001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hdfs; 019 020import java.io.EOFException; 021import java.io.IOException; 022import java.net.InetSocketAddress; 023import java.nio.ByteBuffer; 024import java.util.AbstractMap; 025import java.util.ArrayList; 026import java.util.Arrays; 027import java.util.Collection; 028import java.util.EnumSet; 029import java.util.HashMap; 030import java.util.HashSet; 031import java.util.Iterator; 032import java.util.LinkedList; 033import java.util.List; 034import java.util.Map; 035import java.util.Map.Entry; 036import java.util.Set; 037import java.util.concurrent.Callable; 038import java.util.concurrent.CancellationException; 039import java.util.concurrent.CompletionService; 040import java.util.concurrent.ConcurrentHashMap; 041import java.util.concurrent.ExecutionException; 042import java.util.concurrent.ExecutorCompletionService; 043import java.util.concurrent.Future; 044import java.util.concurrent.ThreadLocalRandom; 045import java.util.concurrent.TimeUnit; 046import java.util.concurrent.atomic.AtomicBoolean; 047 048import com.google.common.base.Preconditions; 049import org.apache.commons.io.IOUtils; 050import org.apache.hadoop.classification.InterfaceAudience; 051import org.apache.hadoop.fs.ByteBufferReadable; 052import org.apache.hadoop.fs.ByteBufferUtil; 053import org.apache.hadoop.fs.CanSetDropBehind; 054import org.apache.hadoop.fs.CanSetReadahead; 055import org.apache.hadoop.fs.CanUnbuffer; 056import org.apache.hadoop.fs.ChecksumException; 057import org.apache.hadoop.fs.FSInputStream; 058import org.apache.hadoop.fs.FileEncryptionInfo; 059import org.apache.hadoop.fs.HasEnhancedByteBufferAccess; 060import org.apache.hadoop.fs.ReadOption; 061import org.apache.hadoop.fs.StorageType; 062import org.apache.hadoop.hdfs.client.impl.BlockReaderFactory; 063import org.apache.hadoop.hdfs.client.impl.DfsClientConf; 064import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; 065import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 066import org.apache.hadoop.hdfs.protocol.ExtendedBlock; 067import org.apache.hadoop.hdfs.protocol.LocatedBlock; 068import org.apache.hadoop.hdfs.protocol.LocatedBlocks; 069import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; 070import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; 071import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; 072import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; 073import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException; 074import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; 075import org.apache.hadoop.io.ByteBufferPool; 076import org.apache.hadoop.ipc.RPC; 077import org.apache.hadoop.ipc.RemoteException; 078import org.apache.hadoop.ipc.RetriableException; 079import org.apache.hadoop.net.NetUtils; 080import org.apache.hadoop.security.token.SecretManager.InvalidToken; 081import org.apache.hadoop.security.token.Token; 082import org.apache.hadoop.util.IdentityHashStore; 083import org.apache.hadoop.util.StopWatch; 084import org.apache.htrace.core.SpanId; 085import org.apache.htrace.core.TraceScope; 086import org.apache.htrace.core.Tracer; 087 088import com.google.common.annotations.VisibleForTesting; 089 090import javax.annotation.Nonnull; 091 092/**************************************************************** 093 * DFSInputStream provides bytes from a named file. It handles 094 * negotiation of the namenode and various datanodes as necessary. 095 ****************************************************************/ 096@InterfaceAudience.Private 097public class DFSInputStream extends FSInputStream 098 implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, 099 HasEnhancedByteBufferAccess, CanUnbuffer { 100 @VisibleForTesting 101 public static boolean tcpReadsDisabledForTesting = false; 102 private long hedgedReadOpsLoopNumForTesting = 0; 103 protected final DFSClient dfsClient; 104 protected AtomicBoolean closed = new AtomicBoolean(false); 105 protected final String src; 106 protected final boolean verifyChecksum; 107 108 // state by stateful read only: 109 // (protected by lock on this) 110 ///// 111 private DatanodeInfo currentNode = null; 112 protected LocatedBlock currentLocatedBlock = null; 113 protected long pos = 0; 114 protected long blockEnd = -1; 115 private BlockReader blockReader = null; 116 //// 117 118 // state shared by stateful and positional read: 119 // (protected by lock on infoLock) 120 //// 121 protected LocatedBlocks locatedBlocks = null; 122 private long lastBlockBeingWrittenLength = 0; 123 private FileEncryptionInfo fileEncryptionInfo = null; 124 protected CachingStrategy cachingStrategy; 125 //// 126 127 protected final ReadStatistics readStatistics = new ReadStatistics(); 128 // lock for state shared between read and pread 129 // Note: Never acquire a lock on <this> with this lock held to avoid deadlocks 130 // (it's OK to acquire this lock when the lock on <this> is held) 131 protected final Object infoLock = new Object(); 132 133 /** 134 * Track the ByteBuffers that we have handed out to readers. 135 * 136 * The value type can be either ByteBufferPool or ClientMmap, depending on 137 * whether we this is a memory-mapped buffer or not. 138 */ 139 private IdentityHashStore<ByteBuffer, Object> extendedReadBuffers; 140 141 private synchronized IdentityHashStore<ByteBuffer, Object> 142 getExtendedReadBuffers() { 143 if (extendedReadBuffers == null) { 144 extendedReadBuffers = new IdentityHashStore<>(0); 145 } 146 return extendedReadBuffers; 147 } 148 149 public static class ReadStatistics { 150 public ReadStatistics() { 151 clear(); 152 } 153 154 public ReadStatistics(ReadStatistics rhs) { 155 this.totalBytesRead = rhs.getTotalBytesRead(); 156 this.totalLocalBytesRead = rhs.getTotalLocalBytesRead(); 157 this.totalShortCircuitBytesRead = rhs.getTotalShortCircuitBytesRead(); 158 this.totalZeroCopyBytesRead = rhs.getTotalZeroCopyBytesRead(); 159 } 160 161 /** 162 * @return The total bytes read. This will always be at least as 163 * high as the other numbers, since it includes all of them. 164 */ 165 public long getTotalBytesRead() { 166 return totalBytesRead; 167 } 168 169 /** 170 * @return The total local bytes read. This will always be at least 171 * as high as totalShortCircuitBytesRead, since all short-circuit 172 * reads are also local. 173 */ 174 public long getTotalLocalBytesRead() { 175 return totalLocalBytesRead; 176 } 177 178 /** 179 * @return The total short-circuit local bytes read. 180 */ 181 public long getTotalShortCircuitBytesRead() { 182 return totalShortCircuitBytesRead; 183 } 184 185 /** 186 * @return The total number of zero-copy bytes read. 187 */ 188 public long getTotalZeroCopyBytesRead() { 189 return totalZeroCopyBytesRead; 190 } 191 192 /** 193 * @return The total number of bytes read which were not local. 194 */ 195 public long getRemoteBytesRead() { 196 return totalBytesRead - totalLocalBytesRead; 197 } 198 199 void addRemoteBytes(long amt) { 200 this.totalBytesRead += amt; 201 } 202 203 void addLocalBytes(long amt) { 204 this.totalBytesRead += amt; 205 this.totalLocalBytesRead += amt; 206 } 207 208 void addShortCircuitBytes(long amt) { 209 this.totalBytesRead += amt; 210 this.totalLocalBytesRead += amt; 211 this.totalShortCircuitBytesRead += amt; 212 } 213 214 void addZeroCopyBytes(long amt) { 215 this.totalBytesRead += amt; 216 this.totalLocalBytesRead += amt; 217 this.totalShortCircuitBytesRead += amt; 218 this.totalZeroCopyBytesRead += amt; 219 } 220 221 void clear() { 222 this.totalBytesRead = 0; 223 this.totalLocalBytesRead = 0; 224 this.totalShortCircuitBytesRead = 0; 225 this.totalZeroCopyBytesRead = 0; 226 } 227 228 private long totalBytesRead; 229 230 private long totalLocalBytesRead; 231 232 private long totalShortCircuitBytesRead; 233 234 private long totalZeroCopyBytesRead; 235 } 236 237 /** 238 * This variable tracks the number of failures since the start of the 239 * most recent user-facing operation. That is to say, it should be reset 240 * whenever the user makes a call on this stream, and if at any point 241 * during the retry logic, the failure count exceeds a threshold, 242 * the errors will be thrown back to the operation. 243 * 244 * Specifically this counts the number of times the client has gone 245 * back to the namenode to get a new list of block locations, and is 246 * capped at maxBlockAcquireFailures 247 */ 248 protected int failures = 0; 249 250 /* XXX Use of CocurrentHashMap is temp fix. Need to fix 251 * parallel accesses to DFSInputStream (through ptreads) properly */ 252 private final ConcurrentHashMap<DatanodeInfo, DatanodeInfo> deadNodes = 253 new ConcurrentHashMap<>(); 254 255 private byte[] oneByteBuf; // used for 'int read()' 256 257 void addToDeadNodes(DatanodeInfo dnInfo) { 258 deadNodes.put(dnInfo, dnInfo); 259 } 260 261 DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum, 262 LocatedBlocks locatedBlocks) throws IOException { 263 this.dfsClient = dfsClient; 264 this.verifyChecksum = verifyChecksum; 265 this.src = src; 266 synchronized (infoLock) { 267 this.cachingStrategy = dfsClient.getDefaultReadCachingStrategy(); 268 } 269 this.locatedBlocks = locatedBlocks; 270 openInfo(false); 271 } 272 273 /** 274 * Grab the open-file info from namenode 275 * @param refreshLocatedBlocks whether to re-fetch locatedblocks 276 */ 277 void openInfo(boolean refreshLocatedBlocks) throws IOException { 278 final DfsClientConf conf = dfsClient.getConf(); 279 synchronized(infoLock) { 280 lastBlockBeingWrittenLength = 281 fetchLocatedBlocksAndGetLastBlockLength(refreshLocatedBlocks); 282 int retriesForLastBlockLength = conf.getRetryTimesForGetLastBlockLength(); 283 while (retriesForLastBlockLength > 0) { 284 // Getting last block length as -1 is a special case. When cluster 285 // restarts, DNs may not report immediately. At this time partial block 286 // locations will not be available with NN for getting the length. Lets 287 // retry for 3 times to get the length. 288 if (lastBlockBeingWrittenLength == -1) { 289 DFSClient.LOG.warn("Last block locations not available. " 290 + "Datanodes might not have reported blocks completely." 291 + " Will retry for " + retriesForLastBlockLength + " times"); 292 waitFor(conf.getRetryIntervalForGetLastBlockLength()); 293 lastBlockBeingWrittenLength = 294 fetchLocatedBlocksAndGetLastBlockLength(true); 295 } else { 296 break; 297 } 298 retriesForLastBlockLength--; 299 } 300 if (retriesForLastBlockLength == 0) { 301 throw new IOException("Could not obtain the last block locations."); 302 } 303 } 304 } 305 306 private void waitFor(int waitTime) throws IOException { 307 try { 308 Thread.sleep(waitTime); 309 } catch (InterruptedException e) { 310 throw new IOException( 311 "Interrupted while getting the last block length."); 312 } 313 } 314 315 private long fetchLocatedBlocksAndGetLastBlockLength(boolean refresh) 316 throws IOException { 317 LocatedBlocks newInfo = locatedBlocks; 318 if (locatedBlocks == null || refresh) { 319 newInfo = dfsClient.getLocatedBlocks(src, 0); 320 } 321 DFSClient.LOG.debug("newInfo = {}", newInfo); 322 if (newInfo == null) { 323 throw new IOException("Cannot open filename " + src); 324 } 325 326 if (locatedBlocks != null) { 327 Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator(); 328 Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator(); 329 while (oldIter.hasNext() && newIter.hasNext()) { 330 if (! oldIter.next().getBlock().equals(newIter.next().getBlock())) { 331 throw new IOException("Blocklist for " + src + " has changed!"); 332 } 333 } 334 } 335 locatedBlocks = newInfo; 336 long lastBlockBeingWrittenLength = 0; 337 if (!locatedBlocks.isLastBlockComplete()) { 338 final LocatedBlock last = locatedBlocks.getLastLocatedBlock(); 339 if (last != null) { 340 if (last.getLocations().length == 0) { 341 if (last.getBlockSize() == 0) { 342 // if the length is zero, then no data has been written to 343 // datanode. So no need to wait for the locations. 344 return 0; 345 } 346 return -1; 347 } 348 final long len = readBlockLength(last); 349 last.getBlock().setNumBytes(len); 350 lastBlockBeingWrittenLength = len; 351 } 352 } 353 354 fileEncryptionInfo = locatedBlocks.getFileEncryptionInfo(); 355 356 return lastBlockBeingWrittenLength; 357 } 358 359 /** Read the block length from one of the datanodes. */ 360 private long readBlockLength(LocatedBlock locatedblock) throws IOException { 361 assert locatedblock != null : "LocatedBlock cannot be null"; 362 int replicaNotFoundCount = locatedblock.getLocations().length; 363 364 final DfsClientConf conf = dfsClient.getConf(); 365 final int timeout = conf.getSocketTimeout(); 366 LinkedList<DatanodeInfo> nodeList = new LinkedList<DatanodeInfo>( 367 Arrays.asList(locatedblock.getLocations())); 368 LinkedList<DatanodeInfo> retryList = new LinkedList<DatanodeInfo>(); 369 boolean isRetry = false; 370 StopWatch sw = new StopWatch(); 371 while (nodeList.size() > 0) { 372 DatanodeInfo datanode = nodeList.pop(); 373 ClientDatanodeProtocol cdp = null; 374 try { 375 cdp = DFSUtilClient.createClientDatanodeProtocolProxy(datanode, 376 dfsClient.getConfiguration(), timeout, 377 conf.isConnectToDnViaHostname(), locatedblock); 378 379 final long n = cdp.getReplicaVisibleLength(locatedblock.getBlock()); 380 381 if (n >= 0) { 382 return n; 383 } 384 } catch (IOException ioe) { 385 if (ioe instanceof RemoteException) { 386 if (((RemoteException) ioe).unwrapRemoteException() instanceof 387 ReplicaNotFoundException) { 388 // replica is not on the DN. We will treat it as 0 length 389 // if no one actually has a replica. 390 replicaNotFoundCount--; 391 } else if (((RemoteException) ioe).unwrapRemoteException() instanceof 392 RetriableException) { 393 // add to the list to be retried if necessary. 394 retryList.add(datanode); 395 } 396 } 397 DFSClient.LOG.debug("Failed to getReplicaVisibleLength from datanode {}" 398 + " for block {}", datanode, locatedblock.getBlock(), ioe); 399 } finally { 400 if (cdp != null) { 401 RPC.stopProxy(cdp); 402 } 403 } 404 405 // Ran out of nodes, but there are retriable nodes. 406 if (nodeList.size() == 0 && retryList.size() > 0) { 407 nodeList.addAll(retryList); 408 retryList.clear(); 409 isRetry = true; 410 } 411 412 if (isRetry) { 413 // start the stop watch if not already running. 414 if (!sw.isRunning()) { 415 sw.start(); 416 } 417 try { 418 Thread.sleep(500); // delay between retries. 419 } catch (InterruptedException e) { 420 throw new IOException("Interrupted while getting the length."); 421 } 422 } 423 424 // see if we ran out of retry time 425 if (sw.isRunning() && sw.now(TimeUnit.MILLISECONDS) > timeout) { 426 break; 427 } 428 } 429 430 // Namenode told us about these locations, but none know about the replica 431 // means that we hit the race between pipeline creation start and end. 432 // we require all 3 because some other exception could have happened 433 // on a DN that has it. we want to report that error 434 if (replicaNotFoundCount == 0) { 435 return 0; 436 } 437 438 throw new IOException("Cannot obtain block length for " + locatedblock); 439 } 440 441 public long getFileLength() { 442 synchronized(infoLock) { 443 return locatedBlocks == null? 0: 444 locatedBlocks.getFileLength() + lastBlockBeingWrittenLength; 445 } 446 } 447 448 // Short circuit local reads are forbidden for files that are 449 // under construction. See HDFS-2757. 450 boolean shortCircuitForbidden() { 451 synchronized(infoLock) { 452 return locatedBlocks.isUnderConstruction(); 453 } 454 } 455 456 /** 457 * Returns the datanode from which the stream is currently reading. 458 */ 459 public synchronized DatanodeInfo getCurrentDatanode() { 460 return currentNode; 461 } 462 463 /** 464 * Returns the block containing the target position. 465 */ 466 synchronized public ExtendedBlock getCurrentBlock() { 467 if (currentLocatedBlock == null){ 468 return null; 469 } 470 return currentLocatedBlock.getBlock(); 471 } 472 473 /** 474 * Return collection of blocks that has already been located. 475 */ 476 public List<LocatedBlock> getAllBlocks() throws IOException { 477 return getBlockRange(0, getFileLength()); 478 } 479 480 /** 481 * Get block at the specified position. 482 * Fetch it from the namenode if not cached. 483 * 484 * @param offset block corresponding to this offset in file is returned 485 * @return located block 486 * @throws IOException 487 */ 488 protected LocatedBlock getBlockAt(long offset) throws IOException { 489 synchronized(infoLock) { 490 assert (locatedBlocks != null) : "locatedBlocks is null"; 491 492 final LocatedBlock blk; 493 494 //check offset 495 if (offset < 0 || offset >= getFileLength()) { 496 throw new IOException("offset < 0 || offset >= getFileLength(), offset=" 497 + offset 498 + ", locatedBlocks=" + locatedBlocks); 499 } 500 else if (offset >= locatedBlocks.getFileLength()) { 501 // offset to the portion of the last block, 502 // which is not known to the name-node yet; 503 // getting the last block 504 blk = locatedBlocks.getLastLocatedBlock(); 505 } 506 else { 507 // search cached blocks first 508 blk = fetchBlockAt(offset, 0, true); 509 } 510 return blk; 511 } 512 } 513 514 /** Fetch a block from namenode and cache it */ 515 protected LocatedBlock fetchBlockAt(long offset) throws IOException { 516 return fetchBlockAt(offset, 0, false); // don't use cache 517 } 518 519 /** Fetch a block from namenode and cache it */ 520 private LocatedBlock fetchBlockAt(long offset, long length, boolean useCache) 521 throws IOException { 522 synchronized(infoLock) { 523 int targetBlockIdx = locatedBlocks.findBlock(offset); 524 if (targetBlockIdx < 0) { // block is not cached 525 targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx); 526 useCache = false; 527 } 528 if (!useCache) { // fetch blocks 529 final LocatedBlocks newBlocks = (length == 0) 530 ? dfsClient.getLocatedBlocks(src, offset) 531 : dfsClient.getLocatedBlocks(src, offset, length); 532 if (newBlocks == null || newBlocks.locatedBlockCount() == 0) { 533 throw new EOFException("Could not find target position " + offset); 534 } 535 locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks()); 536 } 537 return locatedBlocks.get(targetBlockIdx); 538 } 539 } 540 541 /** 542 * Get blocks in the specified range. 543 * Fetch them from the namenode if not cached. This function 544 * will not get a read request beyond the EOF. 545 * @param offset starting offset in file 546 * @param length length of data 547 * @return consequent segment of located blocks 548 * @throws IOException 549 */ 550 private List<LocatedBlock> getBlockRange(long offset, 551 long length) throws IOException { 552 // getFileLength(): returns total file length 553 // locatedBlocks.getFileLength(): returns length of completed blocks 554 if (offset >= getFileLength()) { 555 throw new IOException("Offset: " + offset + 556 " exceeds file length: " + getFileLength()); 557 } 558 synchronized(infoLock) { 559 final List<LocatedBlock> blocks; 560 final long lengthOfCompleteBlk = locatedBlocks.getFileLength(); 561 final boolean readOffsetWithinCompleteBlk = offset < lengthOfCompleteBlk; 562 final boolean readLengthPastCompleteBlk = offset + length > lengthOfCompleteBlk; 563 564 if (readOffsetWithinCompleteBlk) { 565 //get the blocks of finalized (completed) block range 566 blocks = getFinalizedBlockRange(offset, 567 Math.min(length, lengthOfCompleteBlk - offset)); 568 } else { 569 blocks = new ArrayList<>(1); 570 } 571 572 // get the blocks from incomplete block range 573 if (readLengthPastCompleteBlk) { 574 blocks.add(locatedBlocks.getLastLocatedBlock()); 575 } 576 577 return blocks; 578 } 579 } 580 581 /** 582 * Get blocks in the specified range. 583 * Includes only the complete blocks. 584 * Fetch them from the namenode if not cached. 585 */ 586 private List<LocatedBlock> getFinalizedBlockRange( 587 long offset, long length) throws IOException { 588 synchronized(infoLock) { 589 assert (locatedBlocks != null) : "locatedBlocks is null"; 590 List<LocatedBlock> blockRange = new ArrayList<>(); 591 // search cached blocks first 592 long remaining = length; 593 long curOff = offset; 594 while(remaining > 0) { 595 LocatedBlock blk = fetchBlockAt(curOff, remaining, true); 596 assert curOff >= blk.getStartOffset() : "Block not found"; 597 blockRange.add(blk); 598 long bytesRead = blk.getStartOffset() + blk.getBlockSize() - curOff; 599 remaining -= bytesRead; 600 curOff += bytesRead; 601 } 602 return blockRange; 603 } 604 } 605 606 /** 607 * Open a DataInputStream to a DataNode so that it can be read from. 608 * We get block ID and the IDs of the destinations at startup, from the namenode. 609 */ 610 private synchronized DatanodeInfo blockSeekTo(long target) throws IOException { 611 if (target >= getFileLength()) { 612 throw new IOException("Attempted to read past end of file"); 613 } 614 615 // Will be getting a new BlockReader. 616 closeCurrentBlockReaders(); 617 618 // 619 // Connect to best DataNode for desired Block, with potential offset 620 // 621 DatanodeInfo chosenNode; 622 int refetchToken = 1; // only need to get a new access token once 623 int refetchEncryptionKey = 1; // only need to get a new encryption key once 624 625 boolean connectFailedOnce = false; 626 627 while (true) { 628 // 629 // Compute desired block 630 // 631 LocatedBlock targetBlock = getBlockAt(target); 632 633 // update current position 634 this.pos = target; 635 this.blockEnd = targetBlock.getStartOffset() + 636 targetBlock.getBlockSize() - 1; 637 this.currentLocatedBlock = targetBlock; 638 639 long offsetIntoBlock = target - targetBlock.getStartOffset(); 640 641 DNAddrPair retval = chooseDataNode(targetBlock, null); 642 chosenNode = retval.info; 643 InetSocketAddress targetAddr = retval.addr; 644 StorageType storageType = retval.storageType; 645 646 try { 647 blockReader = getBlockReader(targetBlock, offsetIntoBlock, 648 targetBlock.getBlockSize() - offsetIntoBlock, targetAddr, 649 storageType, chosenNode); 650 if(connectFailedOnce) { 651 DFSClient.LOG.info("Successfully connected to " + targetAddr + 652 " for " + targetBlock.getBlock()); 653 } 654 return chosenNode; 655 } catch (IOException ex) { 656 if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) { 657 DFSClient.LOG.info("Will fetch a new encryption key and retry, " 658 + "encryption key was invalid when connecting to " + targetAddr 659 + " : " + ex); 660 // The encryption key used is invalid. 661 refetchEncryptionKey--; 662 dfsClient.clearDataEncryptionKey(); 663 } else if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) { 664 refetchToken--; 665 fetchBlockAt(target); 666 } else { 667 connectFailedOnce = true; 668 DFSClient.LOG.warn("Failed to connect to " + targetAddr + " for block" 669 + ", add to deadNodes and continue. " + ex, ex); 670 // Put chosen node into dead list, continue 671 addToDeadNodes(chosenNode); 672 } 673 } 674 } 675 } 676 677 protected BlockReader getBlockReader(LocatedBlock targetBlock, 678 long offsetInBlock, long length, InetSocketAddress targetAddr, 679 StorageType storageType, DatanodeInfo datanode) throws IOException { 680 ExtendedBlock blk = targetBlock.getBlock(); 681 Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken(); 682 CachingStrategy curCachingStrategy; 683 boolean shortCircuitForbidden; 684 synchronized (infoLock) { 685 curCachingStrategy = cachingStrategy; 686 shortCircuitForbidden = shortCircuitForbidden(); 687 } 688 return new BlockReaderFactory(dfsClient.getConf()). 689 setInetSocketAddress(targetAddr). 690 setRemotePeerFactory(dfsClient). 691 setDatanodeInfo(datanode). 692 setStorageType(storageType). 693 setFileName(src). 694 setBlock(blk). 695 setBlockToken(accessToken). 696 setStartOffset(offsetInBlock). 697 setVerifyChecksum(verifyChecksum). 698 setClientName(dfsClient.clientName). 699 setLength(length). 700 setCachingStrategy(curCachingStrategy). 701 setAllowShortCircuitLocalReads(!shortCircuitForbidden). 702 setClientCacheContext(dfsClient.getClientContext()). 703 setUserGroupInformation(dfsClient.ugi). 704 setConfiguration(dfsClient.getConfiguration()). 705 setTracer(dfsClient.getTracer()). 706 build(); 707 } 708 709 /** 710 * Close it down! 711 */ 712 @Override 713 public synchronized void close() throws IOException { 714 if (!closed.compareAndSet(false, true)) { 715 DFSClient.LOG.debug("DFSInputStream has been closed already"); 716 return; 717 } 718 dfsClient.checkOpen(); 719 720 if ((extendedReadBuffers != null) && (!extendedReadBuffers.isEmpty())) { 721 final StringBuilder builder = new StringBuilder(); 722 extendedReadBuffers.visitAll(new IdentityHashStore.Visitor<ByteBuffer, Object>() { 723 private String prefix = ""; 724 @Override 725 public void accept(ByteBuffer k, Object v) { 726 builder.append(prefix).append(k); 727 prefix = ", "; 728 } 729 }); 730 DFSClient.LOG.warn("closing file " + src + ", but there are still " + 731 "unreleased ByteBuffers allocated by read(). " + 732 "Please release " + builder.toString() + "."); 733 } 734 closeCurrentBlockReaders(); 735 super.close(); 736 } 737 738 @Override 739 public synchronized int read() throws IOException { 740 if (oneByteBuf == null) { 741 oneByteBuf = new byte[1]; 742 } 743 int ret = read( oneByteBuf, 0, 1 ); 744 return ( ret <= 0 ) ? -1 : (oneByteBuf[0] & 0xff); 745 } 746 747 /** 748 * Wraps different possible read implementations so that readBuffer can be 749 * strategy-agnostic. 750 */ 751 interface ReaderStrategy { 752 int doRead(BlockReader blockReader, int off, int len) 753 throws IOException; 754 755 /** 756 * Copy data from the src ByteBuffer into the read buffer. 757 * @param src The src buffer where the data is copied from 758 * @param offset Useful only when the ReadStrategy is based on a byte array. 759 * Indicate the offset of the byte array for copy. 760 * @param length Useful only when the ReadStrategy is based on a byte array. 761 * Indicate the length of the data to copy. 762 */ 763 int copyFrom(ByteBuffer src, int offset, int length); 764 } 765 766 protected void updateReadStatistics(ReadStatistics readStatistics, 767 int nRead, BlockReader blockReader) { 768 if (nRead <= 0) return; 769 synchronized(infoLock) { 770 if (blockReader.isShortCircuit()) { 771 readStatistics.addShortCircuitBytes(nRead); 772 } else if (blockReader.isLocal()) { 773 readStatistics.addLocalBytes(nRead); 774 } else { 775 readStatistics.addRemoteBytes(nRead); 776 } 777 } 778 } 779 780 /** 781 * Used to read bytes into a byte[] 782 */ 783 private class ByteArrayStrategy implements ReaderStrategy { 784 final byte[] buf; 785 786 public ByteArrayStrategy(byte[] buf) { 787 this.buf = buf; 788 } 789 790 @Override 791 public int doRead(BlockReader blockReader, int off, int len) 792 throws IOException { 793 int nRead = blockReader.read(buf, off, len); 794 updateReadStatistics(readStatistics, nRead, blockReader); 795 return nRead; 796 } 797 798 @Override 799 public int copyFrom(ByteBuffer src, int offset, int length) { 800 ByteBuffer writeSlice = src.duplicate(); 801 writeSlice.get(buf, offset, length); 802 return length; 803 } 804 } 805 806 /** 807 * Used to read bytes into a user-supplied ByteBuffer 808 */ 809 protected class ByteBufferStrategy implements ReaderStrategy { 810 final ByteBuffer buf; 811 ByteBufferStrategy(ByteBuffer buf) { 812 this.buf = buf; 813 } 814 815 @Override 816 public int doRead(BlockReader blockReader, int off, int len) 817 throws IOException { 818 int oldpos = buf.position(); 819 int oldlimit = buf.limit(); 820 boolean success = false; 821 try { 822 int ret = blockReader.read(buf); 823 success = true; 824 updateReadStatistics(readStatistics, ret, blockReader); 825 if (ret == 0) { 826 DFSClient.LOG.warn("zero"); 827 } 828 return ret; 829 } finally { 830 if (!success) { 831 // Reset to original state so that retries work correctly. 832 buf.position(oldpos); 833 buf.limit(oldlimit); 834 } 835 } 836 } 837 838 @Override 839 public int copyFrom(ByteBuffer src, int offset, int length) { 840 ByteBuffer writeSlice = src.duplicate(); 841 int remaining = Math.min(buf.remaining(), writeSlice.remaining()); 842 writeSlice.limit(writeSlice.position() + remaining); 843 buf.put(writeSlice); 844 return remaining; 845 } 846 } 847 848 /* This is a used by regular read() and handles ChecksumExceptions. 849 * name readBuffer() is chosen to imply similarity to readBuffer() in 850 * ChecksumFileSystem 851 */ 852 private synchronized int readBuffer(ReaderStrategy reader, int off, int len, 853 Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) 854 throws IOException { 855 IOException ioe; 856 857 /* we retry current node only once. So this is set to true only here. 858 * Intention is to handle one common case of an error that is not a 859 * failure on datanode or client : when DataNode closes the connection 860 * since client is idle. If there are other cases of "non-errors" then 861 * then a datanode might be retried by setting this to true again. 862 */ 863 boolean retryCurrentNode = true; 864 865 while (true) { 866 // retry as many times as seekToNewSource allows. 867 try { 868 return reader.doRead(blockReader, off, len); 869 } catch ( ChecksumException ce ) { 870 DFSClient.LOG.warn("Found Checksum error for " 871 + getCurrentBlock() + " from " + currentNode 872 + " at " + ce.getPos()); 873 ioe = ce; 874 retryCurrentNode = false; 875 // we want to remember which block replicas we have tried 876 addIntoCorruptedBlockMap(getCurrentBlock(), currentNode, 877 corruptedBlockMap); 878 } catch ( IOException e ) { 879 if (!retryCurrentNode) { 880 DFSClient.LOG.warn("Exception while reading from " 881 + getCurrentBlock() + " of " + src + " from " 882 + currentNode, e); 883 } 884 ioe = e; 885 } 886 boolean sourceFound; 887 if (retryCurrentNode) { 888 /* possibly retry the same node so that transient errors don't 889 * result in application level failures (e.g. Datanode could have 890 * closed the connection because the client is idle for too long). 891 */ 892 sourceFound = seekToBlockSource(pos); 893 } else { 894 addToDeadNodes(currentNode); 895 sourceFound = seekToNewSource(pos); 896 } 897 if (!sourceFound) { 898 throw ioe; 899 } 900 retryCurrentNode = false; 901 } 902 } 903 904 protected synchronized int readWithStrategy(ReaderStrategy strategy, int off, int len) throws IOException { 905 dfsClient.checkOpen(); 906 if (closed.get()) { 907 throw new IOException("Stream closed"); 908 } 909 Map<ExtendedBlock,Set<DatanodeInfo>> corruptedBlockMap = new HashMap<>(); 910 failures = 0; 911 if (pos < getFileLength()) { 912 int retries = 2; 913 while (retries > 0) { 914 try { 915 // currentNode can be left as null if previous read had a checksum 916 // error on the same block. See HDFS-3067 917 if (pos > blockEnd || currentNode == null) { 918 currentNode = blockSeekTo(pos); 919 } 920 int realLen = (int) Math.min(len, (blockEnd - pos + 1L)); 921 synchronized(infoLock) { 922 if (locatedBlocks.isLastBlockComplete()) { 923 realLen = (int) Math.min(realLen, 924 locatedBlocks.getFileLength() - pos); 925 } 926 } 927 int result = readBuffer(strategy, off, realLen, corruptedBlockMap); 928 929 if (result >= 0) { 930 pos += result; 931 } else { 932 // got a EOS from reader though we expect more data on it. 933 throw new IOException("Unexpected EOS from the reader"); 934 } 935 if (dfsClient.stats != null) { 936 dfsClient.stats.incrementBytesRead(result); 937 } 938 return result; 939 } catch (ChecksumException ce) { 940 throw ce; 941 } catch (IOException e) { 942 if (retries == 1) { 943 DFSClient.LOG.warn("DFS Read", e); 944 } 945 blockEnd = -1; 946 if (currentNode != null) { addToDeadNodes(currentNode); } 947 if (--retries == 0) { 948 throw e; 949 } 950 } finally { 951 // Check if need to report block replicas corruption either read 952 // was successful or ChecksumException occured. 953 reportCheckSumFailure(corruptedBlockMap, 954 currentLocatedBlock.getLocations().length); 955 } 956 } 957 } 958 return -1; 959 } 960 961 /** 962 * Read the entire buffer. 963 */ 964 @Override 965 public synchronized int read(@Nonnull final byte buf[], int off, int len) 966 throws IOException { 967 validatePositionedReadArgs(pos, buf, off, len); 968 if (len == 0) { 969 return 0; 970 } 971 ReaderStrategy byteArrayReader = new ByteArrayStrategy(buf); 972 try (TraceScope ignored = 973 dfsClient.newPathTraceScope("DFSInputStream#byteArrayRead", src)) { 974 return readWithStrategy(byteArrayReader, off, len); 975 } 976 } 977 978 @Override 979 public synchronized int read(final ByteBuffer buf) throws IOException { 980 ReaderStrategy byteBufferReader = new ByteBufferStrategy(buf); 981 try (TraceScope ignored = 982 dfsClient.newPathTraceScope("DFSInputStream#byteBufferRead", src)){ 983 return readWithStrategy(byteBufferReader, 0, buf.remaining()); 984 } 985 } 986 987 988 /** 989 * Add corrupted block replica into map. 990 */ 991 protected void addIntoCorruptedBlockMap(ExtendedBlock blk, DatanodeInfo node, 992 Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) { 993 Set<DatanodeInfo> dnSet; 994 if((corruptedBlockMap.containsKey(blk))) { 995 dnSet = corruptedBlockMap.get(blk); 996 }else { 997 dnSet = new HashSet<>(); 998 } 999 if (!dnSet.contains(node)) { 1000 dnSet.add(node); 1001 corruptedBlockMap.put(blk, dnSet); 1002 } 1003 } 1004 1005 private DNAddrPair chooseDataNode(LocatedBlock block, 1006 Collection<DatanodeInfo> ignoredNodes) throws IOException { 1007 while (true) { 1008 DNAddrPair result = getBestNodeDNAddrPair(block, ignoredNodes); 1009 if (result != null) { 1010 return result; 1011 } else { 1012 String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(), 1013 deadNodes, ignoredNodes); 1014 String blockInfo = block.getBlock() + " file=" + src; 1015 if (failures >= dfsClient.getConf().getMaxBlockAcquireFailures()) { 1016 String description = "Could not obtain block: " + blockInfo; 1017 DFSClient.LOG.warn(description + errMsg 1018 + ". Throwing a BlockMissingException"); 1019 throw new BlockMissingException(src, description, 1020 block.getStartOffset()); 1021 } 1022 1023 DatanodeInfo[] nodes = block.getLocations(); 1024 if (nodes == null || nodes.length == 0) { 1025 DFSClient.LOG.info("No node available for " + blockInfo); 1026 } 1027 DFSClient.LOG.info("Could not obtain " + block.getBlock() 1028 + " from any node: " + errMsg 1029 + ". Will get new block locations from namenode and retry..."); 1030 try { 1031 // Introducing a random factor to the wait time before another retry. 1032 // The wait time is dependent on # of failures and a random factor. 1033 // At the first time of getting a BlockMissingException, the wait time 1034 // is a random number between 0..3000 ms. If the first retry 1035 // still fails, we will wait 3000 ms grace period before the 2nd retry. 1036 // Also at the second retry, the waiting window is expanded to 6000 ms 1037 // alleviating the request rate from the server. Similarly the 3rd retry 1038 // will wait 6000ms grace period before retry and the waiting window is 1039 // expanded to 9000ms. 1040 final int timeWindow = dfsClient.getConf().getTimeWindow(); 1041 double waitTime = timeWindow * failures + // grace period for the last round of attempt 1042 // expanding time window for each failure 1043 timeWindow * (failures + 1) * 1044 ThreadLocalRandom.current().nextDouble(); 1045 DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + " IOException, will wait for " + waitTime + " msec."); 1046 Thread.sleep((long)waitTime); 1047 } catch (InterruptedException ignored) { 1048 } 1049 deadNodes.clear(); //2nd option is to remove only nodes[blockId] 1050 openInfo(true); 1051 block = refreshLocatedBlock(block); 1052 failures++; 1053 } 1054 } 1055 } 1056 1057 /** 1058 * Get the best node from which to stream the data. 1059 * @param block LocatedBlock, containing nodes in priority order. 1060 * @param ignoredNodes Do not choose nodes in this array (may be null) 1061 * @return The DNAddrPair of the best node. Null if no node can be chosen. 1062 */ 1063 protected DNAddrPair getBestNodeDNAddrPair(LocatedBlock block, 1064 Collection<DatanodeInfo> ignoredNodes) { 1065 DatanodeInfo[] nodes = block.getLocations(); 1066 StorageType[] storageTypes = block.getStorageTypes(); 1067 DatanodeInfo chosenNode = null; 1068 StorageType storageType = null; 1069 if (nodes != null) { 1070 for (int i = 0; i < nodes.length; i++) { 1071 if (!deadNodes.containsKey(nodes[i]) 1072 && (ignoredNodes == null || !ignoredNodes.contains(nodes[i]))) { 1073 chosenNode = nodes[i]; 1074 // Storage types are ordered to correspond with nodes, so use the same 1075 // index to get storage type. 1076 if (storageTypes != null && i < storageTypes.length) { 1077 storageType = storageTypes[i]; 1078 } 1079 break; 1080 } 1081 } 1082 } 1083 if (chosenNode == null) { 1084 DFSClient.LOG.warn("No live nodes contain block " + block.getBlock() + 1085 " after checking nodes = " + Arrays.toString(nodes) + 1086 ", ignoredNodes = " + ignoredNodes); 1087 return null; 1088 } 1089 final String dnAddr = 1090 chosenNode.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname()); 1091 DFSClient.LOG.debug("Connecting to datanode {}", dnAddr); 1092 InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr); 1093 return new DNAddrPair(chosenNode, targetAddr, storageType); 1094 } 1095 1096 private static String getBestNodeDNAddrPairErrorString( 1097 DatanodeInfo nodes[], AbstractMap<DatanodeInfo, 1098 DatanodeInfo> deadNodes, Collection<DatanodeInfo> ignoredNodes) { 1099 StringBuilder errMsgr = new StringBuilder( 1100 " No live nodes contain current block "); 1101 errMsgr.append("Block locations:"); 1102 for (DatanodeInfo datanode : nodes) { 1103 errMsgr.append(" "); 1104 errMsgr.append(datanode.toString()); 1105 } 1106 errMsgr.append(" Dead nodes: "); 1107 for (DatanodeInfo datanode : deadNodes.keySet()) { 1108 errMsgr.append(" "); 1109 errMsgr.append(datanode.toString()); 1110 } 1111 if (ignoredNodes != null) { 1112 errMsgr.append(" Ignored nodes: "); 1113 for (DatanodeInfo datanode : ignoredNodes) { 1114 errMsgr.append(" "); 1115 errMsgr.append(datanode.toString()); 1116 } 1117 } 1118 return errMsgr.toString(); 1119 } 1120 1121 protected void fetchBlockByteRange(LocatedBlock block, long start, long end, 1122 byte[] buf, int offset, 1123 Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) 1124 throws IOException { 1125 block = refreshLocatedBlock(block); 1126 while (true) { 1127 DNAddrPair addressPair = chooseDataNode(block, null); 1128 try { 1129 actualGetFromOneDataNode(addressPair, block, start, end, 1130 buf, offset, corruptedBlockMap); 1131 return; 1132 } catch (IOException e) { 1133 // Ignore. Already processed inside the function. 1134 // Loop through to try the next node. 1135 } 1136 } 1137 } 1138 1139 private Callable<ByteBuffer> getFromOneDataNode(final DNAddrPair datanode, 1140 final LocatedBlock block, final long start, final long end, 1141 final ByteBuffer bb, 1142 final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap, 1143 final int hedgedReadId) { 1144 final SpanId parentSpanId = Tracer.getCurrentSpanId(); 1145 return new Callable<ByteBuffer>() { 1146 @Override 1147 public ByteBuffer call() throws Exception { 1148 byte[] buf = bb.array(); 1149 int offset = bb.position(); 1150 try (TraceScope ignored = dfsClient.getTracer(). 1151 newScope("hedgedRead" + hedgedReadId, parentSpanId)) { 1152 actualGetFromOneDataNode(datanode, block, start, end, buf, 1153 offset, corruptedBlockMap); 1154 return bb; 1155 } 1156 } 1157 }; 1158 } 1159 1160 /** 1161 * Used when reading contiguous blocks 1162 */ 1163 private void actualGetFromOneDataNode(final DNAddrPair datanode, 1164 LocatedBlock block, final long start, final long end, byte[] buf, 1165 int offset, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) 1166 throws IOException { 1167 final int length = (int) (end - start + 1); 1168 actualGetFromOneDataNode(datanode, block, start, end, buf, 1169 new int[]{offset}, new int[]{length}, corruptedBlockMap); 1170 } 1171 1172 /** 1173 * Read data from one DataNode. 1174 * @param datanode the datanode from which to read data 1175 * @param block the located block containing the requested data 1176 * @param startInBlk the startInBlk offset of the block 1177 * @param endInBlk the endInBlk offset of the block 1178 * @param buf the given byte array into which the data is read 1179 * @param offsets the data may be read into multiple segments of the buf 1180 * (when reading a striped block). this array indicates the 1181 * offset of each buf segment. 1182 * @param lengths the length of each buf segment 1183 * @param corruptedBlockMap map recording list of datanodes with corrupted 1184 * block replica 1185 */ 1186 void actualGetFromOneDataNode(final DNAddrPair datanode, 1187 LocatedBlock block, final long startInBlk, final long endInBlk, 1188 byte[] buf, int[] offsets, int[] lengths, 1189 Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) 1190 throws IOException { 1191 DFSClientFaultInjector.get().startFetchFromDatanode(); 1192 int refetchToken = 1; // only need to get a new access token once 1193 int refetchEncryptionKey = 1; // only need to get a new encryption key once 1194 final int len = (int) (endInBlk - startInBlk + 1); 1195 checkReadPortions(offsets, lengths, len); 1196 1197 while (true) { 1198 // cached block locations may have been updated by chooseDataNode() 1199 // or fetchBlockAt(). Always get the latest list of locations at the 1200 // start of the loop. 1201 block = refreshLocatedBlock(block); 1202 BlockReader reader = null; 1203 try { 1204 DFSClientFaultInjector.get().fetchFromDatanodeException(); 1205 reader = getBlockReader(block, startInBlk, len, datanode.addr, 1206 datanode.storageType, datanode.info); 1207 for (int i = 0; i < offsets.length; i++) { 1208 int nread = reader.readAll(buf, offsets[i], lengths[i]); 1209 updateReadStatistics(readStatistics, nread, reader); 1210 if (nread != lengths[i]) { 1211 throw new IOException("truncated return from reader.read(): " + 1212 "excpected " + lengths[i] + ", got " + nread); 1213 } 1214 } 1215 DFSClientFaultInjector.get().readFromDatanodeDelay(); 1216 return; 1217 } catch (ChecksumException e) { 1218 String msg = "fetchBlockByteRange(). Got a checksum exception for " 1219 + src + " at " + block.getBlock() + ":" + e.getPos() + " from " 1220 + datanode.info; 1221 DFSClient.LOG.warn(msg); 1222 // we want to remember what we have tried 1223 addIntoCorruptedBlockMap(block.getBlock(), datanode.info, 1224 corruptedBlockMap); 1225 addToDeadNodes(datanode.info); 1226 throw new IOException(msg); 1227 } catch (IOException e) { 1228 if (e instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) { 1229 DFSClient.LOG.info("Will fetch a new encryption key and retry, " 1230 + "encryption key was invalid when connecting to " + datanode.addr 1231 + " : " + e); 1232 // The encryption key used is invalid. 1233 refetchEncryptionKey--; 1234 dfsClient.clearDataEncryptionKey(); 1235 } else if (refetchToken > 0 && tokenRefetchNeeded(e, datanode.addr)) { 1236 refetchToken--; 1237 try { 1238 fetchBlockAt(block.getStartOffset()); 1239 } catch (IOException fbae) { 1240 // ignore IOE, since we can retry it later in a loop 1241 } 1242 } else { 1243 String msg = "Failed to connect to " + datanode.addr + " for file " 1244 + src + " for block " + block.getBlock() + ":" + e; 1245 DFSClient.LOG.warn("Connection failure: " + msg, e); 1246 addToDeadNodes(datanode.info); 1247 throw new IOException(msg); 1248 } 1249 } finally { 1250 if (reader != null) { 1251 reader.close(); 1252 } 1253 } 1254 } 1255 } 1256 1257 /** 1258 * Refresh cached block locations. 1259 * @param block The currently cached block locations 1260 * @return Refreshed block locations 1261 * @throws IOException 1262 */ 1263 protected LocatedBlock refreshLocatedBlock(LocatedBlock block) 1264 throws IOException { 1265 return getBlockAt(block.getStartOffset()); 1266 } 1267 1268 /** 1269 * This method verifies that the read portions are valid and do not overlap 1270 * with each other. 1271 */ 1272 private void checkReadPortions(int[] offsets, int[] lengths, int totalLen) { 1273 Preconditions.checkArgument(offsets.length == lengths.length && offsets.length > 0); 1274 int sum = 0; 1275 for (int i = 0; i < lengths.length; i++) { 1276 if (i > 0) { 1277 int gap = offsets[i] - offsets[i - 1]; 1278 // make sure read portions do not overlap with each other 1279 Preconditions.checkArgument(gap >= lengths[i - 1]); 1280 } 1281 sum += lengths[i]; 1282 } 1283 Preconditions.checkArgument(sum == totalLen); 1284 } 1285 1286 /** 1287 * Like {@link #fetchBlockByteRange}except we start up a second, parallel, 1288 * 'hedged' read if the first read is taking longer than configured amount of 1289 * time. We then wait on which ever read returns first. 1290 */ 1291 private void hedgedFetchBlockByteRange(LocatedBlock block, long start, 1292 long end, byte[] buf, int offset, 1293 Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) 1294 throws IOException { 1295 final DfsClientConf conf = dfsClient.getConf(); 1296 ArrayList<Future<ByteBuffer>> futures = new ArrayList<>(); 1297 CompletionService<ByteBuffer> hedgedService = 1298 new ExecutorCompletionService<>(dfsClient.getHedgedReadsThreadPool()); 1299 ArrayList<DatanodeInfo> ignored = new ArrayList<>(); 1300 ByteBuffer bb; 1301 int len = (int) (end - start + 1); 1302 int hedgedReadId = 0; 1303 block = refreshLocatedBlock(block); 1304 while (true) { 1305 // see HDFS-6591, this metric is used to verify/catch unnecessary loops 1306 hedgedReadOpsLoopNumForTesting++; 1307 DNAddrPair chosenNode = null; 1308 // there is no request already executing. 1309 if (futures.isEmpty()) { 1310 // chooseDataNode is a commitment. If no node, we go to 1311 // the NN to reget block locations. Only go here on first read. 1312 chosenNode = chooseDataNode(block, ignored); 1313 bb = ByteBuffer.allocate(len); 1314 Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode( 1315 chosenNode, block, start, end, bb, 1316 corruptedBlockMap, hedgedReadId++); 1317 Future<ByteBuffer> firstRequest = hedgedService 1318 .submit(getFromDataNodeCallable); 1319 futures.add(firstRequest); 1320 try { 1321 Future<ByteBuffer> future = hedgedService.poll( 1322 conf.getHedgedReadThresholdMillis(), TimeUnit.MILLISECONDS); 1323 if (future != null) { 1324 ByteBuffer result = future.get(); 1325 System.arraycopy(result.array(), result.position(), buf, offset, 1326 len); 1327 return; 1328 } 1329 DFSClient.LOG.debug("Waited {}ms to read from {}; spawning hedged " 1330 + "read", conf.getHedgedReadThresholdMillis(), chosenNode.info); 1331 // Ignore this node on next go around. 1332 ignored.add(chosenNode.info); 1333 dfsClient.getHedgedReadMetrics().incHedgedReadOps(); 1334 // continue; no need to refresh block locations 1335 } catch (InterruptedException | ExecutionException e) { 1336 // Ignore 1337 } 1338 } else { 1339 // We are starting up a 'hedged' read. We have a read already 1340 // ongoing. Call getBestNodeDNAddrPair instead of chooseDataNode. 1341 // If no nodes to do hedged reads against, pass. 1342 try { 1343 chosenNode = getBestNodeDNAddrPair(block, ignored); 1344 if (chosenNode == null) { 1345 chosenNode = chooseDataNode(block, ignored); 1346 } 1347 bb = ByteBuffer.allocate(len); 1348 Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode( 1349 chosenNode, block, start, end, bb, 1350 corruptedBlockMap, hedgedReadId++); 1351 Future<ByteBuffer> oneMoreRequest = hedgedService 1352 .submit(getFromDataNodeCallable); 1353 futures.add(oneMoreRequest); 1354 } catch (IOException ioe) { 1355 DFSClient.LOG.debug("Failed getting node for hedged read: {}", 1356 ioe.getMessage()); 1357 } 1358 // if not succeeded. Submit callables for each datanode in a loop, wait 1359 // for a fixed interval and get the result from the fastest one. 1360 try { 1361 ByteBuffer result = getFirstToComplete(hedgedService, futures); 1362 // cancel the rest. 1363 cancelAll(futures); 1364 dfsClient.getHedgedReadMetrics().incHedgedReadWins(); 1365 System.arraycopy(result.array(), result.position(), buf, offset, 1366 len); 1367 return; 1368 } catch (InterruptedException ie) { 1369 // Ignore and retry 1370 } 1371 // We got here if exception. Ignore this node on next go around IFF 1372 // we found a chosenNode to hedge read against. 1373 if (chosenNode != null && chosenNode.info != null) { 1374 ignored.add(chosenNode.info); 1375 } 1376 } 1377 } 1378 } 1379 1380 @VisibleForTesting 1381 public long getHedgedReadOpsLoopNumForTesting() { 1382 return hedgedReadOpsLoopNumForTesting; 1383 } 1384 1385 private ByteBuffer getFirstToComplete( 1386 CompletionService<ByteBuffer> hedgedService, 1387 ArrayList<Future<ByteBuffer>> futures) throws InterruptedException { 1388 if (futures.isEmpty()) { 1389 throw new InterruptedException("let's retry"); 1390 } 1391 Future<ByteBuffer> future = null; 1392 try { 1393 future = hedgedService.take(); 1394 ByteBuffer bb = future.get(); 1395 futures.remove(future); 1396 return bb; 1397 } catch (ExecutionException | CancellationException e) { 1398 // already logged in the Callable 1399 futures.remove(future); 1400 } 1401 1402 throw new InterruptedException("let's retry"); 1403 } 1404 1405 private void cancelAll(List<Future<ByteBuffer>> futures) { 1406 for (Future<ByteBuffer> future : futures) { 1407 // Unfortunately, hdfs reads do not take kindly to interruption. 1408 // Threads return a variety of interrupted-type exceptions but 1409 // also complaints about invalid pbs -- likely because read 1410 // is interrupted before gets whole pb. Also verbose WARN 1411 // logging. So, for now, do not interrupt running read. 1412 future.cancel(false); 1413 } 1414 } 1415 1416 /** 1417 * Should the block access token be refetched on an exception 1418 * 1419 * @param ex Exception received 1420 * @param targetAddr Target datanode address from where exception was received 1421 * @return true if block access token has expired or invalid and it should be 1422 * refetched 1423 */ 1424 protected static boolean tokenRefetchNeeded(IOException ex, 1425 InetSocketAddress targetAddr) { 1426 /* 1427 * Get a new access token and retry. Retry is needed in 2 cases. 1) 1428 * When both NN and DN re-started while DFSClient holding a cached 1429 * access token. 2) In the case that NN fails to update its 1430 * access key at pre-set interval (by a wide margin) and 1431 * subsequently restarts. In this case, DN re-registers itself with 1432 * NN and receives a new access key, but DN will delete the old 1433 * access key from its memory since it's considered expired based on 1434 * the estimated expiration date. 1435 */ 1436 if (ex instanceof InvalidBlockTokenException || 1437 ex instanceof InvalidToken) { 1438 DFSClient.LOG.debug( 1439 "Access token was invalid when connecting to {}: {}", 1440 targetAddr, ex); 1441 return true; 1442 } 1443 return false; 1444 } 1445 1446 /** 1447 * Read bytes starting from the specified position. 1448 * 1449 * @param position start read from this position 1450 * @param buffer read buffer 1451 * @param offset offset into buffer 1452 * @param length number of bytes to read 1453 * 1454 * @return actual number of bytes read 1455 */ 1456 @Override 1457 public int read(long position, byte[] buffer, int offset, int length) 1458 throws IOException { 1459 validatePositionedReadArgs(position, buffer, offset, length); 1460 if (length == 0) { 1461 return 0; 1462 } 1463 try (TraceScope ignored = dfsClient. 1464 newPathTraceScope("DFSInputStream#byteArrayPread", src)) { 1465 return pread(position, buffer, offset, length); 1466 } 1467 } 1468 1469 private int pread(long position, byte[] buffer, int offset, int length) 1470 throws IOException { 1471 // sanity checks 1472 dfsClient.checkOpen(); 1473 if (closed.get()) { 1474 throw new IOException("Stream closed"); 1475 } 1476 failures = 0; 1477 long filelen = getFileLength(); 1478 if ((position < 0) || (position >= filelen)) { 1479 return -1; 1480 } 1481 int realLen = length; 1482 if ((position + length) > filelen) { 1483 realLen = (int)(filelen - position); 1484 } 1485 1486 // determine the block and byte range within the block 1487 // corresponding to position and realLen 1488 List<LocatedBlock> blockRange = getBlockRange(position, realLen); 1489 int remaining = realLen; 1490 Map<ExtendedBlock,Set<DatanodeInfo>> corruptedBlockMap = new HashMap<>(); 1491 for (LocatedBlock blk : blockRange) { 1492 long targetStart = position - blk.getStartOffset(); 1493 long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart); 1494 try { 1495 if (dfsClient.isHedgedReadsEnabled()) { 1496 hedgedFetchBlockByteRange(blk, targetStart, 1497 targetStart + bytesToRead - 1, buffer, offset, corruptedBlockMap); 1498 } else { 1499 fetchBlockByteRange(blk, targetStart, targetStart + bytesToRead - 1, 1500 buffer, offset, corruptedBlockMap); 1501 } 1502 } finally { 1503 // Check and report if any block replicas are corrupted. 1504 // BlockMissingException may be caught if all block replicas are 1505 // corrupted. 1506 reportCheckSumFailure(corruptedBlockMap, blk.getLocations().length); 1507 } 1508 1509 remaining -= bytesToRead; 1510 position += bytesToRead; 1511 offset += bytesToRead; 1512 } 1513 assert remaining == 0 : "Wrong number of bytes read."; 1514 if (dfsClient.stats != null) { 1515 dfsClient.stats.incrementBytesRead(realLen); 1516 } 1517 return realLen; 1518 } 1519 1520 /** 1521 * DFSInputStream reports checksum failure. 1522 * Case I : client has tried multiple data nodes and at least one of the 1523 * attempts has succeeded. We report the other failures as corrupted block to 1524 * namenode. 1525 * Case II: client has tried out all data nodes, but all failed. We 1526 * only report if the total number of replica is 1. We do not 1527 * report otherwise since this maybe due to the client is a handicapped client 1528 * (who can not read). 1529 * @param corruptedBlockMap map of corrupted blocks 1530 * @param dataNodeCount number of data nodes who contains the block replicas 1531 */ 1532 protected void reportCheckSumFailure( 1533 Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap, 1534 int dataNodeCount) { 1535 if (corruptedBlockMap.isEmpty()) { 1536 return; 1537 } 1538 Iterator<Entry<ExtendedBlock, Set<DatanodeInfo>>> it = corruptedBlockMap 1539 .entrySet().iterator(); 1540 Entry<ExtendedBlock, Set<DatanodeInfo>> entry = it.next(); 1541 ExtendedBlock blk = entry.getKey(); 1542 Set<DatanodeInfo> dnSet = entry.getValue(); 1543 if (((dnSet.size() < dataNodeCount) && (dnSet.size() > 0)) 1544 || ((dataNodeCount == 1) && (dnSet.size() == dataNodeCount))) { 1545 DatanodeInfo[] locs = new DatanodeInfo[dnSet.size()]; 1546 int i = 0; 1547 for (DatanodeInfo dn:dnSet) { 1548 locs[i++] = dn; 1549 } 1550 LocatedBlock [] lblocks = { new LocatedBlock(blk, locs) }; 1551 dfsClient.reportChecksumFailure(src, lblocks); 1552 } 1553 corruptedBlockMap.clear(); 1554 } 1555 1556 @Override 1557 public long skip(long n) throws IOException { 1558 if ( n > 0 ) { 1559 long curPos = getPos(); 1560 long fileLen = getFileLength(); 1561 if( n+curPos > fileLen ) { 1562 n = fileLen - curPos; 1563 } 1564 seek(curPos+n); 1565 return n; 1566 } 1567 return n < 0 ? -1 : 0; 1568 } 1569 1570 /** 1571 * Seek to a new arbitrary location 1572 */ 1573 @Override 1574 public synchronized void seek(long targetPos) throws IOException { 1575 if (targetPos > getFileLength()) { 1576 throw new EOFException("Cannot seek after EOF"); 1577 } 1578 if (targetPos < 0) { 1579 throw new EOFException("Cannot seek to negative offset"); 1580 } 1581 if (closed.get()) { 1582 throw new IOException("Stream is closed!"); 1583 } 1584 boolean done = false; 1585 if (pos <= targetPos && targetPos <= blockEnd) { 1586 // 1587 // If this seek is to a positive position in the current 1588 // block, and this piece of data might already be lying in 1589 // the TCP buffer, then just eat up the intervening data. 1590 // 1591 int diff = (int)(targetPos - pos); 1592 if (diff <= blockReader.available()) { 1593 try { 1594 pos += blockReader.skip(diff); 1595 if (pos == targetPos) { 1596 done = true; 1597 } else { 1598 // The range was already checked. If the block reader returns 1599 // something unexpected instead of throwing an exception, it is 1600 // most likely a bug. 1601 String errMsg = "BlockReader failed to seek to " + 1602 targetPos + ". Instead, it seeked to " + pos + "."; 1603 DFSClient.LOG.warn(errMsg); 1604 throw new IOException(errMsg); 1605 } 1606 } catch (IOException e) {//make following read to retry 1607 DFSClient.LOG.debug("Exception while seek to {} from {} of {} from " 1608 + "{}", targetPos, getCurrentBlock(), src, currentNode, e); 1609 } 1610 } 1611 } 1612 if (!done) { 1613 pos = targetPos; 1614 blockEnd = -1; 1615 } 1616 } 1617 1618 /** 1619 * Same as {@link #seekToNewSource(long)} except that it does not exclude 1620 * the current datanode and might connect to the same node. 1621 */ 1622 private boolean seekToBlockSource(long targetPos) 1623 throws IOException { 1624 currentNode = blockSeekTo(targetPos); 1625 return true; 1626 } 1627 1628 /** 1629 * Seek to given position on a node other than the current node. If 1630 * a node other than the current node is found, then returns true. 1631 * If another node could not be found, then returns false. 1632 */ 1633 @Override 1634 public synchronized boolean seekToNewSource(long targetPos) throws IOException { 1635 if (currentNode == null) { 1636 return seekToBlockSource(targetPos); 1637 } 1638 boolean markedDead = deadNodes.containsKey(currentNode); 1639 addToDeadNodes(currentNode); 1640 DatanodeInfo oldNode = currentNode; 1641 DatanodeInfo newNode = blockSeekTo(targetPos); 1642 if (!markedDead) { 1643 /* remove it from deadNodes. blockSeekTo could have cleared 1644 * deadNodes and added currentNode again. Thats ok. */ 1645 deadNodes.remove(oldNode); 1646 } 1647 if (!oldNode.getDatanodeUuid().equals(newNode.getDatanodeUuid())) { 1648 currentNode = newNode; 1649 return true; 1650 } else { 1651 return false; 1652 } 1653 } 1654 1655 /** 1656 */ 1657 @Override 1658 public synchronized long getPos() { 1659 return pos; 1660 } 1661 1662 /** Return the size of the remaining available bytes 1663 * if the size is less than or equal to {@link Integer#MAX_VALUE}, 1664 * otherwise, return {@link Integer#MAX_VALUE}. 1665 */ 1666 @Override 1667 public synchronized int available() throws IOException { 1668 if (closed.get()) { 1669 throw new IOException("Stream closed"); 1670 } 1671 1672 final long remaining = getFileLength() - pos; 1673 return remaining <= Integer.MAX_VALUE? (int)remaining: Integer.MAX_VALUE; 1674 } 1675 1676 /** 1677 * We definitely don't support marks 1678 */ 1679 @Override 1680 public boolean markSupported() { 1681 return false; 1682 } 1683 @Override 1684 public void mark(int readLimit) { 1685 } 1686 @Override 1687 public void reset() throws IOException { 1688 throw new IOException("Mark/reset not supported"); 1689 } 1690 1691 /** Utility class to encapsulate data node info and its address. */ 1692 static final class DNAddrPair { 1693 final DatanodeInfo info; 1694 final InetSocketAddress addr; 1695 final StorageType storageType; 1696 1697 DNAddrPair(DatanodeInfo info, InetSocketAddress addr, 1698 StorageType storageType) { 1699 this.info = info; 1700 this.addr = addr; 1701 this.storageType = storageType; 1702 } 1703 } 1704 1705 /** 1706 * Get statistics about the reads which this DFSInputStream has done. 1707 */ 1708 public ReadStatistics getReadStatistics() { 1709 synchronized(infoLock) { 1710 return new ReadStatistics(readStatistics); 1711 } 1712 } 1713 1714 /** 1715 * Clear statistics about the reads which this DFSInputStream has done. 1716 */ 1717 public void clearReadStatistics() { 1718 synchronized(infoLock) { 1719 readStatistics.clear(); 1720 } 1721 } 1722 1723 public FileEncryptionInfo getFileEncryptionInfo() { 1724 synchronized(infoLock) { 1725 return fileEncryptionInfo; 1726 } 1727 } 1728 1729 protected void closeCurrentBlockReaders() { 1730 if (blockReader == null) return; 1731 // Close the current block reader so that the new caching settings can 1732 // take effect immediately. 1733 try { 1734 blockReader.close(); 1735 } catch (IOException e) { 1736 DFSClient.LOG.error("error closing blockReader", e); 1737 } 1738 blockReader = null; 1739 blockEnd = -1; 1740 } 1741 1742 @Override 1743 public synchronized void setReadahead(Long readahead) 1744 throws IOException { 1745 synchronized (infoLock) { 1746 this.cachingStrategy = 1747 new CachingStrategy.Builder(this.cachingStrategy).setReadahead(readahead).build(); 1748 } 1749 closeCurrentBlockReaders(); 1750 } 1751 1752 @Override 1753 public synchronized void setDropBehind(Boolean dropBehind) 1754 throws IOException { 1755 synchronized (infoLock) { 1756 this.cachingStrategy = 1757 new CachingStrategy.Builder(this.cachingStrategy).setDropBehind(dropBehind).build(); 1758 } 1759 closeCurrentBlockReaders(); 1760 } 1761 1762 /** 1763 * The immutable empty buffer we return when we reach EOF when doing a 1764 * zero-copy read. 1765 */ 1766 private static final ByteBuffer EMPTY_BUFFER = 1767 ByteBuffer.allocateDirect(0).asReadOnlyBuffer(); 1768 1769 @Override 1770 public synchronized ByteBuffer read(ByteBufferPool bufferPool, 1771 int maxLength, EnumSet<ReadOption> opts) 1772 throws IOException, UnsupportedOperationException { 1773 if (maxLength == 0) { 1774 return EMPTY_BUFFER; 1775 } else if (maxLength < 0) { 1776 throw new IllegalArgumentException("can't read a negative " + 1777 "number of bytes."); 1778 } 1779 if ((blockReader == null) || (blockEnd == -1)) { 1780 if (pos >= getFileLength()) { 1781 return null; 1782 } 1783 /* 1784 * If we don't have a blockReader, or the one we have has no more bytes 1785 * left to read, we call seekToBlockSource to get a new blockReader and 1786 * recalculate blockEnd. Note that we assume we're not at EOF here 1787 * (we check this above). 1788 */ 1789 if ((!seekToBlockSource(pos)) || (blockReader == null)) { 1790 throw new IOException("failed to allocate new BlockReader " + 1791 "at position " + pos); 1792 } 1793 } 1794 ByteBuffer buffer = null; 1795 if (dfsClient.getConf().getShortCircuitConf().isShortCircuitMmapEnabled()) { 1796 buffer = tryReadZeroCopy(maxLength, opts); 1797 } 1798 if (buffer != null) { 1799 return buffer; 1800 } 1801 buffer = ByteBufferUtil.fallbackRead(this, bufferPool, maxLength); 1802 if (buffer != null) { 1803 getExtendedReadBuffers().put(buffer, bufferPool); 1804 } 1805 return buffer; 1806 } 1807 1808 private synchronized ByteBuffer tryReadZeroCopy(int maxLength, 1809 EnumSet<ReadOption> opts) throws IOException { 1810 // Copy 'pos' and 'blockEnd' to local variables to make it easier for the 1811 // JVM to optimize this function. 1812 final long curPos = pos; 1813 final long curEnd = blockEnd; 1814 final long blockStartInFile = currentLocatedBlock.getStartOffset(); 1815 final long blockPos = curPos - blockStartInFile; 1816 1817 // Shorten this read if the end of the block is nearby. 1818 long length63; 1819 if ((curPos + maxLength) <= (curEnd + 1)) { 1820 length63 = maxLength; 1821 } else { 1822 length63 = 1 + curEnd - curPos; 1823 if (length63 <= 0) { 1824 DFSClient.LOG.debug("Unable to perform a zero-copy read from offset {}" 1825 + " of {}; {} bytes left in block. blockPos={}; curPos={};" 1826 + "curEnd={}", 1827 curPos, src, length63, blockPos, curPos, curEnd); 1828 return null; 1829 } 1830 DFSClient.LOG.debug("Reducing read length from {} to {} to avoid going " 1831 + "more than one byte past the end of the block. blockPos={}; " 1832 +" curPos={}; curEnd={}", 1833 maxLength, length63, blockPos, curPos, curEnd); 1834 } 1835 // Make sure that don't go beyond 31-bit offsets in the MappedByteBuffer. 1836 int length; 1837 if (blockPos + length63 <= Integer.MAX_VALUE) { 1838 length = (int)length63; 1839 } else { 1840 long length31 = Integer.MAX_VALUE - blockPos; 1841 if (length31 <= 0) { 1842 // Java ByteBuffers can't be longer than 2 GB, because they use 1843 // 4-byte signed integers to represent capacity, etc. 1844 // So we can't mmap the parts of the block higher than the 2 GB offset. 1845 // FIXME: we could work around this with multiple memory maps. 1846 // See HDFS-5101. 1847 DFSClient.LOG.debug("Unable to perform a zero-copy read from offset {} " 1848 + " of {}; 31-bit MappedByteBuffer limit exceeded. blockPos={}, " 1849 + "curEnd={}", curPos, src, blockPos, curEnd); 1850 return null; 1851 } 1852 length = (int)length31; 1853 DFSClient.LOG.debug("Reducing read length from {} to {} to avoid 31-bit " 1854 + "limit. blockPos={}; curPos={}; curEnd={}", 1855 maxLength, length, blockPos, curPos, curEnd); 1856 } 1857 final ClientMmap clientMmap = blockReader.getClientMmap(opts); 1858 if (clientMmap == null) { 1859 DFSClient.LOG.debug("unable to perform a zero-copy read from offset {} of" 1860 + " {}; BlockReader#getClientMmap returned null.", curPos, src); 1861 return null; 1862 } 1863 boolean success = false; 1864 ByteBuffer buffer; 1865 try { 1866 seek(curPos + length); 1867 buffer = clientMmap.getMappedByteBuffer().asReadOnlyBuffer(); 1868 buffer.position((int)blockPos); 1869 buffer.limit((int)(blockPos + length)); 1870 getExtendedReadBuffers().put(buffer, clientMmap); 1871 synchronized (infoLock) { 1872 readStatistics.addZeroCopyBytes(length); 1873 } 1874 DFSClient.LOG.debug("readZeroCopy read {} bytes from offset {} via the " 1875 + "zero-copy read path. blockEnd = {}", length, curPos, blockEnd); 1876 success = true; 1877 } finally { 1878 if (!success) { 1879 IOUtils.closeQuietly(clientMmap); 1880 } 1881 } 1882 return buffer; 1883 } 1884 1885 @Override 1886 public synchronized void releaseBuffer(ByteBuffer buffer) { 1887 if (buffer == EMPTY_BUFFER) return; 1888 Object val = getExtendedReadBuffers().remove(buffer); 1889 if (val == null) { 1890 throw new IllegalArgumentException("tried to release a buffer " + 1891 "that was not created by this stream, " + buffer); 1892 } 1893 if (val instanceof ClientMmap) { 1894 IOUtils.closeQuietly((ClientMmap)val); 1895 } else if (val instanceof ByteBufferPool) { 1896 ((ByteBufferPool)val).putBuffer(buffer); 1897 } 1898 } 1899 1900 @Override 1901 public synchronized void unbuffer() { 1902 closeCurrentBlockReaders(); 1903 } 1904}