001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.net.unix;
019
020import java.io.Closeable;
021import java.io.EOFException;
022
023import org.apache.hadoop.classification.InterfaceAudience;
024import org.apache.hadoop.io.IOUtils;
025
026import java.io.IOException;
027import java.nio.channels.ClosedChannelException;
028import java.util.Iterator;
029import java.util.LinkedList;
030import java.util.TreeMap;
031import java.util.Map;
032import java.util.concurrent.locks.Condition;
033import java.util.concurrent.locks.ReentrantLock;
034
035import org.apache.commons.lang.SystemUtils;
036import org.apache.commons.logging.Log;
037import org.apache.commons.logging.LogFactory;
038import org.apache.hadoop.util.NativeCodeLoader;
039
040import com.google.common.annotations.VisibleForTesting;
041import com.google.common.base.Preconditions;
042import com.google.common.util.concurrent.Uninterruptibles;
043
044/**
045 * The DomainSocketWatcher watches a set of domain sockets to see when they
046 * become readable, or closed.  When one of those events happens, it makes a
047 * callback.
048 *
049 * See {@link DomainSocket} for more information about UNIX domain sockets.
050 */
051@InterfaceAudience.LimitedPrivate("HDFS")
052public final class DomainSocketWatcher implements Closeable {
053  static {
054    if (SystemUtils.IS_OS_WINDOWS) {
055      loadingFailureReason = "UNIX Domain sockets are not available on Windows.";
056    } else if (!NativeCodeLoader.isNativeCodeLoaded()) {
057      loadingFailureReason = "libhadoop cannot be loaded.";
058    } else {
059      String problem;
060      try {
061        anchorNative();
062        problem = null;
063      } catch (Throwable t) {
064        problem = "DomainSocketWatcher#anchorNative got error: " +
065          t.getMessage();
066      }
067      loadingFailureReason = problem;
068    }
069  }
070
071  static Log LOG = LogFactory.getLog(DomainSocketWatcher.class);
072
073  /**
074   * The reason why DomainSocketWatcher is not available, or null if it is
075   * available.
076   */
077  private final static String loadingFailureReason;
078
079  /**
080   * Initializes the native library code.
081   */
082  private static native void anchorNative();
083
084  public static String getLoadingFailureReason() {
085    return loadingFailureReason;
086  }
087
088  public interface Handler {
089    /**
090     * Handles an event on a socket.  An event may be the socket becoming
091     * readable, or the remote end being closed.
092     *
093     * @param sock    The socket that the event occurred on.
094     * @return        Whether we should close the socket.
095     */
096    boolean handle(DomainSocket sock);
097  }
098
099  /**
100   * Handler for {DomainSocketWatcher#notificationSockets[1]}
101   */
102  private class NotificationHandler implements Handler {
103    public boolean handle(DomainSocket sock) {
104      assert(lock.isHeldByCurrentThread());
105      try {
106        kicked = false;
107        if (LOG.isTraceEnabled()) {
108          LOG.trace(this + ": NotificationHandler: doing a read on " +
109            sock.fd);
110        }
111        if (sock.getInputStream().read() == -1) {
112          if (LOG.isTraceEnabled()) {
113            LOG.trace(this + ": NotificationHandler: got EOF on " + sock.fd);
114          }
115          throw new EOFException();
116        }
117        if (LOG.isTraceEnabled()) {
118          LOG.trace(this + ": NotificationHandler: read succeeded on " +
119            sock.fd);
120        }
121        return false;
122      } catch (IOException e) {
123        if (LOG.isTraceEnabled()) {
124          LOG.trace(this + ": NotificationHandler: setting closed to " +
125              "true for " + sock.fd);
126        }
127        closed = true;
128        return true;
129      }
130    }
131  }
132
133  private static class Entry {
134    final DomainSocket socket;
135    final Handler handler;
136
137    Entry(DomainSocket socket, Handler handler) {
138      this.socket = socket;
139      this.handler = handler;
140    }
141
142    DomainSocket getDomainSocket() {
143      return socket;
144    }
145
146    Handler getHandler() {
147      return handler;
148    }
149  }
150
151  /**
152   * The FdSet is a set of file descriptors that gets passed to poll(2).
153   * It contains a native memory segment, so that we don't have to copy
154   * in the poll0 function.
155   */
156  private static class FdSet {
157    private long data;
158
159    private native static long alloc0();
160
161    FdSet() {
162      data = alloc0();
163    }
164
165    /**
166     * Add a file descriptor to the set.
167     *
168     * @param fd   The file descriptor to add.
169     */
170    native void add(int fd);
171
172    /**
173     * Remove a file descriptor from the set.
174     *
175     * @param fd   The file descriptor to remove.
176     */
177    native void remove(int fd);
178
179    /**
180     * Get an array containing all the FDs marked as readable.
181     * Also clear the state of all FDs.
182     *
183     * @return     An array containing all of the currently readable file
184     *             descriptors.
185     */
186    native int[] getAndClearReadableFds();
187
188    /**
189     * Close the object and de-allocate the memory used.
190     */
191    native void close();
192  }
193
194  /**
195   * Lock which protects toAdd, toRemove, and closed.
196   */
197  private final ReentrantLock lock = new ReentrantLock();
198
199  /**
200   * Condition variable which indicates that toAdd and toRemove have been
201   * processed.
202   */
203  private final Condition processedCond = lock.newCondition();
204
205  /**
206   * Entries to add.
207   */
208  private final LinkedList<Entry> toAdd =
209      new LinkedList<Entry>();
210
211  /**
212   * Entries to remove.
213   */
214  private final TreeMap<Integer, DomainSocket> toRemove =
215      new TreeMap<Integer, DomainSocket>();
216
217  /**
218   * Maximum length of time to go between checking whether the interrupted
219   * bit has been set for this thread.
220   */
221  private final int interruptCheckPeriodMs;
222
223  /**
224   * A pair of sockets used to wake up the thread after it has called poll(2).
225   */
226  private final DomainSocket notificationSockets[];
227
228  /**
229   * Whether or not this DomainSocketWatcher is closed.
230   */
231  private boolean closed = false;
232  
233  /**
234   * True if we have written a byte to the notification socket. We should not
235   * write anything else to the socket until the notification handler has had a
236   * chance to run. Otherwise, our thread might block, causing deadlock. 
237   * See HADOOP-11333 for details.
238   */
239  private boolean kicked = false;
240
241  public DomainSocketWatcher(int interruptCheckPeriodMs, String src)
242      throws IOException {
243    if (loadingFailureReason != null) {
244      throw new UnsupportedOperationException(loadingFailureReason);
245    }
246    Preconditions.checkArgument(interruptCheckPeriodMs > 0);
247    this.interruptCheckPeriodMs = interruptCheckPeriodMs;
248    notificationSockets = DomainSocket.socketpair();
249    watcherThread.setDaemon(true);
250    watcherThread.setName(src + " DomainSocketWatcher");
251    watcherThread
252        .setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
253          @Override
254          public void uncaughtException(Thread thread, Throwable t) {
255            LOG.error(thread + " terminating on unexpected exception", t);
256          }
257        });
258    watcherThread.start();
259  }
260
261  /**
262   * Close the DomainSocketWatcher and wait for its thread to terminate.
263   *
264   * If there is more than one close, all but the first will be ignored.
265   */
266  @Override
267  public void close() throws IOException {
268    lock.lock();
269    try {
270      if (closed) return;
271      if (LOG.isDebugEnabled()) {
272        LOG.debug(this + ": closing");
273      }
274      closed = true;
275    } finally {
276      lock.unlock();
277    }
278    // Close notificationSockets[0], so that notificationSockets[1] gets an EOF
279    // event.  This will wake up the thread immediately if it is blocked inside
280    // the select() system call.
281    notificationSockets[0].close();
282    // Wait for the select thread to terminate.
283    Uninterruptibles.joinUninterruptibly(watcherThread);
284  }
285
286  @VisibleForTesting
287  public boolean isClosed() {
288    lock.lock();
289    try {
290      return closed;
291    } finally {
292      lock.unlock();
293    }
294  }
295
296  /**
297   * Add a socket.
298   *
299   * @param sock     The socket to add.  It is an error to re-add a socket that
300   *                   we are already watching.
301   * @param handler  The handler to associate with this socket.  This may be
302   *                   called any time after this function is called.
303   */
304  public void add(DomainSocket sock, Handler handler) {
305    lock.lock();
306    try {
307      if (closed) {
308        handler.handle(sock);
309        IOUtils.cleanup(LOG, sock);
310        return;
311      }
312      Entry entry = new Entry(sock, handler);
313      try {
314        sock.refCount.reference();
315      } catch (ClosedChannelException e1) {
316        // If the socket is already closed before we add it, invoke the
317        // handler immediately.  Then we're done.
318        handler.handle(sock);
319        return;
320      }
321      toAdd.add(entry);
322      kick();
323      while (true) {
324        try {
325          processedCond.await();
326        } catch (InterruptedException e) {
327          Thread.currentThread().interrupt();
328        }
329        if (!toAdd.contains(entry)) {
330          break;
331        }
332      }
333    } finally {
334      lock.unlock();
335    }
336  }
337
338  /**
339   * Remove a socket.  Its handler will be called.
340   *
341   * @param sock     The socket to remove.
342   */
343  public void remove(DomainSocket sock) {
344    lock.lock();
345    try {
346      if (closed) return;
347      toRemove.put(sock.fd, sock);
348      kick();
349      while (true) {
350        try {
351          processedCond.await();
352        } catch (InterruptedException e) {
353          Thread.currentThread().interrupt();
354        }
355        if (!toRemove.containsKey(sock.fd)) {
356          break;
357        }
358      }
359    } finally {
360      lock.unlock();
361    }
362  }
363
364  /**
365   * Wake up the DomainSocketWatcher thread.
366   */
367  private void kick() {
368    assert(lock.isHeldByCurrentThread());
369    
370    if (kicked) {
371      return;
372    }
373    
374    try {
375      notificationSockets[0].getOutputStream().write(0);
376      kicked = true;
377    } catch (IOException e) {
378      if (!closed) {
379        LOG.error(this + ": error writing to notificationSockets[0]", e);
380      }
381    }
382  }
383
384  /**
385   * Send callback and return whether or not the domain socket was closed as a
386   * result of processing.
387   *
388   * @param caller reason for call
389   * @param entries mapping of file descriptor to entry
390   * @param fdSet set of file descriptors
391   * @param fd file descriptor
392   * @return true if the domain socket was closed as a result of processing
393   */
394  private boolean sendCallback(String caller, TreeMap<Integer, Entry> entries,
395      FdSet fdSet, int fd) {
396    if (LOG.isTraceEnabled()) {
397      LOG.trace(this + ": " + caller + " starting sendCallback for fd " + fd);
398    }
399    Entry entry = entries.get(fd);
400    Preconditions.checkNotNull(entry,
401        this + ": fdSet contained " + fd + ", which we were " +
402        "not tracking.");
403    DomainSocket sock = entry.getDomainSocket();
404    if (entry.getHandler().handle(sock)) {
405      if (LOG.isTraceEnabled()) {
406        LOG.trace(this + ": " + caller + ": closing fd " + fd +
407            " at the request of the handler.");
408      }
409      if (toRemove.remove(fd) != null) {
410        if (LOG.isTraceEnabled()) {
411          LOG.trace(this + ": " + caller + " : sendCallback processed fd " +
412            fd  + " in toRemove.");
413        }
414      }
415      try {
416        sock.refCount.unreferenceCheckClosed();
417      } catch (IOException e) {
418        Preconditions.checkArgument(false,
419            this + ": file descriptor " + sock.fd + " was closed while " +
420            "still in the poll(2) loop.");
421      }
422      IOUtils.cleanup(LOG, sock);
423      fdSet.remove(fd);
424      return true;
425    } else {
426      if (LOG.isTraceEnabled()) {
427        LOG.trace(this + ": " + caller + ": sendCallback not " +
428            "closing fd " + fd);
429      }
430      return false;
431    }
432  }
433
434  /**
435   * Send callback, and if the domain socket was closed as a result of
436   * processing, then also remove the entry for the file descriptor.
437   *
438   * @param caller reason for call
439   * @param entries mapping of file descriptor to entry
440   * @param fdSet set of file descriptors
441   * @param fd file descriptor
442   */
443  private void sendCallbackAndRemove(String caller,
444      TreeMap<Integer, Entry> entries, FdSet fdSet, int fd) {
445    if (sendCallback(caller, entries, fdSet, fd)) {
446      entries.remove(fd);
447    }
448  }
449
450  @VisibleForTesting
451  final Thread watcherThread = new Thread(new Runnable() {
452    @Override
453    public void run() {
454      if (LOG.isDebugEnabled()) {
455        LOG.debug(this + ": starting with interruptCheckPeriodMs = " +
456            interruptCheckPeriodMs);
457      }
458      final TreeMap<Integer, Entry> entries = new TreeMap<Integer, Entry>();
459      FdSet fdSet = new FdSet();
460      addNotificationSocket(entries, fdSet);
461      try {
462        while (true) {
463          lock.lock();
464          try {
465            for (int fd : fdSet.getAndClearReadableFds()) {
466              sendCallbackAndRemove("getAndClearReadableFds", entries, fdSet,
467                  fd);
468            }
469            if (!(toAdd.isEmpty() && toRemove.isEmpty())) {
470              // Handle pending additions (before pending removes).
471              for (Iterator<Entry> iter = toAdd.iterator(); iter.hasNext(); ) {
472                Entry entry = iter.next();
473                iter.remove();
474                DomainSocket sock = entry.getDomainSocket();
475                Entry prevEntry = entries.put(sock.fd, entry);
476                Preconditions.checkState(prevEntry == null,
477                    this + ": tried to watch a file descriptor that we " +
478                    "were already watching: " + sock);
479                if (LOG.isTraceEnabled()) {
480                  LOG.trace(this + ": adding fd " + sock.fd);
481                }
482                fdSet.add(sock.fd);
483              }
484              // Handle pending removals
485              while (true) {
486                Map.Entry<Integer, DomainSocket> entry = toRemove.firstEntry();
487                if (entry == null) break;
488                sendCallbackAndRemove("handlePendingRemovals",
489                    entries, fdSet, entry.getValue().fd);
490              }
491              processedCond.signalAll();
492            }
493            // Check if the thread should terminate.  Doing this check now is
494            // easier than at the beginning of the loop, since we know toAdd and
495            // toRemove are now empty and processedCond has been notified if it
496            // needed to be.
497            if (closed) {
498              if (LOG.isDebugEnabled()) {
499                LOG.debug(toString() + " thread terminating.");
500              }
501              return;
502            }
503            // Check if someone sent our thread an InterruptedException while we
504            // were waiting in poll().
505            if (Thread.interrupted()) {
506              throw new InterruptedException();
507            }
508          } finally {
509            lock.unlock();
510          }
511          doPoll0(interruptCheckPeriodMs, fdSet);
512        }
513      } catch (InterruptedException e) {
514        LOG.info(toString() + " terminating on InterruptedException");
515      } catch (Throwable e) {
516        LOG.error(toString() + " terminating on exception", e);
517      } finally {
518        lock.lock();
519        try {
520          kick(); // allow the handler for notificationSockets[0] to read a byte
521          for (Entry entry : entries.values()) {
522            // We do not remove from entries as we iterate, because that can
523            // cause a ConcurrentModificationException.
524            sendCallback("close", entries, fdSet, entry.getDomainSocket().fd);
525          }
526          entries.clear();
527          fdSet.close();
528          closed = true;
529          if (!(toAdd.isEmpty() && toRemove.isEmpty())) {
530            // Items in toAdd might not be added to entries, handle it here
531            for (Iterator<Entry> iter = toAdd.iterator(); iter.hasNext();) {
532              Entry entry = iter.next();
533              entry.getDomainSocket().refCount.unreference();
534              entry.getHandler().handle(entry.getDomainSocket());
535              IOUtils.cleanup(LOG, entry.getDomainSocket());
536              iter.remove();
537            }
538            // Items in toRemove might not be really removed, handle it here
539            while (true) {
540              Map.Entry<Integer, DomainSocket> entry = toRemove.firstEntry();
541              if (entry == null)
542                break;
543              sendCallback("close", entries, fdSet, entry.getValue().fd);
544            }
545          }
546          processedCond.signalAll();
547        } finally {
548          lock.unlock();
549        }
550      }
551    }
552  });
553
554  private void addNotificationSocket(final TreeMap<Integer, Entry> entries,
555      FdSet fdSet) {
556    entries.put(notificationSockets[1].fd, 
557        new Entry(notificationSockets[1], new NotificationHandler()));
558    try {
559      notificationSockets[1].refCount.reference();
560    } catch (IOException e) {
561      throw new RuntimeException(e);
562    }
563    fdSet.add(notificationSockets[1].fd);
564    if (LOG.isTraceEnabled()) {
565      LOG.trace(this + ": adding notificationSocket " +
566          notificationSockets[1].fd + ", connected to " +
567          notificationSockets[0].fd);
568    }
569  }
570
571  public String toString() {
572    return "DomainSocketWatcher(" + System.identityHashCode(this) + ")"; 
573  }
574
575  private static native int doPoll0(int maxWaitMs, FdSet readFds)
576      throws IOException;
577}