001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hdfs.client.impl;
019
020import com.google.common.annotations.VisibleForTesting;
021import org.apache.hadoop.HadoopIllegalArgumentException;
022import org.apache.hadoop.conf.Configuration;
023import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
024import org.apache.hadoop.fs.Options.ChecksumOpt;
025import org.apache.hadoop.fs.permission.FsPermission;
026import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
027import org.apache.hadoop.hdfs.ReplicaAccessorBuilder;
028import org.apache.hadoop.hdfs.protocol.HdfsConstants;
029import org.apache.hadoop.hdfs.util.ByteArrayManager;
030import org.apache.hadoop.ipc.Client;
031import org.apache.hadoop.util.DataChecksum;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite;
036import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
037import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY;
038import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
039import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
040import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT;
041import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CHECKSUM_TYPE_KEY;
042import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT;
043import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY;
044import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT;
045import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY;
046import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC;
047import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT;
048import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT;
049import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_MS;
050import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT;
051import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY;
052import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT;
053import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY;
054import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT;
055import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY;
056import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT;
057import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY;
058import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_DEFAULT;
059import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_KEY;
060import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_DEFAULT;
061import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_KEY;
062import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
063import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
064import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
065import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER;
066import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL;
067import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT;
068import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT;
069import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
070import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
071import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
072import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_PATH_DEFAULT;
073import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY;
074import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_REPLICATION_DEFAULT;
075import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_REPLICATION_KEY;
076import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS;
077import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT;
078import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Failover;
079import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.HedgedRead;
080import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Mmap;
081import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Read;
082import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Retry;
083import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.ShortCircuit;
084import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write;
085
086import java.util.ArrayList;
087import java.util.Collections;
088import java.util.List;
089
090/**
091 * DFSClient configuration.
092 */
093public class DfsClientConf {
094  private static final Logger LOG = LoggerFactory.getLogger(DfsClientConf
095                                                                .class);
096
097  private final int hdfsTimeout;    // timeout value for a DFS operation.
098
099  private final int maxFailoverAttempts;
100  private final int maxRetryAttempts;
101  private final int failoverSleepBaseMillis;
102  private final int failoverSleepMaxMillis;
103  private final int maxBlockAcquireFailures;
104  private final int datanodeSocketWriteTimeout;
105  private final int ioBufferSize;
106  private final ChecksumOpt defaultChecksumOpt;
107  private final int writePacketSize;
108  private final int writeMaxPackets;
109  private final ByteArrayManager.Conf writeByteArrayManagerConf;
110  private final int socketTimeout;
111  private final int socketSendBufferSize;
112  private final long excludedNodesCacheExpiry;
113  /** Wait time window (in msec) if BlockMissingException is caught. */
114  private final int timeWindow;
115  private final int numCachedConnRetry;
116  private final int numBlockWriteRetry;
117  private final int numBlockWriteLocateFollowingRetry;
118  private final int blockWriteLocateFollowingInitialDelayMs;
119  private final long defaultBlockSize;
120  private final long prefetchSize;
121  private final short defaultReplication;
122  private final String taskId;
123  private final FsPermission uMask;
124  private final boolean connectToDnViaHostname;
125  private final boolean hdfsBlocksMetadataEnabled;
126  private final int fileBlockStorageLocationsNumThreads;
127  private final int fileBlockStorageLocationsTimeoutMs;
128  private final int retryTimesForGetLastBlockLength;
129  private final int retryIntervalForGetLastBlockLength;
130  private final long datanodeRestartTimeout;
131  private final long slowIoWarningThresholdMs;
132
133  private final ShortCircuitConf shortCircuitConf;
134
135  private final long hedgedReadThresholdMillis;
136  private final int hedgedReadThreadpoolSize;
137  private final List<Class<? extends ReplicaAccessorBuilder>>
138      replicaAccessorBuilderClasses;
139
140  private final boolean dataTransferTcpNoDelay;
141
142  public DfsClientConf(Configuration conf) {
143    // The hdfsTimeout is currently the same as the ipc timeout
144    hdfsTimeout = Client.getRpcTimeout(conf);
145
146    maxRetryAttempts = conf.getInt(
147        Retry.MAX_ATTEMPTS_KEY,
148        Retry.MAX_ATTEMPTS_DEFAULT);
149    timeWindow = conf.getInt(
150        Retry.WINDOW_BASE_KEY,
151        Retry.WINDOW_BASE_DEFAULT);
152    retryTimesForGetLastBlockLength = conf.getInt(
153        Retry.TIMES_GET_LAST_BLOCK_LENGTH_KEY,
154        Retry.TIMES_GET_LAST_BLOCK_LENGTH_DEFAULT);
155    retryIntervalForGetLastBlockLength = conf.getInt(
156        Retry.INTERVAL_GET_LAST_BLOCK_LENGTH_KEY,
157        Retry.INTERVAL_GET_LAST_BLOCK_LENGTH_DEFAULT);
158
159    maxFailoverAttempts = conf.getInt(
160        Failover.MAX_ATTEMPTS_KEY,
161        Failover.MAX_ATTEMPTS_DEFAULT);
162    failoverSleepBaseMillis = conf.getInt(
163        Failover.SLEEPTIME_BASE_KEY,
164        Failover.SLEEPTIME_BASE_DEFAULT);
165    failoverSleepMaxMillis = conf.getInt(
166        Failover.SLEEPTIME_MAX_KEY,
167        Failover.SLEEPTIME_MAX_DEFAULT);
168
169    maxBlockAcquireFailures = conf.getInt(
170        DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY,
171        DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT);
172    datanodeSocketWriteTimeout = conf.getInt(
173        DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY,
174        HdfsConstants.WRITE_TIMEOUT);
175    ioBufferSize = conf.getInt(
176        CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
177        CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
178    defaultChecksumOpt = getChecksumOptFromConf(conf);
179    dataTransferTcpNoDelay = conf.getBoolean(
180        DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_KEY,
181        DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_DEFAULT);
182    socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
183        HdfsConstants.READ_TIMEOUT);
184    socketSendBufferSize = conf.getInt(DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_KEY,
185        DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_DEFAULT);
186    /** dfs.write.packet.size is an internal config variable */
187    writePacketSize = conf.getInt(
188        DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
189        DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
190    writeMaxPackets = conf.getInt(
191        Write.MAX_PACKETS_IN_FLIGHT_KEY,
192        Write.MAX_PACKETS_IN_FLIGHT_DEFAULT);
193
194    final boolean byteArrayManagerEnabled = conf.getBoolean(
195        Write.ByteArrayManager.ENABLED_KEY,
196        Write.ByteArrayManager.ENABLED_DEFAULT);
197    if (!byteArrayManagerEnabled) {
198      writeByteArrayManagerConf = null;
199    } else {
200      final int countThreshold = conf.getInt(
201          Write.ByteArrayManager.COUNT_THRESHOLD_KEY,
202          Write.ByteArrayManager.COUNT_THRESHOLD_DEFAULT);
203      final int countLimit = conf.getInt(
204          Write.ByteArrayManager.COUNT_LIMIT_KEY,
205          Write.ByteArrayManager.COUNT_LIMIT_DEFAULT);
206      final long countResetTimePeriodMs = conf.getLong(
207          Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_KEY,
208          Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_DEFAULT);
209      writeByteArrayManagerConf = new ByteArrayManager.Conf(
210          countThreshold, countLimit, countResetTimePeriodMs);
211    }
212
213    defaultBlockSize = conf.getLongBytes(DFS_BLOCK_SIZE_KEY,
214        DFS_BLOCK_SIZE_DEFAULT);
215    defaultReplication = (short) conf.getInt(
216        DFS_REPLICATION_KEY, DFS_REPLICATION_DEFAULT);
217    taskId = conf.get("mapreduce.task.attempt.id", "NONMAPREDUCE");
218    excludedNodesCacheExpiry = conf.getLong(
219        Write.EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_KEY,
220        Write.EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT);
221    prefetchSize = conf.getLong(Read.PREFETCH_SIZE_KEY,
222        10 * defaultBlockSize);
223    numCachedConnRetry = conf.getInt(DFS_CLIENT_CACHED_CONN_RETRY_KEY,
224        DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT);
225    numBlockWriteRetry = conf.getInt(
226        BlockWrite.RETRIES_KEY,
227        BlockWrite.RETRIES_DEFAULT);
228    numBlockWriteLocateFollowingRetry = conf.getInt(
229        BlockWrite.LOCATEFOLLOWINGBLOCK_RETRIES_KEY,
230        BlockWrite.LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT);
231    blockWriteLocateFollowingInitialDelayMs = conf.getInt(
232        BlockWrite.LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_MS_KEY,
233        BlockWrite.LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_MS_DEFAULT);
234    uMask = FsPermission.getUMask(conf);
235    connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME,
236        DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
237    hdfsBlocksMetadataEnabled = conf.getBoolean(
238        HdfsClientConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
239        HdfsClientConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
240    fileBlockStorageLocationsNumThreads = conf.getInt(
241        HdfsClientConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS,
242        HdfsClientConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS_DEFAULT);
243    fileBlockStorageLocationsTimeoutMs = conf.getInt(
244        HdfsClientConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS,
245        HdfsClientConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS_DEFAULT);
246
247    datanodeRestartTimeout = conf.getLong(
248        DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY,
249        DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT) * 1000;
250    slowIoWarningThresholdMs = conf.getLong(
251        DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY,
252        DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT);
253
254    shortCircuitConf = new ShortCircuitConf(conf);
255
256    hedgedReadThresholdMillis = conf.getLong(
257        HedgedRead.THRESHOLD_MILLIS_KEY,
258        HedgedRead.THRESHOLD_MILLIS_DEFAULT);
259    hedgedReadThreadpoolSize = conf.getInt(
260        HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_KEY,
261        HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_DEFAULT);
262
263    replicaAccessorBuilderClasses = loadReplicaAccessorBuilderClasses(conf);
264  }
265
266  @SuppressWarnings("unchecked")
267  private List<Class<? extends ReplicaAccessorBuilder>>
268      loadReplicaAccessorBuilderClasses(Configuration conf) {
269    String[] classNames = conf.getTrimmedStrings(
270        HdfsClientConfigKeys.REPLICA_ACCESSOR_BUILDER_CLASSES_KEY);
271    if (classNames.length == 0) {
272      return Collections.emptyList();
273    }
274    ArrayList<Class<? extends ReplicaAccessorBuilder>> classes =
275        new ArrayList<>();
276    ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
277    for (String className: classNames) {
278      try {
279        Class<? extends ReplicaAccessorBuilder> cls =
280            (Class<? extends ReplicaAccessorBuilder>)
281                classLoader.loadClass(className);
282        classes.add(cls);
283      } catch (Throwable t) {
284        LOG.warn("Unable to load " + className, t);
285      }
286    }
287    return classes;
288  }
289
290  private static DataChecksum.Type getChecksumType(Configuration conf) {
291    final String checksum = conf.get(
292        DFS_CHECKSUM_TYPE_KEY,
293        DFS_CHECKSUM_TYPE_DEFAULT);
294    try {
295      return DataChecksum.Type.valueOf(checksum);
296    } catch(IllegalArgumentException iae) {
297      LOG.warn("Bad checksum type: {}. Using default {}", checksum,
298               DFS_CHECKSUM_TYPE_DEFAULT);
299      return DataChecksum.Type.valueOf(
300          DFS_CHECKSUM_TYPE_DEFAULT);
301    }
302  }
303
304  // Construct a checksum option from conf
305  public static ChecksumOpt getChecksumOptFromConf(Configuration conf) {
306    DataChecksum.Type type = getChecksumType(conf);
307    int bytesPerChecksum = conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY,
308        DFS_BYTES_PER_CHECKSUM_DEFAULT);
309    return new ChecksumOpt(type, bytesPerChecksum);
310  }
311
312  /** create a DataChecksum with the given option. */
313  public DataChecksum createChecksum(ChecksumOpt userOpt) {
314    // Fill in any missing field with the default.
315    ChecksumOpt opt = ChecksumOpt.processChecksumOpt(
316        defaultChecksumOpt, userOpt);
317    DataChecksum dataChecksum = DataChecksum.newDataChecksum(
318        opt.getChecksumType(),
319        opt.getBytesPerChecksum());
320    if (dataChecksum == null) {
321      throw new HadoopIllegalArgumentException("Invalid checksum type: userOpt="
322          + userOpt + ", default=" + defaultChecksumOpt
323          + ", effective=null");
324    }
325    return dataChecksum;
326  }
327
328  @VisibleForTesting
329  public int getBlockWriteLocateFollowingInitialDelayMs() {
330    return blockWriteLocateFollowingInitialDelayMs;
331  }
332
333  /**
334   * @return the hdfsTimeout
335   */
336  public int getHdfsTimeout() {
337    return hdfsTimeout;
338  }
339
340  /**
341   * @return the maxFailoverAttempts
342   */
343  public int getMaxFailoverAttempts() {
344    return maxFailoverAttempts;
345  }
346
347  /**
348   * @return the maxRetryAttempts
349   */
350  public int getMaxRetryAttempts() {
351    return maxRetryAttempts;
352  }
353
354  /**
355   * @return the failoverSleepBaseMillis
356   */
357  public int getFailoverSleepBaseMillis() {
358    return failoverSleepBaseMillis;
359  }
360
361  /**
362   * @return the failoverSleepMaxMillis
363   */
364  public int getFailoverSleepMaxMillis() {
365    return failoverSleepMaxMillis;
366  }
367
368  /**
369   * @return the maxBlockAcquireFailures
370   */
371  public int getMaxBlockAcquireFailures() {
372    return maxBlockAcquireFailures;
373  }
374
375  /**
376   * @return the datanodeSocketWriteTimeout
377   */
378  public int getDatanodeSocketWriteTimeout() {
379    return datanodeSocketWriteTimeout;
380  }
381
382  /**
383   * @return the ioBufferSize
384   */
385  public int getIoBufferSize() {
386    return ioBufferSize;
387  }
388
389  /**
390   * @return the defaultChecksumOpt
391   */
392  public ChecksumOpt getDefaultChecksumOpt() {
393    return defaultChecksumOpt;
394  }
395
396  /**
397   * @return the writePacketSize
398   */
399  public int getWritePacketSize() {
400    return writePacketSize;
401  }
402
403  /**
404   * @return the writeMaxPackets
405   */
406  public int getWriteMaxPackets() {
407    return writeMaxPackets;
408  }
409
410  /**
411   * @return the writeByteArrayManagerConf
412   */
413  public ByteArrayManager.Conf getWriteByteArrayManagerConf() {
414    return writeByteArrayManagerConf;
415  }
416
417  /**
418   * @return whether TCP_NODELAY should be set on client sockets
419   */
420  public boolean getDataTransferTcpNoDelay() {
421    return dataTransferTcpNoDelay;
422  }
423
424  /**
425   * @return the socketTimeout
426   */
427  public int getSocketTimeout() {
428    return socketTimeout;
429  }
430
431  /**
432   * @return the socketSendBufferSize
433   */
434  public int getSocketSendBufferSize() {
435    return socketSendBufferSize;
436  }
437
438  /**
439   * @return the excludedNodesCacheExpiry
440   */
441  public long getExcludedNodesCacheExpiry() {
442    return excludedNodesCacheExpiry;
443  }
444
445  /**
446   * @return the timeWindow
447   */
448  public int getTimeWindow() {
449    return timeWindow;
450  }
451
452  /**
453   * @return the numCachedConnRetry
454   */
455  public int getNumCachedConnRetry() {
456    return numCachedConnRetry;
457  }
458
459  /**
460   * @return the numBlockWriteRetry
461   */
462  public int getNumBlockWriteRetry() {
463    return numBlockWriteRetry;
464  }
465
466  /**
467   * @return the numBlockWriteLocateFollowingRetry
468   */
469  public int getNumBlockWriteLocateFollowingRetry() {
470    return numBlockWriteLocateFollowingRetry;
471  }
472
473  /**
474   * @return the defaultBlockSize
475   */
476  public long getDefaultBlockSize() {
477    return defaultBlockSize;
478  }
479
480  /**
481   * @return the prefetchSize
482   */
483  public long getPrefetchSize() {
484    return prefetchSize;
485  }
486
487  /**
488   * @return the defaultReplication
489   */
490  public short getDefaultReplication() {
491    return defaultReplication;
492  }
493
494  /**
495   * @return the taskId
496   */
497  public String getTaskId() {
498    return taskId;
499  }
500
501  /**
502   * @return the uMask
503   */
504  public FsPermission getUMask() {
505    return uMask;
506  }
507
508  /**
509   * @return the connectToDnViaHostname
510   */
511  public boolean isConnectToDnViaHostname() {
512    return connectToDnViaHostname;
513  }
514
515  /**
516   * @return the hdfsBlocksMetadataEnabled
517   */
518  public boolean isHdfsBlocksMetadataEnabled() {
519    return hdfsBlocksMetadataEnabled;
520  }
521
522  /**
523   * @return the fileBlockStorageLocationsNumThreads
524   */
525  public int getFileBlockStorageLocationsNumThreads() {
526    return fileBlockStorageLocationsNumThreads;
527  }
528
529  /**
530   * @return the getFileBlockStorageLocationsTimeoutMs
531   */
532  public int getFileBlockStorageLocationsTimeoutMs() {
533    return fileBlockStorageLocationsTimeoutMs;
534  }
535
536  /**
537   * @return the retryTimesForGetLastBlockLength
538   */
539  public int getRetryTimesForGetLastBlockLength() {
540    return retryTimesForGetLastBlockLength;
541  }
542
543  /**
544   * @return the retryIntervalForGetLastBlockLength
545   */
546  public int getRetryIntervalForGetLastBlockLength() {
547    return retryIntervalForGetLastBlockLength;
548  }
549
550  /**
551   * @return the datanodeRestartTimeout
552   */
553  public long getDatanodeRestartTimeout() {
554    return datanodeRestartTimeout;
555  }
556
557  /**
558   * @return the slowIoWarningThresholdMs
559   */
560  public long getSlowIoWarningThresholdMs() {
561    return slowIoWarningThresholdMs;
562  }
563
564  /**
565   * @return the hedgedReadThresholdMillis
566   */
567  public long getHedgedReadThresholdMillis() {
568    return hedgedReadThresholdMillis;
569  }
570
571  /**
572   * @return the hedgedReadThreadpoolSize
573   */
574  public int getHedgedReadThreadpoolSize() {
575    return hedgedReadThreadpoolSize;
576  }
577
578  /**
579   * @return the replicaAccessorBuilderClasses
580   */
581  public List<Class<? extends ReplicaAccessorBuilder>>
582        getReplicaAccessorBuilderClasses() {
583    return replicaAccessorBuilderClasses;
584  }
585
586  /**
587   * @return the shortCircuitConf
588   */
589  public ShortCircuitConf getShortCircuitConf() {
590    return shortCircuitConf;
591  }
592
593  /**
594   * Configuration for short-circuit reads.
595   */
596  public static class ShortCircuitConf {
597    private static final Logger LOG = DfsClientConf.LOG;
598
599    private final int socketCacheCapacity;
600    private final long socketCacheExpiry;
601
602    private final boolean useLegacyBlockReader;
603    private final boolean useLegacyBlockReaderLocal;
604    private final String domainSocketPath;
605    private final boolean skipShortCircuitChecksums;
606
607    private final int shortCircuitBufferSize;
608    private final boolean shortCircuitLocalReads;
609    private final boolean domainSocketDataTraffic;
610    private final int shortCircuitStreamsCacheSize;
611    private final long shortCircuitStreamsCacheExpiryMs;
612    private final int shortCircuitSharedMemoryWatcherInterruptCheckMs;
613
614    private final boolean shortCircuitMmapEnabled;
615    private final int shortCircuitMmapCacheSize;
616    private final long shortCircuitMmapCacheExpiryMs;
617    private final long shortCircuitMmapCacheRetryTimeout;
618    private final long shortCircuitCacheStaleThresholdMs;
619
620    private final long keyProviderCacheExpiryMs;
621
622    public ShortCircuitConf(Configuration conf) {
623      socketCacheCapacity = conf.getInt(
624          DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY,
625          DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT);
626      socketCacheExpiry = conf.getLong(
627          DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY,
628          DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT);
629
630      useLegacyBlockReader = conf.getBoolean(
631          DFS_CLIENT_USE_LEGACY_BLOCKREADER,
632          DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT);
633      useLegacyBlockReaderLocal = conf.getBoolean(
634          DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL,
635          DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT);
636      shortCircuitLocalReads = conf.getBoolean(
637          Read.ShortCircuit.KEY,
638          Read.ShortCircuit.DEFAULT);
639      domainSocketDataTraffic = conf.getBoolean(
640          DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
641          DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT);
642      domainSocketPath = conf.getTrimmed(
643          DFS_DOMAIN_SOCKET_PATH_KEY,
644          DFS_DOMAIN_SOCKET_PATH_DEFAULT);
645
646      LOG.debug(DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL
647                    + " = {}", useLegacyBlockReaderLocal);
648      LOG.debug(Read.ShortCircuit.KEY
649                    + " = {}", shortCircuitLocalReads);
650      LOG.debug(DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC
651                    + " = {}", domainSocketDataTraffic);
652      LOG.debug(DFS_DOMAIN_SOCKET_PATH_KEY
653                    + " = {}", domainSocketPath);
654
655      skipShortCircuitChecksums = conf.getBoolean(
656          Read.ShortCircuit.SKIP_CHECKSUM_KEY,
657          Read.ShortCircuit.SKIP_CHECKSUM_DEFAULT);
658      shortCircuitBufferSize = conf.getInt(
659          Read.ShortCircuit.BUFFER_SIZE_KEY,
660          Read.ShortCircuit.BUFFER_SIZE_DEFAULT);
661      shortCircuitStreamsCacheSize = conf.getInt(
662          Read.ShortCircuit.STREAMS_CACHE_SIZE_KEY,
663          Read.ShortCircuit.STREAMS_CACHE_SIZE_DEFAULT);
664      shortCircuitStreamsCacheExpiryMs = conf.getLong(
665          Read.ShortCircuit.STREAMS_CACHE_EXPIRY_MS_KEY,
666          Read.ShortCircuit.STREAMS_CACHE_EXPIRY_MS_DEFAULT);
667      shortCircuitMmapEnabled = conf.getBoolean(
668          Mmap.ENABLED_KEY,
669          Mmap.ENABLED_DEFAULT);
670      shortCircuitMmapCacheSize = conf.getInt(
671          Mmap.CACHE_SIZE_KEY,
672          Mmap.CACHE_SIZE_DEFAULT);
673      shortCircuitMmapCacheExpiryMs = conf.getLong(
674          Mmap.CACHE_TIMEOUT_MS_KEY,
675          Mmap.CACHE_TIMEOUT_MS_DEFAULT);
676      shortCircuitMmapCacheRetryTimeout = conf.getLong(
677          Mmap.RETRY_TIMEOUT_MS_KEY,
678          Mmap.RETRY_TIMEOUT_MS_DEFAULT);
679      shortCircuitCacheStaleThresholdMs = conf.getLong(
680          ShortCircuit.REPLICA_STALE_THRESHOLD_MS_KEY,
681          ShortCircuit.REPLICA_STALE_THRESHOLD_MS_DEFAULT);
682      shortCircuitSharedMemoryWatcherInterruptCheckMs = conf.getInt(
683          DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS,
684          DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT);
685
686      keyProviderCacheExpiryMs = conf.getLong(
687          DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_MS,
688          DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT);
689    }
690
691    /**
692     * @return the socketCacheCapacity
693     */
694    public int getSocketCacheCapacity() {
695      return socketCacheCapacity;
696    }
697
698    /**
699     * @return the socketCacheExpiry
700     */
701    public long getSocketCacheExpiry() {
702      return socketCacheExpiry;
703    }
704
705    public boolean isUseLegacyBlockReaderLocal() {
706      return useLegacyBlockReaderLocal;
707    }
708
709    public String getDomainSocketPath() {
710      return domainSocketPath;
711    }
712
713    public boolean isShortCircuitLocalReads() {
714      return shortCircuitLocalReads;
715    }
716
717    public boolean isDomainSocketDataTraffic() {
718      return domainSocketDataTraffic;
719    }
720    /**
721     * @return the useLegacyBlockReader
722     */
723    public boolean isUseLegacyBlockReader() {
724      return useLegacyBlockReader;
725    }
726
727    /**
728     * @return the skipShortCircuitChecksums
729     */
730    public boolean isSkipShortCircuitChecksums() {
731      return skipShortCircuitChecksums;
732    }
733
734    /**
735     * @return the shortCircuitBufferSize
736     */
737    public int getShortCircuitBufferSize() {
738      return shortCircuitBufferSize;
739    }
740
741    /**
742     * @return the shortCircuitStreamsCacheSize
743     */
744    public int getShortCircuitStreamsCacheSize() {
745      return shortCircuitStreamsCacheSize;
746    }
747
748    /**
749     * @return the shortCircuitStreamsCacheExpiryMs
750     */
751    public long getShortCircuitStreamsCacheExpiryMs() {
752      return shortCircuitStreamsCacheExpiryMs;
753    }
754
755    /**
756     * @return the shortCircuitSharedMemoryWatcherInterruptCheckMs
757     */
758    public int getShortCircuitSharedMemoryWatcherInterruptCheckMs() {
759      return shortCircuitSharedMemoryWatcherInterruptCheckMs;
760    }
761
762    /**
763     * @return the shortCircuitMmapEnabled
764     */
765    public boolean isShortCircuitMmapEnabled() {
766      return shortCircuitMmapEnabled;
767    }
768
769    /**
770     * @return the shortCircuitMmapCacheSize
771     */
772    public int getShortCircuitMmapCacheSize() {
773      return shortCircuitMmapCacheSize;
774    }
775
776    /**
777     * @return the shortCircuitMmapCacheExpiryMs
778     */
779    public long getShortCircuitMmapCacheExpiryMs() {
780      return shortCircuitMmapCacheExpiryMs;
781    }
782
783    /**
784     * @return the shortCircuitMmapCacheRetryTimeout
785     */
786    public long getShortCircuitMmapCacheRetryTimeout() {
787      return shortCircuitMmapCacheRetryTimeout;
788    }
789
790    /**
791     * @return the shortCircuitCacheStaleThresholdMs
792     */
793    public long getShortCircuitCacheStaleThresholdMs() {
794      return shortCircuitCacheStaleThresholdMs;
795    }
796
797    /**
798     * @return the keyProviderCacheExpiryMs
799     */
800    public long getKeyProviderCacheExpiryMs() {
801      return keyProviderCacheExpiryMs;
802    }
803
804    public String confAsString() {
805
806      return "shortCircuitStreamsCacheSize = "
807          + shortCircuitStreamsCacheSize
808          + ", shortCircuitStreamsCacheExpiryMs = "
809          + shortCircuitStreamsCacheExpiryMs
810          + ", shortCircuitMmapCacheSize = "
811          + shortCircuitMmapCacheSize
812          + ", shortCircuitMmapCacheExpiryMs = "
813          + shortCircuitMmapCacheExpiryMs
814          + ", shortCircuitMmapCacheRetryTimeout = "
815          + shortCircuitMmapCacheRetryTimeout
816          + ", shortCircuitCacheStaleThresholdMs = "
817          + shortCircuitCacheStaleThresholdMs
818          + ", socketCacheCapacity = "
819          + socketCacheCapacity
820          + ", socketCacheExpiry = "
821          + socketCacheExpiry
822          + ", shortCircuitLocalReads = "
823          + shortCircuitLocalReads
824          + ", useLegacyBlockReaderLocal = "
825          + useLegacyBlockReaderLocal
826          + ", domainSocketDataTraffic = "
827          + domainSocketDataTraffic
828          + ", shortCircuitSharedMemoryWatcherInterruptCheckMs = "
829          + shortCircuitSharedMemoryWatcherInterruptCheckMs
830          + ", keyProviderCacheExpiryMs = "
831          + keyProviderCacheExpiryMs;
832    }
833  }
834}