001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.net.unix; 019 020import java.io.Closeable; 021import java.io.EOFException; 022 023import org.apache.hadoop.classification.InterfaceAudience; 024import org.apache.hadoop.io.IOUtils; 025 026import java.io.IOException; 027import java.nio.channels.ClosedChannelException; 028import java.util.Iterator; 029import java.util.LinkedList; 030import java.util.TreeMap; 031import java.util.Map; 032import java.util.concurrent.locks.Condition; 033import java.util.concurrent.locks.ReentrantLock; 034 035import org.apache.commons.lang.SystemUtils; 036import org.apache.commons.logging.Log; 037import org.apache.commons.logging.LogFactory; 038import org.apache.hadoop.util.NativeCodeLoader; 039 040import com.google.common.annotations.VisibleForTesting; 041import com.google.common.base.Preconditions; 042import com.google.common.util.concurrent.Uninterruptibles; 043 044/** 045 * The DomainSocketWatcher watches a set of domain sockets to see when they 046 * become readable, or closed. When one of those events happens, it makes a 047 * callback. 048 * 049 * See {@link DomainSocket} for more information about UNIX domain sockets. 050 */ 051@InterfaceAudience.LimitedPrivate("HDFS") 052public final class DomainSocketWatcher implements Closeable { 053 static { 054 if (SystemUtils.IS_OS_WINDOWS) { 055 loadingFailureReason = "UNIX Domain sockets are not available on Windows."; 056 } else if (!NativeCodeLoader.isNativeCodeLoaded()) { 057 loadingFailureReason = "libhadoop cannot be loaded."; 058 } else { 059 String problem; 060 try { 061 anchorNative(); 062 problem = null; 063 } catch (Throwable t) { 064 problem = "DomainSocketWatcher#anchorNative got error: " + 065 t.getMessage(); 066 } 067 loadingFailureReason = problem; 068 } 069 } 070 071 static Log LOG = LogFactory.getLog(DomainSocketWatcher.class); 072 073 /** 074 * The reason why DomainSocketWatcher is not available, or null if it is 075 * available. 076 */ 077 private final static String loadingFailureReason; 078 079 /** 080 * Initializes the native library code. 081 */ 082 private static native void anchorNative(); 083 084 public static String getLoadingFailureReason() { 085 return loadingFailureReason; 086 } 087 088 public interface Handler { 089 /** 090 * Handles an event on a socket. An event may be the socket becoming 091 * readable, or the remote end being closed. 092 * 093 * @param sock The socket that the event occurred on. 094 * @return Whether we should close the socket. 095 */ 096 boolean handle(DomainSocket sock); 097 } 098 099 /** 100 * Handler for {DomainSocketWatcher#notificationSockets[1]} 101 */ 102 private class NotificationHandler implements Handler { 103 public boolean handle(DomainSocket sock) { 104 assert(lock.isHeldByCurrentThread()); 105 try { 106 kicked = false; 107 if (LOG.isTraceEnabled()) { 108 LOG.trace(this + ": NotificationHandler: doing a read on " + 109 sock.fd); 110 } 111 if (sock.getInputStream().read() == -1) { 112 if (LOG.isTraceEnabled()) { 113 LOG.trace(this + ": NotificationHandler: got EOF on " + sock.fd); 114 } 115 throw new EOFException(); 116 } 117 if (LOG.isTraceEnabled()) { 118 LOG.trace(this + ": NotificationHandler: read succeeded on " + 119 sock.fd); 120 } 121 return false; 122 } catch (IOException e) { 123 if (LOG.isTraceEnabled()) { 124 LOG.trace(this + ": NotificationHandler: setting closed to " + 125 "true for " + sock.fd); 126 } 127 closed = true; 128 return true; 129 } 130 } 131 } 132 133 private static class Entry { 134 final DomainSocket socket; 135 final Handler handler; 136 137 Entry(DomainSocket socket, Handler handler) { 138 this.socket = socket; 139 this.handler = handler; 140 } 141 142 DomainSocket getDomainSocket() { 143 return socket; 144 } 145 146 Handler getHandler() { 147 return handler; 148 } 149 } 150 151 /** 152 * The FdSet is a set of file descriptors that gets passed to poll(2). 153 * It contains a native memory segment, so that we don't have to copy 154 * in the poll0 function. 155 */ 156 private static class FdSet { 157 private long data; 158 159 private native static long alloc0(); 160 161 FdSet() { 162 data = alloc0(); 163 } 164 165 /** 166 * Add a file descriptor to the set. 167 * 168 * @param fd The file descriptor to add. 169 */ 170 native void add(int fd); 171 172 /** 173 * Remove a file descriptor from the set. 174 * 175 * @param fd The file descriptor to remove. 176 */ 177 native void remove(int fd); 178 179 /** 180 * Get an array containing all the FDs marked as readable. 181 * Also clear the state of all FDs. 182 * 183 * @return An array containing all of the currently readable file 184 * descriptors. 185 */ 186 native int[] getAndClearReadableFds(); 187 188 /** 189 * Close the object and de-allocate the memory used. 190 */ 191 native void close(); 192 } 193 194 /** 195 * Lock which protects toAdd, toRemove, and closed. 196 */ 197 private final ReentrantLock lock = new ReentrantLock(); 198 199 /** 200 * Condition variable which indicates that toAdd and toRemove have been 201 * processed. 202 */ 203 private final Condition processedCond = lock.newCondition(); 204 205 /** 206 * Entries to add. 207 */ 208 private final LinkedList<Entry> toAdd = 209 new LinkedList<Entry>(); 210 211 /** 212 * Entries to remove. 213 */ 214 private final TreeMap<Integer, DomainSocket> toRemove = 215 new TreeMap<Integer, DomainSocket>(); 216 217 /** 218 * Maximum length of time to go between checking whether the interrupted 219 * bit has been set for this thread. 220 */ 221 private final int interruptCheckPeriodMs; 222 223 /** 224 * A pair of sockets used to wake up the thread after it has called poll(2). 225 */ 226 private final DomainSocket notificationSockets[]; 227 228 /** 229 * Whether or not this DomainSocketWatcher is closed. 230 */ 231 private boolean closed = false; 232 233 /** 234 * True if we have written a byte to the notification socket. We should not 235 * write anything else to the socket until the notification handler has had a 236 * chance to run. Otherwise, our thread might block, causing deadlock. 237 * See HADOOP-11333 for details. 238 */ 239 private boolean kicked = false; 240 241 public DomainSocketWatcher(int interruptCheckPeriodMs, String src) 242 throws IOException { 243 if (loadingFailureReason != null) { 244 throw new UnsupportedOperationException(loadingFailureReason); 245 } 246 Preconditions.checkArgument(interruptCheckPeriodMs > 0); 247 this.interruptCheckPeriodMs = interruptCheckPeriodMs; 248 notificationSockets = DomainSocket.socketpair(); 249 watcherThread.setDaemon(true); 250 watcherThread.setName(src + " DomainSocketWatcher"); 251 watcherThread 252 .setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { 253 @Override 254 public void uncaughtException(Thread thread, Throwable t) { 255 LOG.error(thread + " terminating on unexpected exception", t); 256 } 257 }); 258 watcherThread.start(); 259 } 260 261 /** 262 * Close the DomainSocketWatcher and wait for its thread to terminate. 263 * 264 * If there is more than one close, all but the first will be ignored. 265 */ 266 @Override 267 public void close() throws IOException { 268 lock.lock(); 269 try { 270 if (closed) return; 271 if (LOG.isDebugEnabled()) { 272 LOG.debug(this + ": closing"); 273 } 274 closed = true; 275 } finally { 276 lock.unlock(); 277 } 278 // Close notificationSockets[0], so that notificationSockets[1] gets an EOF 279 // event. This will wake up the thread immediately if it is blocked inside 280 // the select() system call. 281 notificationSockets[0].close(); 282 // Wait for the select thread to terminate. 283 Uninterruptibles.joinUninterruptibly(watcherThread); 284 } 285 286 @VisibleForTesting 287 public boolean isClosed() { 288 lock.lock(); 289 try { 290 return closed; 291 } finally { 292 lock.unlock(); 293 } 294 } 295 296 /** 297 * Add a socket. 298 * 299 * @param sock The socket to add. It is an error to re-add a socket that 300 * we are already watching. 301 * @param handler The handler to associate with this socket. This may be 302 * called any time after this function is called. 303 */ 304 public void add(DomainSocket sock, Handler handler) { 305 lock.lock(); 306 try { 307 if (closed) { 308 handler.handle(sock); 309 IOUtils.cleanup(LOG, sock); 310 return; 311 } 312 Entry entry = new Entry(sock, handler); 313 try { 314 sock.refCount.reference(); 315 } catch (ClosedChannelException e1) { 316 // If the socket is already closed before we add it, invoke the 317 // handler immediately. Then we're done. 318 handler.handle(sock); 319 return; 320 } 321 toAdd.add(entry); 322 kick(); 323 while (true) { 324 try { 325 processedCond.await(); 326 } catch (InterruptedException e) { 327 Thread.currentThread().interrupt(); 328 } 329 if (!toAdd.contains(entry)) { 330 break; 331 } 332 } 333 } finally { 334 lock.unlock(); 335 } 336 } 337 338 /** 339 * Remove a socket. Its handler will be called. 340 * 341 * @param sock The socket to remove. 342 */ 343 public void remove(DomainSocket sock) { 344 lock.lock(); 345 try { 346 if (closed) return; 347 toRemove.put(sock.fd, sock); 348 kick(); 349 while (true) { 350 try { 351 processedCond.await(); 352 } catch (InterruptedException e) { 353 Thread.currentThread().interrupt(); 354 } 355 if (!toRemove.containsKey(sock.fd)) { 356 break; 357 } 358 } 359 } finally { 360 lock.unlock(); 361 } 362 } 363 364 /** 365 * Wake up the DomainSocketWatcher thread. 366 */ 367 private void kick() { 368 assert(lock.isHeldByCurrentThread()); 369 370 if (kicked) { 371 return; 372 } 373 374 try { 375 notificationSockets[0].getOutputStream().write(0); 376 kicked = true; 377 } catch (IOException e) { 378 if (!closed) { 379 LOG.error(this + ": error writing to notificationSockets[0]", e); 380 } 381 } 382 } 383 384 /** 385 * Send callback and return whether or not the domain socket was closed as a 386 * result of processing. 387 * 388 * @param caller reason for call 389 * @param entries mapping of file descriptor to entry 390 * @param fdSet set of file descriptors 391 * @param fd file descriptor 392 * @return true if the domain socket was closed as a result of processing 393 */ 394 private boolean sendCallback(String caller, TreeMap<Integer, Entry> entries, 395 FdSet fdSet, int fd) { 396 if (LOG.isTraceEnabled()) { 397 LOG.trace(this + ": " + caller + " starting sendCallback for fd " + fd); 398 } 399 Entry entry = entries.get(fd); 400 Preconditions.checkNotNull(entry, 401 this + ": fdSet contained " + fd + ", which we were " + 402 "not tracking."); 403 DomainSocket sock = entry.getDomainSocket(); 404 if (entry.getHandler().handle(sock)) { 405 if (LOG.isTraceEnabled()) { 406 LOG.trace(this + ": " + caller + ": closing fd " + fd + 407 " at the request of the handler."); 408 } 409 if (toRemove.remove(fd) != null) { 410 if (LOG.isTraceEnabled()) { 411 LOG.trace(this + ": " + caller + " : sendCallback processed fd " + 412 fd + " in toRemove."); 413 } 414 } 415 try { 416 sock.refCount.unreferenceCheckClosed(); 417 } catch (IOException e) { 418 Preconditions.checkArgument(false, 419 this + ": file descriptor " + sock.fd + " was closed while " + 420 "still in the poll(2) loop."); 421 } 422 IOUtils.cleanup(LOG, sock); 423 fdSet.remove(fd); 424 return true; 425 } else { 426 if (LOG.isTraceEnabled()) { 427 LOG.trace(this + ": " + caller + ": sendCallback not " + 428 "closing fd " + fd); 429 } 430 return false; 431 } 432 } 433 434 /** 435 * Send callback, and if the domain socket was closed as a result of 436 * processing, then also remove the entry for the file descriptor. 437 * 438 * @param caller reason for call 439 * @param entries mapping of file descriptor to entry 440 * @param fdSet set of file descriptors 441 * @param fd file descriptor 442 */ 443 private void sendCallbackAndRemove(String caller, 444 TreeMap<Integer, Entry> entries, FdSet fdSet, int fd) { 445 if (sendCallback(caller, entries, fdSet, fd)) { 446 entries.remove(fd); 447 } 448 } 449 450 @VisibleForTesting 451 final Thread watcherThread = new Thread(new Runnable() { 452 @Override 453 public void run() { 454 if (LOG.isDebugEnabled()) { 455 LOG.debug(this + ": starting with interruptCheckPeriodMs = " + 456 interruptCheckPeriodMs); 457 } 458 final TreeMap<Integer, Entry> entries = new TreeMap<Integer, Entry>(); 459 FdSet fdSet = new FdSet(); 460 addNotificationSocket(entries, fdSet); 461 try { 462 while (true) { 463 lock.lock(); 464 try { 465 for (int fd : fdSet.getAndClearReadableFds()) { 466 sendCallbackAndRemove("getAndClearReadableFds", entries, fdSet, 467 fd); 468 } 469 if (!(toAdd.isEmpty() && toRemove.isEmpty())) { 470 // Handle pending additions (before pending removes). 471 for (Iterator<Entry> iter = toAdd.iterator(); iter.hasNext(); ) { 472 Entry entry = iter.next(); 473 iter.remove(); 474 DomainSocket sock = entry.getDomainSocket(); 475 Entry prevEntry = entries.put(sock.fd, entry); 476 Preconditions.checkState(prevEntry == null, 477 this + ": tried to watch a file descriptor that we " + 478 "were already watching: " + sock); 479 if (LOG.isTraceEnabled()) { 480 LOG.trace(this + ": adding fd " + sock.fd); 481 } 482 fdSet.add(sock.fd); 483 } 484 // Handle pending removals 485 while (true) { 486 Map.Entry<Integer, DomainSocket> entry = toRemove.firstEntry(); 487 if (entry == null) break; 488 sendCallbackAndRemove("handlePendingRemovals", 489 entries, fdSet, entry.getValue().fd); 490 } 491 processedCond.signalAll(); 492 } 493 // Check if the thread should terminate. Doing this check now is 494 // easier than at the beginning of the loop, since we know toAdd and 495 // toRemove are now empty and processedCond has been notified if it 496 // needed to be. 497 if (closed) { 498 if (LOG.isDebugEnabled()) { 499 LOG.debug(toString() + " thread terminating."); 500 } 501 return; 502 } 503 // Check if someone sent our thread an InterruptedException while we 504 // were waiting in poll(). 505 if (Thread.interrupted()) { 506 throw new InterruptedException(); 507 } 508 } finally { 509 lock.unlock(); 510 } 511 doPoll0(interruptCheckPeriodMs, fdSet); 512 } 513 } catch (InterruptedException e) { 514 LOG.info(toString() + " terminating on InterruptedException"); 515 } catch (Throwable e) { 516 LOG.error(toString() + " terminating on exception", e); 517 } finally { 518 lock.lock(); 519 try { 520 kick(); // allow the handler for notificationSockets[0] to read a byte 521 for (Entry entry : entries.values()) { 522 // We do not remove from entries as we iterate, because that can 523 // cause a ConcurrentModificationException. 524 sendCallback("close", entries, fdSet, entry.getDomainSocket().fd); 525 } 526 entries.clear(); 527 fdSet.close(); 528 closed = true; 529 if (!(toAdd.isEmpty() && toRemove.isEmpty())) { 530 // Items in toAdd might not be added to entries, handle it here 531 for (Iterator<Entry> iter = toAdd.iterator(); iter.hasNext();) { 532 Entry entry = iter.next(); 533 entry.getDomainSocket().refCount.unreference(); 534 entry.getHandler().handle(entry.getDomainSocket()); 535 IOUtils.cleanup(LOG, entry.getDomainSocket()); 536 iter.remove(); 537 } 538 // Items in toRemove might not be really removed, handle it here 539 while (true) { 540 Map.Entry<Integer, DomainSocket> entry = toRemove.firstEntry(); 541 if (entry == null) 542 break; 543 sendCallback("close", entries, fdSet, entry.getValue().fd); 544 } 545 } 546 processedCond.signalAll(); 547 } finally { 548 lock.unlock(); 549 } 550 } 551 } 552 }); 553 554 private void addNotificationSocket(final TreeMap<Integer, Entry> entries, 555 FdSet fdSet) { 556 entries.put(notificationSockets[1].fd, 557 new Entry(notificationSockets[1], new NotificationHandler())); 558 try { 559 notificationSockets[1].refCount.reference(); 560 } catch (IOException e) { 561 throw new RuntimeException(e); 562 } 563 fdSet.add(notificationSockets[1].fd); 564 if (LOG.isTraceEnabled()) { 565 LOG.trace(this + ": adding notificationSocket " + 566 notificationSockets[1].fd + ", connected to " + 567 notificationSockets[0].fd); 568 } 569 } 570 571 public String toString() { 572 return "DomainSocketWatcher(" + System.identityHashCode(this) + ")"; 573 } 574 575 private static native int doPoll0(int maxWaitMs, FdSet readFds) 576 throws IOException; 577}