001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hdfs;
019
020import java.io.EOFException;
021import java.io.IOException;
022import java.net.InetSocketAddress;
023import java.nio.ByteBuffer;
024import java.util.AbstractMap;
025import java.util.ArrayList;
026import java.util.Arrays;
027import java.util.Collection;
028import java.util.EnumSet;
029import java.util.HashMap;
030import java.util.HashSet;
031import java.util.Iterator;
032import java.util.LinkedList;
033import java.util.List;
034import java.util.Map;
035import java.util.Map.Entry;
036import java.util.Set;
037import java.util.concurrent.Callable;
038import java.util.concurrent.CancellationException;
039import java.util.concurrent.CompletionService;
040import java.util.concurrent.ConcurrentHashMap;
041import java.util.concurrent.ExecutionException;
042import java.util.concurrent.ExecutorCompletionService;
043import java.util.concurrent.Future;
044import java.util.concurrent.ThreadLocalRandom;
045import java.util.concurrent.TimeUnit;
046import java.util.concurrent.atomic.AtomicBoolean;
047
048import com.google.common.base.Preconditions;
049import org.apache.commons.io.IOUtils;
050import org.apache.hadoop.classification.InterfaceAudience;
051import org.apache.hadoop.fs.ByteBufferReadable;
052import org.apache.hadoop.fs.ByteBufferUtil;
053import org.apache.hadoop.fs.CanSetDropBehind;
054import org.apache.hadoop.fs.CanSetReadahead;
055import org.apache.hadoop.fs.CanUnbuffer;
056import org.apache.hadoop.fs.ChecksumException;
057import org.apache.hadoop.fs.FSInputStream;
058import org.apache.hadoop.fs.FileEncryptionInfo;
059import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
060import org.apache.hadoop.fs.ReadOption;
061import org.apache.hadoop.fs.StorageType;
062import org.apache.hadoop.hdfs.client.impl.BlockReaderFactory;
063import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
064import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
065import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
066import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
067import org.apache.hadoop.hdfs.protocol.LocatedBlock;
068import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
069import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
070import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
071import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
072import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
073import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
074import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
075import org.apache.hadoop.io.ByteBufferPool;
076import org.apache.hadoop.ipc.RPC;
077import org.apache.hadoop.ipc.RemoteException;
078import org.apache.hadoop.ipc.RetriableException;
079import org.apache.hadoop.net.NetUtils;
080import org.apache.hadoop.security.token.SecretManager.InvalidToken;
081import org.apache.hadoop.security.token.Token;
082import org.apache.hadoop.util.IdentityHashStore;
083import org.apache.hadoop.util.StopWatch;
084import org.apache.htrace.core.SpanId;
085import org.apache.htrace.core.TraceScope;
086import org.apache.htrace.core.Tracer;
087
088import com.google.common.annotations.VisibleForTesting;
089
090import javax.annotation.Nonnull;
091
092/****************************************************************
093 * DFSInputStream provides bytes from a named file.  It handles
094 * negotiation of the namenode and various datanodes as necessary.
095 ****************************************************************/
096@InterfaceAudience.Private
097public class DFSInputStream extends FSInputStream
098    implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
099    HasEnhancedByteBufferAccess, CanUnbuffer {
100  @VisibleForTesting
101  public static boolean tcpReadsDisabledForTesting = false;
102  private long hedgedReadOpsLoopNumForTesting = 0;
103  protected final DFSClient dfsClient;
104  protected AtomicBoolean closed = new AtomicBoolean(false);
105  protected final String src;
106  protected final boolean verifyChecksum;
107
108  // state by stateful read only:
109  // (protected by lock on this)
110  /////
111  private DatanodeInfo currentNode = null;
112  protected LocatedBlock currentLocatedBlock = null;
113  protected long pos = 0;
114  protected long blockEnd = -1;
115  private BlockReader blockReader = null;
116  ////
117
118  // state shared by stateful and positional read:
119  // (protected by lock on infoLock)
120  ////
121  protected LocatedBlocks locatedBlocks = null;
122  private long lastBlockBeingWrittenLength = 0;
123  private FileEncryptionInfo fileEncryptionInfo = null;
124  protected CachingStrategy cachingStrategy;
125  ////
126
127  protected final ReadStatistics readStatistics = new ReadStatistics();
128  // lock for state shared between read and pread
129  // Note: Never acquire a lock on <this> with this lock held to avoid deadlocks
130  //       (it's OK to acquire this lock when the lock on <this> is held)
131  protected final Object infoLock = new Object();
132
133  /**
134   * Track the ByteBuffers that we have handed out to readers.
135   *
136   * The value type can be either ByteBufferPool or ClientMmap, depending on
137   * whether we this is a memory-mapped buffer or not.
138   */
139  private IdentityHashStore<ByteBuffer, Object> extendedReadBuffers;
140
141  private synchronized IdentityHashStore<ByteBuffer, Object>
142        getExtendedReadBuffers() {
143    if (extendedReadBuffers == null) {
144      extendedReadBuffers = new IdentityHashStore<>(0);
145    }
146    return extendedReadBuffers;
147  }
148
149  public static class ReadStatistics {
150    public ReadStatistics() {
151      clear();
152    }
153
154    public ReadStatistics(ReadStatistics rhs) {
155      this.totalBytesRead = rhs.getTotalBytesRead();
156      this.totalLocalBytesRead = rhs.getTotalLocalBytesRead();
157      this.totalShortCircuitBytesRead = rhs.getTotalShortCircuitBytesRead();
158      this.totalZeroCopyBytesRead = rhs.getTotalZeroCopyBytesRead();
159    }
160
161    /**
162     * @return The total bytes read.  This will always be at least as
163     * high as the other numbers, since it includes all of them.
164     */
165    public long getTotalBytesRead() {
166      return totalBytesRead;
167    }
168
169    /**
170     * @return The total local bytes read.  This will always be at least
171     * as high as totalShortCircuitBytesRead, since all short-circuit
172     * reads are also local.
173     */
174    public long getTotalLocalBytesRead() {
175      return totalLocalBytesRead;
176    }
177
178    /**
179     * @return The total short-circuit local bytes read.
180     */
181    public long getTotalShortCircuitBytesRead() {
182      return totalShortCircuitBytesRead;
183    }
184
185    /**
186     * @return The total number of zero-copy bytes read.
187     */
188    public long getTotalZeroCopyBytesRead() {
189      return totalZeroCopyBytesRead;
190    }
191
192    /**
193     * @return The total number of bytes read which were not local.
194     */
195    public long getRemoteBytesRead() {
196      return totalBytesRead - totalLocalBytesRead;
197    }
198
199    void addRemoteBytes(long amt) {
200      this.totalBytesRead += amt;
201    }
202
203    void addLocalBytes(long amt) {
204      this.totalBytesRead += amt;
205      this.totalLocalBytesRead += amt;
206    }
207
208    void addShortCircuitBytes(long amt) {
209      this.totalBytesRead += amt;
210      this.totalLocalBytesRead += amt;
211      this.totalShortCircuitBytesRead += amt;
212    }
213
214    void addZeroCopyBytes(long amt) {
215      this.totalBytesRead += amt;
216      this.totalLocalBytesRead += amt;
217      this.totalShortCircuitBytesRead += amt;
218      this.totalZeroCopyBytesRead += amt;
219    }
220
221    void clear() {
222      this.totalBytesRead = 0;
223      this.totalLocalBytesRead = 0;
224      this.totalShortCircuitBytesRead = 0;
225      this.totalZeroCopyBytesRead = 0;
226    }
227
228    private long totalBytesRead;
229
230    private long totalLocalBytesRead;
231
232    private long totalShortCircuitBytesRead;
233
234    private long totalZeroCopyBytesRead;
235  }
236
237  /**
238   * This variable tracks the number of failures since the start of the
239   * most recent user-facing operation. That is to say, it should be reset
240   * whenever the user makes a call on this stream, and if at any point
241   * during the retry logic, the failure count exceeds a threshold,
242   * the errors will be thrown back to the operation.
243   *
244   * Specifically this counts the number of times the client has gone
245   * back to the namenode to get a new list of block locations, and is
246   * capped at maxBlockAcquireFailures
247   */
248  protected int failures = 0;
249
250  /* XXX Use of CocurrentHashMap is temp fix. Need to fix
251   * parallel accesses to DFSInputStream (through ptreads) properly */
252  private final ConcurrentHashMap<DatanodeInfo, DatanodeInfo> deadNodes =
253             new ConcurrentHashMap<>();
254
255  private byte[] oneByteBuf; // used for 'int read()'
256
257  void addToDeadNodes(DatanodeInfo dnInfo) {
258    deadNodes.put(dnInfo, dnInfo);
259  }
260
261  DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum,
262      LocatedBlocks locatedBlocks) throws IOException {
263    this.dfsClient = dfsClient;
264    this.verifyChecksum = verifyChecksum;
265    this.src = src;
266    synchronized (infoLock) {
267      this.cachingStrategy = dfsClient.getDefaultReadCachingStrategy();
268    }
269    this.locatedBlocks = locatedBlocks;
270    openInfo(false);
271  }
272
273  /**
274   * Grab the open-file info from namenode
275   * @param refreshLocatedBlocks whether to re-fetch locatedblocks
276   */
277  void openInfo(boolean refreshLocatedBlocks) throws IOException {
278    final DfsClientConf conf = dfsClient.getConf();
279    synchronized(infoLock) {
280      lastBlockBeingWrittenLength =
281          fetchLocatedBlocksAndGetLastBlockLength(refreshLocatedBlocks);
282      int retriesForLastBlockLength = conf.getRetryTimesForGetLastBlockLength();
283      while (retriesForLastBlockLength > 0) {
284        // Getting last block length as -1 is a special case. When cluster
285        // restarts, DNs may not report immediately. At this time partial block
286        // locations will not be available with NN for getting the length. Lets
287        // retry for 3 times to get the length.
288        if (lastBlockBeingWrittenLength == -1) {
289          DFSClient.LOG.warn("Last block locations not available. "
290              + "Datanodes might not have reported blocks completely."
291              + " Will retry for " + retriesForLastBlockLength + " times");
292          waitFor(conf.getRetryIntervalForGetLastBlockLength());
293          lastBlockBeingWrittenLength =
294              fetchLocatedBlocksAndGetLastBlockLength(true);
295        } else {
296          break;
297        }
298        retriesForLastBlockLength--;
299      }
300      if (retriesForLastBlockLength == 0) {
301        throw new IOException("Could not obtain the last block locations.");
302      }
303    }
304  }
305
306  private void waitFor(int waitTime) throws IOException {
307    try {
308      Thread.sleep(waitTime);
309    } catch (InterruptedException e) {
310      throw new IOException(
311          "Interrupted while getting the last block length.");
312    }
313  }
314
315  private long fetchLocatedBlocksAndGetLastBlockLength(boolean refresh)
316      throws IOException {
317    LocatedBlocks newInfo = locatedBlocks;
318    if (locatedBlocks == null || refresh) {
319      newInfo = dfsClient.getLocatedBlocks(src, 0);
320    }
321    DFSClient.LOG.debug("newInfo = {}", newInfo);
322    if (newInfo == null) {
323      throw new IOException("Cannot open filename " + src);
324    }
325
326    if (locatedBlocks != null) {
327      Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator();
328      Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator();
329      while (oldIter.hasNext() && newIter.hasNext()) {
330        if (! oldIter.next().getBlock().equals(newIter.next().getBlock())) {
331          throw new IOException("Blocklist for " + src + " has changed!");
332        }
333      }
334    }
335    locatedBlocks = newInfo;
336    long lastBlockBeingWrittenLength = 0;
337    if (!locatedBlocks.isLastBlockComplete()) {
338      final LocatedBlock last = locatedBlocks.getLastLocatedBlock();
339      if (last != null) {
340        if (last.getLocations().length == 0) {
341          if (last.getBlockSize() == 0) {
342            // if the length is zero, then no data has been written to
343            // datanode. So no need to wait for the locations.
344            return 0;
345          }
346          return -1;
347        }
348        final long len = readBlockLength(last);
349        last.getBlock().setNumBytes(len);
350        lastBlockBeingWrittenLength = len;
351      }
352    }
353
354    fileEncryptionInfo = locatedBlocks.getFileEncryptionInfo();
355
356    return lastBlockBeingWrittenLength;
357  }
358
359  /** Read the block length from one of the datanodes. */
360  private long readBlockLength(LocatedBlock locatedblock) throws IOException {
361    assert locatedblock != null : "LocatedBlock cannot be null";
362    int replicaNotFoundCount = locatedblock.getLocations().length;
363
364    final DfsClientConf conf = dfsClient.getConf();
365    final int timeout = conf.getSocketTimeout();
366    LinkedList<DatanodeInfo> nodeList = new LinkedList<DatanodeInfo>(
367        Arrays.asList(locatedblock.getLocations()));
368    LinkedList<DatanodeInfo> retryList = new LinkedList<DatanodeInfo>();
369    boolean isRetry = false;
370    StopWatch sw = new StopWatch();
371    while (nodeList.size() > 0) {
372      DatanodeInfo datanode = nodeList.pop();
373      ClientDatanodeProtocol cdp = null;
374      try {
375        cdp = DFSUtilClient.createClientDatanodeProtocolProxy(datanode,
376            dfsClient.getConfiguration(), timeout,
377            conf.isConnectToDnViaHostname(), locatedblock);
378
379        final long n = cdp.getReplicaVisibleLength(locatedblock.getBlock());
380
381        if (n >= 0) {
382          return n;
383        }
384      } catch (IOException ioe) {
385        if (ioe instanceof RemoteException) {
386          if (((RemoteException) ioe).unwrapRemoteException() instanceof
387              ReplicaNotFoundException) {
388            // replica is not on the DN. We will treat it as 0 length
389            // if no one actually has a replica.
390            replicaNotFoundCount--;
391          } else if (((RemoteException) ioe).unwrapRemoteException() instanceof
392              RetriableException) {
393            // add to the list to be retried if necessary.
394            retryList.add(datanode);
395          }
396        }
397        DFSClient.LOG.debug("Failed to getReplicaVisibleLength from datanode {}"
398              + " for block {}", datanode, locatedblock.getBlock(), ioe);
399      } finally {
400        if (cdp != null) {
401          RPC.stopProxy(cdp);
402        }
403      }
404
405      // Ran out of nodes, but there are retriable nodes.
406      if (nodeList.size() == 0 && retryList.size() > 0) {
407        nodeList.addAll(retryList);
408        retryList.clear();
409        isRetry = true;
410      }
411
412      if (isRetry) {
413        // start the stop watch if not already running.
414        if (!sw.isRunning()) {
415          sw.start();
416        }
417        try {
418          Thread.sleep(500); // delay between retries.
419        } catch (InterruptedException e) {
420          throw new IOException("Interrupted while getting the length.");
421        }
422      }
423
424      // see if we ran out of retry time
425      if (sw.isRunning() && sw.now(TimeUnit.MILLISECONDS) > timeout) {
426        break;
427      }
428    }
429
430    // Namenode told us about these locations, but none know about the replica
431    // means that we hit the race between pipeline creation start and end.
432    // we require all 3 because some other exception could have happened
433    // on a DN that has it.  we want to report that error
434    if (replicaNotFoundCount == 0) {
435      return 0;
436    }
437
438    throw new IOException("Cannot obtain block length for " + locatedblock);
439  }
440
441  public long getFileLength() {
442    synchronized(infoLock) {
443      return locatedBlocks == null? 0:
444          locatedBlocks.getFileLength() + lastBlockBeingWrittenLength;
445    }
446  }
447
448  // Short circuit local reads are forbidden for files that are
449  // under construction.  See HDFS-2757.
450  boolean shortCircuitForbidden() {
451    synchronized(infoLock) {
452      return locatedBlocks.isUnderConstruction();
453    }
454  }
455
456  /**
457   * Returns the datanode from which the stream is currently reading.
458   */
459  public synchronized DatanodeInfo getCurrentDatanode() {
460    return currentNode;
461  }
462
463  /**
464   * Returns the block containing the target position.
465   */
466  synchronized public ExtendedBlock getCurrentBlock() {
467    if (currentLocatedBlock == null){
468      return null;
469    }
470    return currentLocatedBlock.getBlock();
471  }
472
473  /**
474   * Return collection of blocks that has already been located.
475   */
476  public List<LocatedBlock> getAllBlocks() throws IOException {
477    return getBlockRange(0, getFileLength());
478  }
479
480  /**
481   * Get block at the specified position.
482   * Fetch it from the namenode if not cached.
483   *
484   * @param offset block corresponding to this offset in file is returned
485   * @return located block
486   * @throws IOException
487   */
488  protected LocatedBlock getBlockAt(long offset) throws IOException {
489    synchronized(infoLock) {
490      assert (locatedBlocks != null) : "locatedBlocks is null";
491
492      final LocatedBlock blk;
493
494      //check offset
495      if (offset < 0 || offset >= getFileLength()) {
496        throw new IOException("offset < 0 || offset >= getFileLength(), offset="
497            + offset
498            + ", locatedBlocks=" + locatedBlocks);
499      }
500      else if (offset >= locatedBlocks.getFileLength()) {
501        // offset to the portion of the last block,
502        // which is not known to the name-node yet;
503        // getting the last block
504        blk = locatedBlocks.getLastLocatedBlock();
505      }
506      else {
507        // search cached blocks first
508        blk = fetchBlockAt(offset, 0, true);
509      }
510      return blk;
511    }
512  }
513
514  /** Fetch a block from namenode and cache it */
515  protected LocatedBlock fetchBlockAt(long offset) throws IOException {
516    return fetchBlockAt(offset, 0, false); // don't use cache
517  }
518
519  /** Fetch a block from namenode and cache it */
520  private LocatedBlock fetchBlockAt(long offset, long length, boolean useCache)
521      throws IOException {
522    synchronized(infoLock) {
523      int targetBlockIdx = locatedBlocks.findBlock(offset);
524      if (targetBlockIdx < 0) { // block is not cached
525        targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
526        useCache = false;
527      }
528      if (!useCache) { // fetch blocks
529        final LocatedBlocks newBlocks = (length == 0)
530            ? dfsClient.getLocatedBlocks(src, offset)
531            : dfsClient.getLocatedBlocks(src, offset, length);
532        if (newBlocks == null || newBlocks.locatedBlockCount() == 0) {
533          throw new EOFException("Could not find target position " + offset);
534        }
535        locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
536      }
537      return locatedBlocks.get(targetBlockIdx);
538    }
539  }
540
541  /**
542   * Get blocks in the specified range.
543   * Fetch them from the namenode if not cached. This function
544   * will not get a read request beyond the EOF.
545   * @param offset starting offset in file
546   * @param length length of data
547   * @return consequent segment of located blocks
548   * @throws IOException
549   */
550  private List<LocatedBlock> getBlockRange(long offset,
551      long length)  throws IOException {
552    // getFileLength(): returns total file length
553    // locatedBlocks.getFileLength(): returns length of completed blocks
554    if (offset >= getFileLength()) {
555      throw new IOException("Offset: " + offset +
556        " exceeds file length: " + getFileLength());
557    }
558    synchronized(infoLock) {
559      final List<LocatedBlock> blocks;
560      final long lengthOfCompleteBlk = locatedBlocks.getFileLength();
561      final boolean readOffsetWithinCompleteBlk = offset < lengthOfCompleteBlk;
562      final boolean readLengthPastCompleteBlk = offset + length > lengthOfCompleteBlk;
563
564      if (readOffsetWithinCompleteBlk) {
565        //get the blocks of finalized (completed) block range
566        blocks = getFinalizedBlockRange(offset,
567          Math.min(length, lengthOfCompleteBlk - offset));
568      } else {
569        blocks = new ArrayList<>(1);
570      }
571
572      // get the blocks from incomplete block range
573      if (readLengthPastCompleteBlk) {
574        blocks.add(locatedBlocks.getLastLocatedBlock());
575      }
576
577      return blocks;
578    }
579  }
580
581  /**
582   * Get blocks in the specified range.
583   * Includes only the complete blocks.
584   * Fetch them from the namenode if not cached.
585   */
586  private List<LocatedBlock> getFinalizedBlockRange(
587      long offset, long length) throws IOException {
588    synchronized(infoLock) {
589      assert (locatedBlocks != null) : "locatedBlocks is null";
590      List<LocatedBlock> blockRange = new ArrayList<>();
591      // search cached blocks first
592      long remaining = length;
593      long curOff = offset;
594      while(remaining > 0) {
595        LocatedBlock blk = fetchBlockAt(curOff, remaining, true);
596        assert curOff >= blk.getStartOffset() : "Block not found";
597        blockRange.add(blk);
598        long bytesRead = blk.getStartOffset() + blk.getBlockSize() - curOff;
599        remaining -= bytesRead;
600        curOff += bytesRead;
601      }
602      return blockRange;
603    }
604  }
605
606  /**
607   * Open a DataInputStream to a DataNode so that it can be read from.
608   * We get block ID and the IDs of the destinations at startup, from the namenode.
609   */
610  private synchronized DatanodeInfo blockSeekTo(long target) throws IOException {
611    if (target >= getFileLength()) {
612      throw new IOException("Attempted to read past end of file");
613    }
614
615    // Will be getting a new BlockReader.
616    closeCurrentBlockReaders();
617
618    //
619    // Connect to best DataNode for desired Block, with potential offset
620    //
621    DatanodeInfo chosenNode;
622    int refetchToken = 1; // only need to get a new access token once
623    int refetchEncryptionKey = 1; // only need to get a new encryption key once
624
625    boolean connectFailedOnce = false;
626
627    while (true) {
628      //
629      // Compute desired block
630      //
631      LocatedBlock targetBlock = getBlockAt(target);
632
633      // update current position
634      this.pos = target;
635      this.blockEnd = targetBlock.getStartOffset() +
636            targetBlock.getBlockSize() - 1;
637      this.currentLocatedBlock = targetBlock;
638
639      long offsetIntoBlock = target - targetBlock.getStartOffset();
640
641      DNAddrPair retval = chooseDataNode(targetBlock, null);
642      chosenNode = retval.info;
643      InetSocketAddress targetAddr = retval.addr;
644      StorageType storageType = retval.storageType;
645
646      try {
647        blockReader = getBlockReader(targetBlock, offsetIntoBlock,
648            targetBlock.getBlockSize() - offsetIntoBlock, targetAddr,
649            storageType, chosenNode);
650        if(connectFailedOnce) {
651          DFSClient.LOG.info("Successfully connected to " + targetAddr +
652                             " for " + targetBlock.getBlock());
653        }
654        return chosenNode;
655      } catch (IOException ex) {
656        if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
657          DFSClient.LOG.info("Will fetch a new encryption key and retry, "
658              + "encryption key was invalid when connecting to " + targetAddr
659              + " : " + ex);
660          // The encryption key used is invalid.
661          refetchEncryptionKey--;
662          dfsClient.clearDataEncryptionKey();
663        } else if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) {
664          refetchToken--;
665          fetchBlockAt(target);
666        } else {
667          connectFailedOnce = true;
668          DFSClient.LOG.warn("Failed to connect to " + targetAddr + " for block"
669              + ", add to deadNodes and continue. " + ex, ex);
670          // Put chosen node into dead list, continue
671          addToDeadNodes(chosenNode);
672        }
673      }
674    }
675  }
676
677  protected BlockReader getBlockReader(LocatedBlock targetBlock,
678      long offsetInBlock, long length, InetSocketAddress targetAddr,
679      StorageType storageType, DatanodeInfo datanode) throws IOException {
680    ExtendedBlock blk = targetBlock.getBlock();
681    Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
682    CachingStrategy curCachingStrategy;
683    boolean shortCircuitForbidden;
684    synchronized (infoLock) {
685      curCachingStrategy = cachingStrategy;
686      shortCircuitForbidden = shortCircuitForbidden();
687    }
688    return new BlockReaderFactory(dfsClient.getConf()).
689        setInetSocketAddress(targetAddr).
690        setRemotePeerFactory(dfsClient).
691        setDatanodeInfo(datanode).
692        setStorageType(storageType).
693        setFileName(src).
694        setBlock(blk).
695        setBlockToken(accessToken).
696        setStartOffset(offsetInBlock).
697        setVerifyChecksum(verifyChecksum).
698        setClientName(dfsClient.clientName).
699        setLength(length).
700        setCachingStrategy(curCachingStrategy).
701        setAllowShortCircuitLocalReads(!shortCircuitForbidden).
702        setClientCacheContext(dfsClient.getClientContext()).
703        setUserGroupInformation(dfsClient.ugi).
704        setConfiguration(dfsClient.getConfiguration()).
705        setTracer(dfsClient.getTracer()).
706        build();
707  }
708
709  /**
710   * Close it down!
711   */
712  @Override
713  public synchronized void close() throws IOException {
714    if (!closed.compareAndSet(false, true)) {
715      DFSClient.LOG.debug("DFSInputStream has been closed already");
716      return;
717    }
718    dfsClient.checkOpen();
719
720    if ((extendedReadBuffers != null) && (!extendedReadBuffers.isEmpty())) {
721      final StringBuilder builder = new StringBuilder();
722      extendedReadBuffers.visitAll(new IdentityHashStore.Visitor<ByteBuffer, Object>() {
723        private String prefix = "";
724        @Override
725        public void accept(ByteBuffer k, Object v) {
726          builder.append(prefix).append(k);
727          prefix = ", ";
728        }
729      });
730      DFSClient.LOG.warn("closing file " + src + ", but there are still " +
731          "unreleased ByteBuffers allocated by read().  " +
732          "Please release " + builder.toString() + ".");
733    }
734    closeCurrentBlockReaders();
735    super.close();
736  }
737
738  @Override
739  public synchronized int read() throws IOException {
740    if (oneByteBuf == null) {
741      oneByteBuf = new byte[1];
742    }
743    int ret = read( oneByteBuf, 0, 1 );
744    return ( ret <= 0 ) ? -1 : (oneByteBuf[0] & 0xff);
745  }
746
747  /**
748   * Wraps different possible read implementations so that readBuffer can be
749   * strategy-agnostic.
750   */
751  interface ReaderStrategy {
752    int doRead(BlockReader blockReader, int off, int len)
753        throws IOException;
754
755    /**
756     * Copy data from the src ByteBuffer into the read buffer.
757     * @param src The src buffer where the data is copied from
758     * @param offset Useful only when the ReadStrategy is based on a byte array.
759     *               Indicate the offset of the byte array for copy.
760     * @param length Useful only when the ReadStrategy is based on a byte array.
761     *               Indicate the length of the data to copy.
762     */
763    int copyFrom(ByteBuffer src, int offset, int length);
764  }
765
766  protected void updateReadStatistics(ReadStatistics readStatistics,
767        int nRead, BlockReader blockReader) {
768    if (nRead <= 0) return;
769    synchronized(infoLock) {
770      if (blockReader.isShortCircuit()) {
771        readStatistics.addShortCircuitBytes(nRead);
772      } else if (blockReader.isLocal()) {
773        readStatistics.addLocalBytes(nRead);
774      } else {
775        readStatistics.addRemoteBytes(nRead);
776      }
777    }
778  }
779
780  /**
781   * Used to read bytes into a byte[]
782   */
783  private class ByteArrayStrategy implements ReaderStrategy {
784    final byte[] buf;
785
786    public ByteArrayStrategy(byte[] buf) {
787      this.buf = buf;
788    }
789
790    @Override
791    public int doRead(BlockReader blockReader, int off, int len)
792        throws IOException {
793      int nRead = blockReader.read(buf, off, len);
794      updateReadStatistics(readStatistics, nRead, blockReader);
795      return nRead;
796    }
797
798    @Override
799    public int copyFrom(ByteBuffer src, int offset, int length) {
800      ByteBuffer writeSlice = src.duplicate();
801      writeSlice.get(buf, offset, length);
802      return length;
803    }
804  }
805
806  /**
807   * Used to read bytes into a user-supplied ByteBuffer
808   */
809  protected class ByteBufferStrategy implements ReaderStrategy {
810    final ByteBuffer buf;
811    ByteBufferStrategy(ByteBuffer buf) {
812      this.buf = buf;
813    }
814
815    @Override
816    public int doRead(BlockReader blockReader, int off, int len)
817        throws IOException {
818      int oldpos = buf.position();
819      int oldlimit = buf.limit();
820      boolean success = false;
821      try {
822        int ret = blockReader.read(buf);
823        success = true;
824        updateReadStatistics(readStatistics, ret, blockReader);
825        if (ret == 0) {
826          DFSClient.LOG.warn("zero");
827        }
828        return ret;
829      } finally {
830        if (!success) {
831          // Reset to original state so that retries work correctly.
832          buf.position(oldpos);
833          buf.limit(oldlimit);
834        }
835      }
836    }
837
838    @Override
839    public int copyFrom(ByteBuffer src, int offset, int length) {
840      ByteBuffer writeSlice = src.duplicate();
841      int remaining = Math.min(buf.remaining(), writeSlice.remaining());
842      writeSlice.limit(writeSlice.position() + remaining);
843      buf.put(writeSlice);
844      return remaining;
845    }
846  }
847
848  /* This is a used by regular read() and handles ChecksumExceptions.
849   * name readBuffer() is chosen to imply similarity to readBuffer() in
850   * ChecksumFileSystem
851   */
852  private synchronized int readBuffer(ReaderStrategy reader, int off, int len,
853      Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
854      throws IOException {
855    IOException ioe;
856
857    /* we retry current node only once. So this is set to true only here.
858     * Intention is to handle one common case of an error that is not a
859     * failure on datanode or client : when DataNode closes the connection
860     * since client is idle. If there are other cases of "non-errors" then
861     * then a datanode might be retried by setting this to true again.
862     */
863    boolean retryCurrentNode = true;
864
865    while (true) {
866      // retry as many times as seekToNewSource allows.
867      try {
868        return reader.doRead(blockReader, off, len);
869      } catch ( ChecksumException ce ) {
870        DFSClient.LOG.warn("Found Checksum error for "
871            + getCurrentBlock() + " from " + currentNode
872            + " at " + ce.getPos());
873        ioe = ce;
874        retryCurrentNode = false;
875        // we want to remember which block replicas we have tried
876        addIntoCorruptedBlockMap(getCurrentBlock(), currentNode,
877            corruptedBlockMap);
878      } catch ( IOException e ) {
879        if (!retryCurrentNode) {
880          DFSClient.LOG.warn("Exception while reading from "
881              + getCurrentBlock() + " of " + src + " from "
882              + currentNode, e);
883        }
884        ioe = e;
885      }
886      boolean sourceFound;
887      if (retryCurrentNode) {
888        /* possibly retry the same node so that transient errors don't
889         * result in application level failures (e.g. Datanode could have
890         * closed the connection because the client is idle for too long).
891         */
892        sourceFound = seekToBlockSource(pos);
893      } else {
894        addToDeadNodes(currentNode);
895        sourceFound = seekToNewSource(pos);
896      }
897      if (!sourceFound) {
898        throw ioe;
899      }
900      retryCurrentNode = false;
901    }
902  }
903
904  protected synchronized int readWithStrategy(ReaderStrategy strategy, int off, int len) throws IOException {
905    dfsClient.checkOpen();
906    if (closed.get()) {
907      throw new IOException("Stream closed");
908    }
909    Map<ExtendedBlock,Set<DatanodeInfo>> corruptedBlockMap = new HashMap<>();
910    failures = 0;
911    if (pos < getFileLength()) {
912      int retries = 2;
913      while (retries > 0) {
914        try {
915          // currentNode can be left as null if previous read had a checksum
916          // error on the same block. See HDFS-3067
917          if (pos > blockEnd || currentNode == null) {
918            currentNode = blockSeekTo(pos);
919          }
920          int realLen = (int) Math.min(len, (blockEnd - pos + 1L));
921          synchronized(infoLock) {
922            if (locatedBlocks.isLastBlockComplete()) {
923              realLen = (int) Math.min(realLen,
924                  locatedBlocks.getFileLength() - pos);
925            }
926          }
927          int result = readBuffer(strategy, off, realLen, corruptedBlockMap);
928
929          if (result >= 0) {
930            pos += result;
931          } else {
932            // got a EOS from reader though we expect more data on it.
933            throw new IOException("Unexpected EOS from the reader");
934          }
935          if (dfsClient.stats != null) {
936            dfsClient.stats.incrementBytesRead(result);
937          }
938          return result;
939        } catch (ChecksumException ce) {
940          throw ce;
941        } catch (IOException e) {
942          if (retries == 1) {
943            DFSClient.LOG.warn("DFS Read", e);
944          }
945          blockEnd = -1;
946          if (currentNode != null) { addToDeadNodes(currentNode); }
947          if (--retries == 0) {
948            throw e;
949          }
950        } finally {
951          // Check if need to report block replicas corruption either read
952          // was successful or ChecksumException occured.
953          reportCheckSumFailure(corruptedBlockMap,
954              currentLocatedBlock.getLocations().length);
955        }
956      }
957    }
958    return -1;
959  }
960
961  /**
962   * Read the entire buffer.
963   */
964  @Override
965  public synchronized int read(@Nonnull final byte buf[], int off, int len)
966      throws IOException {
967    validatePositionedReadArgs(pos, buf, off, len);
968    if (len == 0) {
969      return 0;
970    }
971    ReaderStrategy byteArrayReader = new ByteArrayStrategy(buf);
972    try (TraceScope ignored =
973             dfsClient.newPathTraceScope("DFSInputStream#byteArrayRead", src)) {
974      return readWithStrategy(byteArrayReader, off, len);
975    }
976  }
977
978  @Override
979  public synchronized int read(final ByteBuffer buf) throws IOException {
980    ReaderStrategy byteBufferReader = new ByteBufferStrategy(buf);
981    try (TraceScope ignored =
982             dfsClient.newPathTraceScope("DFSInputStream#byteBufferRead", src)){
983      return readWithStrategy(byteBufferReader, 0, buf.remaining());
984    }
985  }
986
987
988  /**
989   * Add corrupted block replica into map.
990   */
991  protected void addIntoCorruptedBlockMap(ExtendedBlock blk, DatanodeInfo node,
992      Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
993    Set<DatanodeInfo> dnSet;
994    if((corruptedBlockMap.containsKey(blk))) {
995      dnSet = corruptedBlockMap.get(blk);
996    }else {
997      dnSet = new HashSet<>();
998    }
999    if (!dnSet.contains(node)) {
1000      dnSet.add(node);
1001      corruptedBlockMap.put(blk, dnSet);
1002    }
1003  }
1004
1005  private DNAddrPair chooseDataNode(LocatedBlock block,
1006      Collection<DatanodeInfo> ignoredNodes) throws IOException {
1007    while (true) {
1008      DNAddrPair result = getBestNodeDNAddrPair(block, ignoredNodes);
1009      if (result != null) {
1010        return result;
1011      } else {
1012        String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(),
1013            deadNodes, ignoredNodes);
1014        String blockInfo = block.getBlock() + " file=" + src;
1015        if (failures >= dfsClient.getConf().getMaxBlockAcquireFailures()) {
1016          String description = "Could not obtain block: " + blockInfo;
1017          DFSClient.LOG.warn(description + errMsg
1018              + ". Throwing a BlockMissingException");
1019          throw new BlockMissingException(src, description,
1020              block.getStartOffset());
1021        }
1022
1023        DatanodeInfo[] nodes = block.getLocations();
1024        if (nodes == null || nodes.length == 0) {
1025          DFSClient.LOG.info("No node available for " + blockInfo);
1026        }
1027        DFSClient.LOG.info("Could not obtain " + block.getBlock()
1028            + " from any node: " + errMsg
1029            + ". Will get new block locations from namenode and retry...");
1030        try {
1031          // Introducing a random factor to the wait time before another retry.
1032          // The wait time is dependent on # of failures and a random factor.
1033          // At the first time of getting a BlockMissingException, the wait time
1034          // is a random number between 0..3000 ms. If the first retry
1035          // still fails, we will wait 3000 ms grace period before the 2nd retry.
1036          // Also at the second retry, the waiting window is expanded to 6000 ms
1037          // alleviating the request rate from the server. Similarly the 3rd retry
1038          // will wait 6000ms grace period before retry and the waiting window is
1039          // expanded to 9000ms.
1040          final int timeWindow = dfsClient.getConf().getTimeWindow();
1041          double waitTime = timeWindow * failures +       // grace period for the last round of attempt
1042              // expanding time window for each failure
1043              timeWindow * (failures + 1) *
1044              ThreadLocalRandom.current().nextDouble();
1045          DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + " IOException, will wait for " + waitTime + " msec.");
1046          Thread.sleep((long)waitTime);
1047        } catch (InterruptedException ignored) {
1048        }
1049        deadNodes.clear(); //2nd option is to remove only nodes[blockId]
1050        openInfo(true);
1051        block = refreshLocatedBlock(block);
1052        failures++;
1053      }
1054    }
1055  }
1056
1057  /**
1058   * Get the best node from which to stream the data.
1059   * @param block LocatedBlock, containing nodes in priority order.
1060   * @param ignoredNodes Do not choose nodes in this array (may be null)
1061   * @return The DNAddrPair of the best node. Null if no node can be chosen.
1062   */
1063  protected DNAddrPair getBestNodeDNAddrPair(LocatedBlock block,
1064      Collection<DatanodeInfo> ignoredNodes) {
1065    DatanodeInfo[] nodes = block.getLocations();
1066    StorageType[] storageTypes = block.getStorageTypes();
1067    DatanodeInfo chosenNode = null;
1068    StorageType storageType = null;
1069    if (nodes != null) {
1070      for (int i = 0; i < nodes.length; i++) {
1071        if (!deadNodes.containsKey(nodes[i])
1072            && (ignoredNodes == null || !ignoredNodes.contains(nodes[i]))) {
1073          chosenNode = nodes[i];
1074          // Storage types are ordered to correspond with nodes, so use the same
1075          // index to get storage type.
1076          if (storageTypes != null && i < storageTypes.length) {
1077            storageType = storageTypes[i];
1078          }
1079          break;
1080        }
1081      }
1082    }
1083    if (chosenNode == null) {
1084      DFSClient.LOG.warn("No live nodes contain block " + block.getBlock() +
1085          " after checking nodes = " + Arrays.toString(nodes) +
1086          ", ignoredNodes = " + ignoredNodes);
1087      return null;
1088    }
1089    final String dnAddr =
1090        chosenNode.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname());
1091    DFSClient.LOG.debug("Connecting to datanode {}", dnAddr);
1092    InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr);
1093    return new DNAddrPair(chosenNode, targetAddr, storageType);
1094  }
1095
1096  private static String getBestNodeDNAddrPairErrorString(
1097      DatanodeInfo nodes[], AbstractMap<DatanodeInfo,
1098      DatanodeInfo> deadNodes, Collection<DatanodeInfo> ignoredNodes) {
1099    StringBuilder errMsgr = new StringBuilder(
1100        " No live nodes contain current block ");
1101    errMsgr.append("Block locations:");
1102    for (DatanodeInfo datanode : nodes) {
1103      errMsgr.append(" ");
1104      errMsgr.append(datanode.toString());
1105    }
1106    errMsgr.append(" Dead nodes: ");
1107    for (DatanodeInfo datanode : deadNodes.keySet()) {
1108      errMsgr.append(" ");
1109      errMsgr.append(datanode.toString());
1110    }
1111    if (ignoredNodes != null) {
1112      errMsgr.append(" Ignored nodes: ");
1113      for (DatanodeInfo datanode : ignoredNodes) {
1114        errMsgr.append(" ");
1115        errMsgr.append(datanode.toString());
1116      }
1117    }
1118    return errMsgr.toString();
1119  }
1120
1121  protected void fetchBlockByteRange(LocatedBlock block, long start, long end,
1122      byte[] buf, int offset,
1123      Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
1124      throws IOException {
1125    block = refreshLocatedBlock(block);
1126    while (true) {
1127      DNAddrPair addressPair = chooseDataNode(block, null);
1128      try {
1129        actualGetFromOneDataNode(addressPair, block, start, end,
1130            buf, offset, corruptedBlockMap);
1131        return;
1132      } catch (IOException e) {
1133        // Ignore. Already processed inside the function.
1134        // Loop through to try the next node.
1135      }
1136    }
1137  }
1138
1139  private Callable<ByteBuffer> getFromOneDataNode(final DNAddrPair datanode,
1140      final LocatedBlock block, final long start, final long end,
1141      final ByteBuffer bb,
1142      final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
1143      final int hedgedReadId) {
1144    final SpanId parentSpanId = Tracer.getCurrentSpanId();
1145    return new Callable<ByteBuffer>() {
1146      @Override
1147      public ByteBuffer call() throws Exception {
1148        byte[] buf = bb.array();
1149        int offset = bb.position();
1150        try (TraceScope ignored = dfsClient.getTracer().
1151            newScope("hedgedRead" + hedgedReadId, parentSpanId)) {
1152          actualGetFromOneDataNode(datanode, block, start, end, buf,
1153              offset, corruptedBlockMap);
1154          return bb;
1155        }
1156      }
1157    };
1158  }
1159
1160  /**
1161   * Used when reading contiguous blocks
1162   */
1163  private void actualGetFromOneDataNode(final DNAddrPair datanode,
1164      LocatedBlock block, final long start, final long end, byte[] buf,
1165      int offset, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
1166      throws IOException {
1167    final int length = (int) (end - start + 1);
1168    actualGetFromOneDataNode(datanode, block, start, end, buf,
1169        new int[]{offset}, new int[]{length}, corruptedBlockMap);
1170  }
1171
1172  /**
1173   * Read data from one DataNode.
1174   * @param datanode the datanode from which to read data
1175   * @param block the located block containing the requested data
1176   * @param startInBlk the startInBlk offset of the block
1177   * @param endInBlk the endInBlk offset of the block
1178   * @param buf the given byte array into which the data is read
1179   * @param offsets the data may be read into multiple segments of the buf
1180   *                (when reading a striped block). this array indicates the
1181   *                offset of each buf segment.
1182   * @param lengths the length of each buf segment
1183   * @param corruptedBlockMap map recording list of datanodes with corrupted
1184   *                          block replica
1185   */
1186  void actualGetFromOneDataNode(final DNAddrPair datanode,
1187      LocatedBlock block, final long startInBlk, final long endInBlk,
1188      byte[] buf, int[] offsets, int[] lengths,
1189      Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
1190      throws IOException {
1191    DFSClientFaultInjector.get().startFetchFromDatanode();
1192    int refetchToken = 1; // only need to get a new access token once
1193    int refetchEncryptionKey = 1; // only need to get a new encryption key once
1194    final int len = (int) (endInBlk - startInBlk + 1);
1195    checkReadPortions(offsets, lengths, len);
1196
1197    while (true) {
1198      // cached block locations may have been updated by chooseDataNode()
1199      // or fetchBlockAt(). Always get the latest list of locations at the
1200      // start of the loop.
1201      block = refreshLocatedBlock(block);
1202      BlockReader reader = null;
1203      try {
1204        DFSClientFaultInjector.get().fetchFromDatanodeException();
1205        reader = getBlockReader(block, startInBlk, len, datanode.addr,
1206            datanode.storageType, datanode.info);
1207        for (int i = 0; i < offsets.length; i++) {
1208          int nread = reader.readAll(buf, offsets[i], lengths[i]);
1209          updateReadStatistics(readStatistics, nread, reader);
1210          if (nread != lengths[i]) {
1211            throw new IOException("truncated return from reader.read(): " +
1212                "excpected " + lengths[i] + ", got " + nread);
1213          }
1214        }
1215        DFSClientFaultInjector.get().readFromDatanodeDelay();
1216        return;
1217      } catch (ChecksumException e) {
1218        String msg = "fetchBlockByteRange(). Got a checksum exception for "
1219            + src + " at " + block.getBlock() + ":" + e.getPos() + " from "
1220            + datanode.info;
1221        DFSClient.LOG.warn(msg);
1222        // we want to remember what we have tried
1223        addIntoCorruptedBlockMap(block.getBlock(), datanode.info,
1224            corruptedBlockMap);
1225        addToDeadNodes(datanode.info);
1226        throw new IOException(msg);
1227      } catch (IOException e) {
1228        if (e instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
1229          DFSClient.LOG.info("Will fetch a new encryption key and retry, " 
1230              + "encryption key was invalid when connecting to " + datanode.addr
1231              + " : " + e);
1232          // The encryption key used is invalid.
1233          refetchEncryptionKey--;
1234          dfsClient.clearDataEncryptionKey();
1235        } else if (refetchToken > 0 && tokenRefetchNeeded(e, datanode.addr)) {
1236          refetchToken--;
1237          try {
1238            fetchBlockAt(block.getStartOffset());
1239          } catch (IOException fbae) {
1240            // ignore IOE, since we can retry it later in a loop
1241          }
1242        } else {
1243          String msg = "Failed to connect to " + datanode.addr + " for file "
1244              + src + " for block " + block.getBlock() + ":" + e;
1245          DFSClient.LOG.warn("Connection failure: " + msg, e);
1246          addToDeadNodes(datanode.info);
1247          throw new IOException(msg);
1248        }
1249      } finally {
1250        if (reader != null) {
1251          reader.close();
1252        }
1253      }
1254    }
1255  }
1256
1257  /**
1258   * Refresh cached block locations.
1259   * @param block The currently cached block locations
1260   * @return Refreshed block locations
1261   * @throws IOException
1262   */
1263  protected LocatedBlock refreshLocatedBlock(LocatedBlock block)
1264      throws IOException {
1265    return getBlockAt(block.getStartOffset());
1266  }
1267
1268  /**
1269   * This method verifies that the read portions are valid and do not overlap
1270   * with each other.
1271   */
1272  private void checkReadPortions(int[] offsets, int[] lengths, int totalLen) {
1273    Preconditions.checkArgument(offsets.length == lengths.length && offsets.length > 0);
1274    int sum = 0;
1275    for (int i = 0; i < lengths.length; i++) {
1276      if (i > 0) {
1277        int gap = offsets[i] - offsets[i - 1];
1278        // make sure read portions do not overlap with each other
1279        Preconditions.checkArgument(gap >= lengths[i - 1]);
1280      }
1281      sum += lengths[i];
1282    }
1283    Preconditions.checkArgument(sum == totalLen);
1284  }
1285
1286  /**
1287   * Like {@link #fetchBlockByteRange}except we start up a second, parallel,
1288   * 'hedged' read if the first read is taking longer than configured amount of
1289   * time. We then wait on which ever read returns first.
1290   */
1291  private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
1292      long end, byte[] buf, int offset,
1293      Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
1294      throws IOException {
1295    final DfsClientConf conf = dfsClient.getConf();
1296    ArrayList<Future<ByteBuffer>> futures = new ArrayList<>();
1297    CompletionService<ByteBuffer> hedgedService =
1298        new ExecutorCompletionService<>(dfsClient.getHedgedReadsThreadPool());
1299    ArrayList<DatanodeInfo> ignored = new ArrayList<>();
1300    ByteBuffer bb;
1301    int len = (int) (end - start + 1);
1302    int hedgedReadId = 0;
1303    block = refreshLocatedBlock(block);
1304    while (true) {
1305      // see HDFS-6591, this metric is used to verify/catch unnecessary loops
1306      hedgedReadOpsLoopNumForTesting++;
1307      DNAddrPair chosenNode = null;
1308      // there is no request already executing.
1309      if (futures.isEmpty()) {
1310        // chooseDataNode is a commitment. If no node, we go to
1311        // the NN to reget block locations. Only go here on first read.
1312        chosenNode = chooseDataNode(block, ignored);
1313        bb = ByteBuffer.allocate(len);
1314        Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
1315            chosenNode, block, start, end, bb,
1316            corruptedBlockMap, hedgedReadId++);
1317        Future<ByteBuffer> firstRequest = hedgedService
1318            .submit(getFromDataNodeCallable);
1319        futures.add(firstRequest);
1320        try {
1321          Future<ByteBuffer> future = hedgedService.poll(
1322              conf.getHedgedReadThresholdMillis(), TimeUnit.MILLISECONDS);
1323          if (future != null) {
1324            ByteBuffer result = future.get();
1325            System.arraycopy(result.array(), result.position(), buf, offset,
1326                len);
1327            return;
1328          }
1329          DFSClient.LOG.debug("Waited {}ms to read from {}; spawning hedged "
1330              + "read", conf.getHedgedReadThresholdMillis(), chosenNode.info);
1331          // Ignore this node on next go around.
1332          ignored.add(chosenNode.info);
1333          dfsClient.getHedgedReadMetrics().incHedgedReadOps();
1334          // continue; no need to refresh block locations
1335        } catch (InterruptedException | ExecutionException e) {
1336          // Ignore
1337        }
1338      } else {
1339        // We are starting up a 'hedged' read. We have a read already
1340        // ongoing. Call getBestNodeDNAddrPair instead of chooseDataNode.
1341        // If no nodes to do hedged reads against, pass.
1342        try {
1343          chosenNode = getBestNodeDNAddrPair(block, ignored);
1344          if (chosenNode == null) {
1345            chosenNode = chooseDataNode(block, ignored);
1346          }
1347          bb = ByteBuffer.allocate(len);
1348          Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
1349              chosenNode, block, start, end, bb,
1350              corruptedBlockMap, hedgedReadId++);
1351          Future<ByteBuffer> oneMoreRequest = hedgedService
1352              .submit(getFromDataNodeCallable);
1353          futures.add(oneMoreRequest);
1354        } catch (IOException ioe) {
1355          DFSClient.LOG.debug("Failed getting node for hedged read: {}",
1356              ioe.getMessage());
1357        }
1358        // if not succeeded. Submit callables for each datanode in a loop, wait
1359        // for a fixed interval and get the result from the fastest one.
1360        try {
1361          ByteBuffer result = getFirstToComplete(hedgedService, futures);
1362          // cancel the rest.
1363          cancelAll(futures);
1364          dfsClient.getHedgedReadMetrics().incHedgedReadWins();
1365          System.arraycopy(result.array(), result.position(), buf, offset,
1366              len);
1367          return;
1368        } catch (InterruptedException ie) {
1369          // Ignore and retry
1370        }
1371        // We got here if exception. Ignore this node on next go around IFF
1372        // we found a chosenNode to hedge read against.
1373        if (chosenNode != null && chosenNode.info != null) {
1374          ignored.add(chosenNode.info);
1375        }
1376      }
1377    }
1378  }
1379
1380  @VisibleForTesting
1381  public long getHedgedReadOpsLoopNumForTesting() {
1382    return hedgedReadOpsLoopNumForTesting;
1383  }
1384
1385  private ByteBuffer getFirstToComplete(
1386      CompletionService<ByteBuffer> hedgedService,
1387      ArrayList<Future<ByteBuffer>> futures) throws InterruptedException {
1388    if (futures.isEmpty()) {
1389      throw new InterruptedException("let's retry");
1390    }
1391    Future<ByteBuffer> future = null;
1392    try {
1393      future = hedgedService.take();
1394      ByteBuffer bb = future.get();
1395      futures.remove(future);
1396      return bb;
1397    } catch (ExecutionException | CancellationException e) {
1398      // already logged in the Callable
1399      futures.remove(future);
1400    }
1401
1402    throw new InterruptedException("let's retry");
1403  }
1404
1405  private void cancelAll(List<Future<ByteBuffer>> futures) {
1406    for (Future<ByteBuffer> future : futures) {
1407      // Unfortunately, hdfs reads do not take kindly to interruption.
1408      // Threads return a variety of interrupted-type exceptions but
1409      // also complaints about invalid pbs -- likely because read
1410      // is interrupted before gets whole pb.  Also verbose WARN
1411      // logging.  So, for now, do not interrupt running read.
1412      future.cancel(false);
1413    }
1414  }
1415
1416  /**
1417   * Should the block access token be refetched on an exception
1418   *
1419   * @param ex Exception received
1420   * @param targetAddr Target datanode address from where exception was received
1421   * @return true if block access token has expired or invalid and it should be
1422   *         refetched
1423   */
1424  protected static boolean tokenRefetchNeeded(IOException ex,
1425      InetSocketAddress targetAddr) {
1426    /*
1427     * Get a new access token and retry. Retry is needed in 2 cases. 1)
1428     * When both NN and DN re-started while DFSClient holding a cached
1429     * access token. 2) In the case that NN fails to update its
1430     * access key at pre-set interval (by a wide margin) and
1431     * subsequently restarts. In this case, DN re-registers itself with
1432     * NN and receives a new access key, but DN will delete the old
1433     * access key from its memory since it's considered expired based on
1434     * the estimated expiration date.
1435     */
1436    if (ex instanceof InvalidBlockTokenException ||
1437        ex instanceof InvalidToken) {
1438      DFSClient.LOG.debug(
1439          "Access token was invalid when connecting to {}: {}",
1440          targetAddr, ex);
1441      return true;
1442    }
1443    return false;
1444  }
1445
1446  /**
1447   * Read bytes starting from the specified position.
1448   *
1449   * @param position start read from this position
1450   * @param buffer read buffer
1451   * @param offset offset into buffer
1452   * @param length number of bytes to read
1453   *
1454   * @return actual number of bytes read
1455   */
1456  @Override
1457  public int read(long position, byte[] buffer, int offset, int length)
1458      throws IOException {
1459    validatePositionedReadArgs(position, buffer, offset, length);
1460    if (length == 0) {
1461      return 0;
1462    }
1463    try (TraceScope ignored = dfsClient.
1464        newPathTraceScope("DFSInputStream#byteArrayPread", src)) {
1465      return pread(position, buffer, offset, length);
1466    }
1467  }
1468
1469  private int pread(long position, byte[] buffer, int offset, int length)
1470      throws IOException {
1471    // sanity checks
1472    dfsClient.checkOpen();
1473    if (closed.get()) {
1474      throw new IOException("Stream closed");
1475    }
1476    failures = 0;
1477    long filelen = getFileLength();
1478    if ((position < 0) || (position >= filelen)) {
1479      return -1;
1480    }
1481    int realLen = length;
1482    if ((position + length) > filelen) {
1483      realLen = (int)(filelen - position);
1484    }
1485
1486    // determine the block and byte range within the block
1487    // corresponding to position and realLen
1488    List<LocatedBlock> blockRange = getBlockRange(position, realLen);
1489    int remaining = realLen;
1490    Map<ExtendedBlock,Set<DatanodeInfo>> corruptedBlockMap = new HashMap<>();
1491    for (LocatedBlock blk : blockRange) {
1492      long targetStart = position - blk.getStartOffset();
1493      long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart);
1494      try {
1495        if (dfsClient.isHedgedReadsEnabled()) {
1496          hedgedFetchBlockByteRange(blk, targetStart,
1497              targetStart + bytesToRead - 1, buffer, offset, corruptedBlockMap);
1498        } else {
1499          fetchBlockByteRange(blk, targetStart, targetStart + bytesToRead - 1,
1500              buffer, offset, corruptedBlockMap);
1501        }
1502      } finally {
1503        // Check and report if any block replicas are corrupted.
1504        // BlockMissingException may be caught if all block replicas are
1505        // corrupted.
1506        reportCheckSumFailure(corruptedBlockMap, blk.getLocations().length);
1507      }
1508
1509      remaining -= bytesToRead;
1510      position += bytesToRead;
1511      offset += bytesToRead;
1512    }
1513    assert remaining == 0 : "Wrong number of bytes read.";
1514    if (dfsClient.stats != null) {
1515      dfsClient.stats.incrementBytesRead(realLen);
1516    }
1517    return realLen;
1518  }
1519
1520  /**
1521   * DFSInputStream reports checksum failure.
1522   * Case I : client has tried multiple data nodes and at least one of the
1523   * attempts has succeeded. We report the other failures as corrupted block to
1524   * namenode.
1525   * Case II: client has tried out all data nodes, but all failed. We
1526   * only report if the total number of replica is 1. We do not
1527   * report otherwise since this maybe due to the client is a handicapped client
1528   * (who can not read).
1529   * @param corruptedBlockMap map of corrupted blocks
1530   * @param dataNodeCount number of data nodes who contains the block replicas
1531   */
1532  protected void reportCheckSumFailure(
1533      Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
1534      int dataNodeCount) {
1535    if (corruptedBlockMap.isEmpty()) {
1536      return;
1537    }
1538    Iterator<Entry<ExtendedBlock, Set<DatanodeInfo>>> it = corruptedBlockMap
1539        .entrySet().iterator();
1540    Entry<ExtendedBlock, Set<DatanodeInfo>> entry = it.next();
1541    ExtendedBlock blk = entry.getKey();
1542    Set<DatanodeInfo> dnSet = entry.getValue();
1543    if (((dnSet.size() < dataNodeCount) && (dnSet.size() > 0))
1544        || ((dataNodeCount == 1) && (dnSet.size() == dataNodeCount))) {
1545      DatanodeInfo[] locs = new DatanodeInfo[dnSet.size()];
1546      int i = 0;
1547      for (DatanodeInfo dn:dnSet) {
1548        locs[i++] = dn;
1549      }
1550      LocatedBlock [] lblocks = { new LocatedBlock(blk, locs) };
1551      dfsClient.reportChecksumFailure(src, lblocks);
1552    }
1553    corruptedBlockMap.clear();
1554  }
1555
1556  @Override
1557  public long skip(long n) throws IOException {
1558    if ( n > 0 ) {
1559      long curPos = getPos();
1560      long fileLen = getFileLength();
1561      if( n+curPos > fileLen ) {
1562        n = fileLen - curPos;
1563      }
1564      seek(curPos+n);
1565      return n;
1566    }
1567    return n < 0 ? -1 : 0;
1568  }
1569
1570  /**
1571   * Seek to a new arbitrary location
1572   */
1573  @Override
1574  public synchronized void seek(long targetPos) throws IOException {
1575    if (targetPos > getFileLength()) {
1576      throw new EOFException("Cannot seek after EOF");
1577    }
1578    if (targetPos < 0) {
1579      throw new EOFException("Cannot seek to negative offset");
1580    }
1581    if (closed.get()) {
1582      throw new IOException("Stream is closed!");
1583    }
1584    boolean done = false;
1585    if (pos <= targetPos && targetPos <= blockEnd) {
1586      //
1587      // If this seek is to a positive position in the current
1588      // block, and this piece of data might already be lying in
1589      // the TCP buffer, then just eat up the intervening data.
1590      //
1591      int diff = (int)(targetPos - pos);
1592      if (diff <= blockReader.available()) {
1593        try {
1594          pos += blockReader.skip(diff);
1595          if (pos == targetPos) {
1596            done = true;
1597          } else {
1598            // The range was already checked. If the block reader returns
1599            // something unexpected instead of throwing an exception, it is
1600            // most likely a bug.
1601            String errMsg = "BlockReader failed to seek to " +
1602                targetPos + ". Instead, it seeked to " + pos + ".";
1603            DFSClient.LOG.warn(errMsg);
1604            throw new IOException(errMsg);
1605          }
1606        } catch (IOException e) {//make following read to retry
1607          DFSClient.LOG.debug("Exception while seek to {} from {} of {} from "
1608              + "{}", targetPos, getCurrentBlock(), src, currentNode, e);
1609        }
1610      }
1611    }
1612    if (!done) {
1613      pos = targetPos;
1614      blockEnd = -1;
1615    }
1616  }
1617
1618  /**
1619   * Same as {@link #seekToNewSource(long)} except that it does not exclude
1620   * the current datanode and might connect to the same node.
1621   */
1622  private boolean seekToBlockSource(long targetPos)
1623                                                 throws IOException {
1624    currentNode = blockSeekTo(targetPos);
1625    return true;
1626  }
1627
1628  /**
1629   * Seek to given position on a node other than the current node.  If
1630   * a node other than the current node is found, then returns true.
1631   * If another node could not be found, then returns false.
1632   */
1633  @Override
1634  public synchronized boolean seekToNewSource(long targetPos) throws IOException {
1635    if (currentNode == null) {
1636      return seekToBlockSource(targetPos);
1637    }
1638    boolean markedDead = deadNodes.containsKey(currentNode);
1639    addToDeadNodes(currentNode);
1640    DatanodeInfo oldNode = currentNode;
1641    DatanodeInfo newNode = blockSeekTo(targetPos);
1642    if (!markedDead) {
1643      /* remove it from deadNodes. blockSeekTo could have cleared
1644       * deadNodes and added currentNode again. Thats ok. */
1645      deadNodes.remove(oldNode);
1646    }
1647    if (!oldNode.getDatanodeUuid().equals(newNode.getDatanodeUuid())) {
1648      currentNode = newNode;
1649      return true;
1650    } else {
1651      return false;
1652    }
1653  }
1654
1655  /**
1656   */
1657  @Override
1658  public synchronized long getPos() {
1659    return pos;
1660  }
1661
1662  /** Return the size of the remaining available bytes
1663   * if the size is less than or equal to {@link Integer#MAX_VALUE},
1664   * otherwise, return {@link Integer#MAX_VALUE}.
1665   */
1666  @Override
1667  public synchronized int available() throws IOException {
1668    if (closed.get()) {
1669      throw new IOException("Stream closed");
1670    }
1671
1672    final long remaining = getFileLength() - pos;
1673    return remaining <= Integer.MAX_VALUE? (int)remaining: Integer.MAX_VALUE;
1674  }
1675
1676  /**
1677   * We definitely don't support marks
1678   */
1679  @Override
1680  public boolean markSupported() {
1681    return false;
1682  }
1683  @Override
1684  public void mark(int readLimit) {
1685  }
1686  @Override
1687  public void reset() throws IOException {
1688    throw new IOException("Mark/reset not supported");
1689  }
1690
1691  /** Utility class to encapsulate data node info and its address. */
1692  static final class DNAddrPair {
1693    final DatanodeInfo info;
1694    final InetSocketAddress addr;
1695    final StorageType storageType;
1696
1697    DNAddrPair(DatanodeInfo info, InetSocketAddress addr,
1698        StorageType storageType) {
1699      this.info = info;
1700      this.addr = addr;
1701      this.storageType = storageType;
1702    }
1703  }
1704
1705  /**
1706   * Get statistics about the reads which this DFSInputStream has done.
1707   */
1708  public ReadStatistics getReadStatistics() {
1709    synchronized(infoLock) {
1710      return new ReadStatistics(readStatistics);
1711    }
1712  }
1713
1714  /**
1715   * Clear statistics about the reads which this DFSInputStream has done.
1716   */
1717  public void clearReadStatistics() {
1718    synchronized(infoLock) {
1719      readStatistics.clear();
1720    }
1721  }
1722
1723  public FileEncryptionInfo getFileEncryptionInfo() {
1724    synchronized(infoLock) {
1725      return fileEncryptionInfo;
1726    }
1727  }
1728
1729  protected void closeCurrentBlockReaders() {
1730    if (blockReader == null) return;
1731    // Close the current block reader so that the new caching settings can
1732    // take effect immediately.
1733    try {
1734      blockReader.close();
1735    } catch (IOException e) {
1736      DFSClient.LOG.error("error closing blockReader", e);
1737    }
1738    blockReader = null;
1739    blockEnd = -1;
1740  }
1741
1742  @Override
1743  public synchronized void setReadahead(Long readahead)
1744      throws IOException {
1745    synchronized (infoLock) {
1746      this.cachingStrategy =
1747          new CachingStrategy.Builder(this.cachingStrategy).setReadahead(readahead).build();
1748    }
1749    closeCurrentBlockReaders();
1750  }
1751
1752  @Override
1753  public synchronized void setDropBehind(Boolean dropBehind)
1754      throws IOException {
1755    synchronized (infoLock) {
1756      this.cachingStrategy =
1757          new CachingStrategy.Builder(this.cachingStrategy).setDropBehind(dropBehind).build();
1758    }
1759    closeCurrentBlockReaders();
1760  }
1761
1762  /**
1763   * The immutable empty buffer we return when we reach EOF when doing a
1764   * zero-copy read.
1765   */
1766  private static final ByteBuffer EMPTY_BUFFER =
1767      ByteBuffer.allocateDirect(0).asReadOnlyBuffer();
1768
1769  @Override
1770  public synchronized ByteBuffer read(ByteBufferPool bufferPool,
1771      int maxLength, EnumSet<ReadOption> opts)
1772          throws IOException, UnsupportedOperationException {
1773    if (maxLength == 0) {
1774      return EMPTY_BUFFER;
1775    } else if (maxLength < 0) {
1776      throw new IllegalArgumentException("can't read a negative " +
1777          "number of bytes.");
1778    }
1779    if ((blockReader == null) || (blockEnd == -1)) {
1780      if (pos >= getFileLength()) {
1781        return null;
1782      }
1783      /*
1784       * If we don't have a blockReader, or the one we have has no more bytes
1785       * left to read, we call seekToBlockSource to get a new blockReader and
1786       * recalculate blockEnd.  Note that we assume we're not at EOF here
1787       * (we check this above).
1788       */
1789      if ((!seekToBlockSource(pos)) || (blockReader == null)) {
1790        throw new IOException("failed to allocate new BlockReader " +
1791            "at position " + pos);
1792      }
1793    }
1794    ByteBuffer buffer = null;
1795    if (dfsClient.getConf().getShortCircuitConf().isShortCircuitMmapEnabled()) {
1796      buffer = tryReadZeroCopy(maxLength, opts);
1797    }
1798    if (buffer != null) {
1799      return buffer;
1800    }
1801    buffer = ByteBufferUtil.fallbackRead(this, bufferPool, maxLength);
1802    if (buffer != null) {
1803      getExtendedReadBuffers().put(buffer, bufferPool);
1804    }
1805    return buffer;
1806  }
1807
1808  private synchronized ByteBuffer tryReadZeroCopy(int maxLength,
1809      EnumSet<ReadOption> opts) throws IOException {
1810    // Copy 'pos' and 'blockEnd' to local variables to make it easier for the
1811    // JVM to optimize this function.
1812    final long curPos = pos;
1813    final long curEnd = blockEnd;
1814    final long blockStartInFile = currentLocatedBlock.getStartOffset();
1815    final long blockPos = curPos - blockStartInFile;
1816
1817    // Shorten this read if the end of the block is nearby.
1818    long length63;
1819    if ((curPos + maxLength) <= (curEnd + 1)) {
1820      length63 = maxLength;
1821    } else {
1822      length63 = 1 + curEnd - curPos;
1823      if (length63 <= 0) {
1824        DFSClient.LOG.debug("Unable to perform a zero-copy read from offset {}"
1825                + " of {}; {} bytes left in block. blockPos={}; curPos={};"
1826                + "curEnd={}",
1827            curPos, src, length63, blockPos, curPos, curEnd);
1828        return null;
1829      }
1830      DFSClient.LOG.debug("Reducing read length from {} to {} to avoid going "
1831              + "more than one byte past the end of the block.  blockPos={}; "
1832              +" curPos={}; curEnd={}",
1833          maxLength, length63, blockPos, curPos, curEnd);
1834    }
1835    // Make sure that don't go beyond 31-bit offsets in the MappedByteBuffer.
1836    int length;
1837    if (blockPos + length63 <= Integer.MAX_VALUE) {
1838      length = (int)length63;
1839    } else {
1840      long length31 = Integer.MAX_VALUE - blockPos;
1841      if (length31 <= 0) {
1842        // Java ByteBuffers can't be longer than 2 GB, because they use
1843        // 4-byte signed integers to represent capacity, etc.
1844        // So we can't mmap the parts of the block higher than the 2 GB offset.
1845        // FIXME: we could work around this with multiple memory maps.
1846        // See HDFS-5101.
1847        DFSClient.LOG.debug("Unable to perform a zero-copy read from offset {} "
1848            + " of {}; 31-bit MappedByteBuffer limit exceeded.  blockPos={}, "
1849            + "curEnd={}", curPos, src, blockPos, curEnd);
1850        return null;
1851      }
1852      length = (int)length31;
1853      DFSClient.LOG.debug("Reducing read length from {} to {} to avoid 31-bit "
1854          + "limit.  blockPos={}; curPos={}; curEnd={}",
1855          maxLength, length, blockPos, curPos, curEnd);
1856    }
1857    final ClientMmap clientMmap = blockReader.getClientMmap(opts);
1858    if (clientMmap == null) {
1859      DFSClient.LOG.debug("unable to perform a zero-copy read from offset {} of"
1860          + " {}; BlockReader#getClientMmap returned null.", curPos, src);
1861      return null;
1862    }
1863    boolean success = false;
1864    ByteBuffer buffer;
1865    try {
1866      seek(curPos + length);
1867      buffer = clientMmap.getMappedByteBuffer().asReadOnlyBuffer();
1868      buffer.position((int)blockPos);
1869      buffer.limit((int)(blockPos + length));
1870      getExtendedReadBuffers().put(buffer, clientMmap);
1871      synchronized (infoLock) {
1872        readStatistics.addZeroCopyBytes(length);
1873      }
1874      DFSClient.LOG.debug("readZeroCopy read {} bytes from offset {} via the "
1875          + "zero-copy read path.  blockEnd = {}", length, curPos, blockEnd);
1876      success = true;
1877    } finally {
1878      if (!success) {
1879        IOUtils.closeQuietly(clientMmap);
1880      }
1881    }
1882    return buffer;
1883  }
1884
1885  @Override
1886  public synchronized void releaseBuffer(ByteBuffer buffer) {
1887    if (buffer == EMPTY_BUFFER) return;
1888    Object val = getExtendedReadBuffers().remove(buffer);
1889    if (val == null) {
1890      throw new IllegalArgumentException("tried to release a buffer " +
1891          "that was not created by this stream, " + buffer);
1892    }
1893    if (val instanceof ClientMmap) {
1894      IOUtils.closeQuietly((ClientMmap)val);
1895    } else if (val instanceof ByteBufferPool) {
1896      ((ByteBufferPool)val).putBuffer(buffer);
1897    }
1898  }
1899
1900  @Override
1901  public synchronized void unbuffer() {
1902    closeCurrentBlockReaders();
1903  }
1904}