001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hdfs; 019 020import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_CODEC_CLASSES_KEY_PREFIX; 021import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_READS; 022import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_WRITES; 023import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACHE_READAHEAD; 024import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT; 025import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT; 026import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_LOCAL_INTERFACES; 027import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT; 028import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY; 029 030import java.io.BufferedOutputStream; 031import java.io.DataInputStream; 032import java.io.DataOutputStream; 033import java.io.FileNotFoundException; 034import java.io.IOException; 035import java.io.InputStream; 036import java.io.OutputStream; 037import java.net.InetAddress; 038import java.net.InetSocketAddress; 039import java.net.Socket; 040import java.net.SocketAddress; 041import java.net.URI; 042import java.net.UnknownHostException; 043import java.security.GeneralSecurityException; 044import java.util.ArrayList; 045import java.util.EnumSet; 046import java.util.HashMap; 047import java.util.LinkedHashMap; 048import java.util.List; 049import java.util.Map; 050import java.util.Random; 051import java.util.concurrent.SynchronousQueue; 052import java.util.concurrent.ThreadLocalRandom; 053import java.util.concurrent.ThreadPoolExecutor; 054import java.util.concurrent.TimeUnit; 055import java.util.concurrent.atomic.AtomicBoolean; 056import java.util.concurrent.atomic.AtomicInteger; 057 058import javax.net.SocketFactory; 059 060import org.apache.hadoop.HadoopIllegalArgumentException; 061import org.apache.hadoop.classification.InterfaceAudience; 062import org.apache.hadoop.conf.Configuration; 063import org.apache.hadoop.crypto.CipherSuite; 064import org.apache.hadoop.crypto.CryptoCodec; 065import org.apache.hadoop.crypto.CryptoInputStream; 066import org.apache.hadoop.crypto.CryptoOutputStream; 067import org.apache.hadoop.crypto.CryptoProtocolVersion; 068import org.apache.hadoop.crypto.key.KeyProvider; 069import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion; 070import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension; 071import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion; 072import org.apache.hadoop.fs.BlockLocation; 073import org.apache.hadoop.fs.BlockStorageLocation; 074import org.apache.hadoop.fs.CacheFlag; 075import org.apache.hadoop.fs.ContentSummary; 076import org.apache.hadoop.fs.CreateFlag; 077import org.apache.hadoop.fs.FileAlreadyExistsException; 078import org.apache.hadoop.fs.FileEncryptionInfo; 079import org.apache.hadoop.fs.FileSystem; 080import org.apache.hadoop.fs.FsServerDefaults; 081import org.apache.hadoop.fs.FsStatus; 082import org.apache.hadoop.fs.FsTracer; 083import org.apache.hadoop.fs.HdfsBlockLocation; 084import org.apache.hadoop.fs.InvalidPathException; 085import org.apache.hadoop.fs.MD5MD5CRC32CastagnoliFileChecksum; 086import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum; 087import org.apache.hadoop.fs.MD5MD5CRC32GzipFileChecksum; 088import org.apache.hadoop.fs.Options; 089import org.apache.hadoop.fs.Options.ChecksumOpt; 090import org.apache.hadoop.fs.ParentNotDirectoryException; 091import org.apache.hadoop.fs.Path; 092import org.apache.hadoop.fs.QuotaUsage; 093import org.apache.hadoop.fs.RemoteIterator; 094import org.apache.hadoop.fs.StorageType; 095import org.apache.hadoop.fs.VolumeId; 096import org.apache.hadoop.fs.XAttr; 097import org.apache.hadoop.fs.XAttrSetFlag; 098import org.apache.hadoop.fs.permission.AclEntry; 099import org.apache.hadoop.fs.permission.AclStatus; 100import org.apache.hadoop.fs.permission.FsAction; 101import org.apache.hadoop.fs.permission.FsPermission; 102import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo; 103import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; 104import org.apache.hadoop.hdfs.client.HdfsDataInputStream; 105import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; 106import org.apache.hadoop.hdfs.client.impl.DfsClientConf; 107import org.apache.hadoop.hdfs.client.impl.LeaseRenewer; 108import org.apache.hadoop.hdfs.net.Peer; 109import org.apache.hadoop.hdfs.protocol.AclException; 110import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; 111import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; 112import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; 113import org.apache.hadoop.hdfs.protocol.CacheDirectiveIterator; 114import org.apache.hadoop.hdfs.protocol.CachePoolEntry; 115import org.apache.hadoop.hdfs.protocol.CachePoolInfo; 116import org.apache.hadoop.hdfs.protocol.CachePoolIterator; 117import org.apache.hadoop.hdfs.protocol.ClientProtocol; 118import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; 119import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; 120import org.apache.hadoop.hdfs.protocol.DatanodeID; 121import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 122import org.apache.hadoop.hdfs.protocol.DirectoryListing; 123import org.apache.hadoop.hdfs.protocol.EncryptionZone; 124import org.apache.hadoop.hdfs.protocol.EncryptionZoneIterator; 125import org.apache.hadoop.hdfs.protocol.ExtendedBlock; 126import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; 127import org.apache.hadoop.hdfs.protocol.HdfsConstants; 128import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; 129import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction; 130import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; 131import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; 132import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus; 133import org.apache.hadoop.hdfs.protocol.LocatedBlock; 134import org.apache.hadoop.hdfs.protocol.LocatedBlocks; 135import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; 136import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException; 137import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; 138import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException; 139import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; 140import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; 141import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; 142import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; 143import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; 144import org.apache.hadoop.hdfs.protocol.datatransfer.Op; 145import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure; 146import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; 147import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; 148import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory; 149import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil; 150import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; 151import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; 152import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto; 153import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; 154import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; 155import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; 156import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; 157import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; 158import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; 159import org.apache.hadoop.hdfs.server.namenode.SafeModeException; 160import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; 161import org.apache.hadoop.hdfs.util.IOUtilsClient; 162import org.apache.hadoop.io.DataOutputBuffer; 163import org.apache.hadoop.io.EnumSetWritable; 164import org.apache.hadoop.io.IOUtils; 165import org.apache.hadoop.io.MD5Hash; 166import org.apache.hadoop.io.Text; 167import org.apache.hadoop.io.retry.LossyRetryInvocationHandler; 168import org.apache.hadoop.ipc.RPC; 169import org.apache.hadoop.ipc.RemoteException; 170import org.apache.hadoop.ipc.RetriableException; 171import org.apache.hadoop.ipc.RpcNoSuchMethodException; 172import org.apache.hadoop.net.DNS; 173import org.apache.hadoop.net.NetUtils; 174import org.apache.hadoop.security.AccessControlException; 175import org.apache.hadoop.security.UserGroupInformation; 176import org.apache.hadoop.security.token.SecretManager.InvalidToken; 177import org.apache.hadoop.security.token.Token; 178import org.apache.hadoop.security.token.TokenRenewer; 179import org.apache.hadoop.util.Daemon; 180import org.apache.hadoop.util.DataChecksum; 181import org.apache.hadoop.util.DataChecksum.Type; 182import org.apache.hadoop.util.Progressable; 183import org.apache.hadoop.util.Time; 184import org.apache.htrace.core.TraceScope; 185import org.apache.htrace.core.Tracer; 186import org.slf4j.Logger; 187import org.slf4j.LoggerFactory; 188 189import com.google.common.annotations.VisibleForTesting; 190import com.google.common.base.Joiner; 191import com.google.common.base.Preconditions; 192import com.google.common.collect.Lists; 193import com.google.common.net.InetAddresses; 194 195/******************************************************** 196 * DFSClient can connect to a Hadoop Filesystem and 197 * perform basic file tasks. It uses the ClientProtocol 198 * to communicate with a NameNode daemon, and connects 199 * directly to DataNodes to read/write block data. 200 * 201 * Hadoop DFS users should obtain an instance of 202 * DistributedFileSystem, which uses DFSClient to handle 203 * filesystem tasks. 204 * 205 ********************************************************/ 206@InterfaceAudience.Private 207public class DFSClient implements java.io.Closeable, RemotePeerFactory, 208 DataEncryptionKeyFactory { 209 public static final Logger LOG = LoggerFactory.getLogger(DFSClient.class); 210 // 1 hour 211 public static final long SERVER_DEFAULTS_VALIDITY_PERIOD = 60 * 60 * 1000L; 212 213 private final Configuration conf; 214 private final Tracer tracer; 215 private final DfsClientConf dfsClientConf; 216 final ClientProtocol namenode; 217 /* The service used for delegation tokens */ 218 private Text dtService; 219 220 final UserGroupInformation ugi; 221 volatile boolean clientRunning = true; 222 volatile long lastLeaseRenewal; 223 private volatile FsServerDefaults serverDefaults; 224 private volatile long serverDefaultsLastUpdate; 225 final String clientName; 226 final SocketFactory socketFactory; 227 final ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure; 228 final FileSystem.Statistics stats; 229 private final String authority; 230 private final Random r = new Random(); 231 private SocketAddress[] localInterfaceAddrs; 232 private DataEncryptionKey encryptionKey; 233 final SaslDataTransferClient saslClient; 234 private final CachingStrategy defaultReadCachingStrategy; 235 private final CachingStrategy defaultWriteCachingStrategy; 236 private final ClientContext clientContext; 237 238 private static final DFSHedgedReadMetrics HEDGED_READ_METRIC = 239 new DFSHedgedReadMetrics(); 240 private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL; 241 private final int smallBufferSize; 242 243 public DfsClientConf getConf() { 244 return dfsClientConf; 245 } 246 247 Configuration getConfiguration() { 248 return conf; 249 } 250 251 /** 252 * A map from file names to {@link DFSOutputStream} objects 253 * that are currently being written by this client. 254 * Note that a file can only be written by a single client. 255 */ 256 private final Map<Long, DFSOutputStream> filesBeingWritten = new HashMap<>(); 257 258 /** 259 * Same as this(NameNode.getNNAddress(conf), conf); 260 * @see #DFSClient(InetSocketAddress, Configuration) 261 * @deprecated Deprecated at 0.21 262 */ 263 @Deprecated 264 public DFSClient(Configuration conf) throws IOException { 265 this(DFSUtilClient.getNNAddress(conf), conf); 266 } 267 268 public DFSClient(InetSocketAddress address, Configuration conf) 269 throws IOException { 270 this(DFSUtilClient.getNNUri(address), conf); 271 } 272 273 /** 274 * Same as this(nameNodeUri, conf, null); 275 * @see #DFSClient(URI, Configuration, FileSystem.Statistics) 276 */ 277 public DFSClient(URI nameNodeUri, Configuration conf) throws IOException { 278 this(nameNodeUri, conf, null); 279 } 280 281 /** 282 * Same as this(nameNodeUri, null, conf, stats); 283 * @see #DFSClient(URI, ClientProtocol, Configuration, FileSystem.Statistics) 284 */ 285 public DFSClient(URI nameNodeUri, Configuration conf, 286 FileSystem.Statistics stats) throws IOException { 287 this(nameNodeUri, null, conf, stats); 288 } 289 290 /** 291 * Create a new DFSClient connected to the given nameNodeUri or rpcNamenode. 292 * If HA is enabled and a positive value is set for 293 * {@link HdfsClientConfigKeys#DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY} 294 * in the configuration, the DFSClient will use 295 * {@link LossyRetryInvocationHandler} as its RetryInvocationHandler. 296 * Otherwise one of nameNodeUri or rpcNamenode must be null. 297 */ 298 @VisibleForTesting 299 public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, 300 Configuration conf, FileSystem.Statistics stats) throws IOException { 301 // Copy only the required DFSClient configuration 302 this.tracer = FsTracer.get(conf); 303 this.dfsClientConf = new DfsClientConf(conf); 304 this.conf = conf; 305 this.stats = stats; 306 this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class); 307 this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf); 308 this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf); 309 310 this.ugi = UserGroupInformation.getCurrentUser(); 311 312 this.authority = nameNodeUri == null? "null": nameNodeUri.getAuthority(); 313 this.clientName = "DFSClient_" + dfsClientConf.getTaskId() + "_" + 314 ThreadLocalRandom.current().nextInt() + "_" + 315 Thread.currentThread().getId(); 316 int numResponseToDrop = conf.getInt( 317 DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY, 318 DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT); 319 ProxyAndInfo<ClientProtocol> proxyInfo = null; 320 AtomicBoolean nnFallbackToSimpleAuth = new AtomicBoolean(false); 321 322 if (numResponseToDrop > 0) { 323 // This case is used for testing. 324 LOG.warn(DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY 325 + " is set to " + numResponseToDrop 326 + ", this hacked client will proactively drop responses"); 327 proxyInfo = NameNodeProxiesClient.createProxyWithLossyRetryHandler(conf, 328 nameNodeUri, ClientProtocol.class, numResponseToDrop, 329 nnFallbackToSimpleAuth); 330 } 331 332 if (proxyInfo != null) { 333 this.dtService = proxyInfo.getDelegationTokenService(); 334 this.namenode = proxyInfo.getProxy(); 335 } else if (rpcNamenode != null) { 336 // This case is used for testing. 337 Preconditions.checkArgument(nameNodeUri == null); 338 this.namenode = rpcNamenode; 339 dtService = null; 340 } else { 341 Preconditions.checkArgument(nameNodeUri != null, 342 "null URI"); 343 proxyInfo = NameNodeProxiesClient.createProxyWithClientProtocol(conf, 344 nameNodeUri, nnFallbackToSimpleAuth); 345 this.dtService = proxyInfo.getDelegationTokenService(); 346 this.namenode = proxyInfo.getProxy(); 347 } 348 349 String localInterfaces[] = 350 conf.getTrimmedStrings(DFS_CLIENT_LOCAL_INTERFACES); 351 localInterfaceAddrs = getLocalInterfaceAddrs(localInterfaces); 352 if (LOG.isDebugEnabled() && 0 != localInterfaces.length) { 353 LOG.debug("Using local interfaces [" + 354 Joiner.on(',').join(localInterfaces)+ "] with addresses [" + 355 Joiner.on(',').join(localInterfaceAddrs) + "]"); 356 } 357 358 Boolean readDropBehind = 359 (conf.get(DFS_CLIENT_CACHE_DROP_BEHIND_READS) == null) ? 360 null : conf.getBoolean(DFS_CLIENT_CACHE_DROP_BEHIND_READS, false); 361 Long readahead = (conf.get(DFS_CLIENT_CACHE_READAHEAD) == null) ? 362 null : conf.getLong(DFS_CLIENT_CACHE_READAHEAD, 0); 363 Boolean writeDropBehind = 364 (conf.get(DFS_CLIENT_CACHE_DROP_BEHIND_WRITES) == null) ? 365 null : conf.getBoolean(DFS_CLIENT_CACHE_DROP_BEHIND_WRITES, false); 366 this.defaultReadCachingStrategy = 367 new CachingStrategy(readDropBehind, readahead); 368 this.defaultWriteCachingStrategy = 369 new CachingStrategy(writeDropBehind, readahead); 370 this.clientContext = ClientContext.get( 371 conf.get(DFS_CLIENT_CONTEXT, DFS_CLIENT_CONTEXT_DEFAULT), 372 dfsClientConf); 373 374 if (dfsClientConf.getHedgedReadThreadpoolSize() > 0) { 375 this.initThreadsNumForHedgedReads(dfsClientConf.getHedgedReadThreadpoolSize()); 376 } 377 this.saslClient = new SaslDataTransferClient( 378 conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf), 379 TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth); 380 } 381 382 /** 383 * Return the socket addresses to use with each configured 384 * local interface. Local interfaces may be specified by IP 385 * address, IP address range using CIDR notation, interface 386 * name (e.g. eth0) or sub-interface name (e.g. eth0:0). 387 * The socket addresses consist of the IPs for the interfaces 388 * and the ephemeral port (port 0). If an IP, IP range, or 389 * interface name matches an interface with sub-interfaces 390 * only the IP of the interface is used. Sub-interfaces can 391 * be used by specifying them explicitly (by IP or name). 392 * 393 * @return SocketAddresses for the configured local interfaces, 394 * or an empty array if none are configured 395 * @throws UnknownHostException if a given interface name is invalid 396 */ 397 private static SocketAddress[] getLocalInterfaceAddrs( 398 String interfaceNames[]) throws UnknownHostException { 399 List<SocketAddress> localAddrs = new ArrayList<>(); 400 for (String interfaceName : interfaceNames) { 401 if (InetAddresses.isInetAddress(interfaceName)) { 402 localAddrs.add(new InetSocketAddress(interfaceName, 0)); 403 } else if (NetUtils.isValidSubnet(interfaceName)) { 404 for (InetAddress addr : NetUtils.getIPs(interfaceName, false)) { 405 localAddrs.add(new InetSocketAddress(addr, 0)); 406 } 407 } else { 408 for (String ip : DNS.getIPs(interfaceName, false)) { 409 localAddrs.add(new InetSocketAddress(ip, 0)); 410 } 411 } 412 } 413 return localAddrs.toArray(new SocketAddress[localAddrs.size()]); 414 } 415 416 /** 417 * Select one of the configured local interfaces at random. We use a random 418 * interface because other policies like round-robin are less effective 419 * given that we cache connections to datanodes. 420 * 421 * @return one of the local interface addresses at random, or null if no 422 * local interfaces are configured 423 */ 424 SocketAddress getRandomLocalInterfaceAddr() { 425 if (localInterfaceAddrs.length == 0) { 426 return null; 427 } 428 final int idx = r.nextInt(localInterfaceAddrs.length); 429 final SocketAddress addr = localInterfaceAddrs[idx]; 430 LOG.debug("Using local interface {}", addr); 431 return addr; 432 } 433 434 /** 435 * Return the timeout that clients should use when writing to datanodes. 436 * @param numNodes the number of nodes in the pipeline. 437 */ 438 int getDatanodeWriteTimeout(int numNodes) { 439 final int t = dfsClientConf.getDatanodeSocketWriteTimeout(); 440 return t > 0? t + HdfsConstants.WRITE_TIMEOUT_EXTENSION*numNodes: 0; 441 } 442 443 int getDatanodeReadTimeout(int numNodes) { 444 final int t = dfsClientConf.getSocketTimeout(); 445 return t > 0? HdfsConstants.READ_TIMEOUT_EXTENSION*numNodes + t: 0; 446 } 447 448 @VisibleForTesting 449 public String getClientName() { 450 return clientName; 451 } 452 453 void checkOpen() throws IOException { 454 if (!clientRunning) { 455 throw new IOException("Filesystem closed"); 456 } 457 } 458 459 /** Return the lease renewer instance. The renewer thread won't start 460 * until the first output stream is created. The same instance will 461 * be returned until all output streams are closed. 462 */ 463 public LeaseRenewer getLeaseRenewer() { 464 return LeaseRenewer.getInstance(authority, ugi, this); 465 } 466 467 /** Get a lease and start automatic renewal */ 468 private void beginFileLease(final long inodeId, final DFSOutputStream out) 469 throws IOException { 470 getLeaseRenewer().put(inodeId, out, this); 471 } 472 473 /** Stop renewal of lease for the file. */ 474 void endFileLease(final long inodeId) { 475 getLeaseRenewer().closeFile(inodeId, this); 476 } 477 478 479 /** Put a file. Only called from LeaseRenewer, where proper locking is 480 * enforced to consistently update its local dfsclients array and 481 * client's filesBeingWritten map. 482 */ 483 public void putFileBeingWritten(final long inodeId, 484 final DFSOutputStream out) { 485 synchronized(filesBeingWritten) { 486 filesBeingWritten.put(inodeId, out); 487 // update the last lease renewal time only when there was no 488 // writes. once there is one write stream open, the lease renewer 489 // thread keeps it updated well with in anyone's expiration time. 490 if (lastLeaseRenewal == 0) { 491 updateLastLeaseRenewal(); 492 } 493 } 494 } 495 496 /** Remove a file. Only called from LeaseRenewer. */ 497 public void removeFileBeingWritten(final long inodeId) { 498 synchronized(filesBeingWritten) { 499 filesBeingWritten.remove(inodeId); 500 if (filesBeingWritten.isEmpty()) { 501 lastLeaseRenewal = 0; 502 } 503 } 504 } 505 506 /** Is file-being-written map empty? */ 507 public boolean isFilesBeingWrittenEmpty() { 508 synchronized(filesBeingWritten) { 509 return filesBeingWritten.isEmpty(); 510 } 511 } 512 513 /** @return true if the client is running */ 514 public boolean isClientRunning() { 515 return clientRunning; 516 } 517 518 long getLastLeaseRenewal() { 519 return lastLeaseRenewal; 520 } 521 522 void updateLastLeaseRenewal() { 523 synchronized(filesBeingWritten) { 524 if (filesBeingWritten.isEmpty()) { 525 return; 526 } 527 lastLeaseRenewal = Time.monotonicNow(); 528 } 529 } 530 531 /** 532 * Renew leases. 533 * @return true if lease was renewed. May return false if this 534 * client has been closed or has no files open. 535 **/ 536 public boolean renewLease() throws IOException { 537 if (clientRunning && !isFilesBeingWrittenEmpty()) { 538 try { 539 namenode.renewLease(clientName); 540 updateLastLeaseRenewal(); 541 return true; 542 } catch (IOException e) { 543 // Abort if the lease has already expired. 544 final long elapsed = Time.monotonicNow() - getLastLeaseRenewal(); 545 if (elapsed > HdfsConstants.LEASE_HARDLIMIT_PERIOD) { 546 LOG.warn("Failed to renew lease for " + clientName + " for " 547 + (elapsed/1000) + " seconds (>= hard-limit =" 548 + (HdfsConstants.LEASE_HARDLIMIT_PERIOD / 1000) + " seconds.) " 549 + "Closing all files being written ...", e); 550 closeAllFilesBeingWritten(true); 551 } else { 552 // Let the lease renewer handle it and retry. 553 throw e; 554 } 555 } 556 } 557 return false; 558 } 559 560 /** 561 * Close connections the Namenode. 562 */ 563 void closeConnectionToNamenode() { 564 RPC.stopProxy(namenode); 565 } 566 567 /** Close/abort all files being written. */ 568 public void closeAllFilesBeingWritten(final boolean abort) { 569 for(;;) { 570 final long inodeId; 571 final DFSOutputStream out; 572 synchronized(filesBeingWritten) { 573 if (filesBeingWritten.isEmpty()) { 574 return; 575 } 576 inodeId = filesBeingWritten.keySet().iterator().next(); 577 out = filesBeingWritten.remove(inodeId); 578 } 579 if (out != null) { 580 try { 581 if (abort) { 582 out.abort(); 583 } else { 584 out.close(); 585 } 586 } catch(IOException ie) { 587 LOG.error("Failed to " + (abort ? "abort" : "close") + " file: " 588 + out.getSrc() + " with inode: " + inodeId, ie); 589 } 590 } 591 } 592 } 593 594 /** 595 * Close the file system, abandoning all of the leases and files being 596 * created and close connections to the namenode. 597 */ 598 @Override 599 public synchronized void close() throws IOException { 600 if(clientRunning) { 601 closeAllFilesBeingWritten(false); 602 clientRunning = false; 603 getLeaseRenewer().closeClient(this); 604 // close connections to the namenode 605 closeConnectionToNamenode(); 606 } 607 } 608 609 /** 610 * Close all open streams, abandoning all of the leases and files being 611 * created. 612 * @param abort whether streams should be gracefully closed 613 */ 614 public void closeOutputStreams(boolean abort) { 615 if (clientRunning) { 616 closeAllFilesBeingWritten(abort); 617 } 618 } 619 620 /** 621 * @see ClientProtocol#getPreferredBlockSize(String) 622 */ 623 public long getBlockSize(String f) throws IOException { 624 try (TraceScope ignored = newPathTraceScope("getBlockSize", f)) { 625 return namenode.getPreferredBlockSize(f); 626 } catch (IOException ie) { 627 LOG.warn("Problem getting block size", ie); 628 throw ie; 629 } 630 } 631 632 /** 633 * Get server default values for a number of configuration params. 634 * @see ClientProtocol#getServerDefaults() 635 */ 636 public FsServerDefaults getServerDefaults() throws IOException { 637 long now = Time.monotonicNow(); 638 if ((serverDefaults == null) || 639 (now - serverDefaultsLastUpdate > SERVER_DEFAULTS_VALIDITY_PERIOD)) { 640 serverDefaults = namenode.getServerDefaults(); 641 serverDefaultsLastUpdate = now; 642 } 643 assert serverDefaults != null; 644 return serverDefaults; 645 } 646 647 /** 648 * Get a canonical token service name for this client's tokens. Null should 649 * be returned if the client is not using tokens. 650 * @return the token service for the client 651 */ 652 @InterfaceAudience.LimitedPrivate( { "HDFS" }) 653 public String getCanonicalServiceName() { 654 return (dtService != null) ? dtService.toString() : null; 655 } 656 657 /** 658 * @see ClientProtocol#getDelegationToken(Text) 659 */ 660 public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer) 661 throws IOException { 662 assert dtService != null; 663 try (TraceScope ignored = tracer.newScope("getDelegationToken")) { 664 Token<DelegationTokenIdentifier> token = 665 namenode.getDelegationToken(renewer); 666 if (token != null) { 667 token.setService(this.dtService); 668 LOG.info("Created " + DelegationTokenIdentifier.stringifyToken(token)); 669 } else { 670 LOG.info("Cannot get delegation token from " + renewer); 671 } 672 return token; 673 } 674 } 675 676 /** 677 * Renew a delegation token 678 * @param token the token to renew 679 * @return the new expiration time 680 * @throws IOException 681 * @deprecated Use Token.renew instead. 682 */ 683 @Deprecated 684 public long renewDelegationToken(Token<DelegationTokenIdentifier> token) 685 throws IOException { 686 LOG.info("Renewing " + DelegationTokenIdentifier.stringifyToken(token)); 687 try { 688 return token.renew(conf); 689 } catch (InterruptedException ie) { 690 throw new RuntimeException("caught interrupted", ie); 691 } catch (RemoteException re) { 692 throw re.unwrapRemoteException(InvalidToken.class, 693 AccessControlException.class); 694 } 695 } 696 697 /** 698 * Cancel a delegation token 699 * @param token the token to cancel 700 * @throws IOException 701 * @deprecated Use Token.cancel instead. 702 */ 703 @Deprecated 704 public void cancelDelegationToken(Token<DelegationTokenIdentifier> token) 705 throws IOException { 706 LOG.info("Cancelling " + DelegationTokenIdentifier.stringifyToken(token)); 707 try { 708 token.cancel(conf); 709 } catch (InterruptedException ie) { 710 throw new RuntimeException("caught interrupted", ie); 711 } catch (RemoteException re) { 712 throw re.unwrapRemoteException(InvalidToken.class, 713 AccessControlException.class); 714 } 715 } 716 717 @InterfaceAudience.Private 718 public static class Renewer extends TokenRenewer { 719 720 static { 721 //Ensure that HDFS Configuration files are loaded before trying to use 722 // the renewer. 723 HdfsConfiguration.init(); 724 } 725 726 @Override 727 public boolean handleKind(Text kind) { 728 return DelegationTokenIdentifier.HDFS_DELEGATION_KIND.equals(kind); 729 } 730 731 @SuppressWarnings("unchecked") 732 @Override 733 public long renew(Token<?> token, Configuration conf) throws IOException { 734 Token<DelegationTokenIdentifier> delToken = 735 (Token<DelegationTokenIdentifier>) token; 736 ClientProtocol nn = getNNProxy(delToken, conf); 737 try { 738 return nn.renewDelegationToken(delToken); 739 } catch (RemoteException re) { 740 throw re.unwrapRemoteException(InvalidToken.class, 741 AccessControlException.class); 742 } 743 } 744 745 @SuppressWarnings("unchecked") 746 @Override 747 public void cancel(Token<?> token, Configuration conf) throws IOException { 748 Token<DelegationTokenIdentifier> delToken = 749 (Token<DelegationTokenIdentifier>) token; 750 LOG.info("Cancelling " + 751 DelegationTokenIdentifier.stringifyToken(delToken)); 752 ClientProtocol nn = getNNProxy(delToken, conf); 753 try { 754 nn.cancelDelegationToken(delToken); 755 } catch (RemoteException re) { 756 throw re.unwrapRemoteException(InvalidToken.class, 757 AccessControlException.class); 758 } 759 } 760 761 private static ClientProtocol getNNProxy( 762 Token<DelegationTokenIdentifier> token, Configuration conf) 763 throws IOException { 764 URI uri = HAUtilClient.getServiceUriFromToken( 765 HdfsConstants.HDFS_URI_SCHEME, token); 766 if (HAUtilClient.isTokenForLogicalUri(token) && 767 !HAUtilClient.isLogicalUri(conf, uri)) { 768 // If the token is for a logical nameservice, but the configuration 769 // we have disagrees about that, we can't actually renew it. 770 // This can be the case in MR, for example, if the RM doesn't 771 // have all of the HA clusters configured in its configuration. 772 throw new IOException("Unable to map logical nameservice URI '" + 773 uri + "' to a NameNode. Local configuration does not have " + 774 "a failover proxy provider configured."); 775 } 776 777 ProxyAndInfo<ClientProtocol> info = 778 NameNodeProxiesClient.createProxyWithClientProtocol(conf, uri, null); 779 assert info.getDelegationTokenService().equals(token.getService()) : 780 "Returned service '" + info.getDelegationTokenService().toString() + 781 "' doesn't match expected service '" + 782 token.getService().toString() + "'"; 783 784 return info.getProxy(); 785 } 786 787 @Override 788 public boolean isManaged(Token<?> token) throws IOException { 789 return true; 790 } 791 792 } 793 794 /** 795 * Report corrupt blocks that were discovered by the client. 796 * @see ClientProtocol#reportBadBlocks(LocatedBlock[]) 797 */ 798 public void reportBadBlocks(LocatedBlock[] blocks) throws IOException { 799 namenode.reportBadBlocks(blocks); 800 } 801 802 public LocatedBlocks getLocatedBlocks(String src, long start) 803 throws IOException { 804 return getLocatedBlocks(src, start, dfsClientConf.getPrefetchSize()); 805 } 806 807 /* 808 * This is just a wrapper around callGetBlockLocations, but non-static so that 809 * we can stub it out for tests. 810 */ 811 @VisibleForTesting 812 public LocatedBlocks getLocatedBlocks(String src, long start, long length) 813 throws IOException { 814 try (TraceScope ignored = newPathTraceScope("getBlockLocations", src)) { 815 return callGetBlockLocations(namenode, src, start, length); 816 } 817 } 818 819 /** 820 * @see ClientProtocol#getBlockLocations(String, long, long) 821 */ 822 static LocatedBlocks callGetBlockLocations(ClientProtocol namenode, 823 String src, long start, long length) 824 throws IOException { 825 try { 826 return namenode.getBlockLocations(src, start, length); 827 } catch(RemoteException re) { 828 throw re.unwrapRemoteException(AccessControlException.class, 829 FileNotFoundException.class, 830 UnresolvedPathException.class); 831 } 832 } 833 834 /** 835 * Recover a file's lease 836 * @param src a file's path 837 * @return true if the file is already closed 838 * @throws IOException 839 */ 840 boolean recoverLease(String src) throws IOException { 841 checkOpen(); 842 843 try (TraceScope ignored = newPathTraceScope("recoverLease", src)) { 844 return namenode.recoverLease(src, clientName); 845 } catch (RemoteException re) { 846 throw re.unwrapRemoteException(FileNotFoundException.class, 847 AccessControlException.class, 848 UnresolvedPathException.class); 849 } 850 } 851 852 /** 853 * Get block location info about file 854 * 855 * getBlockLocations() returns a list of hostnames that store 856 * data for a specific file region. It returns a set of hostnames 857 * for every block within the indicated region. 858 * 859 * This function is very useful when writing code that considers 860 * data-placement when performing operations. For example, the 861 * MapReduce system tries to schedule tasks on the same machines 862 * as the data-block the task processes. 863 */ 864 public BlockLocation[] getBlockLocations(String src, long start, 865 long length) throws IOException { 866 try (TraceScope ignored = newPathTraceScope("getBlockLocations", src)) { 867 LocatedBlocks blocks = getLocatedBlocks(src, start, length); 868 BlockLocation[] locations = DFSUtilClient.locatedBlocks2Locations(blocks); 869 HdfsBlockLocation[] hdfsLocations = 870 new HdfsBlockLocation[locations.length]; 871 for (int i = 0; i < locations.length; i++) { 872 hdfsLocations[i] = new HdfsBlockLocation(locations[i], blocks.get(i)); 873 } 874 return hdfsLocations; 875 } 876 } 877 878 /** 879 * Get block location information about a list of {@link HdfsBlockLocation}. 880 * Used by {@link DistributedFileSystem#getFileBlockStorageLocations(List)} to 881 * get {@link BlockStorageLocation}s for blocks returned by 882 * {@link DistributedFileSystem#getFileBlockLocations(org.apache.hadoop.fs.FileStatus, long, long)} 883 * . 884 * 885 * This is done by making a round of RPCs to the associated datanodes, asking 886 * the volume of each block replica. The returned array of 887 * {@link BlockStorageLocation} expose this information as a 888 * {@link VolumeId}. 889 * 890 * @param blockLocations 891 * target blocks on which to query volume location information 892 * @return volumeBlockLocations original block array augmented with additional 893 * volume location information for each replica. 894 */ 895 public BlockStorageLocation[] getBlockStorageLocations( 896 List<BlockLocation> blockLocations) throws IOException, 897 UnsupportedOperationException, InvalidBlockTokenException { 898 if (!getConf().isHdfsBlocksMetadataEnabled()) { 899 throw new UnsupportedOperationException("Datanode-side support for " + 900 "getVolumeBlockLocations() must also be enabled in the client " + 901 "configuration."); 902 } 903 // Downcast blockLocations and fetch out required LocatedBlock(s) 904 List<LocatedBlock> blocks = new ArrayList<LocatedBlock>(); 905 for (BlockLocation loc : blockLocations) { 906 if (!(loc instanceof HdfsBlockLocation)) { 907 throw new ClassCastException("DFSClient#getVolumeBlockLocations " + 908 "expected to be passed HdfsBlockLocations"); 909 } 910 HdfsBlockLocation hdfsLoc = (HdfsBlockLocation) loc; 911 blocks.add(hdfsLoc.getLocatedBlock()); 912 } 913 914 // Re-group the LocatedBlocks to be grouped by datanodes, with the values 915 // a list of the LocatedBlocks on the datanode. 916 Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks = 917 new LinkedHashMap<DatanodeInfo, List<LocatedBlock>>(); 918 for (LocatedBlock b : blocks) { 919 for (DatanodeInfo info : b.getLocations()) { 920 if (!datanodeBlocks.containsKey(info)) { 921 datanodeBlocks.put(info, new ArrayList<LocatedBlock>()); 922 } 923 List<LocatedBlock> l = datanodeBlocks.get(info); 924 l.add(b); 925 } 926 } 927 928 // Make RPCs to the datanodes to get volume locations for its replicas 929 TraceScope scope = 930 tracer.newScope("getBlockStorageLocations"); 931 Map<DatanodeInfo, HdfsBlocksMetadata> metadatas; 932 try { 933 metadatas = BlockStorageLocationUtil. 934 queryDatanodesForHdfsBlocksMetadata(conf, datanodeBlocks, 935 getConf().getFileBlockStorageLocationsNumThreads(), 936 getConf().getFileBlockStorageLocationsTimeoutMs(), 937 getConf().isConnectToDnViaHostname(), tracer, scope.getSpanId()); 938 if (LOG.isTraceEnabled()) { 939 LOG.trace("metadata returned: " 940 + Joiner.on("\n").withKeyValueSeparator("=").join(metadatas)); 941 } 942 } finally { 943 scope.close(); 944 } 945 946 // Regroup the returned VolumeId metadata to again be grouped by 947 // LocatedBlock rather than by datanode 948 Map<LocatedBlock, List<VolumeId>> blockVolumeIds = BlockStorageLocationUtil 949 .associateVolumeIdsWithBlocks(blocks, metadatas); 950 951 // Combine original BlockLocations with new VolumeId information 952 BlockStorageLocation[] volumeBlockLocations = BlockStorageLocationUtil 953 .convertToVolumeBlockLocations(blocks, blockVolumeIds); 954 955 return volumeBlockLocations; 956 } 957 958 /** 959 * Decrypts a EDEK by consulting the KeyProvider. 960 */ 961 private KeyVersion decryptEncryptedDataEncryptionKey(FileEncryptionInfo 962 feInfo) throws IOException { 963 try (TraceScope ignored = tracer.newScope("decryptEDEK")) { 964 KeyProvider provider = getKeyProvider(); 965 if (provider == null) { 966 throw new IOException("No KeyProvider is configured, cannot access" + 967 " an encrypted file"); 968 } 969 EncryptedKeyVersion ekv = EncryptedKeyVersion.createForDecryption( 970 feInfo.getKeyName(), feInfo.getEzKeyVersionName(), feInfo.getIV(), 971 feInfo.getEncryptedDataEncryptionKey()); 972 try { 973 KeyProviderCryptoExtension cryptoProvider = KeyProviderCryptoExtension 974 .createKeyProviderCryptoExtension(provider); 975 return cryptoProvider.decryptEncryptedKey(ekv); 976 } catch (GeneralSecurityException e) { 977 throw new IOException(e); 978 } 979 } 980 } 981 982 /** 983 * Obtain the crypto protocol version from the provided FileEncryptionInfo, 984 * checking to see if this version is supported by. 985 * 986 * @param feInfo FileEncryptionInfo 987 * @return CryptoProtocolVersion from the feInfo 988 * @throws IOException if the protocol version is unsupported. 989 */ 990 private static CryptoProtocolVersion getCryptoProtocolVersion( 991 FileEncryptionInfo feInfo) throws IOException { 992 final CryptoProtocolVersion version = feInfo.getCryptoProtocolVersion(); 993 if (!CryptoProtocolVersion.supports(version)) { 994 throw new IOException("Client does not support specified " + 995 "CryptoProtocolVersion " + version.getDescription() + " version " + 996 "number" + version.getVersion()); 997 } 998 return version; 999 } 1000 1001 /** 1002 * Obtain a CryptoCodec based on the CipherSuite set in a FileEncryptionInfo 1003 * and the available CryptoCodecs configured in the Configuration. 1004 * 1005 * @param conf Configuration 1006 * @param feInfo FileEncryptionInfo 1007 * @return CryptoCodec 1008 * @throws IOException if no suitable CryptoCodec for the CipherSuite is 1009 * available. 1010 */ 1011 private static CryptoCodec getCryptoCodec(Configuration conf, 1012 FileEncryptionInfo feInfo) throws IOException { 1013 final CipherSuite suite = feInfo.getCipherSuite(); 1014 if (suite.equals(CipherSuite.UNKNOWN)) { 1015 throw new IOException("NameNode specified unknown CipherSuite with ID " 1016 + suite.getUnknownValue() + ", cannot instantiate CryptoCodec."); 1017 } 1018 final CryptoCodec codec = CryptoCodec.getInstance(conf, suite); 1019 if (codec == null) { 1020 throw new UnknownCipherSuiteException( 1021 "No configuration found for the cipher suite " 1022 + suite.getConfigSuffix() + " prefixed with " 1023 + HADOOP_SECURITY_CRYPTO_CODEC_CLASSES_KEY_PREFIX 1024 + ". Please see the example configuration " 1025 + "hadoop.security.crypto.codec.classes.EXAMPLECIPHERSUITE " 1026 + "at core-default.xml for details."); 1027 } 1028 return codec; 1029 } 1030 1031 /** 1032 * Wraps the stream in a CryptoInputStream if the underlying file is 1033 * encrypted. 1034 */ 1035 public HdfsDataInputStream createWrappedInputStream(DFSInputStream dfsis) 1036 throws IOException { 1037 final FileEncryptionInfo feInfo = dfsis.getFileEncryptionInfo(); 1038 if (feInfo != null) { 1039 // File is encrypted, wrap the stream in a crypto stream. 1040 // Currently only one version, so no special logic based on the version # 1041 getCryptoProtocolVersion(feInfo); 1042 final CryptoCodec codec = getCryptoCodec(conf, feInfo); 1043 final KeyVersion decrypted = decryptEncryptedDataEncryptionKey(feInfo); 1044 final CryptoInputStream cryptoIn = 1045 new CryptoInputStream(dfsis, codec, decrypted.getMaterial(), 1046 feInfo.getIV()); 1047 return new HdfsDataInputStream(cryptoIn); 1048 } else { 1049 // No FileEncryptionInfo so no encryption. 1050 return new HdfsDataInputStream(dfsis); 1051 } 1052 } 1053 1054 /** 1055 * Wraps the stream in a CryptoOutputStream if the underlying file is 1056 * encrypted. 1057 */ 1058 public HdfsDataOutputStream createWrappedOutputStream(DFSOutputStream dfsos, 1059 FileSystem.Statistics statistics) throws IOException { 1060 return createWrappedOutputStream(dfsos, statistics, 0); 1061 } 1062 1063 /** 1064 * Wraps the stream in a CryptoOutputStream if the underlying file is 1065 * encrypted. 1066 */ 1067 public HdfsDataOutputStream createWrappedOutputStream(DFSOutputStream dfsos, 1068 FileSystem.Statistics statistics, long startPos) throws IOException { 1069 final FileEncryptionInfo feInfo = dfsos.getFileEncryptionInfo(); 1070 if (feInfo != null) { 1071 // File is encrypted, wrap the stream in a crypto stream. 1072 // Currently only one version, so no special logic based on the version # 1073 getCryptoProtocolVersion(feInfo); 1074 final CryptoCodec codec = getCryptoCodec(conf, feInfo); 1075 KeyVersion decrypted = decryptEncryptedDataEncryptionKey(feInfo); 1076 final CryptoOutputStream cryptoOut = 1077 new CryptoOutputStream(dfsos, codec, 1078 decrypted.getMaterial(), feInfo.getIV(), startPos); 1079 return new HdfsDataOutputStream(cryptoOut, statistics, startPos); 1080 } else { 1081 // No FileEncryptionInfo present so no encryption. 1082 return new HdfsDataOutputStream(dfsos, statistics, startPos); 1083 } 1084 } 1085 1086 public DFSInputStream open(String src) throws IOException { 1087 return open(src, dfsClientConf.getIoBufferSize(), true); 1088 } 1089 1090 /** 1091 * Create an input stream that obtains a nodelist from the 1092 * namenode, and then reads from all the right places. Creates 1093 * inner subclass of InputStream that does the right out-of-band 1094 * work. 1095 * @deprecated Use {@link #open(String, int, boolean)} instead. 1096 */ 1097 @Deprecated 1098 public DFSInputStream open(String src, int buffersize, boolean verifyChecksum, 1099 FileSystem.Statistics stats) throws IOException { 1100 return open(src, buffersize, verifyChecksum); 1101 } 1102 1103 1104 /** 1105 * Create an input stream that obtains a nodelist from the 1106 * namenode, and then reads from all the right places. Creates 1107 * inner subclass of InputStream that does the right out-of-band 1108 * work. 1109 */ 1110 public DFSInputStream open(String src, int buffersize, boolean verifyChecksum) 1111 throws IOException { 1112 checkOpen(); 1113 // Get block info from namenode 1114 try (TraceScope ignored = newPathTraceScope("newDFSInputStream", src)) { 1115 return new DFSInputStream(this, src, verifyChecksum, null); 1116 } 1117 } 1118 1119 /** 1120 * Get the namenode associated with this DFSClient object 1121 * @return the namenode associated with this DFSClient object 1122 */ 1123 public ClientProtocol getNamenode() { 1124 return namenode; 1125 } 1126 1127 /** 1128 * Call {@link #create(String, boolean, short, long, Progressable)} with 1129 * default <code>replication</code> and <code>blockSize<code> and null <code> 1130 * progress</code>. 1131 */ 1132 public OutputStream create(String src, boolean overwrite) 1133 throws IOException { 1134 return create(src, overwrite, dfsClientConf.getDefaultReplication(), 1135 dfsClientConf.getDefaultBlockSize(), null); 1136 } 1137 1138 /** 1139 * Call {@link #create(String, boolean, short, long, Progressable)} with 1140 * default <code>replication</code> and <code>blockSize<code>. 1141 */ 1142 public OutputStream create(String src, 1143 boolean overwrite, Progressable progress) throws IOException { 1144 return create(src, overwrite, dfsClientConf.getDefaultReplication(), 1145 dfsClientConf.getDefaultBlockSize(), progress); 1146 } 1147 1148 /** 1149 * Call {@link #create(String, boolean, short, long, Progressable)} with 1150 * null <code>progress</code>. 1151 */ 1152 public OutputStream create(String src, boolean overwrite, short replication, 1153 long blockSize) throws IOException { 1154 return create(src, overwrite, replication, blockSize, null); 1155 } 1156 1157 /** 1158 * Call {@link #create(String, boolean, short, long, Progressable, int)} 1159 * with default bufferSize. 1160 */ 1161 public OutputStream create(String src, boolean overwrite, short replication, 1162 long blockSize, Progressable progress) throws IOException { 1163 return create(src, overwrite, replication, blockSize, progress, 1164 dfsClientConf.getIoBufferSize()); 1165 } 1166 1167 /** 1168 * Call {@link #create(String, FsPermission, EnumSet, short, long, 1169 * Progressable, int, ChecksumOpt)} with default <code>permission</code> 1170 * {@link FsPermission#getFileDefault()}. 1171 * 1172 * @param src File name 1173 * @param overwrite overwrite an existing file if true 1174 * @param replication replication factor for the file 1175 * @param blockSize maximum block size 1176 * @param progress interface for reporting client progress 1177 * @param buffersize underlying buffersize 1178 * 1179 * @return output stream 1180 */ 1181 public OutputStream create(String src, boolean overwrite, short replication, 1182 long blockSize, Progressable progress, int buffersize) 1183 throws IOException { 1184 return create(src, FsPermission.getFileDefault(), 1185 overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE) 1186 : EnumSet.of(CreateFlag.CREATE), replication, blockSize, progress, 1187 buffersize, null); 1188 } 1189 1190 /** 1191 * Call {@link #create(String, FsPermission, EnumSet, boolean, short, 1192 * long, Progressable, int, ChecksumOpt)} with <code>createParent</code> 1193 * set to true. 1194 */ 1195 public DFSOutputStream create(String src, FsPermission permission, 1196 EnumSet<CreateFlag> flag, short replication, long blockSize, 1197 Progressable progress, int buffersize, ChecksumOpt checksumOpt) 1198 throws IOException { 1199 return create(src, permission, flag, true, 1200 replication, blockSize, progress, buffersize, checksumOpt, null); 1201 } 1202 1203 /** 1204 * Create a new dfs file with the specified block replication 1205 * with write-progress reporting and return an output stream for writing 1206 * into the file. 1207 * 1208 * @param src File name 1209 * @param permission The permission of the directory being created. 1210 * If null, use default permission 1211 * {@link FsPermission#getFileDefault()} 1212 * @param flag indicates create a new file or create/overwrite an 1213 * existing file or append to an existing file 1214 * @param createParent create missing parent directory if true 1215 * @param replication block replication 1216 * @param blockSize maximum block size 1217 * @param progress interface for reporting client progress 1218 * @param buffersize underlying buffer size 1219 * @param checksumOpt checksum options 1220 * 1221 * @return output stream 1222 * 1223 * @see ClientProtocol#create for detailed description of exceptions thrown 1224 */ 1225 public DFSOutputStream create(String src, FsPermission permission, 1226 EnumSet<CreateFlag> flag, boolean createParent, short replication, 1227 long blockSize, Progressable progress, int buffersize, 1228 ChecksumOpt checksumOpt) throws IOException { 1229 return create(src, permission, flag, createParent, replication, blockSize, 1230 progress, buffersize, checksumOpt, null); 1231 } 1232 1233 private FsPermission applyUMask(FsPermission permission) { 1234 if (permission == null) { 1235 permission = FsPermission.getFileDefault(); 1236 } 1237 return permission.applyUMask(dfsClientConf.getUMask()); 1238 } 1239 1240 /** 1241 * Same as {@link #create(String, FsPermission, EnumSet, boolean, short, long, 1242 * Progressable, int, ChecksumOpt)} with the addition of favoredNodes that is 1243 * a hint to where the namenode should place the file blocks. 1244 * The favored nodes hint is not persisted in HDFS. Hence it may be honored 1245 * at the creation time only. HDFS could move the blocks during balancing or 1246 * replication, to move the blocks from favored nodes. A value of null means 1247 * no favored nodes for this create 1248 */ 1249 public DFSOutputStream create(String src, FsPermission permission, 1250 EnumSet<CreateFlag> flag, boolean createParent, short replication, 1251 long blockSize, Progressable progress, int buffersize, 1252 ChecksumOpt checksumOpt, InetSocketAddress[] favoredNodes) 1253 throws IOException { 1254 checkOpen(); 1255 final FsPermission masked = applyUMask(permission); 1256 LOG.debug("{}: masked={}", src, masked); 1257 final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this, 1258 src, masked, flag, createParent, replication, blockSize, progress, 1259 dfsClientConf.createChecksum(checksumOpt), 1260 getFavoredNodesStr(favoredNodes)); 1261 beginFileLease(result.getFileId(), result); 1262 return result; 1263 } 1264 1265 private String[] getFavoredNodesStr(InetSocketAddress[] favoredNodes) { 1266 String[] favoredNodeStrs = null; 1267 if (favoredNodes != null) { 1268 favoredNodeStrs = new String[favoredNodes.length]; 1269 for (int i = 0; i < favoredNodes.length; i++) { 1270 favoredNodeStrs[i] = 1271 favoredNodes[i].getHostName() + ":" + favoredNodes[i].getPort(); 1272 } 1273 } 1274 return favoredNodeStrs; 1275 } 1276 1277 /** 1278 * Append to an existing file if {@link CreateFlag#APPEND} is present 1279 */ 1280 private DFSOutputStream primitiveAppend(String src, EnumSet<CreateFlag> flag, 1281 Progressable progress) throws IOException { 1282 if (flag.contains(CreateFlag.APPEND)) { 1283 HdfsFileStatus stat = getFileInfo(src); 1284 if (stat == null) { // No file to append to 1285 // New file needs to be created if create option is present 1286 if (!flag.contains(CreateFlag.CREATE)) { 1287 throw new FileNotFoundException( 1288 "failed to append to non-existent file " + src + " on client " 1289 + clientName); 1290 } 1291 return null; 1292 } 1293 return callAppend(src, flag, progress, null); 1294 } 1295 return null; 1296 } 1297 1298 /** 1299 * Same as {{@link #create(String, FsPermission, EnumSet, short, long, 1300 * Progressable, int, ChecksumOpt)} except that the permission 1301 * is absolute (ie has already been masked with umask. 1302 */ 1303 public DFSOutputStream primitiveCreate(String src, FsPermission absPermission, 1304 EnumSet<CreateFlag> flag, boolean createParent, short replication, 1305 long blockSize, Progressable progress, int buffersize, 1306 ChecksumOpt checksumOpt) throws IOException { 1307 checkOpen(); 1308 CreateFlag.validate(flag); 1309 DFSOutputStream result = primitiveAppend(src, flag, progress); 1310 if (result == null) { 1311 DataChecksum checksum = dfsClientConf.createChecksum(checksumOpt); 1312 result = DFSOutputStream.newStreamForCreate(this, src, absPermission, 1313 flag, createParent, replication, blockSize, progress, checksum, null); 1314 } 1315 beginFileLease(result.getFileId(), result); 1316 return result; 1317 } 1318 1319 /** 1320 * Creates a symbolic link. 1321 * 1322 * @see ClientProtocol#createSymlink(String, String,FsPermission, boolean) 1323 */ 1324 public void createSymlink(String target, String link, boolean createParent) 1325 throws IOException { 1326 try (TraceScope ignored = newPathTraceScope("createSymlink", target)) { 1327 final FsPermission dirPerm = applyUMask(null); 1328 namenode.createSymlink(target, link, dirPerm, createParent); 1329 } catch (RemoteException re) { 1330 throw re.unwrapRemoteException(AccessControlException.class, 1331 FileAlreadyExistsException.class, 1332 FileNotFoundException.class, 1333 ParentNotDirectoryException.class, 1334 NSQuotaExceededException.class, 1335 DSQuotaExceededException.class, 1336 QuotaByStorageTypeExceededException.class, 1337 UnresolvedPathException.class, 1338 SnapshotAccessControlException.class); 1339 } 1340 } 1341 1342 /** 1343 * Resolve the *first* symlink, if any, in the path. 1344 * 1345 * @see ClientProtocol#getLinkTarget(String) 1346 */ 1347 public String getLinkTarget(String path) throws IOException { 1348 checkOpen(); 1349 try (TraceScope ignored = newPathTraceScope("getLinkTarget", path)) { 1350 return namenode.getLinkTarget(path); 1351 } catch (RemoteException re) { 1352 throw re.unwrapRemoteException(AccessControlException.class, 1353 FileNotFoundException.class); 1354 } 1355 } 1356 1357 /** 1358 * Invoke namenode append RPC. 1359 * It retries in case of {@link BlockNotYetCompleteException}. 1360 */ 1361 private LastBlockWithStatus callAppend(String src, 1362 EnumSetWritable<CreateFlag> flag) throws IOException { 1363 final long startTime = Time.monotonicNow(); 1364 for(;;) { 1365 try { 1366 return namenode.append(src, clientName, flag); 1367 } catch(RemoteException re) { 1368 if (Time.monotonicNow() - startTime > 5000 1369 || !RetriableException.class.getName().equals( 1370 re.getClassName())) { 1371 throw re; 1372 } 1373 1374 try { // sleep and retry 1375 Thread.sleep(500); 1376 } catch (InterruptedException e) { 1377 throw DFSUtilClient.toInterruptedIOException("callAppend", e); 1378 } 1379 } 1380 } 1381 } 1382 1383 /** Method to get stream returned by append call */ 1384 private DFSOutputStream callAppend(String src, EnumSet<CreateFlag> flag, 1385 Progressable progress, String[] favoredNodes) throws IOException { 1386 CreateFlag.validateForAppend(flag); 1387 try { 1388 final LastBlockWithStatus blkWithStatus = callAppend(src, 1389 new EnumSetWritable<>(flag, CreateFlag.class)); 1390 HdfsFileStatus status = blkWithStatus.getFileStatus(); 1391 if (status == null) { 1392 LOG.debug("NameNode is on an older version, request file " + 1393 "info with additional RPC call for file: {}", src); 1394 status = getFileInfo(src); 1395 } 1396 return DFSOutputStream.newStreamForAppend(this, src, flag, progress, 1397 blkWithStatus.getLastBlock(), status, 1398 dfsClientConf.createChecksum(null), favoredNodes); 1399 } catch(RemoteException re) { 1400 throw re.unwrapRemoteException(AccessControlException.class, 1401 FileNotFoundException.class, 1402 SafeModeException.class, 1403 DSQuotaExceededException.class, 1404 QuotaByStorageTypeExceededException.class, 1405 UnsupportedOperationException.class, 1406 UnresolvedPathException.class, 1407 SnapshotAccessControlException.class); 1408 } 1409 } 1410 1411 /** 1412 * Append to an existing HDFS file. 1413 * 1414 * @param src file name 1415 * @param buffersize buffer size 1416 * @param flag indicates whether to append data to a new block instead of 1417 * the last block 1418 * @param progress for reporting write-progress; null is acceptable. 1419 * @param statistics file system statistics; null is acceptable. 1420 * @return an output stream for writing into the file 1421 * 1422 * @see ClientProtocol#append(String, String, EnumSetWritable) 1423 */ 1424 public HdfsDataOutputStream append(final String src, final int buffersize, 1425 EnumSet<CreateFlag> flag, final Progressable progress, 1426 final FileSystem.Statistics statistics) throws IOException { 1427 final DFSOutputStream out = append(src, buffersize, flag, null, progress); 1428 return createWrappedOutputStream(out, statistics, out.getInitialLen()); 1429 } 1430 1431 /** 1432 * Append to an existing HDFS file. 1433 * 1434 * @param src file name 1435 * @param buffersize buffer size 1436 * @param flag indicates whether to append data to a new block instead of the 1437 * last block 1438 * @param progress for reporting write-progress; null is acceptable. 1439 * @param statistics file system statistics; null is acceptable. 1440 * @param favoredNodes FavoredNodes for new blocks 1441 * @return an output stream for writing into the file 1442 * @see ClientProtocol#append(String, String, EnumSetWritable) 1443 */ 1444 public HdfsDataOutputStream append(final String src, final int buffersize, 1445 EnumSet<CreateFlag> flag, final Progressable progress, 1446 final FileSystem.Statistics statistics, 1447 final InetSocketAddress[] favoredNodes) throws IOException { 1448 final DFSOutputStream out = append(src, buffersize, flag, 1449 getFavoredNodesStr(favoredNodes), progress); 1450 return createWrappedOutputStream(out, statistics, out.getInitialLen()); 1451 } 1452 1453 private DFSOutputStream append(String src, int buffersize, 1454 EnumSet<CreateFlag> flag, String[] favoredNodes, Progressable progress) 1455 throws IOException { 1456 checkOpen(); 1457 final DFSOutputStream result = callAppend(src, flag, progress, 1458 favoredNodes); 1459 beginFileLease(result.getFileId(), result); 1460 return result; 1461 } 1462 1463 /** 1464 * Set replication for an existing file. 1465 * @param src file name 1466 * @param replication replication to set the file to 1467 * 1468 * @see ClientProtocol#setReplication(String, short) 1469 */ 1470 public boolean setReplication(String src, short replication) 1471 throws IOException { 1472 try (TraceScope ignored = newPathTraceScope("setReplication", src)) { 1473 return namenode.setReplication(src, replication); 1474 } catch (RemoteException re) { 1475 throw re.unwrapRemoteException(AccessControlException.class, 1476 FileNotFoundException.class, 1477 SafeModeException.class, 1478 DSQuotaExceededException.class, 1479 QuotaByStorageTypeExceededException.class, 1480 UnresolvedPathException.class, 1481 SnapshotAccessControlException.class); 1482 } 1483 } 1484 1485 /** 1486 * Set storage policy for an existing file/directory 1487 * @param src file/directory name 1488 * @param policyName name of the storage policy 1489 */ 1490 public void setStoragePolicy(String src, String policyName) 1491 throws IOException { 1492 try (TraceScope ignored = newPathTraceScope("setStoragePolicy", src)) { 1493 namenode.setStoragePolicy(src, policyName); 1494 } catch (RemoteException e) { 1495 throw e.unwrapRemoteException(AccessControlException.class, 1496 FileNotFoundException.class, 1497 SafeModeException.class, 1498 NSQuotaExceededException.class, 1499 UnresolvedPathException.class, 1500 SnapshotAccessControlException.class); 1501 } 1502 } 1503 1504 /** 1505 * Unset storage policy set for a given file/directory. 1506 * @param src file/directory name 1507 */ 1508 public void unsetStoragePolicy(String src) throws IOException { 1509 checkOpen(); 1510 try (TraceScope ignored = newPathTraceScope("unsetStoragePolicy", src)) { 1511 namenode.unsetStoragePolicy(src); 1512 } catch (RemoteException e) { 1513 throw e.unwrapRemoteException(AccessControlException.class, 1514 FileNotFoundException.class, 1515 SafeModeException.class, 1516 NSQuotaExceededException.class, 1517 UnresolvedPathException.class, 1518 SnapshotAccessControlException.class); 1519 } 1520 } 1521 1522 /** 1523 * @param path file/directory name 1524 * @return Get the storage policy for specified path 1525 */ 1526 public BlockStoragePolicy getStoragePolicy(String path) throws IOException { 1527 checkOpen(); 1528 try (TraceScope ignored = newPathTraceScope("getStoragePolicy", path)) { 1529 return namenode.getStoragePolicy(path); 1530 } catch (RemoteException e) { 1531 throw e.unwrapRemoteException(AccessControlException.class, 1532 FileNotFoundException.class, 1533 SafeModeException.class, 1534 UnresolvedPathException.class); 1535 } 1536 } 1537 1538 /** 1539 * @return All the existing storage policies 1540 */ 1541 public BlockStoragePolicy[] getStoragePolicies() throws IOException { 1542 try (TraceScope ignored = tracer.newScope("getStoragePolicies")) { 1543 return namenode.getStoragePolicies(); 1544 } 1545 } 1546 1547 /** 1548 * Rename file or directory. 1549 * @see ClientProtocol#rename(String, String) 1550 * @deprecated Use {@link #rename(String, String, Options.Rename...)} instead. 1551 */ 1552 @Deprecated 1553 public boolean rename(String src, String dst) throws IOException { 1554 checkOpen(); 1555 try (TraceScope ignored = newSrcDstTraceScope("rename", src, dst)) { 1556 return namenode.rename(src, dst); 1557 } catch (RemoteException re) { 1558 throw re.unwrapRemoteException(AccessControlException.class, 1559 NSQuotaExceededException.class, 1560 DSQuotaExceededException.class, 1561 QuotaByStorageTypeExceededException.class, 1562 UnresolvedPathException.class, 1563 SnapshotAccessControlException.class); 1564 } 1565 } 1566 1567 /** 1568 * Move blocks from src to trg and delete src 1569 * See {@link ClientProtocol#concat}. 1570 */ 1571 public void concat(String trg, String [] srcs) throws IOException { 1572 checkOpen(); 1573 try (TraceScope ignored = tracer.newScope("concat")) { 1574 namenode.concat(trg, srcs); 1575 } catch (RemoteException re) { 1576 throw re.unwrapRemoteException(AccessControlException.class, 1577 UnresolvedPathException.class, 1578 SnapshotAccessControlException.class); 1579 } 1580 } 1581 /** 1582 * Rename file or directory. 1583 * @see ClientProtocol#rename2(String, String, Options.Rename...) 1584 */ 1585 public void rename(String src, String dst, Options.Rename... options) 1586 throws IOException { 1587 checkOpen(); 1588 try (TraceScope ignored = newSrcDstTraceScope("rename2", src, dst)) { 1589 namenode.rename2(src, dst, options); 1590 } catch (RemoteException re) { 1591 throw re.unwrapRemoteException(AccessControlException.class, 1592 DSQuotaExceededException.class, 1593 QuotaByStorageTypeExceededException.class, 1594 FileAlreadyExistsException.class, 1595 FileNotFoundException.class, 1596 ParentNotDirectoryException.class, 1597 SafeModeException.class, 1598 NSQuotaExceededException.class, 1599 UnresolvedPathException.class, 1600 SnapshotAccessControlException.class); 1601 } 1602 } 1603 1604 /** 1605 * Truncate a file to an indicated size 1606 * See {@link ClientProtocol#truncate}. 1607 */ 1608 public boolean truncate(String src, long newLength) throws IOException { 1609 checkOpen(); 1610 if (newLength < 0) { 1611 throw new HadoopIllegalArgumentException( 1612 "Cannot truncate to a negative file size: " + newLength + "."); 1613 } 1614 try (TraceScope ignored = newPathTraceScope("truncate", src)) { 1615 return namenode.truncate(src, newLength, clientName); 1616 } catch (RemoteException re) { 1617 throw re.unwrapRemoteException(AccessControlException.class, 1618 UnresolvedPathException.class); 1619 } 1620 } 1621 1622 /** 1623 * Delete file or directory. 1624 * See {@link ClientProtocol#delete(String, boolean)}. 1625 */ 1626 @Deprecated 1627 public boolean delete(String src) throws IOException { 1628 checkOpen(); 1629 return delete(src, true); 1630 } 1631 1632 /** 1633 * delete file or directory. 1634 * delete contents of the directory if non empty and recursive 1635 * set to true 1636 * 1637 * @see ClientProtocol#delete(String, boolean) 1638 */ 1639 public boolean delete(String src, boolean recursive) throws IOException { 1640 checkOpen(); 1641 try (TraceScope ignored = newPathTraceScope("delete", src)) { 1642 return namenode.delete(src, recursive); 1643 } catch (RemoteException re) { 1644 throw re.unwrapRemoteException(AccessControlException.class, 1645 FileNotFoundException.class, 1646 SafeModeException.class, 1647 UnresolvedPathException.class, 1648 SnapshotAccessControlException.class); 1649 } 1650 } 1651 1652 /** Implemented using getFileInfo(src) 1653 */ 1654 public boolean exists(String src) throws IOException { 1655 checkOpen(); 1656 return getFileInfo(src) != null; 1657 } 1658 1659 /** 1660 * Get a partial listing of the indicated directory 1661 * No block locations need to be fetched 1662 */ 1663 public DirectoryListing listPaths(String src, byte[] startAfter) 1664 throws IOException { 1665 return listPaths(src, startAfter, false); 1666 } 1667 1668 /** 1669 * Get a partial listing of the indicated directory 1670 * 1671 * Recommend to use HdfsFileStatus.EMPTY_NAME as startAfter 1672 * if the application wants to fetch a listing starting from 1673 * the first entry in the directory 1674 * 1675 * @see ClientProtocol#getListing(String, byte[], boolean) 1676 */ 1677 public DirectoryListing listPaths(String src, byte[] startAfter, 1678 boolean needLocation) throws IOException { 1679 checkOpen(); 1680 try (TraceScope ignored = newPathTraceScope("listPaths", src)) { 1681 return namenode.getListing(src, startAfter, needLocation); 1682 } catch (RemoteException re) { 1683 throw re.unwrapRemoteException(AccessControlException.class, 1684 FileNotFoundException.class, 1685 UnresolvedPathException.class); 1686 } 1687 } 1688 1689 /** 1690 * Get the file info for a specific file or directory. 1691 * @param src The string representation of the path to the file 1692 * @return object containing information regarding the file 1693 * or null if file not found 1694 * 1695 * @see ClientProtocol#getFileInfo(String) for description of exceptions 1696 */ 1697 public HdfsFileStatus getFileInfo(String src) throws IOException { 1698 checkOpen(); 1699 try (TraceScope ignored = newPathTraceScope("getFileInfo", src)) { 1700 return namenode.getFileInfo(src); 1701 } catch (RemoteException re) { 1702 throw re.unwrapRemoteException(AccessControlException.class, 1703 FileNotFoundException.class, 1704 UnresolvedPathException.class); 1705 } 1706 } 1707 1708 /** 1709 * Close status of a file 1710 * @return true if file is already closed 1711 */ 1712 public boolean isFileClosed(String src) throws IOException{ 1713 checkOpen(); 1714 try (TraceScope ignored = newPathTraceScope("isFileClosed", src)) { 1715 return namenode.isFileClosed(src); 1716 } catch (RemoteException re) { 1717 throw re.unwrapRemoteException(AccessControlException.class, 1718 FileNotFoundException.class, 1719 UnresolvedPathException.class); 1720 } 1721 } 1722 1723 /** 1724 * Get the file info for a specific file or directory. If src 1725 * refers to a symlink then the FileStatus of the link is returned. 1726 * @param src path to a file or directory. 1727 * 1728 * For description of exceptions thrown 1729 * @see ClientProtocol#getFileLinkInfo(String) 1730 */ 1731 public HdfsFileStatus getFileLinkInfo(String src) throws IOException { 1732 checkOpen(); 1733 try (TraceScope ignored = newPathTraceScope("getFileLinkInfo", src)) { 1734 return namenode.getFileLinkInfo(src); 1735 } catch (RemoteException re) { 1736 throw re.unwrapRemoteException(AccessControlException.class, 1737 UnresolvedPathException.class); 1738 } 1739 } 1740 1741 @InterfaceAudience.Private 1742 public void clearDataEncryptionKey() { 1743 LOG.debug("Clearing encryption key"); 1744 synchronized (this) { 1745 encryptionKey = null; 1746 } 1747 } 1748 1749 /** 1750 * @return true if data sent between this client and DNs should be encrypted, 1751 * false otherwise. 1752 * @throws IOException in the event of error communicating with the NN 1753 */ 1754 boolean shouldEncryptData() throws IOException { 1755 FsServerDefaults d = getServerDefaults(); 1756 return d != null && d.getEncryptDataTransfer(); 1757 } 1758 1759 @Override 1760 public DataEncryptionKey newDataEncryptionKey() throws IOException { 1761 if (shouldEncryptData()) { 1762 synchronized (this) { 1763 if (encryptionKey == null || 1764 encryptionKey.expiryDate < Time.now()) { 1765 LOG.debug("Getting new encryption token from NN"); 1766 encryptionKey = namenode.getDataEncryptionKey(); 1767 } 1768 return encryptionKey; 1769 } 1770 } else { 1771 return null; 1772 } 1773 } 1774 1775 @VisibleForTesting 1776 public DataEncryptionKey getEncryptionKey() { 1777 return encryptionKey; 1778 } 1779 1780 /** 1781 * Get the checksum of the whole file of a range of the file. Note that the 1782 * range always starts from the beginning of the file. 1783 * @param src The file path 1784 * @param length the length of the range, i.e., the range is [0, length] 1785 * @return The checksum 1786 * @see DistributedFileSystem#getFileChecksum(Path) 1787 */ 1788 public MD5MD5CRC32FileChecksum getFileChecksum(String src, long length) 1789 throws IOException { 1790 checkOpen(); 1791 Preconditions.checkArgument(length >= 0); 1792 //get block locations for the file range 1793 LocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0, 1794 length); 1795 if (null == blockLocations) { 1796 throw new FileNotFoundException("File does not exist: " + src); 1797 } 1798 if (blockLocations.isUnderConstruction()) { 1799 throw new IOException("Fail to get checksum, since file " + src 1800 + " is under construction."); 1801 } 1802 List<LocatedBlock> locatedblocks = blockLocations.getLocatedBlocks(); 1803 final DataOutputBuffer md5out = new DataOutputBuffer(); 1804 int bytesPerCRC = -1; 1805 DataChecksum.Type crcType = DataChecksum.Type.DEFAULT; 1806 long crcPerBlock = 0; 1807 boolean refetchBlocks = false; 1808 int lastRetriedIndex = -1; 1809 1810 // get block checksum for each block 1811 long remaining = length; 1812 if (src.contains(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR_SEPARATOR)) { 1813 remaining = Math.min(length, blockLocations.getFileLength()); 1814 } 1815 for(int i = 0; i < locatedblocks.size() && remaining > 0; i++) { 1816 if (refetchBlocks) { // refetch to get fresh tokens 1817 blockLocations = callGetBlockLocations(namenode, src, 0, length); 1818 if (null == blockLocations) { 1819 throw new FileNotFoundException("File does not exist: " + src); 1820 } 1821 if (blockLocations.isUnderConstruction()) { 1822 throw new IOException("Fail to get checksum, since file " + src 1823 + " is under construction."); 1824 } 1825 locatedblocks = blockLocations.getLocatedBlocks(); 1826 refetchBlocks = false; 1827 } 1828 LocatedBlock lb = locatedblocks.get(i); 1829 final ExtendedBlock block = lb.getBlock(); 1830 if (remaining < block.getNumBytes()) { 1831 block.setNumBytes(remaining); 1832 } 1833 remaining -= block.getNumBytes(); 1834 final DatanodeInfo[] datanodes = lb.getLocations(); 1835 1836 //try each datanode location of the block 1837 final int timeout = 3000 * datanodes.length + 1838 dfsClientConf.getSocketTimeout(); 1839 boolean done = false; 1840 for(int j = 0; !done && j < datanodes.length; j++) { 1841 DataOutputStream out = null; 1842 DataInputStream in = null; 1843 1844 try { 1845 //connect to a datanode 1846 IOStreamPair pair = connectToDN(datanodes[j], timeout, lb); 1847 out = new DataOutputStream(new BufferedOutputStream(pair.out, 1848 smallBufferSize)); 1849 in = new DataInputStream(pair.in); 1850 1851 LOG.debug("write to {}: {}, block={}", 1852 datanodes[j], Op.BLOCK_CHECKSUM, block); 1853 // get block MD5 1854 new Sender(out).blockChecksum(block, lb.getBlockToken()); 1855 1856 final BlockOpResponseProto reply = 1857 BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(in)); 1858 1859 String logInfo = "for block " + block + " from datanode " + 1860 datanodes[j]; 1861 DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo); 1862 1863 OpBlockChecksumResponseProto checksumData = 1864 reply.getChecksumResponse(); 1865 1866 //read byte-per-checksum 1867 final int bpc = checksumData.getBytesPerCrc(); 1868 if (i == 0) { //first block 1869 bytesPerCRC = bpc; 1870 } 1871 else if (bpc != bytesPerCRC) { 1872 throw new IOException("Byte-per-checksum not matched: bpc=" + bpc 1873 + " but bytesPerCRC=" + bytesPerCRC); 1874 } 1875 1876 //read crc-per-block 1877 final long cpb = checksumData.getCrcPerBlock(); 1878 if (locatedblocks.size() > 1 && i == 0) { 1879 crcPerBlock = cpb; 1880 } 1881 1882 //read md5 1883 final MD5Hash md5 = new MD5Hash( 1884 checksumData.getMd5().toByteArray()); 1885 md5.write(md5out); 1886 1887 // read crc-type 1888 final DataChecksum.Type ct; 1889 if (checksumData.hasCrcType()) { 1890 ct = PBHelperClient.convert(checksumData 1891 .getCrcType()); 1892 } else { 1893 LOG.debug("Retrieving checksum from an earlier-version DataNode: " + 1894 "inferring checksum by reading first byte"); 1895 ct = inferChecksumTypeByReading(lb, datanodes[j]); 1896 } 1897 1898 if (i == 0) { // first block 1899 crcType = ct; 1900 } else if (crcType != DataChecksum.Type.MIXED 1901 && crcType != ct) { 1902 // if crc types are mixed in a file 1903 crcType = DataChecksum.Type.MIXED; 1904 } 1905 1906 done = true; 1907 1908 if (LOG.isDebugEnabled()) { 1909 if (i == 0) { 1910 LOG.debug("set bytesPerCRC=" + bytesPerCRC 1911 + ", crcPerBlock=" + crcPerBlock); 1912 } 1913 LOG.debug("got reply from " + datanodes[j] + ": md5=" + md5); 1914 } 1915 } catch (InvalidBlockTokenException ibte) { 1916 if (i > lastRetriedIndex) { 1917 LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM " 1918 + "for file {} for block {} from datanode {}. Will retry " 1919 + "the block once.", 1920 src, block, datanodes[j]); 1921 lastRetriedIndex = i; 1922 done = true; // actually it's not done; but we'll retry 1923 i--; // repeat at i-th block 1924 refetchBlocks = true; 1925 break; 1926 } 1927 } catch (IOException ie) { 1928 LOG.warn("src=" + src + ", datanodes["+j+"]=" + datanodes[j], ie); 1929 } finally { 1930 IOUtils.closeStream(in); 1931 IOUtils.closeStream(out); 1932 } 1933 } 1934 1935 if (!done) { 1936 throw new IOException("Fail to get block MD5 for " + block); 1937 } 1938 } 1939 1940 //compute file MD5 1941 final MD5Hash fileMD5 = MD5Hash.digest(md5out.getData()); 1942 switch (crcType) { 1943 case CRC32: 1944 return new MD5MD5CRC32GzipFileChecksum(bytesPerCRC, 1945 crcPerBlock, fileMD5); 1946 case CRC32C: 1947 return new MD5MD5CRC32CastagnoliFileChecksum(bytesPerCRC, 1948 crcPerBlock, fileMD5); 1949 default: 1950 // If there is no block allocated for the file, 1951 // return one with the magic entry that matches what previous 1952 // hdfs versions return. 1953 if (locatedblocks.size() == 0) { 1954 return new MD5MD5CRC32GzipFileChecksum(0, 0, fileMD5); 1955 } 1956 1957 // we should never get here since the validity was checked 1958 // when getCrcType() was called above. 1959 return null; 1960 } 1961 } 1962 1963 /** 1964 * Connect to the given datanode's datantrasfer port, and return 1965 * the resulting IOStreamPair. This includes encryption wrapping, etc. 1966 */ 1967 private IOStreamPair connectToDN(DatanodeInfo dn, int timeout, 1968 LocatedBlock lb) throws IOException { 1969 boolean success = false; 1970 Socket sock = null; 1971 try { 1972 sock = socketFactory.createSocket(); 1973 String dnAddr = dn.getXferAddr(getConf().isConnectToDnViaHostname()); 1974 LOG.debug("Connecting to datanode {}", dnAddr); 1975 NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr), timeout); 1976 sock.setTcpNoDelay(dfsClientConf.getDataTransferTcpNoDelay()); 1977 sock.setSoTimeout(timeout); 1978 1979 OutputStream unbufOut = NetUtils.getOutputStream(sock); 1980 InputStream unbufIn = NetUtils.getInputStream(sock); 1981 IOStreamPair ret = saslClient.newSocketSend(sock, unbufOut, unbufIn, this, 1982 lb.getBlockToken(), dn); 1983 success = true; 1984 return ret; 1985 } finally { 1986 if (!success) { 1987 IOUtils.closeSocket(sock); 1988 } 1989 } 1990 } 1991 1992 /** 1993 * Infer the checksum type for a replica by sending an OP_READ_BLOCK 1994 * for the first byte of that replica. This is used for compatibility 1995 * with older HDFS versions which did not include the checksum type in 1996 * OpBlockChecksumResponseProto. 1997 * 1998 * @param lb the located block 1999 * @param dn the connected datanode 2000 * @return the inferred checksum type 2001 * @throws IOException if an error occurs 2002 */ 2003 private Type inferChecksumTypeByReading(LocatedBlock lb, DatanodeInfo dn) 2004 throws IOException { 2005 IOStreamPair pair = connectToDN(dn, dfsClientConf.getSocketTimeout(), lb); 2006 2007 try { 2008 DataOutputStream out = new DataOutputStream( 2009 new BufferedOutputStream(pair.out, smallBufferSize)); 2010 DataInputStream in = new DataInputStream(pair.in); 2011 2012 new Sender(out).readBlock(lb.getBlock(), lb.getBlockToken(), clientName, 2013 0, 1, true, CachingStrategy.newDefaultStrategy()); 2014 final BlockOpResponseProto reply = 2015 BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(in)); 2016 String logInfo = "trying to read " + lb.getBlock() + " from datanode " + 2017 dn; 2018 DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo); 2019 2020 return PBHelperClient.convert( 2021 reply.getReadOpChecksumInfo().getChecksum().getType()); 2022 } finally { 2023 IOUtilsClient.cleanup(null, pair.in, pair.out); 2024 } 2025 } 2026 2027 /** 2028 * Set permissions to a file or directory. 2029 * @param src path name. 2030 * @param permission permission to set to 2031 * 2032 * @see ClientProtocol#setPermission(String, FsPermission) 2033 */ 2034 public void setPermission(String src, FsPermission permission) 2035 throws IOException { 2036 checkOpen(); 2037 try (TraceScope ignored = newPathTraceScope("setPermission", src)) { 2038 namenode.setPermission(src, permission); 2039 } catch (RemoteException re) { 2040 throw re.unwrapRemoteException(AccessControlException.class, 2041 FileNotFoundException.class, 2042 SafeModeException.class, 2043 UnresolvedPathException.class, 2044 SnapshotAccessControlException.class); 2045 } 2046 } 2047 2048 /** 2049 * Set file or directory owner. 2050 * @param src path name. 2051 * @param username user id. 2052 * @param groupname user group. 2053 * 2054 * @see ClientProtocol#setOwner(String, String, String) 2055 */ 2056 public void setOwner(String src, String username, String groupname) 2057 throws IOException { 2058 checkOpen(); 2059 try (TraceScope ignored = newPathTraceScope("setOwner", src)) { 2060 namenode.setOwner(src, username, groupname); 2061 } catch (RemoteException re) { 2062 throw re.unwrapRemoteException(AccessControlException.class, 2063 FileNotFoundException.class, 2064 SafeModeException.class, 2065 UnresolvedPathException.class, 2066 SnapshotAccessControlException.class); 2067 } 2068 } 2069 2070 private long getStateByIndex(int stateIndex) throws IOException { 2071 checkOpen(); 2072 try (TraceScope ignored = tracer.newScope("getStats")) { 2073 long[] states = namenode.getStats(); 2074 return states.length > stateIndex ? states[stateIndex] : -1; 2075 } 2076 } 2077 2078 /** 2079 * @see ClientProtocol#getStats() 2080 */ 2081 public FsStatus getDiskStatus() throws IOException { 2082 return new FsStatus(getStateByIndex(0), 2083 getStateByIndex(1), getStateByIndex(2)); 2084 } 2085 2086 /** 2087 * Returns count of blocks with no good replicas left. Normally should be 2088 * zero. 2089 * @throws IOException 2090 */ 2091 public long getMissingBlocksCount() throws IOException { 2092 return getStateByIndex(ClientProtocol. 2093 GET_STATS_MISSING_BLOCKS_IDX); 2094 } 2095 2096 /** 2097 * Returns count of blocks with replication factor 1 and have 2098 * lost the only replica. 2099 * @throws IOException 2100 */ 2101 public long getMissingReplOneBlocksCount() throws IOException { 2102 return getStateByIndex(ClientProtocol. 2103 GET_STATS_MISSING_REPL_ONE_BLOCKS_IDX); 2104 } 2105 2106 /** 2107 * Returns count of blocks pending on deletion. 2108 * @throws IOException 2109 */ 2110 public long getPendingDeletionBlocksCount() throws IOException { 2111 return getStateByIndex(ClientProtocol. 2112 GET_STATS_PENDING_DELETION_BLOCKS_IDX); 2113 } 2114 2115 /** 2116 * Returns count of blocks with one of more replica missing. 2117 * @throws IOException 2118 */ 2119 public long getUnderReplicatedBlocksCount() throws IOException { 2120 return getStateByIndex(ClientProtocol. 2121 GET_STATS_UNDER_REPLICATED_IDX); 2122 } 2123 2124 /** 2125 * Returns count of blocks with at least one replica marked corrupt. 2126 * @throws IOException 2127 */ 2128 public long getCorruptBlocksCount() throws IOException { 2129 return getStateByIndex(ClientProtocol. 2130 GET_STATS_CORRUPT_BLOCKS_IDX); 2131 } 2132 2133 /** 2134 * Returns number of bytes that reside in Blocks with future generation 2135 * stamps. 2136 * @return Bytes in Blocks with future generation stamps. 2137 * @throws IOException 2138 */ 2139 public long getBytesInFutureBlocks() throws IOException { 2140 return getStateByIndex(ClientProtocol. 2141 GET_STATS_BYTES_IN_FUTURE_BLOCKS_IDX); 2142 } 2143 2144 /** 2145 * @return a list in which each entry describes a corrupt file/block 2146 * @throws IOException 2147 */ 2148 public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie) 2149 throws IOException { 2150 checkOpen(); 2151 try (TraceScope ignored 2152 = newPathTraceScope("listCorruptFileBlocks", path)) { 2153 return namenode.listCorruptFileBlocks(path, cookie); 2154 } 2155 } 2156 2157 public DatanodeInfo[] datanodeReport(DatanodeReportType type) 2158 throws IOException { 2159 checkOpen(); 2160 try (TraceScope ignored = tracer.newScope("datanodeReport")) { 2161 return namenode.getDatanodeReport(type); 2162 } 2163 } 2164 2165 public DatanodeStorageReport[] getDatanodeStorageReport( 2166 DatanodeReportType type) throws IOException { 2167 checkOpen(); 2168 try (TraceScope ignored = tracer.newScope("datanodeStorageReport")) { 2169 return namenode.getDatanodeStorageReport(type); 2170 } 2171 } 2172 2173 /** 2174 * Enter, leave or get safe mode. 2175 * 2176 * @see ClientProtocol#setSafeMode(HdfsConstants.SafeModeAction,boolean) 2177 */ 2178 public boolean setSafeMode(SafeModeAction action) throws IOException { 2179 return setSafeMode(action, false); 2180 } 2181 2182 /** 2183 * Enter, leave or get safe mode. 2184 * 2185 * @param action 2186 * One of SafeModeAction.GET, SafeModeAction.ENTER and 2187 * SafeModeActiob.LEAVE 2188 * @param isChecked 2189 * If true, then check only active namenode's safemode status, else 2190 * check first namenode's status. 2191 * @see ClientProtocol#setSafeMode(HdfsConstants.SafeModeAction, boolean) 2192 */ 2193 public boolean setSafeMode(SafeModeAction action, boolean isChecked) 2194 throws IOException{ 2195 try (TraceScope ignored = tracer.newScope("setSafeMode")) { 2196 return namenode.setSafeMode(action, isChecked); 2197 } 2198 } 2199 2200 /** 2201 * Create one snapshot. 2202 * 2203 * @param snapshotRoot The directory where the snapshot is to be taken 2204 * @param snapshotName Name of the snapshot 2205 * @return the snapshot path. 2206 * @see ClientProtocol#createSnapshot(String, String) 2207 */ 2208 public String createSnapshot(String snapshotRoot, String snapshotName) 2209 throws IOException { 2210 checkOpen(); 2211 try (TraceScope ignored = tracer.newScope("createSnapshot")) { 2212 return namenode.createSnapshot(snapshotRoot, snapshotName); 2213 } catch (RemoteException re) { 2214 throw re.unwrapRemoteException(); 2215 } 2216 } 2217 2218 /** 2219 * Delete a snapshot of a snapshottable directory. 2220 * 2221 * @param snapshotRoot The snapshottable directory that the 2222 * to-be-deleted snapshot belongs to 2223 * @param snapshotName The name of the to-be-deleted snapshot 2224 * @throws IOException 2225 * @see ClientProtocol#deleteSnapshot(String, String) 2226 */ 2227 public void deleteSnapshot(String snapshotRoot, String snapshotName) 2228 throws IOException { 2229 checkOpen(); 2230 try (TraceScope ignored = tracer.newScope("deleteSnapshot")) { 2231 namenode.deleteSnapshot(snapshotRoot, snapshotName); 2232 } catch (RemoteException re) { 2233 throw re.unwrapRemoteException(); 2234 } 2235 } 2236 2237 /** 2238 * Rename a snapshot. 2239 * @param snapshotDir The directory path where the snapshot was taken 2240 * @param snapshotOldName Old name of the snapshot 2241 * @param snapshotNewName New name of the snapshot 2242 * @throws IOException 2243 * @see ClientProtocol#renameSnapshot(String, String, String) 2244 */ 2245 public void renameSnapshot(String snapshotDir, String snapshotOldName, 2246 String snapshotNewName) throws IOException { 2247 checkOpen(); 2248 try (TraceScope ignored = tracer.newScope("renameSnapshot")) { 2249 namenode.renameSnapshot(snapshotDir, snapshotOldName, snapshotNewName); 2250 } catch (RemoteException re) { 2251 throw re.unwrapRemoteException(); 2252 } 2253 } 2254 2255 /** 2256 * Get all the current snapshottable directories. 2257 * @return All the current snapshottable directories 2258 * @throws IOException 2259 * @see ClientProtocol#getSnapshottableDirListing() 2260 */ 2261 public SnapshottableDirectoryStatus[] getSnapshottableDirListing() 2262 throws IOException { 2263 checkOpen(); 2264 try (TraceScope ignored = tracer.newScope("getSnapshottableDirListing")) { 2265 return namenode.getSnapshottableDirListing(); 2266 } catch (RemoteException re) { 2267 throw re.unwrapRemoteException(); 2268 } 2269 } 2270 2271 /** 2272 * Allow snapshot on a directory. 2273 * 2274 * @see ClientProtocol#allowSnapshot(String snapshotRoot) 2275 */ 2276 public void allowSnapshot(String snapshotRoot) throws IOException { 2277 checkOpen(); 2278 try (TraceScope ignored = tracer.newScope("allowSnapshot")) { 2279 namenode.allowSnapshot(snapshotRoot); 2280 } catch (RemoteException re) { 2281 throw re.unwrapRemoteException(); 2282 } 2283 } 2284 2285 /** 2286 * Disallow snapshot on a directory. 2287 * 2288 * @see ClientProtocol#disallowSnapshot(String snapshotRoot) 2289 */ 2290 public void disallowSnapshot(String snapshotRoot) throws IOException { 2291 checkOpen(); 2292 try (TraceScope ignored = tracer.newScope("disallowSnapshot")) { 2293 namenode.disallowSnapshot(snapshotRoot); 2294 } catch (RemoteException re) { 2295 throw re.unwrapRemoteException(); 2296 } 2297 } 2298 2299 /** 2300 * Get the difference between two snapshots, or between a snapshot and the 2301 * current tree of a directory. 2302 * @see ClientProtocol#getSnapshotDiffReport(String, String, String) 2303 */ 2304 public SnapshotDiffReport getSnapshotDiffReport(String snapshotDir, 2305 String fromSnapshot, String toSnapshot) throws IOException { 2306 checkOpen(); 2307 try (TraceScope ignored = tracer.newScope("getSnapshotDiffReport")) { 2308 return namenode.getSnapshotDiffReport(snapshotDir, 2309 fromSnapshot, toSnapshot); 2310 } catch (RemoteException re) { 2311 throw re.unwrapRemoteException(); 2312 } 2313 } 2314 2315 public long addCacheDirective( 2316 CacheDirectiveInfo info, EnumSet<CacheFlag> flags) throws IOException { 2317 checkOpen(); 2318 try (TraceScope ignored = tracer.newScope("addCacheDirective")) { 2319 return namenode.addCacheDirective(info, flags); 2320 } catch (RemoteException re) { 2321 throw re.unwrapRemoteException(); 2322 } 2323 } 2324 2325 public void modifyCacheDirective( 2326 CacheDirectiveInfo info, EnumSet<CacheFlag> flags) throws IOException { 2327 checkOpen(); 2328 try (TraceScope ignored = tracer.newScope("modifyCacheDirective")) { 2329 namenode.modifyCacheDirective(info, flags); 2330 } catch (RemoteException re) { 2331 throw re.unwrapRemoteException(); 2332 } 2333 } 2334 2335 public void removeCacheDirective(long id) 2336 throws IOException { 2337 checkOpen(); 2338 try (TraceScope ignored = tracer.newScope("removeCacheDirective")) { 2339 namenode.removeCacheDirective(id); 2340 } catch (RemoteException re) { 2341 throw re.unwrapRemoteException(); 2342 } 2343 } 2344 2345 public RemoteIterator<CacheDirectiveEntry> listCacheDirectives( 2346 CacheDirectiveInfo filter) throws IOException { 2347 return new CacheDirectiveIterator(namenode, filter, tracer); 2348 } 2349 2350 public void addCachePool(CachePoolInfo info) throws IOException { 2351 checkOpen(); 2352 try (TraceScope ignored = tracer.newScope("addCachePool")) { 2353 namenode.addCachePool(info); 2354 } catch (RemoteException re) { 2355 throw re.unwrapRemoteException(); 2356 } 2357 } 2358 2359 public void modifyCachePool(CachePoolInfo info) throws IOException { 2360 checkOpen(); 2361 try (TraceScope ignored = tracer.newScope("modifyCachePool")) { 2362 namenode.modifyCachePool(info); 2363 } catch (RemoteException re) { 2364 throw re.unwrapRemoteException(); 2365 } 2366 } 2367 2368 public void removeCachePool(String poolName) throws IOException { 2369 checkOpen(); 2370 try (TraceScope ignored = tracer.newScope("removeCachePool")) { 2371 namenode.removeCachePool(poolName); 2372 } catch (RemoteException re) { 2373 throw re.unwrapRemoteException(); 2374 } 2375 } 2376 2377 public RemoteIterator<CachePoolEntry> listCachePools() throws IOException { 2378 return new CachePoolIterator(namenode, tracer); 2379 } 2380 2381 /** 2382 * Save namespace image. 2383 * 2384 */ 2385 void saveNamespace() throws IOException { 2386 try (TraceScope ignored = tracer.newScope("saveNamespace")) { 2387 namenode.saveNamespace(); 2388 } catch (RemoteException re) { 2389 throw re.unwrapRemoteException(AccessControlException.class); 2390 } 2391 } 2392 2393 /** 2394 * Rolls the edit log on the active NameNode. 2395 * @return the txid of the new log segment 2396 * 2397 * @see ClientProtocol#rollEdits() 2398 */ 2399 long rollEdits() throws IOException { 2400 try (TraceScope ignored = tracer.newScope("rollEdits")) { 2401 return namenode.rollEdits(); 2402 } catch (RemoteException re) { 2403 throw re.unwrapRemoteException(AccessControlException.class); 2404 } 2405 } 2406 2407 @VisibleForTesting 2408 ExtendedBlock getPreviousBlock(long fileId) { 2409 return filesBeingWritten.get(fileId).getBlock(); 2410 } 2411 2412 /** 2413 * enable/disable restore failed storage. 2414 * 2415 * @see ClientProtocol#restoreFailedStorage(String arg) 2416 */ 2417 boolean restoreFailedStorage(String arg) throws IOException{ 2418 try (TraceScope ignored = tracer.newScope("restoreFailedStorage")) { 2419 return namenode.restoreFailedStorage(arg); 2420 } 2421 } 2422 2423 /** 2424 * Refresh the hosts and exclude files. (Rereads them.) 2425 * See {@link ClientProtocol#refreshNodes()} 2426 * for more details. 2427 * 2428 * @see ClientProtocol#refreshNodes() 2429 */ 2430 public void refreshNodes() throws IOException { 2431 try (TraceScope ignored = tracer.newScope("refreshNodes")) { 2432 namenode.refreshNodes(); 2433 } 2434 } 2435 2436 /** 2437 * Dumps DFS data structures into specified file. 2438 * 2439 * @see ClientProtocol#metaSave(String) 2440 */ 2441 public void metaSave(String pathname) throws IOException { 2442 try (TraceScope ignored = tracer.newScope("metaSave")) { 2443 namenode.metaSave(pathname); 2444 } 2445 } 2446 2447 /** 2448 * Requests the namenode to tell all datanodes to use a new, non-persistent 2449 * bandwidth value for dfs.balance.bandwidthPerSec. 2450 * See {@link ClientProtocol#setBalancerBandwidth(long)} 2451 * for more details. 2452 * 2453 * @see ClientProtocol#setBalancerBandwidth(long) 2454 */ 2455 public void setBalancerBandwidth(long bandwidth) throws IOException { 2456 try (TraceScope ignored = tracer.newScope("setBalancerBandwidth")) { 2457 namenode.setBalancerBandwidth(bandwidth); 2458 } 2459 } 2460 2461 /** 2462 * @see ClientProtocol#finalizeUpgrade() 2463 */ 2464 public void finalizeUpgrade() throws IOException { 2465 try (TraceScope ignored = tracer.newScope("finalizeUpgrade")) { 2466 namenode.finalizeUpgrade(); 2467 } 2468 } 2469 2470 RollingUpgradeInfo rollingUpgrade(RollingUpgradeAction action) 2471 throws IOException { 2472 try (TraceScope ignored = tracer.newScope("rollingUpgrade")) { 2473 return namenode.rollingUpgrade(action); 2474 } 2475 } 2476 2477 /** 2478 */ 2479 @Deprecated 2480 public boolean mkdirs(String src) throws IOException { 2481 return mkdirs(src, null, true); 2482 } 2483 2484 /** 2485 * Create a directory (or hierarchy of directories) with the given 2486 * name and permission. 2487 * 2488 * @param src The path of the directory being created 2489 * @param permission The permission of the directory being created. 2490 * If permission == null, use {@link FsPermission#getDefault()}. 2491 * @param createParent create missing parent directory if true 2492 * 2493 * @return True if the operation success. 2494 * 2495 * @see ClientProtocol#mkdirs(String, FsPermission, boolean) 2496 */ 2497 public boolean mkdirs(String src, FsPermission permission, 2498 boolean createParent) throws IOException { 2499 final FsPermission masked = applyUMask(permission); 2500 return primitiveMkdir(src, masked, createParent); 2501 } 2502 2503 /** 2504 * Same {{@link #mkdirs(String, FsPermission, boolean)} except 2505 * that the permissions has already been masked against umask. 2506 */ 2507 public boolean primitiveMkdir(String src, FsPermission absPermission) 2508 throws IOException { 2509 return primitiveMkdir(src, absPermission, true); 2510 } 2511 2512 /** 2513 * Same {{@link #mkdirs(String, FsPermission, boolean)} except 2514 * that the permissions has already been masked against umask. 2515 */ 2516 public boolean primitiveMkdir(String src, FsPermission absPermission, 2517 boolean createParent) throws IOException { 2518 checkOpen(); 2519 if (absPermission == null) { 2520 absPermission = applyUMask(null); 2521 } 2522 2523 LOG.debug("{}: masked={}", src, absPermission); 2524 try (TraceScope ignored = tracer.newScope("mkdir")) { 2525 return namenode.mkdirs(src, absPermission, createParent); 2526 } catch (RemoteException re) { 2527 throw re.unwrapRemoteException(AccessControlException.class, 2528 InvalidPathException.class, 2529 FileAlreadyExistsException.class, 2530 FileNotFoundException.class, 2531 ParentNotDirectoryException.class, 2532 SafeModeException.class, 2533 NSQuotaExceededException.class, 2534 DSQuotaExceededException.class, 2535 QuotaByStorageTypeExceededException.class, 2536 UnresolvedPathException.class, 2537 SnapshotAccessControlException.class); 2538 } 2539 } 2540 2541 /** 2542 * Get {@link ContentSummary} rooted at the specified directory. 2543 * @param src The string representation of the path 2544 * 2545 * @see ClientProtocol#getContentSummary(String) 2546 */ 2547 ContentSummary getContentSummary(String src) throws IOException { 2548 try (TraceScope ignored = newPathTraceScope("getContentSummary", src)) { 2549 return namenode.getContentSummary(src); 2550 } catch (RemoteException re) { 2551 throw re.unwrapRemoteException(AccessControlException.class, 2552 FileNotFoundException.class, 2553 UnresolvedPathException.class); 2554 } 2555 } 2556 2557 /** 2558 * Get {@link org.apache.hadoop.fs.QuotaUsage} rooted at the specified directory. 2559 * @param src The string representation of the path 2560 * 2561 * @see ClientProtocol#getQuotaUsage(String) 2562 */ 2563 QuotaUsage getQuotaUsage(String src) throws IOException { 2564 checkOpen(); 2565 try (TraceScope ignored = newPathTraceScope("getQuotaUsage", src)) { 2566 return namenode.getQuotaUsage(src); 2567 } catch(RemoteException re) { 2568 IOException ioe = re.unwrapRemoteException(AccessControlException.class, 2569 FileNotFoundException.class, 2570 UnresolvedPathException.class, 2571 RpcNoSuchMethodException.class); 2572 if (ioe instanceof RpcNoSuchMethodException) { 2573 LOG.debug("The version of namenode doesn't support getQuotaUsage API." + 2574 " Fall back to use getContentSummary API."); 2575 return getContentSummary(src); 2576 } else { 2577 throw ioe; 2578 } 2579 } 2580 } 2581 2582 /** 2583 * Sets or resets quotas for a directory. 2584 * @see ClientProtocol#setQuota(String, long, long, StorageType) 2585 */ 2586 void setQuota(String src, long namespaceQuota, long storagespaceQuota) 2587 throws IOException { 2588 // sanity check 2589 if ((namespaceQuota <= 0 && 2590 namespaceQuota != HdfsConstants.QUOTA_DONT_SET && 2591 namespaceQuota != HdfsConstants.QUOTA_RESET) || 2592 (storagespaceQuota < 0 && 2593 storagespaceQuota != HdfsConstants.QUOTA_DONT_SET && 2594 storagespaceQuota != HdfsConstants.QUOTA_RESET)) { 2595 throw new IllegalArgumentException("Invalid values for quota : " + 2596 namespaceQuota + " and " + 2597 storagespaceQuota); 2598 2599 } 2600 try (TraceScope ignored = newPathTraceScope("setQuota", src)) { 2601 // Pass null as storage type for traditional namespace/storagespace quota. 2602 namenode.setQuota(src, namespaceQuota, storagespaceQuota, null); 2603 } catch (RemoteException re) { 2604 throw re.unwrapRemoteException(AccessControlException.class, 2605 FileNotFoundException.class, 2606 NSQuotaExceededException.class, 2607 DSQuotaExceededException.class, 2608 QuotaByStorageTypeExceededException.class, 2609 UnresolvedPathException.class, 2610 SnapshotAccessControlException.class); 2611 } 2612 } 2613 2614 /** 2615 * Sets or resets quotas by storage type for a directory. 2616 * @see ClientProtocol#setQuota(String, long, long, StorageType) 2617 */ 2618 void setQuotaByStorageType(String src, StorageType type, long quota) 2619 throws IOException { 2620 if (quota <= 0 && quota != HdfsConstants.QUOTA_DONT_SET && 2621 quota != HdfsConstants.QUOTA_RESET) { 2622 throw new IllegalArgumentException("Invalid values for quota :" + 2623 quota); 2624 } 2625 if (type == null) { 2626 throw new IllegalArgumentException("Invalid storage type(null)"); 2627 } 2628 if (!type.supportTypeQuota()) { 2629 throw new IllegalArgumentException( 2630 "Don't support Quota for storage type : " + type.toString()); 2631 } 2632 try (TraceScope ignored = newPathTraceScope("setQuotaByStorageType", src)) { 2633 namenode.setQuota(src, HdfsConstants.QUOTA_DONT_SET, quota, type); 2634 } catch (RemoteException re) { 2635 throw re.unwrapRemoteException(AccessControlException.class, 2636 FileNotFoundException.class, 2637 QuotaByStorageTypeExceededException.class, 2638 UnresolvedPathException.class, 2639 SnapshotAccessControlException.class); 2640 } 2641 } 2642 /** 2643 * set the modification and access time of a file 2644 * 2645 * @see ClientProtocol#setTimes(String, long, long) 2646 */ 2647 public void setTimes(String src, long mtime, long atime) throws IOException { 2648 checkOpen(); 2649 try (TraceScope ignored = newPathTraceScope("setTimes", src)) { 2650 namenode.setTimes(src, mtime, atime); 2651 } catch (RemoteException re) { 2652 throw re.unwrapRemoteException(AccessControlException.class, 2653 FileNotFoundException.class, 2654 UnresolvedPathException.class, 2655 SnapshotAccessControlException.class); 2656 } 2657 } 2658 2659 /** 2660 * @deprecated use {@link HdfsDataInputStream} instead. 2661 */ 2662 @Deprecated 2663 public static class DFSDataInputStream extends HdfsDataInputStream { 2664 2665 public DFSDataInputStream(DFSInputStream in) throws IOException { 2666 super(in); 2667 } 2668 } 2669 2670 void reportChecksumFailure(String file, ExtendedBlock blk, DatanodeInfo dn) { 2671 DatanodeInfo [] dnArr = { dn }; 2672 LocatedBlock [] lblocks = { new LocatedBlock(blk, dnArr) }; 2673 reportChecksumFailure(file, lblocks); 2674 } 2675 2676 // just reports checksum failure and ignores any exception during the report. 2677 void reportChecksumFailure(String file, LocatedBlock lblocks[]) { 2678 try { 2679 reportBadBlocks(lblocks); 2680 } catch (IOException ie) { 2681 LOG.info("Found corruption while reading " + file 2682 + ". Error repairing corrupt blocks. Bad blocks remain.", ie); 2683 } 2684 } 2685 2686 @Override 2687 public String toString() { 2688 return getClass().getSimpleName() + "[clientName=" + clientName 2689 + ", ugi=" + ugi + "]"; 2690 } 2691 2692 public CachingStrategy getDefaultReadCachingStrategy() { 2693 return defaultReadCachingStrategy; 2694 } 2695 2696 public CachingStrategy getDefaultWriteCachingStrategy() { 2697 return defaultWriteCachingStrategy; 2698 } 2699 2700 public ClientContext getClientContext() { 2701 return clientContext; 2702 } 2703 2704 public void modifyAclEntries(String src, List<AclEntry> aclSpec) 2705 throws IOException { 2706 checkOpen(); 2707 try (TraceScope ignored = newPathTraceScope("modifyAclEntries", src)) { 2708 namenode.modifyAclEntries(src, aclSpec); 2709 } catch (RemoteException re) { 2710 throw re.unwrapRemoteException(AccessControlException.class, 2711 AclException.class, 2712 FileNotFoundException.class, 2713 NSQuotaExceededException.class, 2714 SafeModeException.class, 2715 SnapshotAccessControlException.class, 2716 UnresolvedPathException.class); 2717 } 2718 } 2719 2720 public void removeAclEntries(String src, List<AclEntry> aclSpec) 2721 throws IOException { 2722 checkOpen(); 2723 try (TraceScope ignored = tracer.newScope("removeAclEntries")) { 2724 namenode.removeAclEntries(src, aclSpec); 2725 } catch (RemoteException re) { 2726 throw re.unwrapRemoteException(AccessControlException.class, 2727 AclException.class, 2728 FileNotFoundException.class, 2729 NSQuotaExceededException.class, 2730 SafeModeException.class, 2731 SnapshotAccessControlException.class, 2732 UnresolvedPathException.class); 2733 } 2734 } 2735 2736 public void removeDefaultAcl(String src) throws IOException { 2737 checkOpen(); 2738 try (TraceScope ignored = tracer.newScope("removeDefaultAcl")) { 2739 namenode.removeDefaultAcl(src); 2740 } catch (RemoteException re) { 2741 throw re.unwrapRemoteException(AccessControlException.class, 2742 AclException.class, 2743 FileNotFoundException.class, 2744 NSQuotaExceededException.class, 2745 SafeModeException.class, 2746 SnapshotAccessControlException.class, 2747 UnresolvedPathException.class); 2748 } 2749 } 2750 2751 public void removeAcl(String src) throws IOException { 2752 checkOpen(); 2753 try (TraceScope ignored = tracer.newScope("removeAcl")) { 2754 namenode.removeAcl(src); 2755 } catch (RemoteException re) { 2756 throw re.unwrapRemoteException(AccessControlException.class, 2757 AclException.class, 2758 FileNotFoundException.class, 2759 NSQuotaExceededException.class, 2760 SafeModeException.class, 2761 SnapshotAccessControlException.class, 2762 UnresolvedPathException.class); 2763 } 2764 } 2765 2766 public void setAcl(String src, List<AclEntry> aclSpec) throws IOException { 2767 checkOpen(); 2768 try (TraceScope ignored = tracer.newScope("setAcl")) { 2769 namenode.setAcl(src, aclSpec); 2770 } catch (RemoteException re) { 2771 throw re.unwrapRemoteException(AccessControlException.class, 2772 AclException.class, 2773 FileNotFoundException.class, 2774 NSQuotaExceededException.class, 2775 SafeModeException.class, 2776 SnapshotAccessControlException.class, 2777 UnresolvedPathException.class); 2778 } 2779 } 2780 2781 public AclStatus getAclStatus(String src) throws IOException { 2782 checkOpen(); 2783 try (TraceScope ignored = newPathTraceScope("getAclStatus", src)) { 2784 return namenode.getAclStatus(src); 2785 } catch (RemoteException re) { 2786 throw re.unwrapRemoteException(AccessControlException.class, 2787 AclException.class, 2788 FileNotFoundException.class, 2789 UnresolvedPathException.class); 2790 } 2791 } 2792 2793 public void createEncryptionZone(String src, String keyName) 2794 throws IOException { 2795 checkOpen(); 2796 try (TraceScope ignored = newPathTraceScope("createEncryptionZone", src)) { 2797 namenode.createEncryptionZone(src, keyName); 2798 } catch (RemoteException re) { 2799 throw re.unwrapRemoteException(AccessControlException.class, 2800 SafeModeException.class, 2801 UnresolvedPathException.class); 2802 } 2803 } 2804 2805 public EncryptionZone getEZForPath(String src) throws IOException { 2806 checkOpen(); 2807 try (TraceScope ignored = newPathTraceScope("getEZForPath", src)) { 2808 return namenode.getEZForPath(src); 2809 } catch (RemoteException re) { 2810 throw re.unwrapRemoteException(AccessControlException.class, 2811 UnresolvedPathException.class); 2812 } 2813 } 2814 2815 public RemoteIterator<EncryptionZone> listEncryptionZones() 2816 throws IOException { 2817 checkOpen(); 2818 return new EncryptionZoneIterator(namenode, tracer); 2819 } 2820 2821 public void setXAttr(String src, String name, byte[] value, 2822 EnumSet<XAttrSetFlag> flag) throws IOException { 2823 checkOpen(); 2824 try (TraceScope ignored = newPathTraceScope("setXAttr", src)) { 2825 namenode.setXAttr(src, XAttrHelper.buildXAttr(name, value), flag); 2826 } catch (RemoteException re) { 2827 throw re.unwrapRemoteException(AccessControlException.class, 2828 FileNotFoundException.class, 2829 NSQuotaExceededException.class, 2830 SafeModeException.class, 2831 SnapshotAccessControlException.class, 2832 UnresolvedPathException.class); 2833 } 2834 } 2835 2836 public byte[] getXAttr(String src, String name) throws IOException { 2837 checkOpen(); 2838 try (TraceScope ignored = newPathTraceScope("getXAttr", src)) { 2839 final List<XAttr> xAttrs = XAttrHelper.buildXAttrAsList(name); 2840 final List<XAttr> result = namenode.getXAttrs(src, xAttrs); 2841 return XAttrHelper.getFirstXAttrValue(result); 2842 } catch (RemoteException re) { 2843 throw re.unwrapRemoteException(AccessControlException.class, 2844 FileNotFoundException.class, 2845 UnresolvedPathException.class); 2846 } 2847 } 2848 2849 public Map<String, byte[]> getXAttrs(String src) throws IOException { 2850 checkOpen(); 2851 try (TraceScope ignored = newPathTraceScope("getXAttrs", src)) { 2852 return XAttrHelper.buildXAttrMap(namenode.getXAttrs(src, null)); 2853 } catch (RemoteException re) { 2854 throw re.unwrapRemoteException(AccessControlException.class, 2855 FileNotFoundException.class, 2856 UnresolvedPathException.class); 2857 } 2858 } 2859 2860 public Map<String, byte[]> getXAttrs(String src, List<String> names) 2861 throws IOException { 2862 checkOpen(); 2863 try (TraceScope ignored = newPathTraceScope("getXAttrs", src)) { 2864 return XAttrHelper.buildXAttrMap(namenode.getXAttrs( 2865 src, XAttrHelper.buildXAttrs(names))); 2866 } catch (RemoteException re) { 2867 throw re.unwrapRemoteException(AccessControlException.class, 2868 FileNotFoundException.class, 2869 UnresolvedPathException.class); 2870 } 2871 } 2872 2873 public List<String> listXAttrs(String src) throws IOException { 2874 checkOpen(); 2875 try (TraceScope ignored = newPathTraceScope("listXAttrs", src)) { 2876 final Map<String, byte[]> xattrs = 2877 XAttrHelper.buildXAttrMap(namenode.listXAttrs(src)); 2878 return Lists.newArrayList(xattrs.keySet()); 2879 } catch (RemoteException re) { 2880 throw re.unwrapRemoteException(AccessControlException.class, 2881 FileNotFoundException.class, 2882 UnresolvedPathException.class); 2883 } 2884 } 2885 2886 public void removeXAttr(String src, String name) throws IOException { 2887 checkOpen(); 2888 try (TraceScope ignored = newPathTraceScope("removeXAttr", src)) { 2889 namenode.removeXAttr(src, XAttrHelper.buildXAttr(name)); 2890 } catch (RemoteException re) { 2891 throw re.unwrapRemoteException(AccessControlException.class, 2892 FileNotFoundException.class, 2893 NSQuotaExceededException.class, 2894 SafeModeException.class, 2895 SnapshotAccessControlException.class, 2896 UnresolvedPathException.class); 2897 } 2898 } 2899 2900 public void checkAccess(String src, FsAction mode) throws IOException { 2901 checkOpen(); 2902 try (TraceScope ignored = newPathTraceScope("checkAccess", src)) { 2903 namenode.checkAccess(src, mode); 2904 } catch (RemoteException re) { 2905 throw re.unwrapRemoteException(AccessControlException.class, 2906 FileNotFoundException.class, 2907 UnresolvedPathException.class); 2908 } 2909 } 2910 2911 public DFSInotifyEventInputStream getInotifyEventStream() throws IOException { 2912 return new DFSInotifyEventInputStream(namenode, tracer); 2913 } 2914 2915 public DFSInotifyEventInputStream getInotifyEventStream(long lastReadTxid) 2916 throws IOException { 2917 return new DFSInotifyEventInputStream(namenode, tracer, 2918 lastReadTxid); 2919 } 2920 2921 @Override // RemotePeerFactory 2922 public Peer newConnectedPeer(InetSocketAddress addr, 2923 Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId) 2924 throws IOException { 2925 Peer peer = null; 2926 boolean success = false; 2927 Socket sock = null; 2928 final int socketTimeout = dfsClientConf.getSocketTimeout(); 2929 try { 2930 sock = socketFactory.createSocket(); 2931 NetUtils.connect(sock, addr, getRandomLocalInterfaceAddr(), 2932 socketTimeout); 2933 peer = DFSUtilClient.peerFromSocketAndKey(saslClient, sock, this, 2934 blockToken, datanodeId, socketTimeout); 2935 success = true; 2936 return peer; 2937 } finally { 2938 if (!success) { 2939 IOUtilsClient.cleanup(LOG, peer); 2940 IOUtils.closeSocket(sock); 2941 } 2942 } 2943 } 2944 2945 /** 2946 * Create hedged reads thread pool, HEDGED_READ_THREAD_POOL, if 2947 * it does not already exist. 2948 * @param num Number of threads for hedged reads thread pool. 2949 * If zero, skip hedged reads thread pool creation. 2950 */ 2951 private synchronized void initThreadsNumForHedgedReads(int num) { 2952 if (num <= 0 || HEDGED_READ_THREAD_POOL != null) return; 2953 HEDGED_READ_THREAD_POOL = new ThreadPoolExecutor(1, num, 60, 2954 TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), 2955 new Daemon.DaemonFactory() { 2956 private final AtomicInteger threadIndex = new AtomicInteger(0); 2957 @Override 2958 public Thread newThread(Runnable r) { 2959 Thread t = super.newThread(r); 2960 t.setName("hedgedRead-" + threadIndex.getAndIncrement()); 2961 return t; 2962 } 2963 }, 2964 new ThreadPoolExecutor.CallerRunsPolicy() { 2965 @Override 2966 public void rejectedExecution(Runnable runnable, 2967 ThreadPoolExecutor e) { 2968 LOG.info("Execution rejected, Executing in current thread"); 2969 HEDGED_READ_METRIC.incHedgedReadOpsInCurThread(); 2970 // will run in the current thread 2971 super.rejectedExecution(runnable, e); 2972 } 2973 }); 2974 HEDGED_READ_THREAD_POOL.allowCoreThreadTimeOut(true); 2975 LOG.debug("Using hedged reads; pool threads={}", num); 2976 } 2977 2978 ThreadPoolExecutor getHedgedReadsThreadPool() { 2979 return HEDGED_READ_THREAD_POOL; 2980 } 2981 2982 boolean isHedgedReadsEnabled() { 2983 return (HEDGED_READ_THREAD_POOL != null) && 2984 HEDGED_READ_THREAD_POOL.getMaximumPoolSize() > 0; 2985 } 2986 2987 DFSHedgedReadMetrics getHedgedReadMetrics() { 2988 return HEDGED_READ_METRIC; 2989 } 2990 2991 public KeyProvider getKeyProvider() { 2992 return clientContext.getKeyProviderCache().get(conf); 2993 } 2994 2995 @VisibleForTesting 2996 public void setKeyProvider(KeyProvider provider) { 2997 clientContext.getKeyProviderCache().setKeyProvider(conf, provider); 2998 } 2999 3000 /** 3001 * Probe for encryption enabled on this filesystem. 3002 * See {@link DFSUtilClient#isHDFSEncryptionEnabled(Configuration)} 3003 * @return true if encryption is enabled 3004 */ 3005 public boolean isHDFSEncryptionEnabled() { 3006 return DFSUtilClient.isHDFSEncryptionEnabled(this.conf); 3007 } 3008 3009 /** 3010 * Returns the SaslDataTransferClient configured for this DFSClient. 3011 * 3012 * @return SaslDataTransferClient configured for this DFSClient 3013 */ 3014 public SaslDataTransferClient getSaslDataTransferClient() { 3015 return saslClient; 3016 } 3017 3018 TraceScope newPathTraceScope(String description, String path) { 3019 TraceScope scope = tracer.newScope(description); 3020 if (path != null) { 3021 scope.addKVAnnotation("path", path); 3022 } 3023 return scope; 3024 } 3025 3026 TraceScope newSrcDstTraceScope(String description, String src, String dst) { 3027 TraceScope scope = tracer.newScope(description); 3028 if (src != null) { 3029 scope.addKVAnnotation("src", src); 3030 } 3031 if (dst != null) { 3032 scope.addKVAnnotation("dst", dst); 3033 } 3034 return scope; 3035 } 3036 3037 Tracer getTracer() { 3038 return tracer; 3039 } 3040}