001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hdfs;
019
020import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_CODEC_CLASSES_KEY_PREFIX;
021import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_READS;
022import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_WRITES;
023import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACHE_READAHEAD;
024import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT;
025import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT;
026import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_LOCAL_INTERFACES;
027import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT;
028import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY;
029
030import java.io.BufferedOutputStream;
031import java.io.DataInputStream;
032import java.io.DataOutputStream;
033import java.io.FileNotFoundException;
034import java.io.IOException;
035import java.io.InputStream;
036import java.io.OutputStream;
037import java.net.InetAddress;
038import java.net.InetSocketAddress;
039import java.net.Socket;
040import java.net.SocketAddress;
041import java.net.URI;
042import java.net.UnknownHostException;
043import java.security.GeneralSecurityException;
044import java.util.ArrayList;
045import java.util.EnumSet;
046import java.util.HashMap;
047import java.util.LinkedHashMap;
048import java.util.List;
049import java.util.Map;
050import java.util.Random;
051import java.util.concurrent.SynchronousQueue;
052import java.util.concurrent.ThreadLocalRandom;
053import java.util.concurrent.ThreadPoolExecutor;
054import java.util.concurrent.TimeUnit;
055import java.util.concurrent.atomic.AtomicBoolean;
056import java.util.concurrent.atomic.AtomicInteger;
057
058import javax.net.SocketFactory;
059
060import org.apache.hadoop.HadoopIllegalArgumentException;
061import org.apache.hadoop.classification.InterfaceAudience;
062import org.apache.hadoop.conf.Configuration;
063import org.apache.hadoop.crypto.CipherSuite;
064import org.apache.hadoop.crypto.CryptoCodec;
065import org.apache.hadoop.crypto.CryptoInputStream;
066import org.apache.hadoop.crypto.CryptoOutputStream;
067import org.apache.hadoop.crypto.CryptoProtocolVersion;
068import org.apache.hadoop.crypto.key.KeyProvider;
069import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
070import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
071import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
072import org.apache.hadoop.fs.BlockLocation;
073import org.apache.hadoop.fs.BlockStorageLocation;
074import org.apache.hadoop.fs.CacheFlag;
075import org.apache.hadoop.fs.ContentSummary;
076import org.apache.hadoop.fs.CreateFlag;
077import org.apache.hadoop.fs.FileAlreadyExistsException;
078import org.apache.hadoop.fs.FileEncryptionInfo;
079import org.apache.hadoop.fs.FileSystem;
080import org.apache.hadoop.fs.FsServerDefaults;
081import org.apache.hadoop.fs.FsStatus;
082import org.apache.hadoop.fs.FsTracer;
083import org.apache.hadoop.fs.HdfsBlockLocation;
084import org.apache.hadoop.fs.InvalidPathException;
085import org.apache.hadoop.fs.MD5MD5CRC32CastagnoliFileChecksum;
086import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
087import org.apache.hadoop.fs.MD5MD5CRC32GzipFileChecksum;
088import org.apache.hadoop.fs.Options;
089import org.apache.hadoop.fs.Options.ChecksumOpt;
090import org.apache.hadoop.fs.ParentNotDirectoryException;
091import org.apache.hadoop.fs.Path;
092import org.apache.hadoop.fs.QuotaUsage;
093import org.apache.hadoop.fs.RemoteIterator;
094import org.apache.hadoop.fs.StorageType;
095import org.apache.hadoop.fs.VolumeId;
096import org.apache.hadoop.fs.XAttr;
097import org.apache.hadoop.fs.XAttrSetFlag;
098import org.apache.hadoop.fs.permission.AclEntry;
099import org.apache.hadoop.fs.permission.AclStatus;
100import org.apache.hadoop.fs.permission.FsAction;
101import org.apache.hadoop.fs.permission.FsPermission;
102import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
103import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
104import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
105import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
106import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
107import org.apache.hadoop.hdfs.client.impl.LeaseRenewer;
108import org.apache.hadoop.hdfs.net.Peer;
109import org.apache.hadoop.hdfs.protocol.AclException;
110import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
111import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
112import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
113import org.apache.hadoop.hdfs.protocol.CacheDirectiveIterator;
114import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
115import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
116import org.apache.hadoop.hdfs.protocol.CachePoolIterator;
117import org.apache.hadoop.hdfs.protocol.ClientProtocol;
118import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
119import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
120import org.apache.hadoop.hdfs.protocol.DatanodeID;
121import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
122import org.apache.hadoop.hdfs.protocol.DirectoryListing;
123import org.apache.hadoop.hdfs.protocol.EncryptionZone;
124import org.apache.hadoop.hdfs.protocol.EncryptionZoneIterator;
125import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
126import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
127import org.apache.hadoop.hdfs.protocol.HdfsConstants;
128import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
129import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
130import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
131import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
132import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
133import org.apache.hadoop.hdfs.protocol.LocatedBlock;
134import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
135import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
136import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException;
137import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
138import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
139import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
140import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
141import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
142import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
143import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
144import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
145import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure;
146import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
147import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
148import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
149import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
150import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
151import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
152import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
153import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
154import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
155import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
156import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
157import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
158import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
159import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
160import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
161import org.apache.hadoop.hdfs.util.IOUtilsClient;
162import org.apache.hadoop.io.DataOutputBuffer;
163import org.apache.hadoop.io.EnumSetWritable;
164import org.apache.hadoop.io.IOUtils;
165import org.apache.hadoop.io.MD5Hash;
166import org.apache.hadoop.io.Text;
167import org.apache.hadoop.io.retry.LossyRetryInvocationHandler;
168import org.apache.hadoop.ipc.RPC;
169import org.apache.hadoop.ipc.RemoteException;
170import org.apache.hadoop.ipc.RetriableException;
171import org.apache.hadoop.ipc.RpcNoSuchMethodException;
172import org.apache.hadoop.net.DNS;
173import org.apache.hadoop.net.NetUtils;
174import org.apache.hadoop.security.AccessControlException;
175import org.apache.hadoop.security.UserGroupInformation;
176import org.apache.hadoop.security.token.SecretManager.InvalidToken;
177import org.apache.hadoop.security.token.Token;
178import org.apache.hadoop.security.token.TokenRenewer;
179import org.apache.hadoop.util.Daemon;
180import org.apache.hadoop.util.DataChecksum;
181import org.apache.hadoop.util.DataChecksum.Type;
182import org.apache.hadoop.util.Progressable;
183import org.apache.hadoop.util.Time;
184import org.apache.htrace.core.TraceScope;
185import org.apache.htrace.core.Tracer;
186import org.slf4j.Logger;
187import org.slf4j.LoggerFactory;
188
189import com.google.common.annotations.VisibleForTesting;
190import com.google.common.base.Joiner;
191import com.google.common.base.Preconditions;
192import com.google.common.collect.Lists;
193import com.google.common.net.InetAddresses;
194
195/********************************************************
196 * DFSClient can connect to a Hadoop Filesystem and
197 * perform basic file tasks.  It uses the ClientProtocol
198 * to communicate with a NameNode daemon, and connects
199 * directly to DataNodes to read/write block data.
200 *
201 * Hadoop DFS users should obtain an instance of
202 * DistributedFileSystem, which uses DFSClient to handle
203 * filesystem tasks.
204 *
205 ********************************************************/
206@InterfaceAudience.Private
207public class DFSClient implements java.io.Closeable, RemotePeerFactory,
208    DataEncryptionKeyFactory {
209  public static final Logger LOG = LoggerFactory.getLogger(DFSClient.class);
210  // 1 hour
211  public static final long SERVER_DEFAULTS_VALIDITY_PERIOD = 60 * 60 * 1000L;
212
213  private final Configuration conf;
214  private final Tracer tracer;
215  private final DfsClientConf dfsClientConf;
216  final ClientProtocol namenode;
217  /* The service used for delegation tokens */
218  private Text dtService;
219
220  final UserGroupInformation ugi;
221  volatile boolean clientRunning = true;
222  volatile long lastLeaseRenewal;
223  private volatile FsServerDefaults serverDefaults;
224  private volatile long serverDefaultsLastUpdate;
225  final String clientName;
226  final SocketFactory socketFactory;
227  final ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure;
228  final FileSystem.Statistics stats;
229  private final String authority;
230  private final Random r = new Random();
231  private SocketAddress[] localInterfaceAddrs;
232  private DataEncryptionKey encryptionKey;
233  final SaslDataTransferClient saslClient;
234  private final CachingStrategy defaultReadCachingStrategy;
235  private final CachingStrategy defaultWriteCachingStrategy;
236  private final ClientContext clientContext;
237
238  private static final DFSHedgedReadMetrics HEDGED_READ_METRIC =
239      new DFSHedgedReadMetrics();
240  private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL;
241  private final int smallBufferSize;
242
243  public DfsClientConf getConf() {
244    return dfsClientConf;
245  }
246
247  Configuration getConfiguration() {
248    return conf;
249  }
250
251  /**
252   * A map from file names to {@link DFSOutputStream} objects
253   * that are currently being written by this client.
254   * Note that a file can only be written by a single client.
255   */
256  private final Map<Long, DFSOutputStream> filesBeingWritten = new HashMap<>();
257
258  /**
259   * Same as this(NameNode.getNNAddress(conf), conf);
260   * @see #DFSClient(InetSocketAddress, Configuration)
261   * @deprecated Deprecated at 0.21
262   */
263  @Deprecated
264  public DFSClient(Configuration conf) throws IOException {
265    this(DFSUtilClient.getNNAddress(conf), conf);
266  }
267
268  public DFSClient(InetSocketAddress address, Configuration conf)
269      throws IOException {
270    this(DFSUtilClient.getNNUri(address), conf);
271  }
272
273  /**
274   * Same as this(nameNodeUri, conf, null);
275   * @see #DFSClient(URI, Configuration, FileSystem.Statistics)
276   */
277  public DFSClient(URI nameNodeUri, Configuration conf) throws IOException {
278    this(nameNodeUri, conf, null);
279  }
280
281  /**
282   * Same as this(nameNodeUri, null, conf, stats);
283   * @see #DFSClient(URI, ClientProtocol, Configuration, FileSystem.Statistics)
284   */
285  public DFSClient(URI nameNodeUri, Configuration conf,
286      FileSystem.Statistics stats) throws IOException {
287    this(nameNodeUri, null, conf, stats);
288  }
289
290  /**
291   * Create a new DFSClient connected to the given nameNodeUri or rpcNamenode.
292   * If HA is enabled and a positive value is set for
293   * {@link HdfsClientConfigKeys#DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY}
294   * in the configuration, the DFSClient will use
295   * {@link LossyRetryInvocationHandler} as its RetryInvocationHandler.
296   * Otherwise one of nameNodeUri or rpcNamenode must be null.
297   */
298  @VisibleForTesting
299  public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
300      Configuration conf, FileSystem.Statistics stats) throws IOException {
301    // Copy only the required DFSClient configuration
302    this.tracer = FsTracer.get(conf);
303    this.dfsClientConf = new DfsClientConf(conf);
304    this.conf = conf;
305    this.stats = stats;
306    this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
307    this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf);
308    this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf);
309
310    this.ugi = UserGroupInformation.getCurrentUser();
311
312    this.authority = nameNodeUri == null? "null": nameNodeUri.getAuthority();
313    this.clientName = "DFSClient_" + dfsClientConf.getTaskId() + "_" +
314        ThreadLocalRandom.current().nextInt()  + "_" +
315        Thread.currentThread().getId();
316    int numResponseToDrop = conf.getInt(
317        DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY,
318        DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT);
319    ProxyAndInfo<ClientProtocol> proxyInfo = null;
320    AtomicBoolean nnFallbackToSimpleAuth = new AtomicBoolean(false);
321
322    if (numResponseToDrop > 0) {
323      // This case is used for testing.
324      LOG.warn(DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY
325          + " is set to " + numResponseToDrop
326          + ", this hacked client will proactively drop responses");
327      proxyInfo = NameNodeProxiesClient.createProxyWithLossyRetryHandler(conf,
328          nameNodeUri, ClientProtocol.class, numResponseToDrop,
329          nnFallbackToSimpleAuth);
330    }
331
332    if (proxyInfo != null) {
333      this.dtService = proxyInfo.getDelegationTokenService();
334      this.namenode = proxyInfo.getProxy();
335    } else if (rpcNamenode != null) {
336      // This case is used for testing.
337      Preconditions.checkArgument(nameNodeUri == null);
338      this.namenode = rpcNamenode;
339      dtService = null;
340    } else {
341      Preconditions.checkArgument(nameNodeUri != null,
342          "null URI");
343      proxyInfo = NameNodeProxiesClient.createProxyWithClientProtocol(conf,
344          nameNodeUri, nnFallbackToSimpleAuth);
345      this.dtService = proxyInfo.getDelegationTokenService();
346      this.namenode = proxyInfo.getProxy();
347    }
348
349    String localInterfaces[] =
350        conf.getTrimmedStrings(DFS_CLIENT_LOCAL_INTERFACES);
351    localInterfaceAddrs = getLocalInterfaceAddrs(localInterfaces);
352    if (LOG.isDebugEnabled() && 0 != localInterfaces.length) {
353      LOG.debug("Using local interfaces [" +
354          Joiner.on(',').join(localInterfaces)+ "] with addresses [" +
355          Joiner.on(',').join(localInterfaceAddrs) + "]");
356    }
357
358    Boolean readDropBehind =
359        (conf.get(DFS_CLIENT_CACHE_DROP_BEHIND_READS) == null) ?
360            null : conf.getBoolean(DFS_CLIENT_CACHE_DROP_BEHIND_READS, false);
361    Long readahead = (conf.get(DFS_CLIENT_CACHE_READAHEAD) == null) ?
362        null : conf.getLong(DFS_CLIENT_CACHE_READAHEAD, 0);
363    Boolean writeDropBehind =
364        (conf.get(DFS_CLIENT_CACHE_DROP_BEHIND_WRITES) == null) ?
365            null : conf.getBoolean(DFS_CLIENT_CACHE_DROP_BEHIND_WRITES, false);
366    this.defaultReadCachingStrategy =
367        new CachingStrategy(readDropBehind, readahead);
368    this.defaultWriteCachingStrategy =
369        new CachingStrategy(writeDropBehind, readahead);
370    this.clientContext = ClientContext.get(
371        conf.get(DFS_CLIENT_CONTEXT, DFS_CLIENT_CONTEXT_DEFAULT),
372        dfsClientConf);
373
374    if (dfsClientConf.getHedgedReadThreadpoolSize() > 0) {
375      this.initThreadsNumForHedgedReads(dfsClientConf.getHedgedReadThreadpoolSize());
376    }
377    this.saslClient = new SaslDataTransferClient(
378        conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf),
379        TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth);
380  }
381
382  /**
383   * Return the socket addresses to use with each configured
384   * local interface. Local interfaces may be specified by IP
385   * address, IP address range using CIDR notation, interface
386   * name (e.g. eth0) or sub-interface name (e.g. eth0:0).
387   * The socket addresses consist of the IPs for the interfaces
388   * and the ephemeral port (port 0). If an IP, IP range, or
389   * interface name matches an interface with sub-interfaces
390   * only the IP of the interface is used. Sub-interfaces can
391   * be used by specifying them explicitly (by IP or name).
392   *
393   * @return SocketAddresses for the configured local interfaces,
394   *    or an empty array if none are configured
395   * @throws UnknownHostException if a given interface name is invalid
396   */
397  private static SocketAddress[] getLocalInterfaceAddrs(
398      String interfaceNames[]) throws UnknownHostException {
399    List<SocketAddress> localAddrs = new ArrayList<>();
400    for (String interfaceName : interfaceNames) {
401      if (InetAddresses.isInetAddress(interfaceName)) {
402        localAddrs.add(new InetSocketAddress(interfaceName, 0));
403      } else if (NetUtils.isValidSubnet(interfaceName)) {
404        for (InetAddress addr : NetUtils.getIPs(interfaceName, false)) {
405          localAddrs.add(new InetSocketAddress(addr, 0));
406        }
407      } else {
408        for (String ip : DNS.getIPs(interfaceName, false)) {
409          localAddrs.add(new InetSocketAddress(ip, 0));
410        }
411      }
412    }
413    return localAddrs.toArray(new SocketAddress[localAddrs.size()]);
414  }
415
416  /**
417   * Select one of the configured local interfaces at random. We use a random
418   * interface because other policies like round-robin are less effective
419   * given that we cache connections to datanodes.
420   *
421   * @return one of the local interface addresses at random, or null if no
422   *    local interfaces are configured
423   */
424  SocketAddress getRandomLocalInterfaceAddr() {
425    if (localInterfaceAddrs.length == 0) {
426      return null;
427    }
428    final int idx = r.nextInt(localInterfaceAddrs.length);
429    final SocketAddress addr = localInterfaceAddrs[idx];
430    LOG.debug("Using local interface {}", addr);
431    return addr;
432  }
433
434  /**
435   * Return the timeout that clients should use when writing to datanodes.
436   * @param numNodes the number of nodes in the pipeline.
437   */
438  int getDatanodeWriteTimeout(int numNodes) {
439    final int t = dfsClientConf.getDatanodeSocketWriteTimeout();
440    return t > 0? t + HdfsConstants.WRITE_TIMEOUT_EXTENSION*numNodes: 0;
441  }
442
443  int getDatanodeReadTimeout(int numNodes) {
444    final int t = dfsClientConf.getSocketTimeout();
445    return t > 0? HdfsConstants.READ_TIMEOUT_EXTENSION*numNodes + t: 0;
446  }
447
448  @VisibleForTesting
449  public String getClientName() {
450    return clientName;
451  }
452
453  void checkOpen() throws IOException {
454    if (!clientRunning) {
455      throw new IOException("Filesystem closed");
456    }
457  }
458
459  /** Return the lease renewer instance. The renewer thread won't start
460   *  until the first output stream is created. The same instance will
461   *  be returned until all output streams are closed.
462   */
463  public LeaseRenewer getLeaseRenewer() {
464    return LeaseRenewer.getInstance(authority, ugi, this);
465  }
466
467  /** Get a lease and start automatic renewal */
468  private void beginFileLease(final long inodeId, final DFSOutputStream out)
469      throws IOException {
470    getLeaseRenewer().put(inodeId, out, this);
471  }
472
473  /** Stop renewal of lease for the file. */
474  void endFileLease(final long inodeId) {
475    getLeaseRenewer().closeFile(inodeId, this);
476  }
477
478
479  /** Put a file. Only called from LeaseRenewer, where proper locking is
480   *  enforced to consistently update its local dfsclients array and
481   *  client's filesBeingWritten map.
482   */
483  public void putFileBeingWritten(final long inodeId,
484      final DFSOutputStream out) {
485    synchronized(filesBeingWritten) {
486      filesBeingWritten.put(inodeId, out);
487      // update the last lease renewal time only when there was no
488      // writes. once there is one write stream open, the lease renewer
489      // thread keeps it updated well with in anyone's expiration time.
490      if (lastLeaseRenewal == 0) {
491        updateLastLeaseRenewal();
492      }
493    }
494  }
495
496  /** Remove a file. Only called from LeaseRenewer. */
497  public void removeFileBeingWritten(final long inodeId) {
498    synchronized(filesBeingWritten) {
499      filesBeingWritten.remove(inodeId);
500      if (filesBeingWritten.isEmpty()) {
501        lastLeaseRenewal = 0;
502      }
503    }
504  }
505
506  /** Is file-being-written map empty? */
507  public boolean isFilesBeingWrittenEmpty() {
508    synchronized(filesBeingWritten) {
509      return filesBeingWritten.isEmpty();
510    }
511  }
512
513  /** @return true if the client is running */
514  public boolean isClientRunning() {
515    return clientRunning;
516  }
517
518  long getLastLeaseRenewal() {
519    return lastLeaseRenewal;
520  }
521
522  void updateLastLeaseRenewal() {
523    synchronized(filesBeingWritten) {
524      if (filesBeingWritten.isEmpty()) {
525        return;
526      }
527      lastLeaseRenewal = Time.monotonicNow();
528    }
529  }
530
531  /**
532   * Renew leases.
533   * @return true if lease was renewed. May return false if this
534   * client has been closed or has no files open.
535   **/
536  public boolean renewLease() throws IOException {
537    if (clientRunning && !isFilesBeingWrittenEmpty()) {
538      try {
539        namenode.renewLease(clientName);
540        updateLastLeaseRenewal();
541        return true;
542      } catch (IOException e) {
543        // Abort if the lease has already expired.
544        final long elapsed = Time.monotonicNow() - getLastLeaseRenewal();
545        if (elapsed > HdfsConstants.LEASE_HARDLIMIT_PERIOD) {
546          LOG.warn("Failed to renew lease for " + clientName + " for "
547              + (elapsed/1000) + " seconds (>= hard-limit ="
548              + (HdfsConstants.LEASE_HARDLIMIT_PERIOD / 1000) + " seconds.) "
549              + "Closing all files being written ...", e);
550          closeAllFilesBeingWritten(true);
551        } else {
552          // Let the lease renewer handle it and retry.
553          throw e;
554        }
555      }
556    }
557    return false;
558  }
559
560  /**
561   * Close connections the Namenode.
562   */
563  void closeConnectionToNamenode() {
564    RPC.stopProxy(namenode);
565  }
566
567  /** Close/abort all files being written. */
568  public void closeAllFilesBeingWritten(final boolean abort) {
569    for(;;) {
570      final long inodeId;
571      final DFSOutputStream out;
572      synchronized(filesBeingWritten) {
573        if (filesBeingWritten.isEmpty()) {
574          return;
575        }
576        inodeId = filesBeingWritten.keySet().iterator().next();
577        out = filesBeingWritten.remove(inodeId);
578      }
579      if (out != null) {
580        try {
581          if (abort) {
582            out.abort();
583          } else {
584            out.close();
585          }
586        } catch(IOException ie) {
587          LOG.error("Failed to " + (abort ? "abort" : "close") + " file: "
588              + out.getSrc() + " with inode: " + inodeId, ie);
589        }
590      }
591    }
592  }
593
594  /**
595   * Close the file system, abandoning all of the leases and files being
596   * created and close connections to the namenode.
597   */
598  @Override
599  public synchronized void close() throws IOException {
600    if(clientRunning) {
601      closeAllFilesBeingWritten(false);
602      clientRunning = false;
603      getLeaseRenewer().closeClient(this);
604      // close connections to the namenode
605      closeConnectionToNamenode();
606    }
607  }
608
609  /**
610   * Close all open streams, abandoning all of the leases and files being
611   * created.
612   * @param abort whether streams should be gracefully closed
613   */
614  public void closeOutputStreams(boolean abort) {
615    if (clientRunning) {
616      closeAllFilesBeingWritten(abort);
617    }
618  }
619
620  /**
621   * @see ClientProtocol#getPreferredBlockSize(String)
622   */
623  public long getBlockSize(String f) throws IOException {
624    try (TraceScope ignored = newPathTraceScope("getBlockSize", f)) {
625      return namenode.getPreferredBlockSize(f);
626    } catch (IOException ie) {
627      LOG.warn("Problem getting block size", ie);
628      throw ie;
629    }
630  }
631
632  /**
633   * Get server default values for a number of configuration params.
634   * @see ClientProtocol#getServerDefaults()
635   */
636  public FsServerDefaults getServerDefaults() throws IOException {
637    long now = Time.monotonicNow();
638    if ((serverDefaults == null) ||
639        (now - serverDefaultsLastUpdate > SERVER_DEFAULTS_VALIDITY_PERIOD)) {
640      serverDefaults = namenode.getServerDefaults();
641      serverDefaultsLastUpdate = now;
642    }
643    assert serverDefaults != null;
644    return serverDefaults;
645  }
646
647  /**
648   * Get a canonical token service name for this client's tokens.  Null should
649   * be returned if the client is not using tokens.
650   * @return the token service for the client
651   */
652  @InterfaceAudience.LimitedPrivate( { "HDFS" })
653  public String getCanonicalServiceName() {
654    return (dtService != null) ? dtService.toString() : null;
655  }
656
657  /**
658   * @see ClientProtocol#getDelegationToken(Text)
659   */
660  public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
661      throws IOException {
662    assert dtService != null;
663    try (TraceScope ignored = tracer.newScope("getDelegationToken")) {
664      Token<DelegationTokenIdentifier> token =
665          namenode.getDelegationToken(renewer);
666      if (token != null) {
667        token.setService(this.dtService);
668        LOG.info("Created " + DelegationTokenIdentifier.stringifyToken(token));
669      } else {
670        LOG.info("Cannot get delegation token from " + renewer);
671      }
672      return token;
673    }
674  }
675
676  /**
677   * Renew a delegation token
678   * @param token the token to renew
679   * @return the new expiration time
680   * @throws IOException
681   * @deprecated Use Token.renew instead.
682   */
683  @Deprecated
684  public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
685      throws IOException {
686    LOG.info("Renewing " + DelegationTokenIdentifier.stringifyToken(token));
687    try {
688      return token.renew(conf);
689    } catch (InterruptedException ie) {
690      throw new RuntimeException("caught interrupted", ie);
691    } catch (RemoteException re) {
692      throw re.unwrapRemoteException(InvalidToken.class,
693          AccessControlException.class);
694    }
695  }
696
697  /**
698   * Cancel a delegation token
699   * @param token the token to cancel
700   * @throws IOException
701   * @deprecated Use Token.cancel instead.
702   */
703  @Deprecated
704  public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
705      throws IOException {
706    LOG.info("Cancelling " + DelegationTokenIdentifier.stringifyToken(token));
707    try {
708      token.cancel(conf);
709    } catch (InterruptedException ie) {
710      throw new RuntimeException("caught interrupted", ie);
711    } catch (RemoteException re) {
712      throw re.unwrapRemoteException(InvalidToken.class,
713          AccessControlException.class);
714    }
715  }
716
717  @InterfaceAudience.Private
718  public static class Renewer extends TokenRenewer {
719
720    static {
721      //Ensure that HDFS Configuration files are loaded before trying to use
722      // the renewer.
723      HdfsConfiguration.init();
724    }
725
726    @Override
727    public boolean handleKind(Text kind) {
728      return DelegationTokenIdentifier.HDFS_DELEGATION_KIND.equals(kind);
729    }
730
731    @SuppressWarnings("unchecked")
732    @Override
733    public long renew(Token<?> token, Configuration conf) throws IOException {
734      Token<DelegationTokenIdentifier> delToken =
735          (Token<DelegationTokenIdentifier>) token;
736      ClientProtocol nn = getNNProxy(delToken, conf);
737      try {
738        return nn.renewDelegationToken(delToken);
739      } catch (RemoteException re) {
740        throw re.unwrapRemoteException(InvalidToken.class,
741            AccessControlException.class);
742      }
743    }
744
745    @SuppressWarnings("unchecked")
746    @Override
747    public void cancel(Token<?> token, Configuration conf) throws IOException {
748      Token<DelegationTokenIdentifier> delToken =
749          (Token<DelegationTokenIdentifier>) token;
750      LOG.info("Cancelling " +
751          DelegationTokenIdentifier.stringifyToken(delToken));
752      ClientProtocol nn = getNNProxy(delToken, conf);
753      try {
754        nn.cancelDelegationToken(delToken);
755      } catch (RemoteException re) {
756        throw re.unwrapRemoteException(InvalidToken.class,
757            AccessControlException.class);
758      }
759    }
760
761    private static ClientProtocol getNNProxy(
762        Token<DelegationTokenIdentifier> token, Configuration conf)
763        throws IOException {
764      URI uri = HAUtilClient.getServiceUriFromToken(
765          HdfsConstants.HDFS_URI_SCHEME, token);
766      if (HAUtilClient.isTokenForLogicalUri(token) &&
767          !HAUtilClient.isLogicalUri(conf, uri)) {
768        // If the token is for a logical nameservice, but the configuration
769        // we have disagrees about that, we can't actually renew it.
770        // This can be the case in MR, for example, if the RM doesn't
771        // have all of the HA clusters configured in its configuration.
772        throw new IOException("Unable to map logical nameservice URI '" +
773            uri + "' to a NameNode. Local configuration does not have " +
774            "a failover proxy provider configured.");
775      }
776
777      ProxyAndInfo<ClientProtocol> info =
778          NameNodeProxiesClient.createProxyWithClientProtocol(conf, uri, null);
779      assert info.getDelegationTokenService().equals(token.getService()) :
780          "Returned service '" + info.getDelegationTokenService().toString() +
781              "' doesn't match expected service '" +
782              token.getService().toString() + "'";
783
784      return info.getProxy();
785    }
786
787    @Override
788    public boolean isManaged(Token<?> token) throws IOException {
789      return true;
790    }
791
792  }
793
794  /**
795   * Report corrupt blocks that were discovered by the client.
796   * @see ClientProtocol#reportBadBlocks(LocatedBlock[])
797   */
798  public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
799    namenode.reportBadBlocks(blocks);
800  }
801
802  public LocatedBlocks getLocatedBlocks(String src, long start)
803      throws IOException {
804    return getLocatedBlocks(src, start, dfsClientConf.getPrefetchSize());
805  }
806
807  /*
808   * This is just a wrapper around callGetBlockLocations, but non-static so that
809   * we can stub it out for tests.
810   */
811  @VisibleForTesting
812  public LocatedBlocks getLocatedBlocks(String src, long start, long length)
813      throws IOException {
814    try (TraceScope ignored = newPathTraceScope("getBlockLocations", src)) {
815      return callGetBlockLocations(namenode, src, start, length);
816    }
817  }
818
819  /**
820   * @see ClientProtocol#getBlockLocations(String, long, long)
821   */
822  static LocatedBlocks callGetBlockLocations(ClientProtocol namenode,
823      String src, long start, long length)
824      throws IOException {
825    try {
826      return namenode.getBlockLocations(src, start, length);
827    } catch(RemoteException re) {
828      throw re.unwrapRemoteException(AccessControlException.class,
829          FileNotFoundException.class,
830          UnresolvedPathException.class);
831    }
832  }
833
834  /**
835   * Recover a file's lease
836   * @param src a file's path
837   * @return true if the file is already closed
838   * @throws IOException
839   */
840  boolean recoverLease(String src) throws IOException {
841    checkOpen();
842
843    try (TraceScope ignored = newPathTraceScope("recoverLease", src)) {
844      return namenode.recoverLease(src, clientName);
845    } catch (RemoteException re) {
846      throw re.unwrapRemoteException(FileNotFoundException.class,
847          AccessControlException.class,
848          UnresolvedPathException.class);
849    }
850  }
851
852  /**
853   * Get block location info about file
854   *
855   * getBlockLocations() returns a list of hostnames that store
856   * data for a specific file region.  It returns a set of hostnames
857   * for every block within the indicated region.
858   *
859   * This function is very useful when writing code that considers
860   * data-placement when performing operations.  For example, the
861   * MapReduce system tries to schedule tasks on the same machines
862   * as the data-block the task processes.
863   */
864  public BlockLocation[] getBlockLocations(String src, long start,
865      long length) throws IOException {
866    try (TraceScope ignored = newPathTraceScope("getBlockLocations", src)) {
867      LocatedBlocks blocks = getLocatedBlocks(src, start, length);
868      BlockLocation[] locations = DFSUtilClient.locatedBlocks2Locations(blocks);
869      HdfsBlockLocation[] hdfsLocations =
870          new HdfsBlockLocation[locations.length];
871      for (int i = 0; i < locations.length; i++) {
872        hdfsLocations[i] = new HdfsBlockLocation(locations[i], blocks.get(i));
873      }
874      return hdfsLocations;
875    }
876  }
877
878  /**
879   * Get block location information about a list of {@link HdfsBlockLocation}.
880   * Used by {@link DistributedFileSystem#getFileBlockStorageLocations(List)} to
881   * get {@link BlockStorageLocation}s for blocks returned by
882   * {@link DistributedFileSystem#getFileBlockLocations(org.apache.hadoop.fs.FileStatus, long, long)}
883   * .
884   * 
885   * This is done by making a round of RPCs to the associated datanodes, asking
886   * the volume of each block replica. The returned array of
887   * {@link BlockStorageLocation} expose this information as a
888   * {@link VolumeId}.
889   * 
890   * @param blockLocations
891   *          target blocks on which to query volume location information
892   * @return volumeBlockLocations original block array augmented with additional
893   *         volume location information for each replica.
894   */
895  public BlockStorageLocation[] getBlockStorageLocations(
896      List<BlockLocation> blockLocations) throws IOException,
897      UnsupportedOperationException, InvalidBlockTokenException {
898    if (!getConf().isHdfsBlocksMetadataEnabled()) {
899      throw new UnsupportedOperationException("Datanode-side support for " +
900          "getVolumeBlockLocations() must also be enabled in the client " +
901          "configuration.");
902    }
903    // Downcast blockLocations and fetch out required LocatedBlock(s)
904    List<LocatedBlock> blocks = new ArrayList<LocatedBlock>();
905    for (BlockLocation loc : blockLocations) {
906      if (!(loc instanceof HdfsBlockLocation)) {
907        throw new ClassCastException("DFSClient#getVolumeBlockLocations " +
908            "expected to be passed HdfsBlockLocations");
909      }
910      HdfsBlockLocation hdfsLoc = (HdfsBlockLocation) loc;
911      blocks.add(hdfsLoc.getLocatedBlock());
912    }
913    
914    // Re-group the LocatedBlocks to be grouped by datanodes, with the values
915    // a list of the LocatedBlocks on the datanode.
916    Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks = 
917        new LinkedHashMap<DatanodeInfo, List<LocatedBlock>>();
918    for (LocatedBlock b : blocks) {
919      for (DatanodeInfo info : b.getLocations()) {
920        if (!datanodeBlocks.containsKey(info)) {
921          datanodeBlocks.put(info, new ArrayList<LocatedBlock>());
922        }
923        List<LocatedBlock> l = datanodeBlocks.get(info);
924        l.add(b);
925      }
926    }
927        
928    // Make RPCs to the datanodes to get volume locations for its replicas
929    TraceScope scope =
930      tracer.newScope("getBlockStorageLocations");
931    Map<DatanodeInfo, HdfsBlocksMetadata> metadatas;
932    try {
933      metadatas = BlockStorageLocationUtil.
934          queryDatanodesForHdfsBlocksMetadata(conf, datanodeBlocks,
935              getConf().getFileBlockStorageLocationsNumThreads(),
936              getConf().getFileBlockStorageLocationsTimeoutMs(),
937              getConf().isConnectToDnViaHostname(), tracer, scope.getSpanId());
938      if (LOG.isTraceEnabled()) {
939        LOG.trace("metadata returned: "
940            + Joiner.on("\n").withKeyValueSeparator("=").join(metadatas));
941      }
942    } finally {
943      scope.close();
944    }
945    
946    // Regroup the returned VolumeId metadata to again be grouped by
947    // LocatedBlock rather than by datanode
948    Map<LocatedBlock, List<VolumeId>> blockVolumeIds = BlockStorageLocationUtil
949        .associateVolumeIdsWithBlocks(blocks, metadatas);
950    
951    // Combine original BlockLocations with new VolumeId information
952    BlockStorageLocation[] volumeBlockLocations = BlockStorageLocationUtil
953        .convertToVolumeBlockLocations(blocks, blockVolumeIds);
954
955    return volumeBlockLocations;
956  }
957
958  /**
959   * Decrypts a EDEK by consulting the KeyProvider.
960   */
961  private KeyVersion decryptEncryptedDataEncryptionKey(FileEncryptionInfo
962      feInfo) throws IOException {
963    try (TraceScope ignored = tracer.newScope("decryptEDEK")) {
964      KeyProvider provider = getKeyProvider();
965      if (provider == null) {
966        throw new IOException("No KeyProvider is configured, cannot access" +
967            " an encrypted file");
968      }
969      EncryptedKeyVersion ekv = EncryptedKeyVersion.createForDecryption(
970          feInfo.getKeyName(), feInfo.getEzKeyVersionName(), feInfo.getIV(),
971          feInfo.getEncryptedDataEncryptionKey());
972      try {
973        KeyProviderCryptoExtension cryptoProvider = KeyProviderCryptoExtension
974            .createKeyProviderCryptoExtension(provider);
975        return cryptoProvider.decryptEncryptedKey(ekv);
976      } catch (GeneralSecurityException e) {
977        throw new IOException(e);
978      }
979    }
980  }
981
982  /**
983   * Obtain the crypto protocol version from the provided FileEncryptionInfo,
984   * checking to see if this version is supported by.
985   *
986   * @param feInfo FileEncryptionInfo
987   * @return CryptoProtocolVersion from the feInfo
988   * @throws IOException if the protocol version is unsupported.
989   */
990  private static CryptoProtocolVersion getCryptoProtocolVersion(
991      FileEncryptionInfo feInfo) throws IOException {
992    final CryptoProtocolVersion version = feInfo.getCryptoProtocolVersion();
993    if (!CryptoProtocolVersion.supports(version)) {
994      throw new IOException("Client does not support specified " +
995          "CryptoProtocolVersion " + version.getDescription() + " version " +
996          "number" + version.getVersion());
997    }
998    return version;
999  }
1000
1001  /**
1002   * Obtain a CryptoCodec based on the CipherSuite set in a FileEncryptionInfo
1003   * and the available CryptoCodecs configured in the Configuration.
1004   *
1005   * @param conf   Configuration
1006   * @param feInfo FileEncryptionInfo
1007   * @return CryptoCodec
1008   * @throws IOException if no suitable CryptoCodec for the CipherSuite is
1009   *                     available.
1010   */
1011  private static CryptoCodec getCryptoCodec(Configuration conf,
1012      FileEncryptionInfo feInfo) throws IOException {
1013    final CipherSuite suite = feInfo.getCipherSuite();
1014    if (suite.equals(CipherSuite.UNKNOWN)) {
1015      throw new IOException("NameNode specified unknown CipherSuite with ID "
1016          + suite.getUnknownValue() + ", cannot instantiate CryptoCodec.");
1017    }
1018    final CryptoCodec codec = CryptoCodec.getInstance(conf, suite);
1019    if (codec == null) {
1020      throw new UnknownCipherSuiteException(
1021          "No configuration found for the cipher suite "
1022              + suite.getConfigSuffix() + " prefixed with "
1023              + HADOOP_SECURITY_CRYPTO_CODEC_CLASSES_KEY_PREFIX
1024              + ". Please see the example configuration "
1025              + "hadoop.security.crypto.codec.classes.EXAMPLECIPHERSUITE "
1026              + "at core-default.xml for details.");
1027    }
1028    return codec;
1029  }
1030
1031  /**
1032   * Wraps the stream in a CryptoInputStream if the underlying file is
1033   * encrypted.
1034   */
1035  public HdfsDataInputStream createWrappedInputStream(DFSInputStream dfsis)
1036      throws IOException {
1037    final FileEncryptionInfo feInfo = dfsis.getFileEncryptionInfo();
1038    if (feInfo != null) {
1039      // File is encrypted, wrap the stream in a crypto stream.
1040      // Currently only one version, so no special logic based on the version #
1041      getCryptoProtocolVersion(feInfo);
1042      final CryptoCodec codec = getCryptoCodec(conf, feInfo);
1043      final KeyVersion decrypted = decryptEncryptedDataEncryptionKey(feInfo);
1044      final CryptoInputStream cryptoIn =
1045          new CryptoInputStream(dfsis, codec, decrypted.getMaterial(),
1046              feInfo.getIV());
1047      return new HdfsDataInputStream(cryptoIn);
1048    } else {
1049      // No FileEncryptionInfo so no encryption.
1050      return new HdfsDataInputStream(dfsis);
1051    }
1052  }
1053
1054  /**
1055   * Wraps the stream in a CryptoOutputStream if the underlying file is
1056   * encrypted.
1057   */
1058  public HdfsDataOutputStream createWrappedOutputStream(DFSOutputStream dfsos,
1059      FileSystem.Statistics statistics) throws IOException {
1060    return createWrappedOutputStream(dfsos, statistics, 0);
1061  }
1062
1063  /**
1064   * Wraps the stream in a CryptoOutputStream if the underlying file is
1065   * encrypted.
1066   */
1067  public HdfsDataOutputStream createWrappedOutputStream(DFSOutputStream dfsos,
1068      FileSystem.Statistics statistics, long startPos) throws IOException {
1069    final FileEncryptionInfo feInfo = dfsos.getFileEncryptionInfo();
1070    if (feInfo != null) {
1071      // File is encrypted, wrap the stream in a crypto stream.
1072      // Currently only one version, so no special logic based on the version #
1073      getCryptoProtocolVersion(feInfo);
1074      final CryptoCodec codec = getCryptoCodec(conf, feInfo);
1075      KeyVersion decrypted = decryptEncryptedDataEncryptionKey(feInfo);
1076      final CryptoOutputStream cryptoOut =
1077          new CryptoOutputStream(dfsos, codec,
1078              decrypted.getMaterial(), feInfo.getIV(), startPos);
1079      return new HdfsDataOutputStream(cryptoOut, statistics, startPos);
1080    } else {
1081      // No FileEncryptionInfo present so no encryption.
1082      return new HdfsDataOutputStream(dfsos, statistics, startPos);
1083    }
1084  }
1085
1086  public DFSInputStream open(String src) throws IOException {
1087    return open(src, dfsClientConf.getIoBufferSize(), true);
1088  }
1089
1090  /**
1091   * Create an input stream that obtains a nodelist from the
1092   * namenode, and then reads from all the right places.  Creates
1093   * inner subclass of InputStream that does the right out-of-band
1094   * work.
1095   * @deprecated Use {@link #open(String, int, boolean)} instead.
1096   */
1097  @Deprecated
1098  public DFSInputStream open(String src, int buffersize, boolean verifyChecksum,
1099      FileSystem.Statistics stats) throws IOException {
1100    return open(src, buffersize, verifyChecksum);
1101  }
1102
1103
1104  /**
1105   * Create an input stream that obtains a nodelist from the
1106   * namenode, and then reads from all the right places.  Creates
1107   * inner subclass of InputStream that does the right out-of-band
1108   * work.
1109   */
1110  public DFSInputStream open(String src, int buffersize, boolean verifyChecksum)
1111      throws IOException {
1112    checkOpen();
1113    //    Get block info from namenode
1114    try (TraceScope ignored = newPathTraceScope("newDFSInputStream", src)) {
1115      return new DFSInputStream(this, src, verifyChecksum, null);
1116    }
1117  }
1118
1119  /**
1120   * Get the namenode associated with this DFSClient object
1121   * @return the namenode associated with this DFSClient object
1122   */
1123  public ClientProtocol getNamenode() {
1124    return namenode;
1125  }
1126
1127  /**
1128   * Call {@link #create(String, boolean, short, long, Progressable)} with
1129   * default <code>replication</code> and <code>blockSize<code> and null <code>
1130   * progress</code>.
1131   */
1132  public OutputStream create(String src, boolean overwrite)
1133      throws IOException {
1134    return create(src, overwrite, dfsClientConf.getDefaultReplication(),
1135        dfsClientConf.getDefaultBlockSize(), null);
1136  }
1137
1138  /**
1139   * Call {@link #create(String, boolean, short, long, Progressable)} with
1140   * default <code>replication</code> and <code>blockSize<code>.
1141   */
1142  public OutputStream create(String src,
1143      boolean overwrite, Progressable progress) throws IOException {
1144    return create(src, overwrite, dfsClientConf.getDefaultReplication(),
1145        dfsClientConf.getDefaultBlockSize(), progress);
1146  }
1147
1148  /**
1149   * Call {@link #create(String, boolean, short, long, Progressable)} with
1150   * null <code>progress</code>.
1151   */
1152  public OutputStream create(String src, boolean overwrite, short replication,
1153      long blockSize) throws IOException {
1154    return create(src, overwrite, replication, blockSize, null);
1155  }
1156
1157  /**
1158   * Call {@link #create(String, boolean, short, long, Progressable, int)}
1159   * with default bufferSize.
1160   */
1161  public OutputStream create(String src, boolean overwrite, short replication,
1162      long blockSize, Progressable progress) throws IOException {
1163    return create(src, overwrite, replication, blockSize, progress,
1164        dfsClientConf.getIoBufferSize());
1165  }
1166
1167  /**
1168   * Call {@link #create(String, FsPermission, EnumSet, short, long,
1169   * Progressable, int, ChecksumOpt)} with default <code>permission</code>
1170   * {@link FsPermission#getFileDefault()}.
1171   *
1172   * @param src File name
1173   * @param overwrite overwrite an existing file if true
1174   * @param replication replication factor for the file
1175   * @param blockSize maximum block size
1176   * @param progress interface for reporting client progress
1177   * @param buffersize underlying buffersize
1178   *
1179   * @return output stream
1180   */
1181  public OutputStream create(String src, boolean overwrite, short replication,
1182      long blockSize, Progressable progress, int buffersize)
1183      throws IOException {
1184    return create(src, FsPermission.getFileDefault(),
1185        overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
1186            : EnumSet.of(CreateFlag.CREATE), replication, blockSize, progress,
1187        buffersize, null);
1188  }
1189
1190  /**
1191   * Call {@link #create(String, FsPermission, EnumSet, boolean, short,
1192   * long, Progressable, int, ChecksumOpt)} with <code>createParent</code>
1193   *  set to true.
1194   */
1195  public DFSOutputStream create(String src, FsPermission permission,
1196      EnumSet<CreateFlag> flag, short replication, long blockSize,
1197      Progressable progress, int buffersize, ChecksumOpt checksumOpt)
1198      throws IOException {
1199    return create(src, permission, flag, true,
1200        replication, blockSize, progress, buffersize, checksumOpt, null);
1201  }
1202
1203  /**
1204   * Create a new dfs file with the specified block replication
1205   * with write-progress reporting and return an output stream for writing
1206   * into the file.
1207   *
1208   * @param src File name
1209   * @param permission The permission of the directory being created.
1210   *          If null, use default permission
1211   *          {@link FsPermission#getFileDefault()}
1212   * @param flag indicates create a new file or create/overwrite an
1213   *          existing file or append to an existing file
1214   * @param createParent create missing parent directory if true
1215   * @param replication block replication
1216   * @param blockSize maximum block size
1217   * @param progress interface for reporting client progress
1218   * @param buffersize underlying buffer size
1219   * @param checksumOpt checksum options
1220   *
1221   * @return output stream
1222   *
1223   * @see ClientProtocol#create for detailed description of exceptions thrown
1224   */
1225  public DFSOutputStream create(String src, FsPermission permission,
1226      EnumSet<CreateFlag> flag, boolean createParent, short replication,
1227      long blockSize, Progressable progress, int buffersize,
1228      ChecksumOpt checksumOpt) throws IOException {
1229    return create(src, permission, flag, createParent, replication, blockSize,
1230        progress, buffersize, checksumOpt, null);
1231  }
1232
1233  private FsPermission applyUMask(FsPermission permission) {
1234    if (permission == null) {
1235      permission = FsPermission.getFileDefault();
1236    }
1237    return permission.applyUMask(dfsClientConf.getUMask());
1238  }
1239
1240  /**
1241   * Same as {@link #create(String, FsPermission, EnumSet, boolean, short, long,
1242   * Progressable, int, ChecksumOpt)} with the addition of favoredNodes that is
1243   * a hint to where the namenode should place the file blocks.
1244   * The favored nodes hint is not persisted in HDFS. Hence it may be honored
1245   * at the creation time only. HDFS could move the blocks during balancing or
1246   * replication, to move the blocks from favored nodes. A value of null means
1247   * no favored nodes for this create
1248   */
1249  public DFSOutputStream create(String src, FsPermission permission,
1250      EnumSet<CreateFlag> flag, boolean createParent, short replication,
1251      long blockSize, Progressable progress, int buffersize,
1252      ChecksumOpt checksumOpt, InetSocketAddress[] favoredNodes)
1253      throws IOException {
1254    checkOpen();
1255    final FsPermission masked = applyUMask(permission);
1256    LOG.debug("{}: masked={}", src, masked);
1257    final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
1258        src, masked, flag, createParent, replication, blockSize, progress,
1259        dfsClientConf.createChecksum(checksumOpt),
1260        getFavoredNodesStr(favoredNodes));
1261    beginFileLease(result.getFileId(), result);
1262    return result;
1263  }
1264
1265  private String[] getFavoredNodesStr(InetSocketAddress[] favoredNodes) {
1266    String[] favoredNodeStrs = null;
1267    if (favoredNodes != null) {
1268      favoredNodeStrs = new String[favoredNodes.length];
1269      for (int i = 0; i < favoredNodes.length; i++) {
1270        favoredNodeStrs[i] =
1271            favoredNodes[i].getHostName() + ":" + favoredNodes[i].getPort();
1272      }
1273    }
1274    return favoredNodeStrs;
1275  }
1276
1277  /**
1278   * Append to an existing file if {@link CreateFlag#APPEND} is present
1279   */
1280  private DFSOutputStream primitiveAppend(String src, EnumSet<CreateFlag> flag,
1281      Progressable progress) throws IOException {
1282    if (flag.contains(CreateFlag.APPEND)) {
1283      HdfsFileStatus stat = getFileInfo(src);
1284      if (stat == null) { // No file to append to
1285        // New file needs to be created if create option is present
1286        if (!flag.contains(CreateFlag.CREATE)) {
1287          throw new FileNotFoundException(
1288              "failed to append to non-existent file " + src + " on client "
1289                  + clientName);
1290        }
1291        return null;
1292      }
1293      return callAppend(src, flag, progress, null);
1294    }
1295    return null;
1296  }
1297
1298  /**
1299   * Same as {{@link #create(String, FsPermission, EnumSet, short, long,
1300   *  Progressable, int, ChecksumOpt)} except that the permission
1301   *  is absolute (ie has already been masked with umask.
1302   */
1303  public DFSOutputStream primitiveCreate(String src, FsPermission absPermission,
1304      EnumSet<CreateFlag> flag, boolean createParent, short replication,
1305      long blockSize, Progressable progress, int buffersize,
1306      ChecksumOpt checksumOpt) throws IOException {
1307    checkOpen();
1308    CreateFlag.validate(flag);
1309    DFSOutputStream result = primitiveAppend(src, flag, progress);
1310    if (result == null) {
1311      DataChecksum checksum = dfsClientConf.createChecksum(checksumOpt);
1312      result = DFSOutputStream.newStreamForCreate(this, src, absPermission,
1313          flag, createParent, replication, blockSize, progress, checksum, null);
1314    }
1315    beginFileLease(result.getFileId(), result);
1316    return result;
1317  }
1318
1319  /**
1320   * Creates a symbolic link.
1321   *
1322   * @see ClientProtocol#createSymlink(String, String,FsPermission, boolean)
1323   */
1324  public void createSymlink(String target, String link, boolean createParent)
1325      throws IOException {
1326    try (TraceScope ignored = newPathTraceScope("createSymlink", target)) {
1327      final FsPermission dirPerm = applyUMask(null);
1328      namenode.createSymlink(target, link, dirPerm, createParent);
1329    } catch (RemoteException re) {
1330      throw re.unwrapRemoteException(AccessControlException.class,
1331          FileAlreadyExistsException.class,
1332          FileNotFoundException.class,
1333          ParentNotDirectoryException.class,
1334          NSQuotaExceededException.class,
1335          DSQuotaExceededException.class,
1336          QuotaByStorageTypeExceededException.class,
1337          UnresolvedPathException.class,
1338          SnapshotAccessControlException.class);
1339    }
1340  }
1341
1342  /**
1343   * Resolve the *first* symlink, if any, in the path.
1344   *
1345   * @see ClientProtocol#getLinkTarget(String)
1346   */
1347  public String getLinkTarget(String path) throws IOException {
1348    checkOpen();
1349    try (TraceScope ignored = newPathTraceScope("getLinkTarget", path)) {
1350      return namenode.getLinkTarget(path);
1351    } catch (RemoteException re) {
1352      throw re.unwrapRemoteException(AccessControlException.class,
1353          FileNotFoundException.class);
1354    }
1355  }
1356
1357  /**
1358   * Invoke namenode append RPC.
1359   * It retries in case of {@link BlockNotYetCompleteException}.
1360   */
1361  private LastBlockWithStatus callAppend(String src,
1362      EnumSetWritable<CreateFlag> flag) throws IOException {
1363    final long startTime = Time.monotonicNow();
1364    for(;;) {
1365      try {
1366        return namenode.append(src, clientName, flag);
1367      } catch(RemoteException re) {
1368        if (Time.monotonicNow() - startTime > 5000
1369            || !RetriableException.class.getName().equals(
1370                re.getClassName())) {
1371          throw re;
1372        }
1373
1374        try { // sleep and retry
1375          Thread.sleep(500);
1376        } catch (InterruptedException e) {
1377          throw DFSUtilClient.toInterruptedIOException("callAppend", e);
1378        }
1379      }
1380    }
1381  }
1382
1383  /** Method to get stream returned by append call */
1384  private DFSOutputStream callAppend(String src, EnumSet<CreateFlag> flag,
1385      Progressable progress, String[] favoredNodes) throws IOException {
1386    CreateFlag.validateForAppend(flag);
1387    try {
1388      final LastBlockWithStatus blkWithStatus = callAppend(src,
1389          new EnumSetWritable<>(flag, CreateFlag.class));
1390      HdfsFileStatus status = blkWithStatus.getFileStatus();
1391      if (status == null) {
1392        LOG.debug("NameNode is on an older version, request file " +
1393            "info with additional RPC call for file: {}", src);
1394        status = getFileInfo(src);
1395      }
1396      return DFSOutputStream.newStreamForAppend(this, src, flag, progress,
1397          blkWithStatus.getLastBlock(), status,
1398          dfsClientConf.createChecksum(null), favoredNodes);
1399    } catch(RemoteException re) {
1400      throw re.unwrapRemoteException(AccessControlException.class,
1401          FileNotFoundException.class,
1402          SafeModeException.class,
1403          DSQuotaExceededException.class,
1404          QuotaByStorageTypeExceededException.class,
1405          UnsupportedOperationException.class,
1406          UnresolvedPathException.class,
1407          SnapshotAccessControlException.class);
1408    }
1409  }
1410
1411  /**
1412   * Append to an existing HDFS file.
1413   *
1414   * @param src file name
1415   * @param buffersize buffer size
1416   * @param flag indicates whether to append data to a new block instead of
1417   *             the last block
1418   * @param progress for reporting write-progress; null is acceptable.
1419   * @param statistics file system statistics; null is acceptable.
1420   * @return an output stream for writing into the file
1421   *
1422   * @see ClientProtocol#append(String, String, EnumSetWritable)
1423   */
1424  public HdfsDataOutputStream append(final String src, final int buffersize,
1425      EnumSet<CreateFlag> flag, final Progressable progress,
1426      final FileSystem.Statistics statistics) throws IOException {
1427    final DFSOutputStream out = append(src, buffersize, flag, null, progress);
1428    return createWrappedOutputStream(out, statistics, out.getInitialLen());
1429  }
1430
1431  /**
1432   * Append to an existing HDFS file.
1433   *
1434   * @param src file name
1435   * @param buffersize buffer size
1436   * @param flag indicates whether to append data to a new block instead of the
1437   *          last block
1438   * @param progress for reporting write-progress; null is acceptable.
1439   * @param statistics file system statistics; null is acceptable.
1440   * @param favoredNodes FavoredNodes for new blocks
1441   * @return an output stream for writing into the file
1442   * @see ClientProtocol#append(String, String, EnumSetWritable)
1443   */
1444  public HdfsDataOutputStream append(final String src, final int buffersize,
1445      EnumSet<CreateFlag> flag, final Progressable progress,
1446      final FileSystem.Statistics statistics,
1447      final InetSocketAddress[] favoredNodes) throws IOException {
1448    final DFSOutputStream out = append(src, buffersize, flag,
1449        getFavoredNodesStr(favoredNodes), progress);
1450    return createWrappedOutputStream(out, statistics, out.getInitialLen());
1451  }
1452
1453  private DFSOutputStream append(String src, int buffersize,
1454      EnumSet<CreateFlag> flag, String[] favoredNodes, Progressable progress)
1455      throws IOException {
1456    checkOpen();
1457    final DFSOutputStream result = callAppend(src, flag, progress,
1458        favoredNodes);
1459    beginFileLease(result.getFileId(), result);
1460    return result;
1461  }
1462
1463  /**
1464   * Set replication for an existing file.
1465   * @param src file name
1466   * @param replication replication to set the file to
1467   *
1468   * @see ClientProtocol#setReplication(String, short)
1469   */
1470  public boolean setReplication(String src, short replication)
1471      throws IOException {
1472    try (TraceScope ignored = newPathTraceScope("setReplication", src)) {
1473      return namenode.setReplication(src, replication);
1474    } catch (RemoteException re) {
1475      throw re.unwrapRemoteException(AccessControlException.class,
1476          FileNotFoundException.class,
1477          SafeModeException.class,
1478          DSQuotaExceededException.class,
1479          QuotaByStorageTypeExceededException.class,
1480          UnresolvedPathException.class,
1481          SnapshotAccessControlException.class);
1482    }
1483  }
1484
1485  /**
1486   * Set storage policy for an existing file/directory
1487   * @param src file/directory name
1488   * @param policyName name of the storage policy
1489   */
1490  public void setStoragePolicy(String src, String policyName)
1491      throws IOException {
1492    try (TraceScope ignored = newPathTraceScope("setStoragePolicy", src)) {
1493      namenode.setStoragePolicy(src, policyName);
1494    } catch (RemoteException e) {
1495      throw e.unwrapRemoteException(AccessControlException.class,
1496          FileNotFoundException.class,
1497          SafeModeException.class,
1498          NSQuotaExceededException.class,
1499          UnresolvedPathException.class,
1500          SnapshotAccessControlException.class);
1501    }
1502  }
1503
1504  /**
1505   * Unset storage policy set for a given file/directory.
1506   * @param src file/directory name
1507   */
1508  public void unsetStoragePolicy(String src) throws IOException {
1509    checkOpen();
1510    try (TraceScope ignored = newPathTraceScope("unsetStoragePolicy", src)) {
1511      namenode.unsetStoragePolicy(src);
1512    } catch (RemoteException e) {
1513      throw e.unwrapRemoteException(AccessControlException.class,
1514          FileNotFoundException.class,
1515          SafeModeException.class,
1516          NSQuotaExceededException.class,
1517          UnresolvedPathException.class,
1518          SnapshotAccessControlException.class);
1519    }
1520  }
1521
1522  /**
1523   * @param path file/directory name
1524   * @return Get the storage policy for specified path
1525   */
1526  public BlockStoragePolicy getStoragePolicy(String path) throws IOException {
1527    checkOpen();
1528    try (TraceScope ignored = newPathTraceScope("getStoragePolicy", path)) {
1529      return namenode.getStoragePolicy(path);
1530    } catch (RemoteException e) {
1531      throw e.unwrapRemoteException(AccessControlException.class,
1532          FileNotFoundException.class,
1533          SafeModeException.class,
1534          UnresolvedPathException.class);
1535    }
1536  }
1537
1538  /**
1539   * @return All the existing storage policies
1540   */
1541  public BlockStoragePolicy[] getStoragePolicies() throws IOException {
1542    try (TraceScope ignored = tracer.newScope("getStoragePolicies")) {
1543      return namenode.getStoragePolicies();
1544    }
1545  }
1546
1547  /**
1548   * Rename file or directory.
1549   * @see ClientProtocol#rename(String, String)
1550   * @deprecated Use {@link #rename(String, String, Options.Rename...)} instead.
1551   */
1552  @Deprecated
1553  public boolean rename(String src, String dst) throws IOException {
1554    checkOpen();
1555    try (TraceScope ignored = newSrcDstTraceScope("rename", src, dst)) {
1556      return namenode.rename(src, dst);
1557    } catch (RemoteException re) {
1558      throw re.unwrapRemoteException(AccessControlException.class,
1559          NSQuotaExceededException.class,
1560          DSQuotaExceededException.class,
1561          QuotaByStorageTypeExceededException.class,
1562          UnresolvedPathException.class,
1563          SnapshotAccessControlException.class);
1564    }
1565  }
1566
1567  /**
1568   * Move blocks from src to trg and delete src
1569   * See {@link ClientProtocol#concat}.
1570   */
1571  public void concat(String trg, String [] srcs) throws IOException {
1572    checkOpen();
1573    try (TraceScope ignored = tracer.newScope("concat")) {
1574      namenode.concat(trg, srcs);
1575    } catch (RemoteException re) {
1576      throw re.unwrapRemoteException(AccessControlException.class,
1577          UnresolvedPathException.class,
1578          SnapshotAccessControlException.class);
1579    }
1580  }
1581  /**
1582   * Rename file or directory.
1583   * @see ClientProtocol#rename2(String, String, Options.Rename...)
1584   */
1585  public void rename(String src, String dst, Options.Rename... options)
1586      throws IOException {
1587    checkOpen();
1588    try (TraceScope ignored = newSrcDstTraceScope("rename2", src, dst)) {
1589      namenode.rename2(src, dst, options);
1590    } catch (RemoteException re) {
1591      throw re.unwrapRemoteException(AccessControlException.class,
1592          DSQuotaExceededException.class,
1593          QuotaByStorageTypeExceededException.class,
1594          FileAlreadyExistsException.class,
1595          FileNotFoundException.class,
1596          ParentNotDirectoryException.class,
1597          SafeModeException.class,
1598          NSQuotaExceededException.class,
1599          UnresolvedPathException.class,
1600          SnapshotAccessControlException.class);
1601    }
1602  }
1603
1604  /**
1605   * Truncate a file to an indicated size
1606   * See {@link ClientProtocol#truncate}.
1607   */
1608  public boolean truncate(String src, long newLength) throws IOException {
1609    checkOpen();
1610    if (newLength < 0) {
1611      throw new HadoopIllegalArgumentException(
1612          "Cannot truncate to a negative file size: " + newLength + ".");
1613    }
1614    try (TraceScope ignored = newPathTraceScope("truncate", src)) {
1615      return namenode.truncate(src, newLength, clientName);
1616    } catch (RemoteException re) {
1617      throw re.unwrapRemoteException(AccessControlException.class,
1618          UnresolvedPathException.class);
1619    }
1620  }
1621
1622  /**
1623   * Delete file or directory.
1624   * See {@link ClientProtocol#delete(String, boolean)}.
1625   */
1626  @Deprecated
1627  public boolean delete(String src) throws IOException {
1628    checkOpen();
1629    return delete(src, true);
1630  }
1631
1632  /**
1633   * delete file or directory.
1634   * delete contents of the directory if non empty and recursive
1635   * set to true
1636   *
1637   * @see ClientProtocol#delete(String, boolean)
1638   */
1639  public boolean delete(String src, boolean recursive) throws IOException {
1640    checkOpen();
1641    try (TraceScope ignored = newPathTraceScope("delete", src)) {
1642      return namenode.delete(src, recursive);
1643    } catch (RemoteException re) {
1644      throw re.unwrapRemoteException(AccessControlException.class,
1645          FileNotFoundException.class,
1646          SafeModeException.class,
1647          UnresolvedPathException.class,
1648          SnapshotAccessControlException.class);
1649    }
1650  }
1651
1652  /** Implemented using getFileInfo(src)
1653   */
1654  public boolean exists(String src) throws IOException {
1655    checkOpen();
1656    return getFileInfo(src) != null;
1657  }
1658
1659  /**
1660   * Get a partial listing of the indicated directory
1661   * No block locations need to be fetched
1662   */
1663  public DirectoryListing listPaths(String src,  byte[] startAfter)
1664      throws IOException {
1665    return listPaths(src, startAfter, false);
1666  }
1667
1668  /**
1669   * Get a partial listing of the indicated directory
1670   *
1671   * Recommend to use HdfsFileStatus.EMPTY_NAME as startAfter
1672   * if the application wants to fetch a listing starting from
1673   * the first entry in the directory
1674   *
1675   * @see ClientProtocol#getListing(String, byte[], boolean)
1676   */
1677  public DirectoryListing listPaths(String src,  byte[] startAfter,
1678      boolean needLocation) throws IOException {
1679    checkOpen();
1680    try (TraceScope ignored = newPathTraceScope("listPaths", src)) {
1681      return namenode.getListing(src, startAfter, needLocation);
1682    } catch (RemoteException re) {
1683      throw re.unwrapRemoteException(AccessControlException.class,
1684          FileNotFoundException.class,
1685          UnresolvedPathException.class);
1686    }
1687  }
1688
1689  /**
1690   * Get the file info for a specific file or directory.
1691   * @param src The string representation of the path to the file
1692   * @return object containing information regarding the file
1693   *         or null if file not found
1694   *
1695   * @see ClientProtocol#getFileInfo(String) for description of exceptions
1696   */
1697  public HdfsFileStatus getFileInfo(String src) throws IOException {
1698    checkOpen();
1699    try (TraceScope ignored = newPathTraceScope("getFileInfo", src)) {
1700      return namenode.getFileInfo(src);
1701    } catch (RemoteException re) {
1702      throw re.unwrapRemoteException(AccessControlException.class,
1703          FileNotFoundException.class,
1704          UnresolvedPathException.class);
1705    }
1706  }
1707
1708  /**
1709   * Close status of a file
1710   * @return true if file is already closed
1711   */
1712  public boolean isFileClosed(String src) throws IOException{
1713    checkOpen();
1714    try (TraceScope ignored = newPathTraceScope("isFileClosed", src)) {
1715      return namenode.isFileClosed(src);
1716    } catch (RemoteException re) {
1717      throw re.unwrapRemoteException(AccessControlException.class,
1718          FileNotFoundException.class,
1719          UnresolvedPathException.class);
1720    }
1721  }
1722
1723  /**
1724   * Get the file info for a specific file or directory. If src
1725   * refers to a symlink then the FileStatus of the link is returned.
1726   * @param src path to a file or directory.
1727   *
1728   * For description of exceptions thrown
1729   * @see ClientProtocol#getFileLinkInfo(String)
1730   */
1731  public HdfsFileStatus getFileLinkInfo(String src) throws IOException {
1732    checkOpen();
1733    try (TraceScope ignored = newPathTraceScope("getFileLinkInfo", src)) {
1734      return namenode.getFileLinkInfo(src);
1735    } catch (RemoteException re) {
1736      throw re.unwrapRemoteException(AccessControlException.class,
1737          UnresolvedPathException.class);
1738    }
1739  }
1740
1741  @InterfaceAudience.Private
1742  public void clearDataEncryptionKey() {
1743    LOG.debug("Clearing encryption key");
1744    synchronized (this) {
1745      encryptionKey = null;
1746    }
1747  }
1748
1749  /**
1750   * @return true if data sent between this client and DNs should be encrypted,
1751   *         false otherwise.
1752   * @throws IOException in the event of error communicating with the NN
1753   */
1754  boolean shouldEncryptData() throws IOException {
1755    FsServerDefaults d = getServerDefaults();
1756    return d != null && d.getEncryptDataTransfer();
1757  }
1758
1759  @Override
1760  public DataEncryptionKey newDataEncryptionKey() throws IOException {
1761    if (shouldEncryptData()) {
1762      synchronized (this) {
1763        if (encryptionKey == null ||
1764            encryptionKey.expiryDate < Time.now()) {
1765          LOG.debug("Getting new encryption token from NN");
1766          encryptionKey = namenode.getDataEncryptionKey();
1767        }
1768        return encryptionKey;
1769      }
1770    } else {
1771      return null;
1772    }
1773  }
1774
1775  @VisibleForTesting
1776  public DataEncryptionKey getEncryptionKey() {
1777    return encryptionKey;
1778  }
1779
1780  /**
1781   * Get the checksum of the whole file of a range of the file. Note that the
1782   * range always starts from the beginning of the file.
1783   * @param src The file path
1784   * @param length the length of the range, i.e., the range is [0, length]
1785   * @return The checksum
1786   * @see DistributedFileSystem#getFileChecksum(Path)
1787   */
1788  public MD5MD5CRC32FileChecksum getFileChecksum(String src, long length)
1789      throws IOException {
1790    checkOpen();
1791    Preconditions.checkArgument(length >= 0);
1792    //get block locations for the file range
1793    LocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0,
1794        length);
1795    if (null == blockLocations) {
1796      throw new FileNotFoundException("File does not exist: " + src);
1797    }
1798    if (blockLocations.isUnderConstruction()) {
1799      throw new IOException("Fail to get checksum, since file " + src
1800          + " is under construction.");
1801    }
1802    List<LocatedBlock> locatedblocks = blockLocations.getLocatedBlocks();
1803    final DataOutputBuffer md5out = new DataOutputBuffer();
1804    int bytesPerCRC = -1;
1805    DataChecksum.Type crcType = DataChecksum.Type.DEFAULT;
1806    long crcPerBlock = 0;
1807    boolean refetchBlocks = false;
1808    int lastRetriedIndex = -1;
1809
1810    // get block checksum for each block
1811    long remaining = length;
1812    if (src.contains(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR_SEPARATOR)) {
1813      remaining = Math.min(length, blockLocations.getFileLength());
1814    }
1815    for(int i = 0; i < locatedblocks.size() && remaining > 0; i++) {
1816      if (refetchBlocks) {  // refetch to get fresh tokens
1817        blockLocations = callGetBlockLocations(namenode, src, 0, length);
1818        if (null == blockLocations) {
1819          throw new FileNotFoundException("File does not exist: " + src);
1820        }
1821        if (blockLocations.isUnderConstruction()) {
1822          throw new IOException("Fail to get checksum, since file " + src
1823              + " is under construction.");
1824        }
1825        locatedblocks = blockLocations.getLocatedBlocks();
1826        refetchBlocks = false;
1827      }
1828      LocatedBlock lb = locatedblocks.get(i);
1829      final ExtendedBlock block = lb.getBlock();
1830      if (remaining < block.getNumBytes()) {
1831        block.setNumBytes(remaining);
1832      }
1833      remaining -= block.getNumBytes();
1834      final DatanodeInfo[] datanodes = lb.getLocations();
1835
1836      //try each datanode location of the block
1837      final int timeout = 3000 * datanodes.length +
1838          dfsClientConf.getSocketTimeout();
1839      boolean done = false;
1840      for(int j = 0; !done && j < datanodes.length; j++) {
1841        DataOutputStream out = null;
1842        DataInputStream in = null;
1843
1844        try {
1845          //connect to a datanode
1846          IOStreamPair pair = connectToDN(datanodes[j], timeout, lb);
1847          out = new DataOutputStream(new BufferedOutputStream(pair.out,
1848              smallBufferSize));
1849          in = new DataInputStream(pair.in);
1850
1851          LOG.debug("write to {}: {}, block={}",
1852              datanodes[j], Op.BLOCK_CHECKSUM, block);
1853          // get block MD5
1854          new Sender(out).blockChecksum(block, lb.getBlockToken());
1855
1856          final BlockOpResponseProto reply =
1857              BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(in));
1858
1859          String logInfo = "for block " + block + " from datanode " +
1860              datanodes[j];
1861          DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo);
1862
1863          OpBlockChecksumResponseProto checksumData =
1864              reply.getChecksumResponse();
1865
1866          //read byte-per-checksum
1867          final int bpc = checksumData.getBytesPerCrc();
1868          if (i == 0) { //first block
1869            bytesPerCRC = bpc;
1870          }
1871          else if (bpc != bytesPerCRC) {
1872            throw new IOException("Byte-per-checksum not matched: bpc=" + bpc
1873                + " but bytesPerCRC=" + bytesPerCRC);
1874          }
1875
1876          //read crc-per-block
1877          final long cpb = checksumData.getCrcPerBlock();
1878          if (locatedblocks.size() > 1 && i == 0) {
1879            crcPerBlock = cpb;
1880          }
1881
1882          //read md5
1883          final MD5Hash md5 = new MD5Hash(
1884              checksumData.getMd5().toByteArray());
1885          md5.write(md5out);
1886
1887          // read crc-type
1888          final DataChecksum.Type ct;
1889          if (checksumData.hasCrcType()) {
1890            ct = PBHelperClient.convert(checksumData
1891                .getCrcType());
1892          } else {
1893            LOG.debug("Retrieving checksum from an earlier-version DataNode: " +
1894                "inferring checksum by reading first byte");
1895            ct = inferChecksumTypeByReading(lb, datanodes[j]);
1896          }
1897
1898          if (i == 0) { // first block
1899            crcType = ct;
1900          } else if (crcType != DataChecksum.Type.MIXED
1901              && crcType != ct) {
1902            // if crc types are mixed in a file
1903            crcType = DataChecksum.Type.MIXED;
1904          }
1905
1906          done = true;
1907
1908          if (LOG.isDebugEnabled()) {
1909            if (i == 0) {
1910              LOG.debug("set bytesPerCRC=" + bytesPerCRC
1911                  + ", crcPerBlock=" + crcPerBlock);
1912            }
1913            LOG.debug("got reply from " + datanodes[j] + ": md5=" + md5);
1914          }
1915        } catch (InvalidBlockTokenException ibte) {
1916          if (i > lastRetriedIndex) {
1917            LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM "
1918                    + "for file {} for block {} from datanode {}. Will retry "
1919                    + "the block once.",
1920                src, block, datanodes[j]);
1921            lastRetriedIndex = i;
1922            done = true; // actually it's not done; but we'll retry
1923            i--; // repeat at i-th block
1924            refetchBlocks = true;
1925            break;
1926          }
1927        } catch (IOException ie) {
1928          LOG.warn("src=" + src + ", datanodes["+j+"]=" + datanodes[j], ie);
1929        } finally {
1930          IOUtils.closeStream(in);
1931          IOUtils.closeStream(out);
1932        }
1933      }
1934
1935      if (!done) {
1936        throw new IOException("Fail to get block MD5 for " + block);
1937      }
1938    }
1939
1940    //compute file MD5
1941    final MD5Hash fileMD5 = MD5Hash.digest(md5out.getData());
1942    switch (crcType) {
1943    case CRC32:
1944      return new MD5MD5CRC32GzipFileChecksum(bytesPerCRC,
1945          crcPerBlock, fileMD5);
1946    case CRC32C:
1947      return new MD5MD5CRC32CastagnoliFileChecksum(bytesPerCRC,
1948          crcPerBlock, fileMD5);
1949    default:
1950      // If there is no block allocated for the file,
1951      // return one with the magic entry that matches what previous
1952      // hdfs versions return.
1953      if (locatedblocks.size() == 0) {
1954        return new MD5MD5CRC32GzipFileChecksum(0, 0, fileMD5);
1955      }
1956
1957      // we should never get here since the validity was checked
1958      // when getCrcType() was called above.
1959      return null;
1960    }
1961  }
1962
1963  /**
1964   * Connect to the given datanode's datantrasfer port, and return
1965   * the resulting IOStreamPair. This includes encryption wrapping, etc.
1966   */
1967  private IOStreamPair connectToDN(DatanodeInfo dn, int timeout,
1968      LocatedBlock lb) throws IOException {
1969    boolean success = false;
1970    Socket sock = null;
1971    try {
1972      sock = socketFactory.createSocket();
1973      String dnAddr = dn.getXferAddr(getConf().isConnectToDnViaHostname());
1974      LOG.debug("Connecting to datanode {}", dnAddr);
1975      NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr), timeout);
1976      sock.setTcpNoDelay(dfsClientConf.getDataTransferTcpNoDelay());
1977      sock.setSoTimeout(timeout);
1978
1979      OutputStream unbufOut = NetUtils.getOutputStream(sock);
1980      InputStream unbufIn = NetUtils.getInputStream(sock);
1981      IOStreamPair ret = saslClient.newSocketSend(sock, unbufOut, unbufIn, this,
1982          lb.getBlockToken(), dn);
1983      success = true;
1984      return ret;
1985    } finally {
1986      if (!success) {
1987        IOUtils.closeSocket(sock);
1988      }
1989    }
1990  }
1991
1992  /**
1993   * Infer the checksum type for a replica by sending an OP_READ_BLOCK
1994   * for the first byte of that replica. This is used for compatibility
1995   * with older HDFS versions which did not include the checksum type in
1996   * OpBlockChecksumResponseProto.
1997   *
1998   * @param lb the located block
1999   * @param dn the connected datanode
2000   * @return the inferred checksum type
2001   * @throws IOException if an error occurs
2002   */
2003  private Type inferChecksumTypeByReading(LocatedBlock lb, DatanodeInfo dn)
2004      throws IOException {
2005    IOStreamPair pair = connectToDN(dn, dfsClientConf.getSocketTimeout(), lb);
2006
2007    try {
2008      DataOutputStream out = new DataOutputStream(
2009          new BufferedOutputStream(pair.out, smallBufferSize));
2010      DataInputStream in = new DataInputStream(pair.in);
2011
2012      new Sender(out).readBlock(lb.getBlock(), lb.getBlockToken(), clientName,
2013          0, 1, true, CachingStrategy.newDefaultStrategy());
2014      final BlockOpResponseProto reply =
2015          BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(in));
2016      String logInfo = "trying to read " + lb.getBlock() + " from datanode " +
2017          dn;
2018      DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo);
2019
2020      return PBHelperClient.convert(
2021          reply.getReadOpChecksumInfo().getChecksum().getType());
2022    } finally {
2023      IOUtilsClient.cleanup(null, pair.in, pair.out);
2024    }
2025  }
2026
2027  /**
2028   * Set permissions to a file or directory.
2029   * @param src path name.
2030   * @param permission permission to set to
2031   *
2032   * @see ClientProtocol#setPermission(String, FsPermission)
2033   */
2034  public void setPermission(String src, FsPermission permission)
2035      throws IOException {
2036    checkOpen();
2037    try (TraceScope ignored = newPathTraceScope("setPermission", src)) {
2038      namenode.setPermission(src, permission);
2039    } catch (RemoteException re) {
2040      throw re.unwrapRemoteException(AccessControlException.class,
2041          FileNotFoundException.class,
2042          SafeModeException.class,
2043          UnresolvedPathException.class,
2044          SnapshotAccessControlException.class);
2045    }
2046  }
2047
2048  /**
2049   * Set file or directory owner.
2050   * @param src path name.
2051   * @param username user id.
2052   * @param groupname user group.
2053   *
2054   * @see ClientProtocol#setOwner(String, String, String)
2055   */
2056  public void setOwner(String src, String username, String groupname)
2057      throws IOException {
2058    checkOpen();
2059    try (TraceScope ignored = newPathTraceScope("setOwner", src)) {
2060      namenode.setOwner(src, username, groupname);
2061    } catch (RemoteException re) {
2062      throw re.unwrapRemoteException(AccessControlException.class,
2063          FileNotFoundException.class,
2064          SafeModeException.class,
2065          UnresolvedPathException.class,
2066          SnapshotAccessControlException.class);
2067    }
2068  }
2069
2070  private long getStateByIndex(int stateIndex) throws IOException {
2071    checkOpen();
2072    try (TraceScope ignored = tracer.newScope("getStats")) {
2073      long[] states =  namenode.getStats();
2074      return states.length > stateIndex ? states[stateIndex] : -1;
2075    }
2076  }
2077
2078  /**
2079   * @see ClientProtocol#getStats()
2080   */
2081  public FsStatus getDiskStatus() throws IOException {
2082    return new FsStatus(getStateByIndex(0),
2083        getStateByIndex(1), getStateByIndex(2));
2084  }
2085
2086  /**
2087   * Returns count of blocks with no good replicas left. Normally should be
2088   * zero.
2089   * @throws IOException
2090   */
2091  public long getMissingBlocksCount() throws IOException {
2092    return getStateByIndex(ClientProtocol.
2093        GET_STATS_MISSING_BLOCKS_IDX);
2094  }
2095
2096  /**
2097   * Returns count of blocks with replication factor 1 and have
2098   * lost the only replica.
2099   * @throws IOException
2100   */
2101  public long getMissingReplOneBlocksCount() throws IOException {
2102    return getStateByIndex(ClientProtocol.
2103        GET_STATS_MISSING_REPL_ONE_BLOCKS_IDX);
2104  }
2105
2106  /**
2107   * Returns count of blocks pending on deletion.
2108   * @throws IOException
2109   */
2110  public long getPendingDeletionBlocksCount() throws IOException {
2111    return getStateByIndex(ClientProtocol.
2112        GET_STATS_PENDING_DELETION_BLOCKS_IDX);
2113  }
2114
2115  /**
2116   * Returns count of blocks with one of more replica missing.
2117   * @throws IOException
2118   */
2119  public long getUnderReplicatedBlocksCount() throws IOException {
2120    return getStateByIndex(ClientProtocol.
2121        GET_STATS_UNDER_REPLICATED_IDX);
2122  }
2123
2124  /**
2125   * Returns count of blocks with at least one replica marked corrupt.
2126   * @throws IOException
2127   */
2128  public long getCorruptBlocksCount() throws IOException {
2129    return getStateByIndex(ClientProtocol.
2130        GET_STATS_CORRUPT_BLOCKS_IDX);
2131  }
2132
2133  /**
2134   * Returns number of bytes that reside in Blocks with future generation
2135   * stamps.
2136   * @return Bytes in Blocks with future generation stamps.
2137   * @throws IOException
2138   */
2139  public long getBytesInFutureBlocks() throws IOException {
2140    return getStateByIndex(ClientProtocol.
2141        GET_STATS_BYTES_IN_FUTURE_BLOCKS_IDX);
2142  }
2143
2144  /**
2145   * @return a list in which each entry describes a corrupt file/block
2146   * @throws IOException
2147   */
2148  public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie)
2149      throws IOException {
2150    checkOpen();
2151    try (TraceScope ignored
2152             = newPathTraceScope("listCorruptFileBlocks", path)) {
2153      return namenode.listCorruptFileBlocks(path, cookie);
2154    }
2155  }
2156
2157  public DatanodeInfo[] datanodeReport(DatanodeReportType type)
2158      throws IOException {
2159    checkOpen();
2160    try (TraceScope ignored = tracer.newScope("datanodeReport")) {
2161      return namenode.getDatanodeReport(type);
2162    }
2163  }
2164
2165  public DatanodeStorageReport[] getDatanodeStorageReport(
2166      DatanodeReportType type) throws IOException {
2167    checkOpen();
2168    try (TraceScope ignored = tracer.newScope("datanodeStorageReport")) {
2169      return namenode.getDatanodeStorageReport(type);
2170    }
2171  }
2172
2173  /**
2174   * Enter, leave or get safe mode.
2175   *
2176   * @see ClientProtocol#setSafeMode(HdfsConstants.SafeModeAction,boolean)
2177   */
2178  public boolean setSafeMode(SafeModeAction action) throws IOException {
2179    return setSafeMode(action, false);
2180  }
2181
2182  /**
2183   * Enter, leave or get safe mode.
2184   *
2185   * @param action
2186   *          One of SafeModeAction.GET, SafeModeAction.ENTER and
2187   *          SafeModeActiob.LEAVE
2188   * @param isChecked
2189   *          If true, then check only active namenode's safemode status, else
2190   *          check first namenode's status.
2191   * @see ClientProtocol#setSafeMode(HdfsConstants.SafeModeAction, boolean)
2192   */
2193  public boolean setSafeMode(SafeModeAction action, boolean isChecked)
2194      throws IOException{
2195    try (TraceScope ignored = tracer.newScope("setSafeMode")) {
2196      return namenode.setSafeMode(action, isChecked);
2197    }
2198  }
2199
2200  /**
2201   * Create one snapshot.
2202   *
2203   * @param snapshotRoot The directory where the snapshot is to be taken
2204   * @param snapshotName Name of the snapshot
2205   * @return the snapshot path.
2206   * @see ClientProtocol#createSnapshot(String, String)
2207   */
2208  public String createSnapshot(String snapshotRoot, String snapshotName)
2209      throws IOException {
2210    checkOpen();
2211    try (TraceScope ignored = tracer.newScope("createSnapshot")) {
2212      return namenode.createSnapshot(snapshotRoot, snapshotName);
2213    } catch (RemoteException re) {
2214      throw re.unwrapRemoteException();
2215    }
2216  }
2217
2218  /**
2219   * Delete a snapshot of a snapshottable directory.
2220   *
2221   * @param snapshotRoot The snapshottable directory that the
2222   *                    to-be-deleted snapshot belongs to
2223   * @param snapshotName The name of the to-be-deleted snapshot
2224   * @throws IOException
2225   * @see ClientProtocol#deleteSnapshot(String, String)
2226   */
2227  public void deleteSnapshot(String snapshotRoot, String snapshotName)
2228      throws IOException {
2229    checkOpen();
2230    try (TraceScope ignored = tracer.newScope("deleteSnapshot")) {
2231      namenode.deleteSnapshot(snapshotRoot, snapshotName);
2232    } catch (RemoteException re) {
2233      throw re.unwrapRemoteException();
2234    }
2235  }
2236
2237  /**
2238   * Rename a snapshot.
2239   * @param snapshotDir The directory path where the snapshot was taken
2240   * @param snapshotOldName Old name of the snapshot
2241   * @param snapshotNewName New name of the snapshot
2242   * @throws IOException
2243   * @see ClientProtocol#renameSnapshot(String, String, String)
2244   */
2245  public void renameSnapshot(String snapshotDir, String snapshotOldName,
2246      String snapshotNewName) throws IOException {
2247    checkOpen();
2248    try (TraceScope ignored = tracer.newScope("renameSnapshot")) {
2249      namenode.renameSnapshot(snapshotDir, snapshotOldName, snapshotNewName);
2250    } catch (RemoteException re) {
2251      throw re.unwrapRemoteException();
2252    }
2253  }
2254
2255  /**
2256   * Get all the current snapshottable directories.
2257   * @return All the current snapshottable directories
2258   * @throws IOException
2259   * @see ClientProtocol#getSnapshottableDirListing()
2260   */
2261  public SnapshottableDirectoryStatus[] getSnapshottableDirListing()
2262      throws IOException {
2263    checkOpen();
2264    try (TraceScope ignored = tracer.newScope("getSnapshottableDirListing")) {
2265      return namenode.getSnapshottableDirListing();
2266    } catch (RemoteException re) {
2267      throw re.unwrapRemoteException();
2268    }
2269  }
2270
2271  /**
2272   * Allow snapshot on a directory.
2273   *
2274   * @see ClientProtocol#allowSnapshot(String snapshotRoot)
2275   */
2276  public void allowSnapshot(String snapshotRoot) throws IOException {
2277    checkOpen();
2278    try (TraceScope ignored = tracer.newScope("allowSnapshot")) {
2279      namenode.allowSnapshot(snapshotRoot);
2280    } catch (RemoteException re) {
2281      throw re.unwrapRemoteException();
2282    }
2283  }
2284
2285  /**
2286   * Disallow snapshot on a directory.
2287   *
2288   * @see ClientProtocol#disallowSnapshot(String snapshotRoot)
2289   */
2290  public void disallowSnapshot(String snapshotRoot) throws IOException {
2291    checkOpen();
2292    try (TraceScope ignored = tracer.newScope("disallowSnapshot")) {
2293      namenode.disallowSnapshot(snapshotRoot);
2294    } catch (RemoteException re) {
2295      throw re.unwrapRemoteException();
2296    }
2297  }
2298
2299  /**
2300   * Get the difference between two snapshots, or between a snapshot and the
2301   * current tree of a directory.
2302   * @see ClientProtocol#getSnapshotDiffReport(String, String, String)
2303   */
2304  public SnapshotDiffReport getSnapshotDiffReport(String snapshotDir,
2305      String fromSnapshot, String toSnapshot) throws IOException {
2306    checkOpen();
2307    try (TraceScope ignored = tracer.newScope("getSnapshotDiffReport")) {
2308      return namenode.getSnapshotDiffReport(snapshotDir,
2309          fromSnapshot, toSnapshot);
2310    } catch (RemoteException re) {
2311      throw re.unwrapRemoteException();
2312    }
2313  }
2314
2315  public long addCacheDirective(
2316      CacheDirectiveInfo info, EnumSet<CacheFlag> flags) throws IOException {
2317    checkOpen();
2318    try (TraceScope ignored = tracer.newScope("addCacheDirective")) {
2319      return namenode.addCacheDirective(info, flags);
2320    } catch (RemoteException re) {
2321      throw re.unwrapRemoteException();
2322    }
2323  }
2324
2325  public void modifyCacheDirective(
2326      CacheDirectiveInfo info, EnumSet<CacheFlag> flags) throws IOException {
2327    checkOpen();
2328    try (TraceScope ignored = tracer.newScope("modifyCacheDirective")) {
2329      namenode.modifyCacheDirective(info, flags);
2330    } catch (RemoteException re) {
2331      throw re.unwrapRemoteException();
2332    }
2333  }
2334
2335  public void removeCacheDirective(long id)
2336      throws IOException {
2337    checkOpen();
2338    try (TraceScope ignored = tracer.newScope("removeCacheDirective")) {
2339      namenode.removeCacheDirective(id);
2340    } catch (RemoteException re) {
2341      throw re.unwrapRemoteException();
2342    }
2343  }
2344
2345  public RemoteIterator<CacheDirectiveEntry> listCacheDirectives(
2346      CacheDirectiveInfo filter) throws IOException {
2347    return new CacheDirectiveIterator(namenode, filter, tracer);
2348  }
2349
2350  public void addCachePool(CachePoolInfo info) throws IOException {
2351    checkOpen();
2352    try (TraceScope ignored = tracer.newScope("addCachePool")) {
2353      namenode.addCachePool(info);
2354    } catch (RemoteException re) {
2355      throw re.unwrapRemoteException();
2356    }
2357  }
2358
2359  public void modifyCachePool(CachePoolInfo info) throws IOException {
2360    checkOpen();
2361    try (TraceScope ignored = tracer.newScope("modifyCachePool")) {
2362      namenode.modifyCachePool(info);
2363    } catch (RemoteException re) {
2364      throw re.unwrapRemoteException();
2365    }
2366  }
2367
2368  public void removeCachePool(String poolName) throws IOException {
2369    checkOpen();
2370    try (TraceScope ignored = tracer.newScope("removeCachePool")) {
2371      namenode.removeCachePool(poolName);
2372    } catch (RemoteException re) {
2373      throw re.unwrapRemoteException();
2374    }
2375  }
2376
2377  public RemoteIterator<CachePoolEntry> listCachePools() throws IOException {
2378    return new CachePoolIterator(namenode, tracer);
2379  }
2380
2381  /**
2382   * Save namespace image.
2383   *
2384   */
2385  void saveNamespace() throws IOException {
2386    try (TraceScope ignored = tracer.newScope("saveNamespace")) {
2387      namenode.saveNamespace();
2388    } catch (RemoteException re) {
2389      throw re.unwrapRemoteException(AccessControlException.class);
2390    }
2391  }
2392
2393  /**
2394   * Rolls the edit log on the active NameNode.
2395   * @return the txid of the new log segment
2396   *
2397   * @see ClientProtocol#rollEdits()
2398   */
2399  long rollEdits() throws IOException {
2400    try (TraceScope ignored = tracer.newScope("rollEdits")) {
2401      return namenode.rollEdits();
2402    } catch (RemoteException re) {
2403      throw re.unwrapRemoteException(AccessControlException.class);
2404    }
2405  }
2406
2407  @VisibleForTesting
2408  ExtendedBlock getPreviousBlock(long fileId) {
2409    return filesBeingWritten.get(fileId).getBlock();
2410  }
2411
2412  /**
2413   * enable/disable restore failed storage.
2414   *
2415   * @see ClientProtocol#restoreFailedStorage(String arg)
2416   */
2417  boolean restoreFailedStorage(String arg) throws IOException{
2418    try (TraceScope ignored = tracer.newScope("restoreFailedStorage")) {
2419      return namenode.restoreFailedStorage(arg);
2420    }
2421  }
2422
2423  /**
2424   * Refresh the hosts and exclude files.  (Rereads them.)
2425   * See {@link ClientProtocol#refreshNodes()}
2426   * for more details.
2427   *
2428   * @see ClientProtocol#refreshNodes()
2429   */
2430  public void refreshNodes() throws IOException {
2431    try (TraceScope ignored = tracer.newScope("refreshNodes")) {
2432      namenode.refreshNodes();
2433    }
2434  }
2435
2436  /**
2437   * Dumps DFS data structures into specified file.
2438   *
2439   * @see ClientProtocol#metaSave(String)
2440   */
2441  public void metaSave(String pathname) throws IOException {
2442    try (TraceScope ignored = tracer.newScope("metaSave")) {
2443      namenode.metaSave(pathname);
2444    }
2445  }
2446
2447  /**
2448   * Requests the namenode to tell all datanodes to use a new, non-persistent
2449   * bandwidth value for dfs.balance.bandwidthPerSec.
2450   * See {@link ClientProtocol#setBalancerBandwidth(long)}
2451   * for more details.
2452   *
2453   * @see ClientProtocol#setBalancerBandwidth(long)
2454   */
2455  public void setBalancerBandwidth(long bandwidth) throws IOException {
2456    try (TraceScope ignored = tracer.newScope("setBalancerBandwidth")) {
2457      namenode.setBalancerBandwidth(bandwidth);
2458    }
2459  }
2460
2461  /**
2462   * @see ClientProtocol#finalizeUpgrade()
2463   */
2464  public void finalizeUpgrade() throws IOException {
2465    try (TraceScope ignored = tracer.newScope("finalizeUpgrade")) {
2466      namenode.finalizeUpgrade();
2467    }
2468  }
2469
2470  RollingUpgradeInfo rollingUpgrade(RollingUpgradeAction action)
2471      throws IOException {
2472    try (TraceScope ignored = tracer.newScope("rollingUpgrade")) {
2473      return namenode.rollingUpgrade(action);
2474    }
2475  }
2476
2477  /**
2478   */
2479  @Deprecated
2480  public boolean mkdirs(String src) throws IOException {
2481    return mkdirs(src, null, true);
2482  }
2483
2484  /**
2485   * Create a directory (or hierarchy of directories) with the given
2486   * name and permission.
2487   *
2488   * @param src The path of the directory being created
2489   * @param permission The permission of the directory being created.
2490   * If permission == null, use {@link FsPermission#getDefault()}.
2491   * @param createParent create missing parent directory if true
2492   *
2493   * @return True if the operation success.
2494   *
2495   * @see ClientProtocol#mkdirs(String, FsPermission, boolean)
2496   */
2497  public boolean mkdirs(String src, FsPermission permission,
2498      boolean createParent) throws IOException {
2499    final FsPermission masked = applyUMask(permission);
2500    return primitiveMkdir(src, masked, createParent);
2501  }
2502
2503  /**
2504   * Same {{@link #mkdirs(String, FsPermission, boolean)} except
2505   * that the permissions has already been masked against umask.
2506   */
2507  public boolean primitiveMkdir(String src, FsPermission absPermission)
2508      throws IOException {
2509    return primitiveMkdir(src, absPermission, true);
2510  }
2511
2512  /**
2513   * Same {{@link #mkdirs(String, FsPermission, boolean)} except
2514   * that the permissions has already been masked against umask.
2515   */
2516  public boolean primitiveMkdir(String src, FsPermission absPermission,
2517      boolean createParent) throws IOException {
2518    checkOpen();
2519    if (absPermission == null) {
2520      absPermission = applyUMask(null);
2521    }
2522
2523    LOG.debug("{}: masked={}", src, absPermission);
2524    try (TraceScope ignored = tracer.newScope("mkdir")) {
2525      return namenode.mkdirs(src, absPermission, createParent);
2526    } catch (RemoteException re) {
2527      throw re.unwrapRemoteException(AccessControlException.class,
2528          InvalidPathException.class,
2529          FileAlreadyExistsException.class,
2530          FileNotFoundException.class,
2531          ParentNotDirectoryException.class,
2532          SafeModeException.class,
2533          NSQuotaExceededException.class,
2534          DSQuotaExceededException.class,
2535          QuotaByStorageTypeExceededException.class,
2536          UnresolvedPathException.class,
2537          SnapshotAccessControlException.class);
2538    }
2539  }
2540
2541  /**
2542   * Get {@link ContentSummary} rooted at the specified directory.
2543   * @param src The string representation of the path
2544   *
2545   * @see ClientProtocol#getContentSummary(String)
2546   */
2547  ContentSummary getContentSummary(String src) throws IOException {
2548    try (TraceScope ignored = newPathTraceScope("getContentSummary", src)) {
2549      return namenode.getContentSummary(src);
2550    } catch (RemoteException re) {
2551      throw re.unwrapRemoteException(AccessControlException.class,
2552          FileNotFoundException.class,
2553          UnresolvedPathException.class);
2554    }
2555  }
2556
2557  /**
2558   * Get {@link org.apache.hadoop.fs.QuotaUsage} rooted at the specified directory.
2559   * @param src The string representation of the path
2560   *
2561   * @see ClientProtocol#getQuotaUsage(String)
2562   */
2563  QuotaUsage getQuotaUsage(String src) throws IOException {
2564    checkOpen();
2565    try (TraceScope ignored = newPathTraceScope("getQuotaUsage", src)) {
2566      return namenode.getQuotaUsage(src);
2567    } catch(RemoteException re) {
2568      IOException ioe = re.unwrapRemoteException(AccessControlException.class,
2569          FileNotFoundException.class,
2570          UnresolvedPathException.class,
2571          RpcNoSuchMethodException.class);
2572      if (ioe instanceof RpcNoSuchMethodException) {
2573        LOG.debug("The version of namenode doesn't support getQuotaUsage API." +
2574            " Fall back to use getContentSummary API.");
2575        return getContentSummary(src);
2576      } else {
2577        throw ioe;
2578      }
2579    }
2580  }
2581
2582  /**
2583   * Sets or resets quotas for a directory.
2584   * @see ClientProtocol#setQuota(String, long, long, StorageType)
2585   */
2586  void setQuota(String src, long namespaceQuota, long storagespaceQuota)
2587      throws IOException {
2588    // sanity check
2589    if ((namespaceQuota <= 0 &&
2590          namespaceQuota != HdfsConstants.QUOTA_DONT_SET &&
2591          namespaceQuota != HdfsConstants.QUOTA_RESET) ||
2592        (storagespaceQuota < 0 &&
2593            storagespaceQuota != HdfsConstants.QUOTA_DONT_SET &&
2594            storagespaceQuota != HdfsConstants.QUOTA_RESET)) {
2595      throw new IllegalArgumentException("Invalid values for quota : " +
2596          namespaceQuota + " and " +
2597          storagespaceQuota);
2598
2599    }
2600    try (TraceScope ignored = newPathTraceScope("setQuota", src)) {
2601      // Pass null as storage type for traditional namespace/storagespace quota.
2602      namenode.setQuota(src, namespaceQuota, storagespaceQuota, null);
2603    } catch (RemoteException re) {
2604      throw re.unwrapRemoteException(AccessControlException.class,
2605          FileNotFoundException.class,
2606          NSQuotaExceededException.class,
2607          DSQuotaExceededException.class,
2608          QuotaByStorageTypeExceededException.class,
2609          UnresolvedPathException.class,
2610          SnapshotAccessControlException.class);
2611    }
2612  }
2613
2614  /**
2615   * Sets or resets quotas by storage type for a directory.
2616   * @see ClientProtocol#setQuota(String, long, long, StorageType)
2617   */
2618  void setQuotaByStorageType(String src, StorageType type, long quota)
2619      throws IOException {
2620    if (quota <= 0 && quota != HdfsConstants.QUOTA_DONT_SET &&
2621        quota != HdfsConstants.QUOTA_RESET) {
2622      throw new IllegalArgumentException("Invalid values for quota :" +
2623          quota);
2624    }
2625    if (type == null) {
2626      throw new IllegalArgumentException("Invalid storage type(null)");
2627    }
2628    if (!type.supportTypeQuota()) {
2629      throw new IllegalArgumentException(
2630          "Don't support Quota for storage type : " + type.toString());
2631    }
2632    try (TraceScope ignored = newPathTraceScope("setQuotaByStorageType", src)) {
2633      namenode.setQuota(src, HdfsConstants.QUOTA_DONT_SET, quota, type);
2634    } catch (RemoteException re) {
2635      throw re.unwrapRemoteException(AccessControlException.class,
2636          FileNotFoundException.class,
2637          QuotaByStorageTypeExceededException.class,
2638          UnresolvedPathException.class,
2639          SnapshotAccessControlException.class);
2640    }
2641  }
2642  /**
2643   * set the modification and access time of a file
2644   *
2645   * @see ClientProtocol#setTimes(String, long, long)
2646   */
2647  public void setTimes(String src, long mtime, long atime) throws IOException {
2648    checkOpen();
2649    try (TraceScope ignored = newPathTraceScope("setTimes", src)) {
2650      namenode.setTimes(src, mtime, atime);
2651    } catch (RemoteException re) {
2652      throw re.unwrapRemoteException(AccessControlException.class,
2653          FileNotFoundException.class,
2654          UnresolvedPathException.class,
2655          SnapshotAccessControlException.class);
2656    }
2657  }
2658
2659  /**
2660   * @deprecated use {@link HdfsDataInputStream} instead.
2661   */
2662  @Deprecated
2663  public static class DFSDataInputStream extends HdfsDataInputStream {
2664
2665    public DFSDataInputStream(DFSInputStream in) throws IOException {
2666      super(in);
2667    }
2668  }
2669
2670  void reportChecksumFailure(String file, ExtendedBlock blk, DatanodeInfo dn) {
2671    DatanodeInfo [] dnArr = { dn };
2672    LocatedBlock [] lblocks = { new LocatedBlock(blk, dnArr) };
2673    reportChecksumFailure(file, lblocks);
2674  }
2675
2676  // just reports checksum failure and ignores any exception during the report.
2677  void reportChecksumFailure(String file, LocatedBlock lblocks[]) {
2678    try {
2679      reportBadBlocks(lblocks);
2680    } catch (IOException ie) {
2681      LOG.info("Found corruption while reading " + file
2682          + ". Error repairing corrupt blocks. Bad blocks remain.", ie);
2683    }
2684  }
2685
2686  @Override
2687  public String toString() {
2688    return getClass().getSimpleName() + "[clientName=" + clientName
2689        + ", ugi=" + ugi + "]";
2690  }
2691
2692  public CachingStrategy getDefaultReadCachingStrategy() {
2693    return defaultReadCachingStrategy;
2694  }
2695
2696  public CachingStrategy getDefaultWriteCachingStrategy() {
2697    return defaultWriteCachingStrategy;
2698  }
2699
2700  public ClientContext getClientContext() {
2701    return clientContext;
2702  }
2703
2704  public void modifyAclEntries(String src, List<AclEntry> aclSpec)
2705      throws IOException {
2706    checkOpen();
2707    try (TraceScope ignored = newPathTraceScope("modifyAclEntries", src)) {
2708      namenode.modifyAclEntries(src, aclSpec);
2709    } catch (RemoteException re) {
2710      throw re.unwrapRemoteException(AccessControlException.class,
2711          AclException.class,
2712          FileNotFoundException.class,
2713          NSQuotaExceededException.class,
2714          SafeModeException.class,
2715          SnapshotAccessControlException.class,
2716          UnresolvedPathException.class);
2717    }
2718  }
2719
2720  public void removeAclEntries(String src, List<AclEntry> aclSpec)
2721      throws IOException {
2722    checkOpen();
2723    try (TraceScope ignored = tracer.newScope("removeAclEntries")) {
2724      namenode.removeAclEntries(src, aclSpec);
2725    } catch (RemoteException re) {
2726      throw re.unwrapRemoteException(AccessControlException.class,
2727          AclException.class,
2728          FileNotFoundException.class,
2729          NSQuotaExceededException.class,
2730          SafeModeException.class,
2731          SnapshotAccessControlException.class,
2732          UnresolvedPathException.class);
2733    }
2734  }
2735
2736  public void removeDefaultAcl(String src) throws IOException {
2737    checkOpen();
2738    try (TraceScope ignored = tracer.newScope("removeDefaultAcl")) {
2739      namenode.removeDefaultAcl(src);
2740    } catch (RemoteException re) {
2741      throw re.unwrapRemoteException(AccessControlException.class,
2742          AclException.class,
2743          FileNotFoundException.class,
2744          NSQuotaExceededException.class,
2745          SafeModeException.class,
2746          SnapshotAccessControlException.class,
2747          UnresolvedPathException.class);
2748    }
2749  }
2750
2751  public void removeAcl(String src) throws IOException {
2752    checkOpen();
2753    try (TraceScope ignored = tracer.newScope("removeAcl")) {
2754      namenode.removeAcl(src);
2755    } catch (RemoteException re) {
2756      throw re.unwrapRemoteException(AccessControlException.class,
2757          AclException.class,
2758          FileNotFoundException.class,
2759          NSQuotaExceededException.class,
2760          SafeModeException.class,
2761          SnapshotAccessControlException.class,
2762          UnresolvedPathException.class);
2763    }
2764  }
2765
2766  public void setAcl(String src, List<AclEntry> aclSpec) throws IOException {
2767    checkOpen();
2768    try (TraceScope ignored = tracer.newScope("setAcl")) {
2769      namenode.setAcl(src, aclSpec);
2770    } catch (RemoteException re) {
2771      throw re.unwrapRemoteException(AccessControlException.class,
2772          AclException.class,
2773          FileNotFoundException.class,
2774          NSQuotaExceededException.class,
2775          SafeModeException.class,
2776          SnapshotAccessControlException.class,
2777          UnresolvedPathException.class);
2778    }
2779  }
2780
2781  public AclStatus getAclStatus(String src) throws IOException {
2782    checkOpen();
2783    try (TraceScope ignored = newPathTraceScope("getAclStatus", src)) {
2784      return namenode.getAclStatus(src);
2785    } catch (RemoteException re) {
2786      throw re.unwrapRemoteException(AccessControlException.class,
2787          AclException.class,
2788          FileNotFoundException.class,
2789          UnresolvedPathException.class);
2790    }
2791  }
2792
2793  public void createEncryptionZone(String src, String keyName)
2794      throws IOException {
2795    checkOpen();
2796    try (TraceScope ignored = newPathTraceScope("createEncryptionZone", src)) {
2797      namenode.createEncryptionZone(src, keyName);
2798    } catch (RemoteException re) {
2799      throw re.unwrapRemoteException(AccessControlException.class,
2800          SafeModeException.class,
2801          UnresolvedPathException.class);
2802    }
2803  }
2804
2805  public EncryptionZone getEZForPath(String src) throws IOException {
2806    checkOpen();
2807    try (TraceScope ignored = newPathTraceScope("getEZForPath", src)) {
2808      return namenode.getEZForPath(src);
2809    } catch (RemoteException re) {
2810      throw re.unwrapRemoteException(AccessControlException.class,
2811          UnresolvedPathException.class);
2812    }
2813  }
2814
2815  public RemoteIterator<EncryptionZone> listEncryptionZones()
2816      throws IOException {
2817    checkOpen();
2818    return new EncryptionZoneIterator(namenode, tracer);
2819  }
2820
2821  public void setXAttr(String src, String name, byte[] value, 
2822      EnumSet<XAttrSetFlag> flag) throws IOException {
2823    checkOpen();
2824    try (TraceScope ignored = newPathTraceScope("setXAttr", src)) {
2825      namenode.setXAttr(src, XAttrHelper.buildXAttr(name, value), flag);
2826    } catch (RemoteException re) {
2827      throw re.unwrapRemoteException(AccessControlException.class,
2828          FileNotFoundException.class,
2829          NSQuotaExceededException.class,
2830          SafeModeException.class,
2831          SnapshotAccessControlException.class,
2832          UnresolvedPathException.class);
2833    }
2834  }
2835
2836  public byte[] getXAttr(String src, String name) throws IOException {
2837    checkOpen();
2838    try (TraceScope ignored = newPathTraceScope("getXAttr", src)) {
2839      final List<XAttr> xAttrs = XAttrHelper.buildXAttrAsList(name);
2840      final List<XAttr> result = namenode.getXAttrs(src, xAttrs);
2841      return XAttrHelper.getFirstXAttrValue(result);
2842    } catch (RemoteException re) {
2843      throw re.unwrapRemoteException(AccessControlException.class,
2844          FileNotFoundException.class,
2845          UnresolvedPathException.class);
2846    }
2847  }
2848
2849  public Map<String, byte[]> getXAttrs(String src) throws IOException {
2850    checkOpen();
2851    try (TraceScope ignored = newPathTraceScope("getXAttrs", src)) {
2852      return XAttrHelper.buildXAttrMap(namenode.getXAttrs(src, null));
2853    } catch (RemoteException re) {
2854      throw re.unwrapRemoteException(AccessControlException.class,
2855          FileNotFoundException.class,
2856          UnresolvedPathException.class);
2857    }
2858  }
2859
2860  public Map<String, byte[]> getXAttrs(String src, List<String> names)
2861      throws IOException {
2862    checkOpen();
2863    try (TraceScope ignored = newPathTraceScope("getXAttrs", src)) {
2864      return XAttrHelper.buildXAttrMap(namenode.getXAttrs(
2865          src, XAttrHelper.buildXAttrs(names)));
2866    } catch (RemoteException re) {
2867      throw re.unwrapRemoteException(AccessControlException.class,
2868          FileNotFoundException.class,
2869          UnresolvedPathException.class);
2870    }
2871  }
2872
2873  public List<String> listXAttrs(String src) throws IOException {
2874    checkOpen();
2875    try (TraceScope ignored = newPathTraceScope("listXAttrs", src)) {
2876      final Map<String, byte[]> xattrs =
2877          XAttrHelper.buildXAttrMap(namenode.listXAttrs(src));
2878      return Lists.newArrayList(xattrs.keySet());
2879    } catch (RemoteException re) {
2880      throw re.unwrapRemoteException(AccessControlException.class,
2881          FileNotFoundException.class,
2882          UnresolvedPathException.class);
2883    }
2884  }
2885
2886  public void removeXAttr(String src, String name) throws IOException {
2887    checkOpen();
2888    try (TraceScope ignored = newPathTraceScope("removeXAttr", src)) {
2889      namenode.removeXAttr(src, XAttrHelper.buildXAttr(name));
2890    } catch (RemoteException re) {
2891      throw re.unwrapRemoteException(AccessControlException.class,
2892          FileNotFoundException.class,
2893          NSQuotaExceededException.class,
2894          SafeModeException.class,
2895          SnapshotAccessControlException.class,
2896          UnresolvedPathException.class);
2897    }
2898  }
2899
2900  public void checkAccess(String src, FsAction mode) throws IOException {
2901    checkOpen();
2902    try (TraceScope ignored = newPathTraceScope("checkAccess", src)) {
2903      namenode.checkAccess(src, mode);
2904    } catch (RemoteException re) {
2905      throw re.unwrapRemoteException(AccessControlException.class,
2906          FileNotFoundException.class,
2907          UnresolvedPathException.class);
2908    }
2909  }
2910
2911  public DFSInotifyEventInputStream getInotifyEventStream() throws IOException {
2912    return new DFSInotifyEventInputStream(namenode, tracer);
2913  }
2914
2915  public DFSInotifyEventInputStream getInotifyEventStream(long lastReadTxid)
2916      throws IOException {
2917    return new DFSInotifyEventInputStream(namenode, tracer,
2918        lastReadTxid);
2919  }
2920
2921  @Override // RemotePeerFactory
2922  public Peer newConnectedPeer(InetSocketAddress addr,
2923      Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
2924      throws IOException {
2925    Peer peer = null;
2926    boolean success = false;
2927    Socket sock = null;
2928    final int socketTimeout = dfsClientConf.getSocketTimeout();
2929    try {
2930      sock = socketFactory.createSocket();
2931      NetUtils.connect(sock, addr, getRandomLocalInterfaceAddr(),
2932          socketTimeout);
2933      peer = DFSUtilClient.peerFromSocketAndKey(saslClient, sock, this,
2934          blockToken, datanodeId, socketTimeout);
2935      success = true;
2936      return peer;
2937    } finally {
2938      if (!success) {
2939        IOUtilsClient.cleanup(LOG, peer);
2940        IOUtils.closeSocket(sock);
2941      }
2942    }
2943  }
2944
2945  /**
2946   * Create hedged reads thread pool, HEDGED_READ_THREAD_POOL, if
2947   * it does not already exist.
2948   * @param num Number of threads for hedged reads thread pool.
2949   * If zero, skip hedged reads thread pool creation.
2950   */
2951  private synchronized void initThreadsNumForHedgedReads(int num) {
2952    if (num <= 0 || HEDGED_READ_THREAD_POOL != null) return;
2953    HEDGED_READ_THREAD_POOL = new ThreadPoolExecutor(1, num, 60,
2954        TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
2955        new Daemon.DaemonFactory() {
2956          private final AtomicInteger threadIndex = new AtomicInteger(0);
2957          @Override
2958          public Thread newThread(Runnable r) {
2959            Thread t = super.newThread(r);
2960            t.setName("hedgedRead-" + threadIndex.getAndIncrement());
2961            return t;
2962          }
2963        },
2964        new ThreadPoolExecutor.CallerRunsPolicy() {
2965          @Override
2966          public void rejectedExecution(Runnable runnable,
2967              ThreadPoolExecutor e) {
2968            LOG.info("Execution rejected, Executing in current thread");
2969            HEDGED_READ_METRIC.incHedgedReadOpsInCurThread();
2970            // will run in the current thread
2971            super.rejectedExecution(runnable, e);
2972          }
2973        });
2974    HEDGED_READ_THREAD_POOL.allowCoreThreadTimeOut(true);
2975    LOG.debug("Using hedged reads; pool threads={}", num);
2976  }
2977
2978  ThreadPoolExecutor getHedgedReadsThreadPool() {
2979    return HEDGED_READ_THREAD_POOL;
2980  }
2981
2982  boolean isHedgedReadsEnabled() {
2983    return (HEDGED_READ_THREAD_POOL != null) &&
2984        HEDGED_READ_THREAD_POOL.getMaximumPoolSize() > 0;
2985  }
2986
2987  DFSHedgedReadMetrics getHedgedReadMetrics() {
2988    return HEDGED_READ_METRIC;
2989  }
2990
2991  public KeyProvider getKeyProvider() {
2992    return clientContext.getKeyProviderCache().get(conf);
2993  }
2994
2995  @VisibleForTesting
2996  public void setKeyProvider(KeyProvider provider) {
2997    clientContext.getKeyProviderCache().setKeyProvider(conf, provider);
2998  }
2999
3000  /**
3001   * Probe for encryption enabled on this filesystem.
3002   * See {@link DFSUtilClient#isHDFSEncryptionEnabled(Configuration)}
3003   * @return true if encryption is enabled
3004   */
3005  public boolean isHDFSEncryptionEnabled() {
3006    return DFSUtilClient.isHDFSEncryptionEnabled(this.conf);
3007  }
3008
3009  /**
3010   * Returns the SaslDataTransferClient configured for this DFSClient.
3011   *
3012   * @return SaslDataTransferClient configured for this DFSClient
3013   */
3014  public SaslDataTransferClient getSaslDataTransferClient() {
3015    return saslClient;
3016  }
3017
3018  TraceScope newPathTraceScope(String description, String path) {
3019    TraceScope scope = tracer.newScope(description);
3020    if (path != null) {
3021      scope.addKVAnnotation("path", path);
3022    }
3023    return scope;
3024  }
3025
3026  TraceScope newSrcDstTraceScope(String description, String src, String dst) {
3027    TraceScope scope = tracer.newScope(description);
3028    if (src != null) {
3029      scope.addKVAnnotation("src", src);
3030    }
3031    if (dst != null) {
3032      scope.addKVAnnotation("dst", dst);
3033    }
3034    return scope;
3035  }
3036
3037  Tracer getTracer() {
3038    return tracer;
3039  }
3040}