001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hdfs.client.impl;
019
020import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.USE_RECEIPT_VERIFICATION;
021
022import java.io.BufferedOutputStream;
023import java.io.DataInputStream;
024import java.io.DataOutputStream;
025import java.io.FileInputStream;
026import java.io.IOException;
027import java.lang.reflect.Constructor;
028import java.net.InetSocketAddress;
029import java.util.List;
030
031import com.google.common.io.ByteArrayDataOutput;
032import com.google.common.io.ByteStreams;
033import org.apache.commons.lang.mutable.MutableBoolean;
034import org.apache.hadoop.classification.InterfaceAudience;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.StorageType;
037import org.apache.hadoop.hdfs.BlockReader;
038import org.apache.hadoop.hdfs.ClientContext;
039import org.apache.hadoop.hdfs.DFSClient;
040import org.apache.hadoop.hdfs.DFSInputStream;
041import org.apache.hadoop.hdfs.DFSUtilClient;
042import org.apache.hadoop.hdfs.ExtendedBlockId;
043import org.apache.hadoop.hdfs.RemotePeerFactory;
044import org.apache.hadoop.hdfs.ReplicaAccessor;
045import org.apache.hadoop.hdfs.ReplicaAccessorBuilder;
046import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
047import org.apache.hadoop.hdfs.net.DomainPeer;
048import org.apache.hadoop.hdfs.net.Peer;
049import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
050import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
051import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
052import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
053import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
054import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
055import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
056import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
057import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
058import org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory;
059import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache;
060import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.ShortCircuitReplicaCreator;
061import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica;
062import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplicaInfo;
063import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
064import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
065import org.apache.hadoop.hdfs.util.IOUtilsClient;
066import org.apache.hadoop.ipc.RemoteException;
067import org.apache.hadoop.net.unix.DomainSocket;
068import org.apache.hadoop.security.AccessControlException;
069import org.apache.hadoop.security.UserGroupInformation;
070import org.apache.hadoop.security.token.SecretManager.InvalidToken;
071import org.apache.hadoop.security.token.Token;
072import org.apache.hadoop.util.PerformanceAdvisory;
073import org.apache.hadoop.util.Time;
074
075import com.google.common.annotations.VisibleForTesting;
076import com.google.common.base.Preconditions;
077import org.apache.htrace.core.Tracer;
078
079import org.slf4j.Logger;
080import org.slf4j.LoggerFactory;
081
082
083/**
084 * Utility class to create BlockReader implementations.
085 */
086@InterfaceAudience.Private
087public class BlockReaderFactory implements ShortCircuitReplicaCreator {
088  static final Logger LOG = LoggerFactory.getLogger(BlockReaderFactory.class);
089
090  public static class FailureInjector {
091    public void injectRequestFileDescriptorsFailure() throws IOException {
092      // do nothing
093    }
094    public boolean getSupportsReceiptVerification() {
095      return true;
096    }
097  }
098
099  @VisibleForTesting
100  static ShortCircuitReplicaCreator
101      createShortCircuitReplicaInfoCallback = null;
102
103  private final DfsClientConf conf;
104
105  /**
106   * Injects failures into specific operations during unit tests.
107   */
108  private static FailureInjector failureInjector = new FailureInjector();
109
110  /**
111   * The file name, for logging and debugging purposes.
112   */
113  private String fileName;
114
115  /**
116   * The block ID and block pool ID to use.
117   */
118  private ExtendedBlock block;
119
120  /**
121   * The block token to use for security purposes.
122   */
123  private Token<BlockTokenIdentifier> token;
124
125  /**
126   * The offset within the block to start reading at.
127   */
128  private long startOffset;
129
130  /**
131   * If false, we won't try to verify the block checksum.
132   */
133  private boolean verifyChecksum;
134
135  /**
136   * The name of this client.
137   */
138  private String clientName;
139
140  /**
141   * The DataNode we're talking to.
142   */
143  private DatanodeInfo datanode;
144
145  /**
146   * StorageType of replica on DataNode.
147   */
148  private StorageType storageType;
149
150  /**
151   * If false, we won't try short-circuit local reads.
152   */
153  private boolean allowShortCircuitLocalReads;
154
155  /**
156   * The ClientContext to use for things like the PeerCache.
157   */
158  private ClientContext clientContext;
159
160  /**
161   * Number of bytes to read. Must be set to a non-negative value.
162   */
163  private long length = -1;
164
165  /**
166   * Caching strategy to use when reading the block.
167   */
168  private CachingStrategy cachingStrategy;
169
170  /**
171   * Socket address to use to connect to peer.
172   */
173  private InetSocketAddress inetSocketAddress;
174
175  /**
176   * Remote peer factory to use to create a peer, if needed.
177   */
178  private RemotePeerFactory remotePeerFactory;
179
180  /**
181   * UserGroupInformation to use for legacy block reader local objects,
182   * if needed.
183   */
184  private UserGroupInformation userGroupInformation;
185
186  /**
187   * Configuration to use for legacy block reader local objects, if needed.
188   */
189  private Configuration configuration;
190
191  /**
192   * The HTrace tracer to use.
193   */
194  private Tracer tracer;
195
196  /**
197   * Information about the domain socket path we should use to connect to the
198   * local peer-- or null if we haven't examined the local domain socket.
199   */
200  private DomainSocketFactory.PathInfo pathInfo;
201
202  /**
203   * The remaining number of times that we'll try to pull a socket out of the
204   * cache.
205   */
206  private int remainingCacheTries;
207
208  public BlockReaderFactory(DfsClientConf conf) {
209    this.conf = conf;
210    this.remainingCacheTries = conf.getNumCachedConnRetry();
211  }
212
213  public BlockReaderFactory setFileName(String fileName) {
214    this.fileName = fileName;
215    return this;
216  }
217
218  public BlockReaderFactory setBlock(ExtendedBlock block) {
219    this.block = block;
220    return this;
221  }
222
223  public BlockReaderFactory setBlockToken(Token<BlockTokenIdentifier> token) {
224    this.token = token;
225    return this;
226  }
227
228  public BlockReaderFactory setStartOffset(long startOffset) {
229    this.startOffset = startOffset;
230    return this;
231  }
232
233  public BlockReaderFactory setVerifyChecksum(boolean verifyChecksum) {
234    this.verifyChecksum = verifyChecksum;
235    return this;
236  }
237
238  public BlockReaderFactory setClientName(String clientName) {
239    this.clientName = clientName;
240    return this;
241  }
242
243  public BlockReaderFactory setDatanodeInfo(DatanodeInfo datanode) {
244    this.datanode = datanode;
245    return this;
246  }
247
248  public BlockReaderFactory setStorageType(StorageType storageType) {
249    this.storageType = storageType;
250    return this;
251  }
252
253  public BlockReaderFactory setAllowShortCircuitLocalReads(
254      boolean allowShortCircuitLocalReads) {
255    this.allowShortCircuitLocalReads = allowShortCircuitLocalReads;
256    return this;
257  }
258
259  public BlockReaderFactory setClientCacheContext(
260      ClientContext clientContext) {
261    this.clientContext = clientContext;
262    return this;
263  }
264
265  public BlockReaderFactory setLength(long length) {
266    this.length = length;
267    return this;
268  }
269
270  public BlockReaderFactory setCachingStrategy(
271      CachingStrategy cachingStrategy) {
272    this.cachingStrategy = cachingStrategy;
273    return this;
274  }
275
276  public BlockReaderFactory setInetSocketAddress (
277      InetSocketAddress inetSocketAddress) {
278    this.inetSocketAddress = inetSocketAddress;
279    return this;
280  }
281
282  public BlockReaderFactory setUserGroupInformation(
283      UserGroupInformation userGroupInformation) {
284    this.userGroupInformation = userGroupInformation;
285    return this;
286  }
287
288  public BlockReaderFactory setRemotePeerFactory(
289      RemotePeerFactory remotePeerFactory) {
290    this.remotePeerFactory = remotePeerFactory;
291    return this;
292  }
293
294  public BlockReaderFactory setConfiguration(
295      Configuration configuration) {
296    this.configuration = configuration;
297    return this;
298  }
299
300  public BlockReaderFactory setTracer(Tracer tracer) {
301    this.tracer = tracer;
302    return this;
303  }
304
305  @VisibleForTesting
306  public static void setFailureInjectorForTesting(FailureInjector injector) {
307    failureInjector = injector;
308  }
309
310  /**
311   * Build a BlockReader with the given options.
312   *
313   * This function will do the best it can to create a block reader that meets
314   * all of our requirements.  We prefer short-circuit block readers
315   * (BlockReaderLocal and BlockReaderLocalLegacy) over remote ones, since the
316   * former avoid the overhead of socket communication.  If short-circuit is
317   * unavailable, our next fallback is data transfer over UNIX domain sockets,
318   * if dfs.client.domain.socket.data.traffic has been enabled.  If that doesn't
319   * work, we will try to create a remote block reader that operates over TCP
320   * sockets.
321   *
322   * There are a few caches that are important here.
323   *
324   * The ShortCircuitCache stores file descriptor objects which have been passed
325   * from the DataNode.
326   *
327   * The DomainSocketFactory stores information about UNIX domain socket paths
328   * that we not been able to use in the past, so that we don't waste time
329   * retrying them over and over.  (Like all the caches, it does have a timeout,
330   * though.)
331   *
332   * The PeerCache stores peers that we have used in the past.  If we can reuse
333   * one of these peers, we avoid the overhead of re-opening a socket.  However,
334   * if the socket has been timed out on the remote end, our attempt to reuse
335   * the socket may end with an IOException.  For that reason, we limit our
336   * attempts at socket reuse to dfs.client.cached.conn.retry times.  After
337   * that, we create new sockets.  This avoids the problem where a thread tries
338   * to talk to a peer that it hasn't talked to in a while, and has to clean out
339   * every entry in a socket cache full of stale entries.
340   *
341   * @return The new BlockReader.  We will not return null.
342   *
343   * @throws InvalidToken
344   *             If the block token was invalid.
345   *         InvalidEncryptionKeyException
346   *             If the encryption key was invalid.
347   *         Other IOException
348   *             If there was another problem.
349   */
350  public BlockReader build() throws IOException {
351    Preconditions.checkNotNull(configuration);
352    Preconditions
353        .checkState(length >= 0, "Length must be set to a non-negative value");
354    BlockReader reader = tryToCreateExternalBlockReader();
355    if (reader != null) {
356      return reader;
357    }
358    final ShortCircuitConf scConf = conf.getShortCircuitConf();
359    if (scConf.isShortCircuitLocalReads() && allowShortCircuitLocalReads) {
360      if (clientContext.getUseLegacyBlockReaderLocal()) {
361        reader = getLegacyBlockReaderLocal();
362        if (reader != null) {
363          LOG.trace("{}: returning new legacy block reader local.", this);
364          return reader;
365        }
366      } else {
367        reader = getBlockReaderLocal();
368        if (reader != null) {
369          LOG.trace("{}: returning new block reader local.", this);
370          return reader;
371        }
372      }
373    }
374    if (scConf.isDomainSocketDataTraffic()) {
375      reader = getRemoteBlockReaderFromDomain();
376      if (reader != null) {
377        LOG.trace("{}: returning new remote block reader using UNIX domain "
378            + "socket on {}", this, pathInfo.getPath());
379        return reader;
380      }
381    }
382    Preconditions.checkState(!DFSInputStream.tcpReadsDisabledForTesting,
383        "TCP reads were disabled for testing, but we failed to " +
384        "do a non-TCP read.");
385    return getRemoteBlockReaderFromTcp();
386  }
387
388  private BlockReader tryToCreateExternalBlockReader() {
389    List<Class<? extends ReplicaAccessorBuilder>> clses =
390        conf.getReplicaAccessorBuilderClasses();
391    for (Class<? extends ReplicaAccessorBuilder> cls : clses) {
392      try {
393        ByteArrayDataOutput bado = ByteStreams.newDataOutput();
394        token.write(bado);
395        byte tokenBytes[] = bado.toByteArray();
396
397        Constructor<? extends ReplicaAccessorBuilder> ctor =
398            cls.getConstructor();
399        ReplicaAccessorBuilder builder = ctor.newInstance();
400        long visibleLength = startOffset + length;
401        ReplicaAccessor accessor = builder.
402            setAllowShortCircuitReads(allowShortCircuitLocalReads).
403            setBlock(block.getBlockId(), block.getBlockPoolId()).
404            setGenerationStamp(block.getGenerationStamp()).
405            setBlockAccessToken(tokenBytes).
406            setClientName(clientName).
407            setConfiguration(configuration).
408            setFileName(fileName).
409            setVerifyChecksum(verifyChecksum).
410            setVisibleLength(visibleLength).
411            build();
412        if (accessor == null) {
413          LOG.trace("{}: No ReplicaAccessor created by {}",
414              this, cls.getName());
415        } else {
416          return new ExternalBlockReader(accessor, visibleLength, startOffset);
417        }
418      } catch (Throwable t) {
419        LOG.warn("Failed to construct new object of type " +
420            cls.getName(), t);
421      }
422    }
423    return null;
424  }
425
426
427  /**
428   * Get {@link BlockReaderLocalLegacy} for short circuited local reads.
429   * This block reader implements the path-based style of local reads
430   * first introduced in HDFS-2246.
431   */
432  private BlockReader getLegacyBlockReaderLocal() throws IOException {
433    LOG.trace("{}: trying to construct BlockReaderLocalLegacy", this);
434    if (!DFSUtilClient.isLocalAddress(inetSocketAddress)) {
435      LOG.trace("{}: can't construct BlockReaderLocalLegacy because the address"
436          + "{} is not local", this, inetSocketAddress);
437      return null;
438    }
439    if (clientContext.getDisableLegacyBlockReaderLocal()) {
440      PerformanceAdvisory.LOG.debug("{}: can't construct " +
441          "BlockReaderLocalLegacy because " +
442          "disableLegacyBlockReaderLocal is set.", this);
443      return null;
444    }
445    IOException ioe;
446    try {
447      return BlockReaderLocalLegacy.newBlockReader(conf,
448          userGroupInformation, configuration, fileName, block, token,
449          datanode, startOffset, length, storageType, tracer);
450    } catch (RemoteException remoteException) {
451      ioe = remoteException.unwrapRemoteException(
452                InvalidToken.class, AccessControlException.class);
453    } catch (IOException e) {
454      ioe = e;
455    }
456    if ((!(ioe instanceof AccessControlException)) &&
457        isSecurityException(ioe)) {
458      // Handle security exceptions.
459      // We do not handle AccessControlException here, since
460      // BlockReaderLocalLegacy#newBlockReader uses that exception to indicate
461      // that the user is not in dfs.block.local-path-access.user, a condition
462      // which requires us to disable legacy SCR.
463      throw ioe;
464    }
465    LOG.warn(this + ": error creating legacy BlockReaderLocal.  " +
466        "Disabling legacy local reads.", ioe);
467    clientContext.setDisableLegacyBlockReaderLocal();
468    return null;
469  }
470
471  private BlockReader getBlockReaderLocal() throws InvalidToken {
472    LOG.trace("{}: trying to construct a BlockReaderLocal for short-circuit "
473        + " reads.", this);
474    if (pathInfo == null) {
475      pathInfo = clientContext.getDomainSocketFactory()
476          .getPathInfo(inetSocketAddress, conf.getShortCircuitConf());
477    }
478    if (!pathInfo.getPathState().getUsableForShortCircuit()) {
479      PerformanceAdvisory.LOG.debug("{}: {} is not usable for short circuit; " +
480              "giving up on BlockReaderLocal.", this, pathInfo);
481      return null;
482    }
483    ShortCircuitCache cache = clientContext.getShortCircuitCache();
484    ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(),
485        block.getBlockPoolId());
486    ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this);
487    InvalidToken exc = info.getInvalidTokenException();
488    if (exc != null) {
489      LOG.trace("{}: got InvalidToken exception while trying to construct "
490          + "BlockReaderLocal via {}", this, pathInfo.getPath());
491      throw exc;
492    }
493    if (info.getReplica() == null) {
494      PerformanceAdvisory.LOG.debug("{}: failed to get " +
495          "ShortCircuitReplica. Cannot construct " +
496          "BlockReaderLocal via {}", this, pathInfo.getPath());
497      return null;
498    }
499    return new BlockReaderLocal.Builder(conf.getShortCircuitConf()).
500        setFilename(fileName).
501        setBlock(block).
502        setStartOffset(startOffset).
503        setShortCircuitReplica(info.getReplica()).
504        setVerifyChecksum(verifyChecksum).
505        setCachingStrategy(cachingStrategy).
506        setStorageType(storageType).
507        setTracer(tracer).
508        build();
509  }
510
511  /**
512   * Fetch a pair of short-circuit block descriptors from a local DataNode.
513   *
514   * @return    Null if we could not communicate with the datanode,
515   *            a new ShortCircuitReplicaInfo object otherwise.
516   *            ShortCircuitReplicaInfo objects may contain either an
517   *            InvalidToken exception, or a ShortCircuitReplica object ready to
518   *            use.
519   */
520  @Override
521  public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
522    if (createShortCircuitReplicaInfoCallback != null) {
523      ShortCircuitReplicaInfo info =
524          createShortCircuitReplicaInfoCallback.createShortCircuitReplicaInfo();
525      if (info != null) return info;
526    }
527    LOG.trace("{}: trying to create ShortCircuitReplicaInfo.", this);
528    BlockReaderPeer curPeer;
529    while (true) {
530      curPeer = nextDomainPeer();
531      if (curPeer == null) break;
532      if (curPeer.fromCache) remainingCacheTries--;
533      DomainPeer peer = (DomainPeer)curPeer.peer;
534      Slot slot = null;
535      ShortCircuitCache cache = clientContext.getShortCircuitCache();
536      try {
537        MutableBoolean usedPeer = new MutableBoolean(false);
538        slot = cache.allocShmSlot(datanode, peer, usedPeer,
539            new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()),
540            clientName);
541        if (usedPeer.booleanValue()) {
542          LOG.trace("{}: allocShmSlot used up our previous socket {}.  "
543              + "Allocating a new one...", this, peer.getDomainSocket());
544          curPeer = nextDomainPeer();
545          if (curPeer == null) break;
546          peer = (DomainPeer)curPeer.peer;
547        }
548        ShortCircuitReplicaInfo info = requestFileDescriptors(peer, slot);
549        clientContext.getPeerCache().put(datanode, peer);
550        return info;
551      } catch (IOException e) {
552        if (slot != null) {
553          cache.freeSlot(slot);
554        }
555        if (curPeer.fromCache) {
556          // Handle an I/O error we got when using a cached socket.
557          // These are considered less serious, because the socket may be stale.
558          LOG.debug("{}: closing stale domain peer {}", this, peer, e);
559          IOUtilsClient.cleanup(LOG, peer);
560        } else {
561          // Handle an I/O error we got when using a newly created socket.
562          // We temporarily disable the domain socket path for a few minutes in
563          // this case, to prevent wasting more time on it.
564          LOG.warn(this + ": I/O error requesting file descriptors.  " +
565              "Disabling domain socket " + peer.getDomainSocket(), e);
566          IOUtilsClient.cleanup(LOG, peer);
567          clientContext.getDomainSocketFactory()
568              .disableDomainSocketPath(pathInfo.getPath());
569          return null;
570        }
571      }
572    }
573    return null;
574  }
575
576  /**
577   * Request file descriptors from a DomainPeer.
578   *
579   * @param peer   The peer to use for communication.
580   * @param slot   If non-null, the shared memory slot to associate with the
581   *               new ShortCircuitReplica.
582   *
583   * @return  A ShortCircuitReplica object if we could communicate with the
584   *          datanode; null, otherwise.
585   * @throws  IOException If we encountered an I/O exception while communicating
586   *          with the datanode.
587   */
588  private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer,
589          Slot slot) throws IOException {
590    ShortCircuitCache cache = clientContext.getShortCircuitCache();
591    final DataOutputStream out =
592        new DataOutputStream(new BufferedOutputStream(peer.getOutputStream()));
593    SlotId slotId = slot == null ? null : slot.getSlotId();
594    new Sender(out).requestShortCircuitFds(block, token, slotId, 1,
595        failureInjector.getSupportsReceiptVerification());
596    DataInputStream in = new DataInputStream(peer.getInputStream());
597    BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
598        PBHelperClient.vintPrefixed(in));
599    DomainSocket sock = peer.getDomainSocket();
600    failureInjector.injectRequestFileDescriptorsFailure();
601    switch (resp.getStatus()) {
602    case SUCCESS:
603      byte buf[] = new byte[1];
604      FileInputStream[] fis = new FileInputStream[2];
605      sock.recvFileInputStreams(fis, buf, 0, buf.length);
606      ShortCircuitReplica replica = null;
607      try {
608        ExtendedBlockId key =
609            new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId());
610        if (buf[0] == USE_RECEIPT_VERIFICATION.getNumber()) {
611          LOG.trace("Sending receipt verification byte for slot {}", slot);
612          sock.getOutputStream().write(0);
613        }
614        replica = new ShortCircuitReplica(key, fis[0], fis[1], cache,
615            Time.monotonicNow(), slot);
616        return new ShortCircuitReplicaInfo(replica);
617      } catch (IOException e) {
618        // This indicates an error reading from disk, or a format error.  Since
619        // it's not a socket communication problem, we return null rather than
620        // throwing an exception.
621        LOG.warn(this + ": error creating ShortCircuitReplica.", e);
622        return null;
623      } finally {
624        if (replica == null) {
625          IOUtilsClient.cleanup(DFSClient.LOG, fis[0], fis[1]);
626        }
627      }
628    case ERROR_UNSUPPORTED:
629      if (!resp.hasShortCircuitAccessVersion()) {
630        LOG.warn("short-circuit read access is disabled for " +
631            "DataNode " + datanode + ".  reason: " + resp.getMessage());
632        clientContext.getDomainSocketFactory()
633            .disableShortCircuitForPath(pathInfo.getPath());
634      } else {
635        LOG.warn("short-circuit read access for the file " +
636            fileName + " is disabled for DataNode " + datanode +
637            ".  reason: " + resp.getMessage());
638      }
639      return null;
640    case ERROR_ACCESS_TOKEN:
641      String msg = "access control error while " +
642          "attempting to set up short-circuit access to " +
643          fileName + resp.getMessage();
644      LOG.debug("{}:{}", this, msg);
645      return new ShortCircuitReplicaInfo(new InvalidToken(msg));
646    default:
647      LOG.warn(this + ": unknown response code " + resp.getStatus() +
648          " while attempting to set up short-circuit access. " +
649          resp.getMessage());
650      clientContext.getDomainSocketFactory()
651          .disableShortCircuitForPath(pathInfo.getPath());
652      return null;
653    }
654  }
655
656  /**
657   * Get a BlockReaderRemote that communicates over a UNIX domain socket.
658   *
659   * @return The new BlockReader, or null if we failed to create the block
660   * reader.
661   *
662   * @throws InvalidToken    If the block token was invalid.
663   * Potentially other security-related execptions.
664   */
665  private BlockReader getRemoteBlockReaderFromDomain() throws IOException {
666    if (pathInfo == null) {
667      pathInfo = clientContext.getDomainSocketFactory()
668          .getPathInfo(inetSocketAddress, conf.getShortCircuitConf());
669    }
670    if (!pathInfo.getPathState().getUsableForDataTransfer()) {
671      PerformanceAdvisory.LOG.debug("{}: not trying to create a " +
672          "remote block reader because the UNIX domain socket at {}" +
673           " is not usable.", this, pathInfo);
674      return null;
675    }
676    LOG.trace("{}: trying to create a remote block reader from the UNIX domain "
677        + "socket at {}", this, pathInfo.getPath());
678
679    while (true) {
680      BlockReaderPeer curPeer = nextDomainPeer();
681      if (curPeer == null) break;
682      if (curPeer.fromCache) remainingCacheTries--;
683      DomainPeer peer = (DomainPeer)curPeer.peer;
684      BlockReader blockReader = null;
685      try {
686        blockReader = getRemoteBlockReader(peer);
687        return blockReader;
688      } catch (IOException ioe) {
689        IOUtilsClient.cleanup(LOG, peer);
690        if (isSecurityException(ioe)) {
691          LOG.trace("{}: got security exception while constructing a remote "
692                  + " block reader from the unix domain socket at {}",
693              this, pathInfo.getPath(), ioe);
694          throw ioe;
695        }
696        if (curPeer.fromCache) {
697          // Handle an I/O error we got when using a cached peer.  These are
698          // considered less serious because the underlying socket may be stale.
699          LOG.debug("Closed potentially stale domain peer {}", peer, ioe);
700        } else {
701          // Handle an I/O error we got when using a newly created domain peer.
702          // We temporarily disable the domain socket path for a few minutes in
703          // this case, to prevent wasting more time on it.
704          LOG.warn("I/O error constructing remote block reader.  Disabling " +
705              "domain socket " + peer.getDomainSocket(), ioe);
706          clientContext.getDomainSocketFactory()
707              .disableDomainSocketPath(pathInfo.getPath());
708          return null;
709        }
710      } finally {
711        if (blockReader == null) {
712          IOUtilsClient.cleanup(LOG, peer);
713        }
714      }
715    }
716    return null;
717  }
718
719  /**
720   * Get a BlockReaderRemote that communicates over a TCP socket.
721   *
722   * @return The new BlockReader.  We will not return null, but instead throw
723   *         an exception if this fails.
724   *
725   * @throws InvalidToken
726   *             If the block token was invalid.
727   *         InvalidEncryptionKeyException
728   *             If the encryption key was invalid.
729   *         Other IOException
730   *             If there was another problem.
731   */
732  private BlockReader getRemoteBlockReaderFromTcp() throws IOException {
733    LOG.trace("{}: trying to create a remote block reader from a TCP socket",
734        this);
735    BlockReader blockReader = null;
736    while (true) {
737      BlockReaderPeer curPeer = null;
738      Peer peer = null;
739      try {
740        curPeer = nextTcpPeer();
741        if (curPeer.fromCache) remainingCacheTries--;
742        peer = curPeer.peer;
743        blockReader = getRemoteBlockReader(peer);
744        return blockReader;
745      } catch (IOException ioe) {
746        if (isSecurityException(ioe)) {
747          LOG.trace("{}: got security exception while constructing a remote "
748              + "block reader from {}", this, peer, ioe);
749          throw ioe;
750        }
751        if ((curPeer != null) && curPeer.fromCache) {
752          // Handle an I/O error we got when using a cached peer.  These are
753          // considered less serious, because the underlying socket may be
754          // stale.
755          LOG.debug("Closed potentially stale remote peer {}", peer, ioe);
756        } else {
757          // Handle an I/O error we got when using a newly created peer.
758          LOG.warn("I/O error constructing remote block reader.", ioe);
759          throw ioe;
760        }
761      } finally {
762        if (blockReader == null) {
763          IOUtilsClient.cleanup(LOG, peer);
764        }
765      }
766    }
767  }
768
769  public static class BlockReaderPeer {
770    final Peer peer;
771    final boolean fromCache;
772
773    BlockReaderPeer(Peer peer, boolean fromCache) {
774      this.peer = peer;
775      this.fromCache = fromCache;
776    }
777  }
778
779  /**
780   * Get the next DomainPeer-- either from the cache or by creating it.
781   *
782   * @return the next DomainPeer, or null if we could not construct one.
783   */
784  private BlockReaderPeer nextDomainPeer() {
785    if (remainingCacheTries > 0) {
786      Peer peer = clientContext.getPeerCache().get(datanode, true);
787      if (peer != null) {
788        LOG.trace("nextDomainPeer: reusing existing peer {}", peer);
789        return new BlockReaderPeer(peer, true);
790      }
791    }
792    DomainSocket sock = clientContext.getDomainSocketFactory().
793        createSocket(pathInfo, conf.getSocketTimeout());
794    if (sock == null) return null;
795    return new BlockReaderPeer(new DomainPeer(sock), false);
796  }
797
798  /**
799   * Get the next TCP-based peer-- either from the cache or by creating it.
800   *
801   * @return the next Peer, or null if we could not construct one.
802   *
803   * @throws IOException  If there was an error while constructing the peer
804   *                      (such as an InvalidEncryptionKeyException)
805   */
806  private BlockReaderPeer nextTcpPeer() throws IOException {
807    if (remainingCacheTries > 0) {
808      Peer peer = clientContext.getPeerCache().get(datanode, false);
809      if (peer != null) {
810        LOG.trace("nextTcpPeer: reusing existing peer {}", peer);
811        return new BlockReaderPeer(peer, true);
812      }
813    }
814    try {
815      Peer peer = remotePeerFactory.newConnectedPeer(inetSocketAddress, token,
816          datanode);
817      LOG.trace("nextTcpPeer: created newConnectedPeer {}", peer);
818      return new BlockReaderPeer(peer, false);
819    } catch (IOException e) {
820      LOG.trace("nextTcpPeer: failed to create newConnectedPeer connected to"
821          + "{}", datanode);
822      throw e;
823    }
824  }
825
826  /**
827   * Determine if an exception is security-related.
828   *
829   * We need to handle these exceptions differently than other IOExceptions.
830   * They don't indicate a communication problem.  Instead, they mean that there
831   * is some action the client needs to take, such as refetching block tokens,
832   * renewing encryption keys, etc.
833   *
834   * @param ioe    The exception
835   * @return       True only if the exception is security-related.
836   */
837  private static boolean isSecurityException(IOException ioe) {
838    return (ioe instanceof InvalidToken) ||
839            (ioe instanceof InvalidEncryptionKeyException) ||
840            (ioe instanceof InvalidBlockTokenException) ||
841            (ioe instanceof AccessControlException);
842  }
843
844  @SuppressWarnings("deprecation")
845  private BlockReader getRemoteBlockReader(Peer peer) throws IOException {
846    if (conf.getShortCircuitConf().isUseLegacyBlockReader()) {
847      return BlockReaderRemote.newBlockReader(fileName,
848          block, token, startOffset, length, conf.getIoBufferSize(),
849          verifyChecksum, clientName, peer, datanode,
850          clientContext.getPeerCache(), cachingStrategy, tracer);
851    } else {
852      return BlockReaderRemote2.newBlockReader(
853          fileName, block, token, startOffset, length,
854          verifyChecksum, clientName, peer, datanode,
855          clientContext.getPeerCache(), cachingStrategy, tracer);
856    }
857  }
858
859  @Override
860  public String toString() {
861    return "BlockReaderFactory(fileName=" + fileName + ", block=" + block + ")";
862  }
863
864  /**
865   * File name to print when accessing a block directly (from servlets)
866   * @param s Address of the block location
867   * @param poolId Block pool ID of the block
868   * @param blockId Block ID of the block
869   * @return string that has a file name for debug purposes
870   */
871  public static String getFileName(final InetSocketAddress s,
872      final String poolId, final long blockId) {
873    return s.toString() + ":" + poolId + ":" + blockId;
874  }
875}