001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hdfs.client.impl; 019 020import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.USE_RECEIPT_VERIFICATION; 021 022import java.io.BufferedOutputStream; 023import java.io.DataInputStream; 024import java.io.DataOutputStream; 025import java.io.FileInputStream; 026import java.io.IOException; 027import java.lang.reflect.Constructor; 028import java.net.InetSocketAddress; 029import java.util.List; 030 031import com.google.common.io.ByteArrayDataOutput; 032import com.google.common.io.ByteStreams; 033import org.apache.commons.lang.mutable.MutableBoolean; 034import org.apache.hadoop.classification.InterfaceAudience; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.fs.StorageType; 037import org.apache.hadoop.hdfs.BlockReader; 038import org.apache.hadoop.hdfs.ClientContext; 039import org.apache.hadoop.hdfs.DFSClient; 040import org.apache.hadoop.hdfs.DFSInputStream; 041import org.apache.hadoop.hdfs.DFSUtilClient; 042import org.apache.hadoop.hdfs.ExtendedBlockId; 043import org.apache.hadoop.hdfs.RemotePeerFactory; 044import org.apache.hadoop.hdfs.ReplicaAccessor; 045import org.apache.hadoop.hdfs.ReplicaAccessorBuilder; 046import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf; 047import org.apache.hadoop.hdfs.net.DomainPeer; 048import org.apache.hadoop.hdfs.net.Peer; 049import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 050import org.apache.hadoop.hdfs.protocol.ExtendedBlock; 051import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; 052import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; 053import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; 054import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; 055import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; 056import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; 057import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; 058import org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory; 059import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache; 060import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.ShortCircuitReplicaCreator; 061import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica; 062import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplicaInfo; 063import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot; 064import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId; 065import org.apache.hadoop.hdfs.util.IOUtilsClient; 066import org.apache.hadoop.ipc.RemoteException; 067import org.apache.hadoop.net.unix.DomainSocket; 068import org.apache.hadoop.security.AccessControlException; 069import org.apache.hadoop.security.UserGroupInformation; 070import org.apache.hadoop.security.token.SecretManager.InvalidToken; 071import org.apache.hadoop.security.token.Token; 072import org.apache.hadoop.util.PerformanceAdvisory; 073import org.apache.hadoop.util.Time; 074 075import com.google.common.annotations.VisibleForTesting; 076import com.google.common.base.Preconditions; 077import org.apache.htrace.core.Tracer; 078 079import org.slf4j.Logger; 080import org.slf4j.LoggerFactory; 081 082 083/** 084 * Utility class to create BlockReader implementations. 085 */ 086@InterfaceAudience.Private 087public class BlockReaderFactory implements ShortCircuitReplicaCreator { 088 static final Logger LOG = LoggerFactory.getLogger(BlockReaderFactory.class); 089 090 public static class FailureInjector { 091 public void injectRequestFileDescriptorsFailure() throws IOException { 092 // do nothing 093 } 094 public boolean getSupportsReceiptVerification() { 095 return true; 096 } 097 } 098 099 @VisibleForTesting 100 static ShortCircuitReplicaCreator 101 createShortCircuitReplicaInfoCallback = null; 102 103 private final DfsClientConf conf; 104 105 /** 106 * Injects failures into specific operations during unit tests. 107 */ 108 private static FailureInjector failureInjector = new FailureInjector(); 109 110 /** 111 * The file name, for logging and debugging purposes. 112 */ 113 private String fileName; 114 115 /** 116 * The block ID and block pool ID to use. 117 */ 118 private ExtendedBlock block; 119 120 /** 121 * The block token to use for security purposes. 122 */ 123 private Token<BlockTokenIdentifier> token; 124 125 /** 126 * The offset within the block to start reading at. 127 */ 128 private long startOffset; 129 130 /** 131 * If false, we won't try to verify the block checksum. 132 */ 133 private boolean verifyChecksum; 134 135 /** 136 * The name of this client. 137 */ 138 private String clientName; 139 140 /** 141 * The DataNode we're talking to. 142 */ 143 private DatanodeInfo datanode; 144 145 /** 146 * StorageType of replica on DataNode. 147 */ 148 private StorageType storageType; 149 150 /** 151 * If false, we won't try short-circuit local reads. 152 */ 153 private boolean allowShortCircuitLocalReads; 154 155 /** 156 * The ClientContext to use for things like the PeerCache. 157 */ 158 private ClientContext clientContext; 159 160 /** 161 * Number of bytes to read. Must be set to a non-negative value. 162 */ 163 private long length = -1; 164 165 /** 166 * Caching strategy to use when reading the block. 167 */ 168 private CachingStrategy cachingStrategy; 169 170 /** 171 * Socket address to use to connect to peer. 172 */ 173 private InetSocketAddress inetSocketAddress; 174 175 /** 176 * Remote peer factory to use to create a peer, if needed. 177 */ 178 private RemotePeerFactory remotePeerFactory; 179 180 /** 181 * UserGroupInformation to use for legacy block reader local objects, 182 * if needed. 183 */ 184 private UserGroupInformation userGroupInformation; 185 186 /** 187 * Configuration to use for legacy block reader local objects, if needed. 188 */ 189 private Configuration configuration; 190 191 /** 192 * The HTrace tracer to use. 193 */ 194 private Tracer tracer; 195 196 /** 197 * Information about the domain socket path we should use to connect to the 198 * local peer-- or null if we haven't examined the local domain socket. 199 */ 200 private DomainSocketFactory.PathInfo pathInfo; 201 202 /** 203 * The remaining number of times that we'll try to pull a socket out of the 204 * cache. 205 */ 206 private int remainingCacheTries; 207 208 public BlockReaderFactory(DfsClientConf conf) { 209 this.conf = conf; 210 this.remainingCacheTries = conf.getNumCachedConnRetry(); 211 } 212 213 public BlockReaderFactory setFileName(String fileName) { 214 this.fileName = fileName; 215 return this; 216 } 217 218 public BlockReaderFactory setBlock(ExtendedBlock block) { 219 this.block = block; 220 return this; 221 } 222 223 public BlockReaderFactory setBlockToken(Token<BlockTokenIdentifier> token) { 224 this.token = token; 225 return this; 226 } 227 228 public BlockReaderFactory setStartOffset(long startOffset) { 229 this.startOffset = startOffset; 230 return this; 231 } 232 233 public BlockReaderFactory setVerifyChecksum(boolean verifyChecksum) { 234 this.verifyChecksum = verifyChecksum; 235 return this; 236 } 237 238 public BlockReaderFactory setClientName(String clientName) { 239 this.clientName = clientName; 240 return this; 241 } 242 243 public BlockReaderFactory setDatanodeInfo(DatanodeInfo datanode) { 244 this.datanode = datanode; 245 return this; 246 } 247 248 public BlockReaderFactory setStorageType(StorageType storageType) { 249 this.storageType = storageType; 250 return this; 251 } 252 253 public BlockReaderFactory setAllowShortCircuitLocalReads( 254 boolean allowShortCircuitLocalReads) { 255 this.allowShortCircuitLocalReads = allowShortCircuitLocalReads; 256 return this; 257 } 258 259 public BlockReaderFactory setClientCacheContext( 260 ClientContext clientContext) { 261 this.clientContext = clientContext; 262 return this; 263 } 264 265 public BlockReaderFactory setLength(long length) { 266 this.length = length; 267 return this; 268 } 269 270 public BlockReaderFactory setCachingStrategy( 271 CachingStrategy cachingStrategy) { 272 this.cachingStrategy = cachingStrategy; 273 return this; 274 } 275 276 public BlockReaderFactory setInetSocketAddress ( 277 InetSocketAddress inetSocketAddress) { 278 this.inetSocketAddress = inetSocketAddress; 279 return this; 280 } 281 282 public BlockReaderFactory setUserGroupInformation( 283 UserGroupInformation userGroupInformation) { 284 this.userGroupInformation = userGroupInformation; 285 return this; 286 } 287 288 public BlockReaderFactory setRemotePeerFactory( 289 RemotePeerFactory remotePeerFactory) { 290 this.remotePeerFactory = remotePeerFactory; 291 return this; 292 } 293 294 public BlockReaderFactory setConfiguration( 295 Configuration configuration) { 296 this.configuration = configuration; 297 return this; 298 } 299 300 public BlockReaderFactory setTracer(Tracer tracer) { 301 this.tracer = tracer; 302 return this; 303 } 304 305 @VisibleForTesting 306 public static void setFailureInjectorForTesting(FailureInjector injector) { 307 failureInjector = injector; 308 } 309 310 /** 311 * Build a BlockReader with the given options. 312 * 313 * This function will do the best it can to create a block reader that meets 314 * all of our requirements. We prefer short-circuit block readers 315 * (BlockReaderLocal and BlockReaderLocalLegacy) over remote ones, since the 316 * former avoid the overhead of socket communication. If short-circuit is 317 * unavailable, our next fallback is data transfer over UNIX domain sockets, 318 * if dfs.client.domain.socket.data.traffic has been enabled. If that doesn't 319 * work, we will try to create a remote block reader that operates over TCP 320 * sockets. 321 * 322 * There are a few caches that are important here. 323 * 324 * The ShortCircuitCache stores file descriptor objects which have been passed 325 * from the DataNode. 326 * 327 * The DomainSocketFactory stores information about UNIX domain socket paths 328 * that we not been able to use in the past, so that we don't waste time 329 * retrying them over and over. (Like all the caches, it does have a timeout, 330 * though.) 331 * 332 * The PeerCache stores peers that we have used in the past. If we can reuse 333 * one of these peers, we avoid the overhead of re-opening a socket. However, 334 * if the socket has been timed out on the remote end, our attempt to reuse 335 * the socket may end with an IOException. For that reason, we limit our 336 * attempts at socket reuse to dfs.client.cached.conn.retry times. After 337 * that, we create new sockets. This avoids the problem where a thread tries 338 * to talk to a peer that it hasn't talked to in a while, and has to clean out 339 * every entry in a socket cache full of stale entries. 340 * 341 * @return The new BlockReader. We will not return null. 342 * 343 * @throws InvalidToken 344 * If the block token was invalid. 345 * InvalidEncryptionKeyException 346 * If the encryption key was invalid. 347 * Other IOException 348 * If there was another problem. 349 */ 350 public BlockReader build() throws IOException { 351 Preconditions.checkNotNull(configuration); 352 Preconditions 353 .checkState(length >= 0, "Length must be set to a non-negative value"); 354 BlockReader reader = tryToCreateExternalBlockReader(); 355 if (reader != null) { 356 return reader; 357 } 358 final ShortCircuitConf scConf = conf.getShortCircuitConf(); 359 if (scConf.isShortCircuitLocalReads() && allowShortCircuitLocalReads) { 360 if (clientContext.getUseLegacyBlockReaderLocal()) { 361 reader = getLegacyBlockReaderLocal(); 362 if (reader != null) { 363 LOG.trace("{}: returning new legacy block reader local.", this); 364 return reader; 365 } 366 } else { 367 reader = getBlockReaderLocal(); 368 if (reader != null) { 369 LOG.trace("{}: returning new block reader local.", this); 370 return reader; 371 } 372 } 373 } 374 if (scConf.isDomainSocketDataTraffic()) { 375 reader = getRemoteBlockReaderFromDomain(); 376 if (reader != null) { 377 LOG.trace("{}: returning new remote block reader using UNIX domain " 378 + "socket on {}", this, pathInfo.getPath()); 379 return reader; 380 } 381 } 382 Preconditions.checkState(!DFSInputStream.tcpReadsDisabledForTesting, 383 "TCP reads were disabled for testing, but we failed to " + 384 "do a non-TCP read."); 385 return getRemoteBlockReaderFromTcp(); 386 } 387 388 private BlockReader tryToCreateExternalBlockReader() { 389 List<Class<? extends ReplicaAccessorBuilder>> clses = 390 conf.getReplicaAccessorBuilderClasses(); 391 for (Class<? extends ReplicaAccessorBuilder> cls : clses) { 392 try { 393 ByteArrayDataOutput bado = ByteStreams.newDataOutput(); 394 token.write(bado); 395 byte tokenBytes[] = bado.toByteArray(); 396 397 Constructor<? extends ReplicaAccessorBuilder> ctor = 398 cls.getConstructor(); 399 ReplicaAccessorBuilder builder = ctor.newInstance(); 400 long visibleLength = startOffset + length; 401 ReplicaAccessor accessor = builder. 402 setAllowShortCircuitReads(allowShortCircuitLocalReads). 403 setBlock(block.getBlockId(), block.getBlockPoolId()). 404 setGenerationStamp(block.getGenerationStamp()). 405 setBlockAccessToken(tokenBytes). 406 setClientName(clientName). 407 setConfiguration(configuration). 408 setFileName(fileName). 409 setVerifyChecksum(verifyChecksum). 410 setVisibleLength(visibleLength). 411 build(); 412 if (accessor == null) { 413 LOG.trace("{}: No ReplicaAccessor created by {}", 414 this, cls.getName()); 415 } else { 416 return new ExternalBlockReader(accessor, visibleLength, startOffset); 417 } 418 } catch (Throwable t) { 419 LOG.warn("Failed to construct new object of type " + 420 cls.getName(), t); 421 } 422 } 423 return null; 424 } 425 426 427 /** 428 * Get {@link BlockReaderLocalLegacy} for short circuited local reads. 429 * This block reader implements the path-based style of local reads 430 * first introduced in HDFS-2246. 431 */ 432 private BlockReader getLegacyBlockReaderLocal() throws IOException { 433 LOG.trace("{}: trying to construct BlockReaderLocalLegacy", this); 434 if (!DFSUtilClient.isLocalAddress(inetSocketAddress)) { 435 LOG.trace("{}: can't construct BlockReaderLocalLegacy because the address" 436 + "{} is not local", this, inetSocketAddress); 437 return null; 438 } 439 if (clientContext.getDisableLegacyBlockReaderLocal()) { 440 PerformanceAdvisory.LOG.debug("{}: can't construct " + 441 "BlockReaderLocalLegacy because " + 442 "disableLegacyBlockReaderLocal is set.", this); 443 return null; 444 } 445 IOException ioe; 446 try { 447 return BlockReaderLocalLegacy.newBlockReader(conf, 448 userGroupInformation, configuration, fileName, block, token, 449 datanode, startOffset, length, storageType, tracer); 450 } catch (RemoteException remoteException) { 451 ioe = remoteException.unwrapRemoteException( 452 InvalidToken.class, AccessControlException.class); 453 } catch (IOException e) { 454 ioe = e; 455 } 456 if ((!(ioe instanceof AccessControlException)) && 457 isSecurityException(ioe)) { 458 // Handle security exceptions. 459 // We do not handle AccessControlException here, since 460 // BlockReaderLocalLegacy#newBlockReader uses that exception to indicate 461 // that the user is not in dfs.block.local-path-access.user, a condition 462 // which requires us to disable legacy SCR. 463 throw ioe; 464 } 465 LOG.warn(this + ": error creating legacy BlockReaderLocal. " + 466 "Disabling legacy local reads.", ioe); 467 clientContext.setDisableLegacyBlockReaderLocal(); 468 return null; 469 } 470 471 private BlockReader getBlockReaderLocal() throws InvalidToken { 472 LOG.trace("{}: trying to construct a BlockReaderLocal for short-circuit " 473 + " reads.", this); 474 if (pathInfo == null) { 475 pathInfo = clientContext.getDomainSocketFactory() 476 .getPathInfo(inetSocketAddress, conf.getShortCircuitConf()); 477 } 478 if (!pathInfo.getPathState().getUsableForShortCircuit()) { 479 PerformanceAdvisory.LOG.debug("{}: {} is not usable for short circuit; " + 480 "giving up on BlockReaderLocal.", this, pathInfo); 481 return null; 482 } 483 ShortCircuitCache cache = clientContext.getShortCircuitCache(); 484 ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), 485 block.getBlockPoolId()); 486 ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this); 487 InvalidToken exc = info.getInvalidTokenException(); 488 if (exc != null) { 489 LOG.trace("{}: got InvalidToken exception while trying to construct " 490 + "BlockReaderLocal via {}", this, pathInfo.getPath()); 491 throw exc; 492 } 493 if (info.getReplica() == null) { 494 PerformanceAdvisory.LOG.debug("{}: failed to get " + 495 "ShortCircuitReplica. Cannot construct " + 496 "BlockReaderLocal via {}", this, pathInfo.getPath()); 497 return null; 498 } 499 return new BlockReaderLocal.Builder(conf.getShortCircuitConf()). 500 setFilename(fileName). 501 setBlock(block). 502 setStartOffset(startOffset). 503 setShortCircuitReplica(info.getReplica()). 504 setVerifyChecksum(verifyChecksum). 505 setCachingStrategy(cachingStrategy). 506 setStorageType(storageType). 507 setTracer(tracer). 508 build(); 509 } 510 511 /** 512 * Fetch a pair of short-circuit block descriptors from a local DataNode. 513 * 514 * @return Null if we could not communicate with the datanode, 515 * a new ShortCircuitReplicaInfo object otherwise. 516 * ShortCircuitReplicaInfo objects may contain either an 517 * InvalidToken exception, or a ShortCircuitReplica object ready to 518 * use. 519 */ 520 @Override 521 public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { 522 if (createShortCircuitReplicaInfoCallback != null) { 523 ShortCircuitReplicaInfo info = 524 createShortCircuitReplicaInfoCallback.createShortCircuitReplicaInfo(); 525 if (info != null) return info; 526 } 527 LOG.trace("{}: trying to create ShortCircuitReplicaInfo.", this); 528 BlockReaderPeer curPeer; 529 while (true) { 530 curPeer = nextDomainPeer(); 531 if (curPeer == null) break; 532 if (curPeer.fromCache) remainingCacheTries--; 533 DomainPeer peer = (DomainPeer)curPeer.peer; 534 Slot slot = null; 535 ShortCircuitCache cache = clientContext.getShortCircuitCache(); 536 try { 537 MutableBoolean usedPeer = new MutableBoolean(false); 538 slot = cache.allocShmSlot(datanode, peer, usedPeer, 539 new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()), 540 clientName); 541 if (usedPeer.booleanValue()) { 542 LOG.trace("{}: allocShmSlot used up our previous socket {}. " 543 + "Allocating a new one...", this, peer.getDomainSocket()); 544 curPeer = nextDomainPeer(); 545 if (curPeer == null) break; 546 peer = (DomainPeer)curPeer.peer; 547 } 548 ShortCircuitReplicaInfo info = requestFileDescriptors(peer, slot); 549 clientContext.getPeerCache().put(datanode, peer); 550 return info; 551 } catch (IOException e) { 552 if (slot != null) { 553 cache.freeSlot(slot); 554 } 555 if (curPeer.fromCache) { 556 // Handle an I/O error we got when using a cached socket. 557 // These are considered less serious, because the socket may be stale. 558 LOG.debug("{}: closing stale domain peer {}", this, peer, e); 559 IOUtilsClient.cleanup(LOG, peer); 560 } else { 561 // Handle an I/O error we got when using a newly created socket. 562 // We temporarily disable the domain socket path for a few minutes in 563 // this case, to prevent wasting more time on it. 564 LOG.warn(this + ": I/O error requesting file descriptors. " + 565 "Disabling domain socket " + peer.getDomainSocket(), e); 566 IOUtilsClient.cleanup(LOG, peer); 567 clientContext.getDomainSocketFactory() 568 .disableDomainSocketPath(pathInfo.getPath()); 569 return null; 570 } 571 } 572 } 573 return null; 574 } 575 576 /** 577 * Request file descriptors from a DomainPeer. 578 * 579 * @param peer The peer to use for communication. 580 * @param slot If non-null, the shared memory slot to associate with the 581 * new ShortCircuitReplica. 582 * 583 * @return A ShortCircuitReplica object if we could communicate with the 584 * datanode; null, otherwise. 585 * @throws IOException If we encountered an I/O exception while communicating 586 * with the datanode. 587 */ 588 private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer, 589 Slot slot) throws IOException { 590 ShortCircuitCache cache = clientContext.getShortCircuitCache(); 591 final DataOutputStream out = 592 new DataOutputStream(new BufferedOutputStream(peer.getOutputStream())); 593 SlotId slotId = slot == null ? null : slot.getSlotId(); 594 new Sender(out).requestShortCircuitFds(block, token, slotId, 1, 595 failureInjector.getSupportsReceiptVerification()); 596 DataInputStream in = new DataInputStream(peer.getInputStream()); 597 BlockOpResponseProto resp = BlockOpResponseProto.parseFrom( 598 PBHelperClient.vintPrefixed(in)); 599 DomainSocket sock = peer.getDomainSocket(); 600 failureInjector.injectRequestFileDescriptorsFailure(); 601 switch (resp.getStatus()) { 602 case SUCCESS: 603 byte buf[] = new byte[1]; 604 FileInputStream[] fis = new FileInputStream[2]; 605 sock.recvFileInputStreams(fis, buf, 0, buf.length); 606 ShortCircuitReplica replica = null; 607 try { 608 ExtendedBlockId key = 609 new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()); 610 if (buf[0] == USE_RECEIPT_VERIFICATION.getNumber()) { 611 LOG.trace("Sending receipt verification byte for slot {}", slot); 612 sock.getOutputStream().write(0); 613 } 614 replica = new ShortCircuitReplica(key, fis[0], fis[1], cache, 615 Time.monotonicNow(), slot); 616 return new ShortCircuitReplicaInfo(replica); 617 } catch (IOException e) { 618 // This indicates an error reading from disk, or a format error. Since 619 // it's not a socket communication problem, we return null rather than 620 // throwing an exception. 621 LOG.warn(this + ": error creating ShortCircuitReplica.", e); 622 return null; 623 } finally { 624 if (replica == null) { 625 IOUtilsClient.cleanup(DFSClient.LOG, fis[0], fis[1]); 626 } 627 } 628 case ERROR_UNSUPPORTED: 629 if (!resp.hasShortCircuitAccessVersion()) { 630 LOG.warn("short-circuit read access is disabled for " + 631 "DataNode " + datanode + ". reason: " + resp.getMessage()); 632 clientContext.getDomainSocketFactory() 633 .disableShortCircuitForPath(pathInfo.getPath()); 634 } else { 635 LOG.warn("short-circuit read access for the file " + 636 fileName + " is disabled for DataNode " + datanode + 637 ". reason: " + resp.getMessage()); 638 } 639 return null; 640 case ERROR_ACCESS_TOKEN: 641 String msg = "access control error while " + 642 "attempting to set up short-circuit access to " + 643 fileName + resp.getMessage(); 644 LOG.debug("{}:{}", this, msg); 645 return new ShortCircuitReplicaInfo(new InvalidToken(msg)); 646 default: 647 LOG.warn(this + ": unknown response code " + resp.getStatus() + 648 " while attempting to set up short-circuit access. " + 649 resp.getMessage()); 650 clientContext.getDomainSocketFactory() 651 .disableShortCircuitForPath(pathInfo.getPath()); 652 return null; 653 } 654 } 655 656 /** 657 * Get a BlockReaderRemote that communicates over a UNIX domain socket. 658 * 659 * @return The new BlockReader, or null if we failed to create the block 660 * reader. 661 * 662 * @throws InvalidToken If the block token was invalid. 663 * Potentially other security-related execptions. 664 */ 665 private BlockReader getRemoteBlockReaderFromDomain() throws IOException { 666 if (pathInfo == null) { 667 pathInfo = clientContext.getDomainSocketFactory() 668 .getPathInfo(inetSocketAddress, conf.getShortCircuitConf()); 669 } 670 if (!pathInfo.getPathState().getUsableForDataTransfer()) { 671 PerformanceAdvisory.LOG.debug("{}: not trying to create a " + 672 "remote block reader because the UNIX domain socket at {}" + 673 " is not usable.", this, pathInfo); 674 return null; 675 } 676 LOG.trace("{}: trying to create a remote block reader from the UNIX domain " 677 + "socket at {}", this, pathInfo.getPath()); 678 679 while (true) { 680 BlockReaderPeer curPeer = nextDomainPeer(); 681 if (curPeer == null) break; 682 if (curPeer.fromCache) remainingCacheTries--; 683 DomainPeer peer = (DomainPeer)curPeer.peer; 684 BlockReader blockReader = null; 685 try { 686 blockReader = getRemoteBlockReader(peer); 687 return blockReader; 688 } catch (IOException ioe) { 689 IOUtilsClient.cleanup(LOG, peer); 690 if (isSecurityException(ioe)) { 691 LOG.trace("{}: got security exception while constructing a remote " 692 + " block reader from the unix domain socket at {}", 693 this, pathInfo.getPath(), ioe); 694 throw ioe; 695 } 696 if (curPeer.fromCache) { 697 // Handle an I/O error we got when using a cached peer. These are 698 // considered less serious because the underlying socket may be stale. 699 LOG.debug("Closed potentially stale domain peer {}", peer, ioe); 700 } else { 701 // Handle an I/O error we got when using a newly created domain peer. 702 // We temporarily disable the domain socket path for a few minutes in 703 // this case, to prevent wasting more time on it. 704 LOG.warn("I/O error constructing remote block reader. Disabling " + 705 "domain socket " + peer.getDomainSocket(), ioe); 706 clientContext.getDomainSocketFactory() 707 .disableDomainSocketPath(pathInfo.getPath()); 708 return null; 709 } 710 } finally { 711 if (blockReader == null) { 712 IOUtilsClient.cleanup(LOG, peer); 713 } 714 } 715 } 716 return null; 717 } 718 719 /** 720 * Get a BlockReaderRemote that communicates over a TCP socket. 721 * 722 * @return The new BlockReader. We will not return null, but instead throw 723 * an exception if this fails. 724 * 725 * @throws InvalidToken 726 * If the block token was invalid. 727 * InvalidEncryptionKeyException 728 * If the encryption key was invalid. 729 * Other IOException 730 * If there was another problem. 731 */ 732 private BlockReader getRemoteBlockReaderFromTcp() throws IOException { 733 LOG.trace("{}: trying to create a remote block reader from a TCP socket", 734 this); 735 BlockReader blockReader = null; 736 while (true) { 737 BlockReaderPeer curPeer = null; 738 Peer peer = null; 739 try { 740 curPeer = nextTcpPeer(); 741 if (curPeer.fromCache) remainingCacheTries--; 742 peer = curPeer.peer; 743 blockReader = getRemoteBlockReader(peer); 744 return blockReader; 745 } catch (IOException ioe) { 746 if (isSecurityException(ioe)) { 747 LOG.trace("{}: got security exception while constructing a remote " 748 + "block reader from {}", this, peer, ioe); 749 throw ioe; 750 } 751 if ((curPeer != null) && curPeer.fromCache) { 752 // Handle an I/O error we got when using a cached peer. These are 753 // considered less serious, because the underlying socket may be 754 // stale. 755 LOG.debug("Closed potentially stale remote peer {}", peer, ioe); 756 } else { 757 // Handle an I/O error we got when using a newly created peer. 758 LOG.warn("I/O error constructing remote block reader.", ioe); 759 throw ioe; 760 } 761 } finally { 762 if (blockReader == null) { 763 IOUtilsClient.cleanup(LOG, peer); 764 } 765 } 766 } 767 } 768 769 public static class BlockReaderPeer { 770 final Peer peer; 771 final boolean fromCache; 772 773 BlockReaderPeer(Peer peer, boolean fromCache) { 774 this.peer = peer; 775 this.fromCache = fromCache; 776 } 777 } 778 779 /** 780 * Get the next DomainPeer-- either from the cache or by creating it. 781 * 782 * @return the next DomainPeer, or null if we could not construct one. 783 */ 784 private BlockReaderPeer nextDomainPeer() { 785 if (remainingCacheTries > 0) { 786 Peer peer = clientContext.getPeerCache().get(datanode, true); 787 if (peer != null) { 788 LOG.trace("nextDomainPeer: reusing existing peer {}", peer); 789 return new BlockReaderPeer(peer, true); 790 } 791 } 792 DomainSocket sock = clientContext.getDomainSocketFactory(). 793 createSocket(pathInfo, conf.getSocketTimeout()); 794 if (sock == null) return null; 795 return new BlockReaderPeer(new DomainPeer(sock), false); 796 } 797 798 /** 799 * Get the next TCP-based peer-- either from the cache or by creating it. 800 * 801 * @return the next Peer, or null if we could not construct one. 802 * 803 * @throws IOException If there was an error while constructing the peer 804 * (such as an InvalidEncryptionKeyException) 805 */ 806 private BlockReaderPeer nextTcpPeer() throws IOException { 807 if (remainingCacheTries > 0) { 808 Peer peer = clientContext.getPeerCache().get(datanode, false); 809 if (peer != null) { 810 LOG.trace("nextTcpPeer: reusing existing peer {}", peer); 811 return new BlockReaderPeer(peer, true); 812 } 813 } 814 try { 815 Peer peer = remotePeerFactory.newConnectedPeer(inetSocketAddress, token, 816 datanode); 817 LOG.trace("nextTcpPeer: created newConnectedPeer {}", peer); 818 return new BlockReaderPeer(peer, false); 819 } catch (IOException e) { 820 LOG.trace("nextTcpPeer: failed to create newConnectedPeer connected to" 821 + "{}", datanode); 822 throw e; 823 } 824 } 825 826 /** 827 * Determine if an exception is security-related. 828 * 829 * We need to handle these exceptions differently than other IOExceptions. 830 * They don't indicate a communication problem. Instead, they mean that there 831 * is some action the client needs to take, such as refetching block tokens, 832 * renewing encryption keys, etc. 833 * 834 * @param ioe The exception 835 * @return True only if the exception is security-related. 836 */ 837 private static boolean isSecurityException(IOException ioe) { 838 return (ioe instanceof InvalidToken) || 839 (ioe instanceof InvalidEncryptionKeyException) || 840 (ioe instanceof InvalidBlockTokenException) || 841 (ioe instanceof AccessControlException); 842 } 843 844 @SuppressWarnings("deprecation") 845 private BlockReader getRemoteBlockReader(Peer peer) throws IOException { 846 if (conf.getShortCircuitConf().isUseLegacyBlockReader()) { 847 return BlockReaderRemote.newBlockReader(fileName, 848 block, token, startOffset, length, conf.getIoBufferSize(), 849 verifyChecksum, clientName, peer, datanode, 850 clientContext.getPeerCache(), cachingStrategy, tracer); 851 } else { 852 return BlockReaderRemote2.newBlockReader( 853 fileName, block, token, startOffset, length, 854 verifyChecksum, clientName, peer, datanode, 855 clientContext.getPeerCache(), cachingStrategy, tracer); 856 } 857 } 858 859 @Override 860 public String toString() { 861 return "BlockReaderFactory(fileName=" + fileName + ", block=" + block + ")"; 862 } 863 864 /** 865 * File name to print when accessing a block directly (from servlets) 866 * @param s Address of the block location 867 * @param poolId Block pool ID of the block 868 * @param blockId Block ID of the block 869 * @return string that has a file name for debug purposes 870 */ 871 public static String getFileName(final InetSocketAddress s, 872 final String poolId, final long blockId) { 873 return s.toString() + ":" + poolId + ":" + blockId; 874 } 875}