001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hdfs.client.impl; 019 020import com.google.common.annotations.VisibleForTesting; 021import org.apache.hadoop.HadoopIllegalArgumentException; 022import org.apache.hadoop.conf.Configuration; 023import org.apache.hadoop.fs.CommonConfigurationKeysPublic; 024import org.apache.hadoop.fs.Options.ChecksumOpt; 025import org.apache.hadoop.fs.permission.FsPermission; 026import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; 027import org.apache.hadoop.hdfs.ReplicaAccessorBuilder; 028import org.apache.hadoop.hdfs.protocol.HdfsConstants; 029import org.apache.hadoop.hdfs.util.ByteArrayManager; 030import org.apache.hadoop.ipc.Client; 031import org.apache.hadoop.util.DataChecksum; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite; 036import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BLOCK_SIZE_DEFAULT; 037import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY; 038import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT; 039import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY; 040import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT; 041import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CHECKSUM_TYPE_KEY; 042import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT; 043import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY; 044import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT; 045import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY; 046import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC; 047import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT; 048import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT; 049import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_MS; 050import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT; 051import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY; 052import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT; 053import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY; 054import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT; 055import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY; 056import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT; 057import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY; 058import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_DEFAULT; 059import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_KEY; 060import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_DEFAULT; 061import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_KEY; 062import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; 063import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME; 064import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT; 065import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER; 066import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL; 067import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT; 068import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT; 069import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT; 070import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY; 071import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY; 072import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_PATH_DEFAULT; 073import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY; 074import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_REPLICATION_DEFAULT; 075import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_REPLICATION_KEY; 076import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS; 077import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT; 078import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Failover; 079import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.HedgedRead; 080import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Mmap; 081import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Read; 082import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Retry; 083import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.ShortCircuit; 084import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write; 085 086import java.util.ArrayList; 087import java.util.Collections; 088import java.util.List; 089 090/** 091 * DFSClient configuration. 092 */ 093public class DfsClientConf { 094 private static final Logger LOG = LoggerFactory.getLogger(DfsClientConf 095 .class); 096 097 private final int hdfsTimeout; // timeout value for a DFS operation. 098 099 private final int maxFailoverAttempts; 100 private final int maxRetryAttempts; 101 private final int failoverSleepBaseMillis; 102 private final int failoverSleepMaxMillis; 103 private final int maxBlockAcquireFailures; 104 private final int datanodeSocketWriteTimeout; 105 private final int ioBufferSize; 106 private final ChecksumOpt defaultChecksumOpt; 107 private final int writePacketSize; 108 private final int writeMaxPackets; 109 private final ByteArrayManager.Conf writeByteArrayManagerConf; 110 private final int socketTimeout; 111 private final int socketSendBufferSize; 112 private final long excludedNodesCacheExpiry; 113 /** Wait time window (in msec) if BlockMissingException is caught. */ 114 private final int timeWindow; 115 private final int numCachedConnRetry; 116 private final int numBlockWriteRetry; 117 private final int numBlockWriteLocateFollowingRetry; 118 private final int blockWriteLocateFollowingInitialDelayMs; 119 private final long defaultBlockSize; 120 private final long prefetchSize; 121 private final short defaultReplication; 122 private final String taskId; 123 private final FsPermission uMask; 124 private final boolean connectToDnViaHostname; 125 private final boolean hdfsBlocksMetadataEnabled; 126 private final int fileBlockStorageLocationsNumThreads; 127 private final int fileBlockStorageLocationsTimeoutMs; 128 private final int retryTimesForGetLastBlockLength; 129 private final int retryIntervalForGetLastBlockLength; 130 private final long datanodeRestartTimeout; 131 private final long slowIoWarningThresholdMs; 132 133 private final ShortCircuitConf shortCircuitConf; 134 135 private final long hedgedReadThresholdMillis; 136 private final int hedgedReadThreadpoolSize; 137 private final List<Class<? extends ReplicaAccessorBuilder>> 138 replicaAccessorBuilderClasses; 139 140 private final boolean dataTransferTcpNoDelay; 141 142 public DfsClientConf(Configuration conf) { 143 // The hdfsTimeout is currently the same as the ipc timeout 144 hdfsTimeout = Client.getRpcTimeout(conf); 145 146 maxRetryAttempts = conf.getInt( 147 Retry.MAX_ATTEMPTS_KEY, 148 Retry.MAX_ATTEMPTS_DEFAULT); 149 timeWindow = conf.getInt( 150 Retry.WINDOW_BASE_KEY, 151 Retry.WINDOW_BASE_DEFAULT); 152 retryTimesForGetLastBlockLength = conf.getInt( 153 Retry.TIMES_GET_LAST_BLOCK_LENGTH_KEY, 154 Retry.TIMES_GET_LAST_BLOCK_LENGTH_DEFAULT); 155 retryIntervalForGetLastBlockLength = conf.getInt( 156 Retry.INTERVAL_GET_LAST_BLOCK_LENGTH_KEY, 157 Retry.INTERVAL_GET_LAST_BLOCK_LENGTH_DEFAULT); 158 159 maxFailoverAttempts = conf.getInt( 160 Failover.MAX_ATTEMPTS_KEY, 161 Failover.MAX_ATTEMPTS_DEFAULT); 162 failoverSleepBaseMillis = conf.getInt( 163 Failover.SLEEPTIME_BASE_KEY, 164 Failover.SLEEPTIME_BASE_DEFAULT); 165 failoverSleepMaxMillis = conf.getInt( 166 Failover.SLEEPTIME_MAX_KEY, 167 Failover.SLEEPTIME_MAX_DEFAULT); 168 169 maxBlockAcquireFailures = conf.getInt( 170 DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY, 171 DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT); 172 datanodeSocketWriteTimeout = conf.getInt( 173 DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY, 174 HdfsConstants.WRITE_TIMEOUT); 175 ioBufferSize = conf.getInt( 176 CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY, 177 CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT); 178 defaultChecksumOpt = getChecksumOptFromConf(conf); 179 dataTransferTcpNoDelay = conf.getBoolean( 180 DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_KEY, 181 DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_DEFAULT); 182 socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, 183 HdfsConstants.READ_TIMEOUT); 184 socketSendBufferSize = conf.getInt(DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_KEY, 185 DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_DEFAULT); 186 /** dfs.write.packet.size is an internal config variable */ 187 writePacketSize = conf.getInt( 188 DFS_CLIENT_WRITE_PACKET_SIZE_KEY, 189 DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT); 190 writeMaxPackets = conf.getInt( 191 Write.MAX_PACKETS_IN_FLIGHT_KEY, 192 Write.MAX_PACKETS_IN_FLIGHT_DEFAULT); 193 194 final boolean byteArrayManagerEnabled = conf.getBoolean( 195 Write.ByteArrayManager.ENABLED_KEY, 196 Write.ByteArrayManager.ENABLED_DEFAULT); 197 if (!byteArrayManagerEnabled) { 198 writeByteArrayManagerConf = null; 199 } else { 200 final int countThreshold = conf.getInt( 201 Write.ByteArrayManager.COUNT_THRESHOLD_KEY, 202 Write.ByteArrayManager.COUNT_THRESHOLD_DEFAULT); 203 final int countLimit = conf.getInt( 204 Write.ByteArrayManager.COUNT_LIMIT_KEY, 205 Write.ByteArrayManager.COUNT_LIMIT_DEFAULT); 206 final long countResetTimePeriodMs = conf.getLong( 207 Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_KEY, 208 Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_DEFAULT); 209 writeByteArrayManagerConf = new ByteArrayManager.Conf( 210 countThreshold, countLimit, countResetTimePeriodMs); 211 } 212 213 defaultBlockSize = conf.getLongBytes(DFS_BLOCK_SIZE_KEY, 214 DFS_BLOCK_SIZE_DEFAULT); 215 defaultReplication = (short) conf.getInt( 216 DFS_REPLICATION_KEY, DFS_REPLICATION_DEFAULT); 217 taskId = conf.get("mapreduce.task.attempt.id", "NONMAPREDUCE"); 218 excludedNodesCacheExpiry = conf.getLong( 219 Write.EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_KEY, 220 Write.EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT); 221 prefetchSize = conf.getLong(Read.PREFETCH_SIZE_KEY, 222 10 * defaultBlockSize); 223 numCachedConnRetry = conf.getInt(DFS_CLIENT_CACHED_CONN_RETRY_KEY, 224 DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT); 225 numBlockWriteRetry = conf.getInt( 226 BlockWrite.RETRIES_KEY, 227 BlockWrite.RETRIES_DEFAULT); 228 numBlockWriteLocateFollowingRetry = conf.getInt( 229 BlockWrite.LOCATEFOLLOWINGBLOCK_RETRIES_KEY, 230 BlockWrite.LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT); 231 blockWriteLocateFollowingInitialDelayMs = conf.getInt( 232 BlockWrite.LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_MS_KEY, 233 BlockWrite.LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_MS_DEFAULT); 234 uMask = FsPermission.getUMask(conf); 235 connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, 236 DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT); 237 hdfsBlocksMetadataEnabled = conf.getBoolean( 238 HdfsClientConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, 239 HdfsClientConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT); 240 fileBlockStorageLocationsNumThreads = conf.getInt( 241 HdfsClientConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS, 242 HdfsClientConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS_DEFAULT); 243 fileBlockStorageLocationsTimeoutMs = conf.getInt( 244 HdfsClientConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS, 245 HdfsClientConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS_DEFAULT); 246 247 datanodeRestartTimeout = conf.getLong( 248 DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY, 249 DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT) * 1000; 250 slowIoWarningThresholdMs = conf.getLong( 251 DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY, 252 DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT); 253 254 shortCircuitConf = new ShortCircuitConf(conf); 255 256 hedgedReadThresholdMillis = conf.getLong( 257 HedgedRead.THRESHOLD_MILLIS_KEY, 258 HedgedRead.THRESHOLD_MILLIS_DEFAULT); 259 hedgedReadThreadpoolSize = conf.getInt( 260 HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_KEY, 261 HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_DEFAULT); 262 263 replicaAccessorBuilderClasses = loadReplicaAccessorBuilderClasses(conf); 264 } 265 266 @SuppressWarnings("unchecked") 267 private List<Class<? extends ReplicaAccessorBuilder>> 268 loadReplicaAccessorBuilderClasses(Configuration conf) { 269 String[] classNames = conf.getTrimmedStrings( 270 HdfsClientConfigKeys.REPLICA_ACCESSOR_BUILDER_CLASSES_KEY); 271 if (classNames.length == 0) { 272 return Collections.emptyList(); 273 } 274 ArrayList<Class<? extends ReplicaAccessorBuilder>> classes = 275 new ArrayList<>(); 276 ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); 277 for (String className: classNames) { 278 try { 279 Class<? extends ReplicaAccessorBuilder> cls = 280 (Class<? extends ReplicaAccessorBuilder>) 281 classLoader.loadClass(className); 282 classes.add(cls); 283 } catch (Throwable t) { 284 LOG.warn("Unable to load " + className, t); 285 } 286 } 287 return classes; 288 } 289 290 private static DataChecksum.Type getChecksumType(Configuration conf) { 291 final String checksum = conf.get( 292 DFS_CHECKSUM_TYPE_KEY, 293 DFS_CHECKSUM_TYPE_DEFAULT); 294 try { 295 return DataChecksum.Type.valueOf(checksum); 296 } catch(IllegalArgumentException iae) { 297 LOG.warn("Bad checksum type: {}. Using default {}", checksum, 298 DFS_CHECKSUM_TYPE_DEFAULT); 299 return DataChecksum.Type.valueOf( 300 DFS_CHECKSUM_TYPE_DEFAULT); 301 } 302 } 303 304 // Construct a checksum option from conf 305 public static ChecksumOpt getChecksumOptFromConf(Configuration conf) { 306 DataChecksum.Type type = getChecksumType(conf); 307 int bytesPerChecksum = conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY, 308 DFS_BYTES_PER_CHECKSUM_DEFAULT); 309 return new ChecksumOpt(type, bytesPerChecksum); 310 } 311 312 /** create a DataChecksum with the given option. */ 313 public DataChecksum createChecksum(ChecksumOpt userOpt) { 314 // Fill in any missing field with the default. 315 ChecksumOpt opt = ChecksumOpt.processChecksumOpt( 316 defaultChecksumOpt, userOpt); 317 DataChecksum dataChecksum = DataChecksum.newDataChecksum( 318 opt.getChecksumType(), 319 opt.getBytesPerChecksum()); 320 if (dataChecksum == null) { 321 throw new HadoopIllegalArgumentException("Invalid checksum type: userOpt=" 322 + userOpt + ", default=" + defaultChecksumOpt 323 + ", effective=null"); 324 } 325 return dataChecksum; 326 } 327 328 @VisibleForTesting 329 public int getBlockWriteLocateFollowingInitialDelayMs() { 330 return blockWriteLocateFollowingInitialDelayMs; 331 } 332 333 /** 334 * @return the hdfsTimeout 335 */ 336 public int getHdfsTimeout() { 337 return hdfsTimeout; 338 } 339 340 /** 341 * @return the maxFailoverAttempts 342 */ 343 public int getMaxFailoverAttempts() { 344 return maxFailoverAttempts; 345 } 346 347 /** 348 * @return the maxRetryAttempts 349 */ 350 public int getMaxRetryAttempts() { 351 return maxRetryAttempts; 352 } 353 354 /** 355 * @return the failoverSleepBaseMillis 356 */ 357 public int getFailoverSleepBaseMillis() { 358 return failoverSleepBaseMillis; 359 } 360 361 /** 362 * @return the failoverSleepMaxMillis 363 */ 364 public int getFailoverSleepMaxMillis() { 365 return failoverSleepMaxMillis; 366 } 367 368 /** 369 * @return the maxBlockAcquireFailures 370 */ 371 public int getMaxBlockAcquireFailures() { 372 return maxBlockAcquireFailures; 373 } 374 375 /** 376 * @return the datanodeSocketWriteTimeout 377 */ 378 public int getDatanodeSocketWriteTimeout() { 379 return datanodeSocketWriteTimeout; 380 } 381 382 /** 383 * @return the ioBufferSize 384 */ 385 public int getIoBufferSize() { 386 return ioBufferSize; 387 } 388 389 /** 390 * @return the defaultChecksumOpt 391 */ 392 public ChecksumOpt getDefaultChecksumOpt() { 393 return defaultChecksumOpt; 394 } 395 396 /** 397 * @return the writePacketSize 398 */ 399 public int getWritePacketSize() { 400 return writePacketSize; 401 } 402 403 /** 404 * @return the writeMaxPackets 405 */ 406 public int getWriteMaxPackets() { 407 return writeMaxPackets; 408 } 409 410 /** 411 * @return the writeByteArrayManagerConf 412 */ 413 public ByteArrayManager.Conf getWriteByteArrayManagerConf() { 414 return writeByteArrayManagerConf; 415 } 416 417 /** 418 * @return whether TCP_NODELAY should be set on client sockets 419 */ 420 public boolean getDataTransferTcpNoDelay() { 421 return dataTransferTcpNoDelay; 422 } 423 424 /** 425 * @return the socketTimeout 426 */ 427 public int getSocketTimeout() { 428 return socketTimeout; 429 } 430 431 /** 432 * @return the socketSendBufferSize 433 */ 434 public int getSocketSendBufferSize() { 435 return socketSendBufferSize; 436 } 437 438 /** 439 * @return the excludedNodesCacheExpiry 440 */ 441 public long getExcludedNodesCacheExpiry() { 442 return excludedNodesCacheExpiry; 443 } 444 445 /** 446 * @return the timeWindow 447 */ 448 public int getTimeWindow() { 449 return timeWindow; 450 } 451 452 /** 453 * @return the numCachedConnRetry 454 */ 455 public int getNumCachedConnRetry() { 456 return numCachedConnRetry; 457 } 458 459 /** 460 * @return the numBlockWriteRetry 461 */ 462 public int getNumBlockWriteRetry() { 463 return numBlockWriteRetry; 464 } 465 466 /** 467 * @return the numBlockWriteLocateFollowingRetry 468 */ 469 public int getNumBlockWriteLocateFollowingRetry() { 470 return numBlockWriteLocateFollowingRetry; 471 } 472 473 /** 474 * @return the defaultBlockSize 475 */ 476 public long getDefaultBlockSize() { 477 return defaultBlockSize; 478 } 479 480 /** 481 * @return the prefetchSize 482 */ 483 public long getPrefetchSize() { 484 return prefetchSize; 485 } 486 487 /** 488 * @return the defaultReplication 489 */ 490 public short getDefaultReplication() { 491 return defaultReplication; 492 } 493 494 /** 495 * @return the taskId 496 */ 497 public String getTaskId() { 498 return taskId; 499 } 500 501 /** 502 * @return the uMask 503 */ 504 public FsPermission getUMask() { 505 return uMask; 506 } 507 508 /** 509 * @return the connectToDnViaHostname 510 */ 511 public boolean isConnectToDnViaHostname() { 512 return connectToDnViaHostname; 513 } 514 515 /** 516 * @return the hdfsBlocksMetadataEnabled 517 */ 518 public boolean isHdfsBlocksMetadataEnabled() { 519 return hdfsBlocksMetadataEnabled; 520 } 521 522 /** 523 * @return the fileBlockStorageLocationsNumThreads 524 */ 525 public int getFileBlockStorageLocationsNumThreads() { 526 return fileBlockStorageLocationsNumThreads; 527 } 528 529 /** 530 * @return the getFileBlockStorageLocationsTimeoutMs 531 */ 532 public int getFileBlockStorageLocationsTimeoutMs() { 533 return fileBlockStorageLocationsTimeoutMs; 534 } 535 536 /** 537 * @return the retryTimesForGetLastBlockLength 538 */ 539 public int getRetryTimesForGetLastBlockLength() { 540 return retryTimesForGetLastBlockLength; 541 } 542 543 /** 544 * @return the retryIntervalForGetLastBlockLength 545 */ 546 public int getRetryIntervalForGetLastBlockLength() { 547 return retryIntervalForGetLastBlockLength; 548 } 549 550 /** 551 * @return the datanodeRestartTimeout 552 */ 553 public long getDatanodeRestartTimeout() { 554 return datanodeRestartTimeout; 555 } 556 557 /** 558 * @return the slowIoWarningThresholdMs 559 */ 560 public long getSlowIoWarningThresholdMs() { 561 return slowIoWarningThresholdMs; 562 } 563 564 /** 565 * @return the hedgedReadThresholdMillis 566 */ 567 public long getHedgedReadThresholdMillis() { 568 return hedgedReadThresholdMillis; 569 } 570 571 /** 572 * @return the hedgedReadThreadpoolSize 573 */ 574 public int getHedgedReadThreadpoolSize() { 575 return hedgedReadThreadpoolSize; 576 } 577 578 /** 579 * @return the replicaAccessorBuilderClasses 580 */ 581 public List<Class<? extends ReplicaAccessorBuilder>> 582 getReplicaAccessorBuilderClasses() { 583 return replicaAccessorBuilderClasses; 584 } 585 586 /** 587 * @return the shortCircuitConf 588 */ 589 public ShortCircuitConf getShortCircuitConf() { 590 return shortCircuitConf; 591 } 592 593 /** 594 * Configuration for short-circuit reads. 595 */ 596 public static class ShortCircuitConf { 597 private static final Logger LOG = DfsClientConf.LOG; 598 599 private final int socketCacheCapacity; 600 private final long socketCacheExpiry; 601 602 private final boolean useLegacyBlockReader; 603 private final boolean useLegacyBlockReaderLocal; 604 private final String domainSocketPath; 605 private final boolean skipShortCircuitChecksums; 606 607 private final int shortCircuitBufferSize; 608 private final boolean shortCircuitLocalReads; 609 private final boolean domainSocketDataTraffic; 610 private final int shortCircuitStreamsCacheSize; 611 private final long shortCircuitStreamsCacheExpiryMs; 612 private final int shortCircuitSharedMemoryWatcherInterruptCheckMs; 613 614 private final boolean shortCircuitMmapEnabled; 615 private final int shortCircuitMmapCacheSize; 616 private final long shortCircuitMmapCacheExpiryMs; 617 private final long shortCircuitMmapCacheRetryTimeout; 618 private final long shortCircuitCacheStaleThresholdMs; 619 620 private final long keyProviderCacheExpiryMs; 621 622 public ShortCircuitConf(Configuration conf) { 623 socketCacheCapacity = conf.getInt( 624 DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY, 625 DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT); 626 socketCacheExpiry = conf.getLong( 627 DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY, 628 DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT); 629 630 useLegacyBlockReader = conf.getBoolean( 631 DFS_CLIENT_USE_LEGACY_BLOCKREADER, 632 DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT); 633 useLegacyBlockReaderLocal = conf.getBoolean( 634 DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, 635 DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT); 636 shortCircuitLocalReads = conf.getBoolean( 637 Read.ShortCircuit.KEY, 638 Read.ShortCircuit.DEFAULT); 639 domainSocketDataTraffic = conf.getBoolean( 640 DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, 641 DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT); 642 domainSocketPath = conf.getTrimmed( 643 DFS_DOMAIN_SOCKET_PATH_KEY, 644 DFS_DOMAIN_SOCKET_PATH_DEFAULT); 645 646 LOG.debug(DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL 647 + " = {}", useLegacyBlockReaderLocal); 648 LOG.debug(Read.ShortCircuit.KEY 649 + " = {}", shortCircuitLocalReads); 650 LOG.debug(DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC 651 + " = {}", domainSocketDataTraffic); 652 LOG.debug(DFS_DOMAIN_SOCKET_PATH_KEY 653 + " = {}", domainSocketPath); 654 655 skipShortCircuitChecksums = conf.getBoolean( 656 Read.ShortCircuit.SKIP_CHECKSUM_KEY, 657 Read.ShortCircuit.SKIP_CHECKSUM_DEFAULT); 658 shortCircuitBufferSize = conf.getInt( 659 Read.ShortCircuit.BUFFER_SIZE_KEY, 660 Read.ShortCircuit.BUFFER_SIZE_DEFAULT); 661 shortCircuitStreamsCacheSize = conf.getInt( 662 Read.ShortCircuit.STREAMS_CACHE_SIZE_KEY, 663 Read.ShortCircuit.STREAMS_CACHE_SIZE_DEFAULT); 664 shortCircuitStreamsCacheExpiryMs = conf.getLong( 665 Read.ShortCircuit.STREAMS_CACHE_EXPIRY_MS_KEY, 666 Read.ShortCircuit.STREAMS_CACHE_EXPIRY_MS_DEFAULT); 667 shortCircuitMmapEnabled = conf.getBoolean( 668 Mmap.ENABLED_KEY, 669 Mmap.ENABLED_DEFAULT); 670 shortCircuitMmapCacheSize = conf.getInt( 671 Mmap.CACHE_SIZE_KEY, 672 Mmap.CACHE_SIZE_DEFAULT); 673 shortCircuitMmapCacheExpiryMs = conf.getLong( 674 Mmap.CACHE_TIMEOUT_MS_KEY, 675 Mmap.CACHE_TIMEOUT_MS_DEFAULT); 676 shortCircuitMmapCacheRetryTimeout = conf.getLong( 677 Mmap.RETRY_TIMEOUT_MS_KEY, 678 Mmap.RETRY_TIMEOUT_MS_DEFAULT); 679 shortCircuitCacheStaleThresholdMs = conf.getLong( 680 ShortCircuit.REPLICA_STALE_THRESHOLD_MS_KEY, 681 ShortCircuit.REPLICA_STALE_THRESHOLD_MS_DEFAULT); 682 shortCircuitSharedMemoryWatcherInterruptCheckMs = conf.getInt( 683 DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS, 684 DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT); 685 686 keyProviderCacheExpiryMs = conf.getLong( 687 DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_MS, 688 DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT); 689 } 690 691 /** 692 * @return the socketCacheCapacity 693 */ 694 public int getSocketCacheCapacity() { 695 return socketCacheCapacity; 696 } 697 698 /** 699 * @return the socketCacheExpiry 700 */ 701 public long getSocketCacheExpiry() { 702 return socketCacheExpiry; 703 } 704 705 public boolean isUseLegacyBlockReaderLocal() { 706 return useLegacyBlockReaderLocal; 707 } 708 709 public String getDomainSocketPath() { 710 return domainSocketPath; 711 } 712 713 public boolean isShortCircuitLocalReads() { 714 return shortCircuitLocalReads; 715 } 716 717 public boolean isDomainSocketDataTraffic() { 718 return domainSocketDataTraffic; 719 } 720 /** 721 * @return the useLegacyBlockReader 722 */ 723 public boolean isUseLegacyBlockReader() { 724 return useLegacyBlockReader; 725 } 726 727 /** 728 * @return the skipShortCircuitChecksums 729 */ 730 public boolean isSkipShortCircuitChecksums() { 731 return skipShortCircuitChecksums; 732 } 733 734 /** 735 * @return the shortCircuitBufferSize 736 */ 737 public int getShortCircuitBufferSize() { 738 return shortCircuitBufferSize; 739 } 740 741 /** 742 * @return the shortCircuitStreamsCacheSize 743 */ 744 public int getShortCircuitStreamsCacheSize() { 745 return shortCircuitStreamsCacheSize; 746 } 747 748 /** 749 * @return the shortCircuitStreamsCacheExpiryMs 750 */ 751 public long getShortCircuitStreamsCacheExpiryMs() { 752 return shortCircuitStreamsCacheExpiryMs; 753 } 754 755 /** 756 * @return the shortCircuitSharedMemoryWatcherInterruptCheckMs 757 */ 758 public int getShortCircuitSharedMemoryWatcherInterruptCheckMs() { 759 return shortCircuitSharedMemoryWatcherInterruptCheckMs; 760 } 761 762 /** 763 * @return the shortCircuitMmapEnabled 764 */ 765 public boolean isShortCircuitMmapEnabled() { 766 return shortCircuitMmapEnabled; 767 } 768 769 /** 770 * @return the shortCircuitMmapCacheSize 771 */ 772 public int getShortCircuitMmapCacheSize() { 773 return shortCircuitMmapCacheSize; 774 } 775 776 /** 777 * @return the shortCircuitMmapCacheExpiryMs 778 */ 779 public long getShortCircuitMmapCacheExpiryMs() { 780 return shortCircuitMmapCacheExpiryMs; 781 } 782 783 /** 784 * @return the shortCircuitMmapCacheRetryTimeout 785 */ 786 public long getShortCircuitMmapCacheRetryTimeout() { 787 return shortCircuitMmapCacheRetryTimeout; 788 } 789 790 /** 791 * @return the shortCircuitCacheStaleThresholdMs 792 */ 793 public long getShortCircuitCacheStaleThresholdMs() { 794 return shortCircuitCacheStaleThresholdMs; 795 } 796 797 /** 798 * @return the keyProviderCacheExpiryMs 799 */ 800 public long getKeyProviderCacheExpiryMs() { 801 return keyProviderCacheExpiryMs; 802 } 803 804 public String confAsString() { 805 806 return "shortCircuitStreamsCacheSize = " 807 + shortCircuitStreamsCacheSize 808 + ", shortCircuitStreamsCacheExpiryMs = " 809 + shortCircuitStreamsCacheExpiryMs 810 + ", shortCircuitMmapCacheSize = " 811 + shortCircuitMmapCacheSize 812 + ", shortCircuitMmapCacheExpiryMs = " 813 + shortCircuitMmapCacheExpiryMs 814 + ", shortCircuitMmapCacheRetryTimeout = " 815 + shortCircuitMmapCacheRetryTimeout 816 + ", shortCircuitCacheStaleThresholdMs = " 817 + shortCircuitCacheStaleThresholdMs 818 + ", socketCacheCapacity = " 819 + socketCacheCapacity 820 + ", socketCacheExpiry = " 821 + socketCacheExpiry 822 + ", shortCircuitLocalReads = " 823 + shortCircuitLocalReads 824 + ", useLegacyBlockReaderLocal = " 825 + useLegacyBlockReaderLocal 826 + ", domainSocketDataTraffic = " 827 + domainSocketDataTraffic 828 + ", shortCircuitSharedMemoryWatcherInterruptCheckMs = " 829 + shortCircuitSharedMemoryWatcherInterruptCheckMs 830 + ", keyProviderCacheExpiryMs = " 831 + keyProviderCacheExpiryMs; 832 } 833 } 834}