001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hdfs;
020
021import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN;
022import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT;
023import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_KEY;
024import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY;
025import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY;
026import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_DEFAULT;
027import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY;
028import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_DEFAULT;
029import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY;
030import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY;
031import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
032import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY;
033import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
034import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
035import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYPASSWORD_KEY;
036import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_PASSWORD_KEY;
037import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_TRUSTSTORE_PASSWORD_KEY;
038
039import java.io.IOException;
040import java.io.PrintStream;
041import java.net.InetAddress;
042import java.net.InetSocketAddress;
043import java.net.URI;
044import java.net.URISyntaxException;
045import java.security.SecureRandom;
046import java.util.Arrays;
047import java.util.Collection;
048import java.util.Comparator;
049import java.util.Date;
050import java.util.HashSet;
051import java.util.List;
052import java.util.Map;
053import java.util.Set;
054import java.util.concurrent.ThreadLocalRandom;
055
056import org.apache.commons.cli.CommandLine;
057import org.apache.commons.cli.CommandLineParser;
058import org.apache.commons.cli.Option;
059import org.apache.commons.cli.Options;
060import org.apache.commons.cli.ParseException;
061import org.apache.commons.cli.PosixParser;
062import org.apache.commons.logging.Log;
063import org.apache.commons.logging.LogFactory;
064import org.apache.hadoop.HadoopIllegalArgumentException;
065import org.apache.hadoop.classification.InterfaceAudience;
066import org.apache.hadoop.conf.Configuration;
067import org.apache.hadoop.crypto.key.KeyProvider;
068import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
069import org.apache.hadoop.fs.CommonConfigurationKeys;
070import org.apache.hadoop.fs.FileSystem;
071import org.apache.hadoop.fs.Path;
072import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
073import org.apache.hadoop.hdfs.protocol.HdfsConstants;
074import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
075import org.apache.hadoop.hdfs.server.namenode.NameNode;
076import org.apache.hadoop.http.HttpConfig;
077import org.apache.hadoop.http.HttpServer2;
078import org.apache.hadoop.ipc.ProtobufRpcEngine;
079import org.apache.hadoop.ipc.RPC;
080import org.apache.hadoop.net.NetUtils;
081import org.apache.hadoop.security.SecurityUtil;
082import org.apache.hadoop.security.UserGroupInformation;
083import org.apache.hadoop.security.authorize.AccessControlList;
084import org.apache.hadoop.util.ToolRunner;
085
086import com.google.common.annotations.VisibleForTesting;
087import com.google.common.base.Joiner;
088import com.google.common.base.Preconditions;
089import com.google.common.collect.Lists;
090import com.google.common.collect.Sets;
091import com.google.protobuf.BlockingService;
092
093@InterfaceAudience.Private
094public class DFSUtil {
095  public static final Log LOG = LogFactory.getLog(DFSUtil.class.getName());
096  
097  private DFSUtil() { /* Hidden constructor */ }
098  
099  private static final ThreadLocal<SecureRandom> SECURE_RANDOM = new ThreadLocal<SecureRandom>() {
100    @Override
101    protected SecureRandom initialValue() {
102      return new SecureRandom();
103    }
104  };
105
106  /** @return a pseudo secure random number generator. */
107  public static SecureRandom getSecureRandom() {
108    return SECURE_RANDOM.get();
109  }
110
111  /** Shuffle the elements in the given array. */
112  public static <T> T[] shuffle(final T[] array) {
113    if (array != null && array.length > 0) {
114      for (int n = array.length; n > 1; ) {
115        final int randomIndex = ThreadLocalRandom.current().nextInt(n);
116        n--;
117        if (n != randomIndex) {
118          final T tmp = array[randomIndex];
119          array[randomIndex] = array[n];
120          array[n] = tmp;
121        }
122      }
123    }
124    return array;
125  }
126
127  /**
128   * Compartor for sorting DataNodeInfo[] based on decommissioned states.
129   * Decommissioned nodes are moved to the end of the array on sorting with
130   * this compartor.
131   */
132  public static final Comparator<DatanodeInfo> DECOM_COMPARATOR = 
133    new Comparator<DatanodeInfo>() {
134      @Override
135      public int compare(DatanodeInfo a, DatanodeInfo b) {
136        return a.isDecommissioned() == b.isDecommissioned() ? 0 : 
137          a.isDecommissioned() ? 1 : -1;
138      }
139    };
140
141
142  /**
143   * Comparator for sorting DataNodeInfo[] based on decommissioned/stale states.
144   * Decommissioned/stale nodes are moved to the end of the array on sorting
145   * with this comparator.
146   */ 
147  @InterfaceAudience.Private 
148  public static class DecomStaleComparator implements Comparator<DatanodeInfo> {
149    private final long staleInterval;
150
151    /**
152     * Constructor of DecomStaleComparator
153     * 
154     * @param interval
155     *          The time interval for marking datanodes as stale is passed from
156     *          outside, since the interval may be changed dynamically
157     */
158    public DecomStaleComparator(long interval) {
159      this.staleInterval = interval;
160    }
161
162    @Override
163    public int compare(DatanodeInfo a, DatanodeInfo b) {
164      // Decommissioned nodes will still be moved to the end of the list
165      if (a.isDecommissioned()) {
166        return b.isDecommissioned() ? 0 : 1;
167      } else if (b.isDecommissioned()) {
168        return -1;
169      }
170      // Stale nodes will be moved behind the normal nodes
171      boolean aStale = a.isStale(staleInterval);
172      boolean bStale = b.isStale(staleInterval);
173      return aStale == bStale ? 0 : (aStale ? 1 : -1);
174    }
175  }    
176    
177  /**
178   * Address matcher for matching an address to local address
179   */
180  static final AddressMatcher LOCAL_ADDRESS_MATCHER = new AddressMatcher() {
181    @Override
182    public boolean match(InetSocketAddress s) {
183      return NetUtils.isLocalAddress(s.getAddress());
184    };
185  };
186  
187  /**
188   * Whether the pathname is valid.  Currently prohibits relative paths, 
189   * names which contain a ":" or "//", or other non-canonical paths.
190   */
191  public static boolean isValidName(String src) {
192    return DFSUtilClient.isValidName(src);
193  }
194
195  /**
196   * Checks if a string is a valid path component. For instance, components
197   * cannot contain a ":" or "/", and cannot be equal to a reserved component
198   * like ".snapshot".
199   * <p>
200   * The primary use of this method is for validating paths when loading the
201   * FSImage. During normal NN operation, paths are sometimes allowed to
202   * contain reserved components.
203   * 
204   * @return If component is valid
205   */
206  public static boolean isValidNameForComponent(String component) {
207    if (component.equals(".") ||
208        component.equals("..") ||
209        component.indexOf(":") >= 0 ||
210        component.indexOf("/") >= 0) {
211      return false;
212    }
213    return !isReservedPathComponent(component);
214  }
215
216
217  /**
218   * Returns if the component is reserved.
219   * 
220   * <p>
221   * Note that some components are only reserved under certain directories, e.g.
222   * "/.reserved" is reserved, while "/hadoop/.reserved" is not.
223   * @return true, if the component is reserved
224   */
225  public static boolean isReservedPathComponent(String component) {
226    for (String reserved : HdfsServerConstants.RESERVED_PATH_COMPONENTS) {
227      if (component.equals(reserved)) {
228        return true;
229      }
230    }
231    return false;
232  }
233
234  /**
235   * Converts a byte array to a string using UTF8 encoding.
236   */
237  public static String bytes2String(byte[] bytes) {
238    return bytes2String(bytes, 0, bytes.length);
239  }
240  
241  /**
242   * Decode a specific range of bytes of the given byte array to a string
243   * using UTF8.
244   * 
245   * @param bytes The bytes to be decoded into characters
246   * @param offset The index of the first byte to decode
247   * @param length The number of bytes to decode
248   * @return The decoded string
249   */
250  public static String bytes2String(byte[] bytes, int offset, int length) {
251    return DFSUtilClient.bytes2String(bytes, 0, bytes.length);
252  }
253
254  /**
255   * Converts a string to a byte array using UTF8 encoding.
256   */
257  public static byte[] string2Bytes(String str) {
258    return DFSUtilClient.string2Bytes(str);
259  }
260
261  /**
262   * Given a list of path components returns a path as a UTF8 String
263   */
264  public static String byteArray2PathString(final byte[][] components,
265      final int offset, final int length) {
266    // specifically not using StringBuilder to more efficiently build
267    // string w/o excessive byte[] copies and charset conversions.
268    final int range = offset + length;
269    Preconditions.checkPositionIndexes(offset, range, components.length);
270    if (length == 0) {
271      return "";
272    }
273    // absolute paths start with either null or empty byte[]
274    byte[] firstComponent = components[offset];
275    boolean isAbsolute = (offset == 0 &&
276        (firstComponent == null || firstComponent.length == 0));
277    if (offset == 0 && length == 1) {
278      return isAbsolute ? Path.SEPARATOR : bytes2String(firstComponent);
279    }
280    // compute length of full byte[], seed with 1st component and delimiters
281    int pos = isAbsolute ? 0 : firstComponent.length;
282    int size = pos + length - 1;
283    for (int i=offset + 1; i < range; i++) {
284      size += components[i].length;
285    }
286    final byte[] result = new byte[size];
287    if (!isAbsolute) {
288      System.arraycopy(firstComponent, 0, result, 0, firstComponent.length);
289    }
290    // append remaining components as "/component".
291    for (int i=offset + 1; i < range; i++) {
292      result[pos++] = (byte)Path.SEPARATOR_CHAR;
293      int len = components[i].length;
294      System.arraycopy(components[i], 0, result, pos, len);
295      pos += len;
296    }
297    return bytes2String(result);
298  }
299
300  public static String byteArray2PathString(byte[][] pathComponents) {
301    return byteArray2PathString(pathComponents, 0, pathComponents.length);
302  }
303
304  /**
305   * Converts a list of path components into a path using Path.SEPARATOR.
306   * 
307   * @param components Path components
308   * @return Combined path as a UTF-8 string
309   */
310  public static String strings2PathString(String[] components) {
311    if (components.length == 0) {
312      return "";
313    }
314    if (components.length == 1) {
315      if (components[0] == null || components[0].isEmpty()) {
316        return Path.SEPARATOR;
317      }
318    }
319    return Joiner.on(Path.SEPARATOR).join(components);
320  }
321
322  /** Convert an object representing a path to a string. */
323  public static String path2String(final Object path) {
324    return path == null? null
325        : path instanceof String? (String)path
326        : path instanceof byte[][]? byteArray2PathString((byte[][])path)
327        : path.toString();
328  }
329
330  /**
331   * Convert a UTF8 string to an array of byte arrays.
332   */
333  public static byte[][] getPathComponents(String path) {
334    // avoid intermediate split to String[]
335    final byte[] bytes = string2Bytes(path);
336    return bytes2byteArray(bytes, bytes.length, (byte)Path.SEPARATOR_CHAR);
337  }
338
339  /**
340   * Splits the array of bytes into array of arrays of bytes
341   * on byte separator
342   * @param bytes the array of bytes to split
343   * @param separator the delimiting byte
344   */
345  public static byte[][] bytes2byteArray(byte[] bytes, byte separator) {
346    return bytes2byteArray(bytes, bytes.length, separator);
347  }
348
349  /**
350   * Splits first len bytes in bytes to array of arrays of bytes
351   * on byte separator
352   * @param bytes the byte array to split
353   * @param len the number of bytes to split
354   * @param separator the delimiting byte
355   */
356  public static byte[][] bytes2byteArray(byte[] bytes,
357                                         int len,
358                                         byte separator) {
359    Preconditions.checkPositionIndex(len, bytes.length);
360    if (len == 0) {
361      return new byte[][]{null};
362    }
363    // Count the splits. Omit multiple separators and the last one by
364    // peeking at prior byte.
365    int splits = 0;
366    for (int i = 1; i < len; i++) {
367      if (bytes[i-1] == separator && bytes[i] != separator) {
368        splits++;
369      }
370    }
371    if (splits == 0 && bytes[0] == separator) {
372      return new byte[][]{null};
373    }
374    splits++;
375    byte[][] result = new byte[splits][];
376    int nextIndex = 0;
377    // Build the splits.
378    for (int i = 0; i < splits; i++) {
379      int startIndex = nextIndex;
380      // find next separator in the bytes.
381      while (nextIndex < len && bytes[nextIndex] != separator) {
382        nextIndex++;
383      }
384      result[i] = (nextIndex > 0)
385          ? Arrays.copyOfRange(bytes, startIndex, nextIndex)
386          : DFSUtilClient.EMPTY_BYTES; // reuse empty bytes for root.
387      do { // skip over separators.
388        nextIndex++;
389      } while (nextIndex < len && bytes[nextIndex] == separator);
390    }
391    return result;
392  }
393
394  /**
395   * Return configuration key of format key.suffix1.suffix2...suffixN
396   */
397  public static String addKeySuffixes(String key, String... suffixes) {
398    String keySuffix = DFSUtilClient.concatSuffixes(suffixes);
399    return DFSUtilClient.addSuffix(key, keySuffix);
400  }
401
402  /**
403   * Get all of the RPC addresses of the individual NNs in a given nameservice.
404   * 
405   * @param conf Configuration
406   * @param nsId the nameservice whose NNs addresses we want.
407   * @param defaultValue default address to return in case key is not found.
408   * @return A map from nnId -> RPC address of each NN in the nameservice.
409   */
410  public static Map<String, InetSocketAddress> getRpcAddressesForNameserviceId(
411      Configuration conf, String nsId, String defaultValue) {
412    return DFSUtilClient.getAddressesForNameserviceId(conf, nsId, defaultValue,
413                                                      DFS_NAMENODE_RPC_ADDRESS_KEY);
414  }
415
416  /**
417   * @return a collection of all configured NN Kerberos principals.
418   */
419  public static Set<String> getAllNnPrincipals(Configuration conf) throws IOException {
420    Set<String> principals = new HashSet<String>();
421    for (String nsId : DFSUtilClient.getNameServiceIds(conf)) {
422      if (HAUtil.isHAEnabled(conf, nsId)) {
423        for (String nnId : DFSUtilClient.getNameNodeIds(conf, nsId)) {
424          Configuration confForNn = new Configuration(conf);
425          NameNode.initializeGenericKeys(confForNn, nsId, nnId);
426          String principal = SecurityUtil.getServerPrincipal(confForNn
427              .get(DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY),
428              DFSUtilClient.getNNAddress(confForNn).getHostName());
429          principals.add(principal);
430        }
431      } else {
432        Configuration confForNn = new Configuration(conf);
433        NameNode.initializeGenericKeys(confForNn, nsId, null);
434        String principal = SecurityUtil.getServerPrincipal(confForNn
435            .get(DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY),
436            DFSUtilClient.getNNAddress(confForNn).getHostName());
437        principals.add(principal);
438      }
439    }
440
441    return principals;
442  }
443
444  /**
445   * Returns list of InetSocketAddress corresponding to HA NN RPC addresses from
446   * the configuration.
447   * 
448   * @param conf configuration
449   * @return list of InetSocketAddresses
450   */
451  public static Map<String, Map<String, InetSocketAddress>> getHaNnRpcAddresses(
452      Configuration conf) {
453    return DFSUtilClient.getAddresses(conf, null,
454                                      DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
455  }
456
457  /**
458   * Returns list of InetSocketAddress corresponding to  backup node rpc 
459   * addresses from the configuration.
460   * 
461   * @param conf configuration
462   * @return list of InetSocketAddresses
463   * @throws IOException on error
464   */
465  public static Map<String, Map<String, InetSocketAddress>> getBackupNodeAddresses(
466      Configuration conf) throws IOException {
467    Map<String, Map<String, InetSocketAddress>> addressList = DFSUtilClient.getAddresses(
468        conf, null, DFS_NAMENODE_BACKUP_ADDRESS_KEY);
469    if (addressList.isEmpty()) {
470      throw new IOException("Incorrect configuration: backup node address "
471          + DFS_NAMENODE_BACKUP_ADDRESS_KEY + " is not configured.");
472    }
473    return addressList;
474  }
475
476  /**
477   * Returns list of InetSocketAddresses of corresponding to secondary namenode
478   * http addresses from the configuration.
479   * 
480   * @param conf configuration
481   * @return list of InetSocketAddresses
482   * @throws IOException on error
483   */
484  public static Map<String, Map<String, InetSocketAddress>> getSecondaryNameNodeAddresses(
485      Configuration conf) throws IOException {
486    Map<String, Map<String, InetSocketAddress>> addressList = DFSUtilClient.getAddresses(
487        conf, null, DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY);
488    if (addressList.isEmpty()) {
489      throw new IOException("Incorrect configuration: secondary namenode address "
490          + DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY + " is not configured.");
491    }
492    return addressList;
493  }
494
495  /**
496   * Returns list of InetSocketAddresses corresponding to namenodes from the
497   * configuration.
498   * 
499   * Returns namenode address specifically configured for datanodes (using
500   * service ports), if found. If not, regular RPC address configured for other
501   * clients is returned.
502   * 
503   * @param conf configuration
504   * @return list of InetSocketAddress
505   * @throws IOException on error
506   */
507  public static Map<String, Map<String, InetSocketAddress>> getNNServiceRpcAddresses(
508      Configuration conf) throws IOException {
509    // Use default address as fall back
510    String defaultAddress;
511    try {
512      defaultAddress = NetUtils.getHostPortString(
513          DFSUtilClient.getNNAddress(conf));
514    } catch (IllegalArgumentException e) {
515      defaultAddress = null;
516    }
517    
518    Map<String, Map<String, InetSocketAddress>> addressList =
519      DFSUtilClient.getAddresses(conf, defaultAddress,
520                                 DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
521                                 DFS_NAMENODE_RPC_ADDRESS_KEY);
522    if (addressList.isEmpty()) {
523      throw new IOException("Incorrect configuration: namenode address "
524          + DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY + " or "  
525          + DFS_NAMENODE_RPC_ADDRESS_KEY
526          + " is not configured.");
527    }
528    return addressList;
529  }
530
531  /**
532   * Returns list of InetSocketAddresses corresponding to the namenode
533   * that manages this cluster. Note this is to be used by datanodes to get
534   * the list of namenode addresses to talk to.
535   *
536   * Returns namenode address specifically configured for datanodes (using
537   * service ports), if found. If not, regular RPC address configured for other
538   * clients is returned.
539   *
540   * @param conf configuration
541   * @return list of InetSocketAddress
542   * @throws IOException on error
543   */
544  public static Map<String, Map<String, InetSocketAddress>>
545    getNNServiceRpcAddressesForCluster(Configuration conf) throws IOException {
546    // Use default address as fall back
547    String defaultAddress;
548    try {
549      defaultAddress = NetUtils.getHostPortString(
550          DFSUtilClient.getNNAddress(conf));
551    } catch (IllegalArgumentException e) {
552      defaultAddress = null;
553    }
554
555    Collection<String> parentNameServices = conf.getTrimmedStringCollection
556            (DFSConfigKeys.DFS_INTERNAL_NAMESERVICES_KEY);
557
558    if (parentNameServices.isEmpty()) {
559      parentNameServices = conf.getTrimmedStringCollection
560              (DFSConfigKeys.DFS_NAMESERVICES);
561    } else {
562      // Ensure that the internal service is ineed in the list of all available
563      // nameservices.
564      Set<String> availableNameServices = Sets.newHashSet(conf
565              .getTrimmedStringCollection(DFSConfigKeys.DFS_NAMESERVICES));
566      for (String nsId : parentNameServices) {
567        if (!availableNameServices.contains(nsId)) {
568          throw new IOException("Unknown nameservice: " + nsId);
569        }
570      }
571    }
572
573    Map<String, Map<String, InetSocketAddress>> addressList =
574            DFSUtilClient.getAddressesForNsIds(conf, parentNameServices,
575                                               defaultAddress,
576                                               DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
577                                               DFS_NAMENODE_RPC_ADDRESS_KEY);
578    if (addressList.isEmpty()) {
579      throw new IOException("Incorrect configuration: namenode address "
580              + DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY + " or "
581              + DFS_NAMENODE_RPC_ADDRESS_KEY
582              + " is not configured.");
583    }
584    return addressList;
585  }
586
587  /**
588   * Returns list of InetSocketAddresses corresponding to lifeline RPC servers
589   * at namenodes from the configuration.
590   *
591   * @param conf configuration
592   * @return list of InetSocketAddress
593   * @throws IOException on error
594   */
595  public static Map<String, Map<String, InetSocketAddress>>
596      getNNLifelineRpcAddressesForCluster(Configuration conf)
597      throws IOException {
598
599    Collection<String> parentNameServices = conf.getTrimmedStringCollection(
600        DFSConfigKeys.DFS_INTERNAL_NAMESERVICES_KEY);
601
602    if (parentNameServices.isEmpty()) {
603      parentNameServices = conf.getTrimmedStringCollection(
604          DFSConfigKeys.DFS_NAMESERVICES);
605    } else {
606      // Ensure that the internal service is indeed in the list of all available
607      // nameservices.
608      Set<String> availableNameServices = Sets.newHashSet(conf
609          .getTrimmedStringCollection(DFSConfigKeys.DFS_NAMESERVICES));
610      for (String nsId : parentNameServices) {
611        if (!availableNameServices.contains(nsId)) {
612          throw new IOException("Unknown nameservice: " + nsId);
613        }
614      }
615    }
616
617    return DFSUtilClient.getAddressesForNsIds(conf, parentNameServices, null,
618        DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY);
619  }
620
621  /**
622   * Map a logical namenode ID to its lifeline address.  Use the given
623   * nameservice if specified, or the configured one if none is given.
624   *
625   * @param conf Configuration
626   * @param nsId which nameservice nnId is a part of, optional
627   * @param nnId the namenode ID to get the service addr for
628   * @return the lifeline addr, null if it could not be determined
629   */
630  public static String getNamenodeLifelineAddr(final Configuration conf,
631      String nsId, String nnId) {
632
633    if (nsId == null) {
634      nsId = getOnlyNameServiceIdOrNull(conf);
635    }
636
637    String lifelineAddrKey = DFSUtilClient.concatSuffixes(
638        DFSConfigKeys.DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY, nsId, nnId);
639
640    return conf.get(lifelineAddrKey);
641  }
642
643  /**
644   * Flatten the given map, as returned by other functions in this class,
645   * into a flat list of {@link ConfiguredNNAddress} instances.
646   */
647  public static List<ConfiguredNNAddress> flattenAddressMap(
648      Map<String, Map<String, InetSocketAddress>> map) {
649    List<ConfiguredNNAddress> ret = Lists.newArrayList();
650    
651    for (Map.Entry<String, Map<String, InetSocketAddress>> entry :
652      map.entrySet()) {
653      String nsId = entry.getKey();
654      Map<String, InetSocketAddress> nnMap = entry.getValue();
655      for (Map.Entry<String, InetSocketAddress> e2 : nnMap.entrySet()) {
656        String nnId = e2.getKey();
657        InetSocketAddress addr = e2.getValue();
658        
659        ret.add(new ConfiguredNNAddress(nsId, nnId, addr));
660      }
661    }
662    return ret;
663  }
664
665  /**
666   * Format the given map, as returned by other functions in this class,
667   * into a string suitable for debugging display. The format of this string
668   * should not be considered an interface, and is liable to change.
669   */
670  public static String addressMapToString(
671      Map<String, Map<String, InetSocketAddress>> map) {
672    StringBuilder b = new StringBuilder();
673    for (Map.Entry<String, Map<String, InetSocketAddress>> entry :
674         map.entrySet()) {
675      String nsId = entry.getKey();
676      Map<String, InetSocketAddress> nnMap = entry.getValue();
677      b.append("Nameservice <").append(nsId).append(">:").append("\n");
678      for (Map.Entry<String, InetSocketAddress> e2 : nnMap.entrySet()) {
679        b.append("  NN ID ").append(e2.getKey())
680          .append(" => ").append(e2.getValue()).append("\n");
681      }
682    }
683    return b.toString();
684  }
685  
686  public static String nnAddressesAsString(Configuration conf) {
687    Map<String, Map<String, InetSocketAddress>> addresses =
688      getHaNnRpcAddresses(conf);
689    return addressMapToString(addresses);
690  }
691
692  /**
693   * Represent one of the NameNodes configured in the cluster.
694   */
695  public static class ConfiguredNNAddress {
696    private final String nameserviceId;
697    private final String namenodeId;
698    private final InetSocketAddress addr;
699
700    private ConfiguredNNAddress(String nameserviceId, String namenodeId,
701        InetSocketAddress addr) {
702      this.nameserviceId = nameserviceId;
703      this.namenodeId = namenodeId;
704      this.addr = addr;
705    }
706
707    public String getNameserviceId() {
708      return nameserviceId;
709    }
710
711    public String getNamenodeId() {
712      return namenodeId;
713    }
714
715    public InetSocketAddress getAddress() {
716      return addr;
717    }
718    
719    @Override
720    public String toString() {
721      return "ConfiguredNNAddress[nsId=" + nameserviceId + ";" +
722        "nnId=" + namenodeId + ";addr=" + addr + "]";
723    }
724  }
725
726  /** @return Internal name services specified in the conf. */
727  static Collection<String> getInternalNameServices(Configuration conf) {
728    final Collection<String> ids = conf.getTrimmedStringCollection(
729        DFSConfigKeys.DFS_INTERNAL_NAMESERVICES_KEY);
730    return !ids.isEmpty()? ids: DFSUtilClient.getNameServiceIds(conf);
731  }
732
733  /**
734   * Get a URI for each internal nameservice. If a nameservice is
735   * HA-enabled, and the configured failover proxy provider supports logical
736   * URIs, then the logical URI of the nameservice is returned.
737   * Otherwise, a URI corresponding to an RPC address of the single NN for that
738   * nameservice is returned, preferring the service RPC address over the
739   * client RPC address.
740   * 
741   * @param conf configuration
742   * @return a collection of all configured NN URIs, preferring service
743   *         addresses
744   */
745  public static Collection<URI> getInternalNsRpcUris(Configuration conf) {
746    return getNameServiceUris(conf, getInternalNameServices(conf),
747        DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
748        DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
749  }
750
751  /**
752   * Get a URI for each configured nameservice. If a nameservice is
753   * HA-enabled, and the configured failover proxy provider supports logical
754   * URIs, then the logical URI of the nameservice is returned.
755   * Otherwise, a URI corresponding to the address of the single NN for that
756   * nameservice is returned.
757   * 
758   * @param conf configuration
759   * @param keys configuration keys to try in order to get the URI for non-HA
760   *        nameservices
761   * @return a collection of all configured NN URIs
762   */
763  static Collection<URI> getNameServiceUris(Configuration conf,
764      Collection<String> nameServices, String... keys) {
765    Set<URI> ret = new HashSet<URI>();
766    
767    // We're passed multiple possible configuration keys for any given NN or HA
768    // nameservice, and search the config in order of these keys. In order to
769    // make sure that a later config lookup (e.g. fs.defaultFS) doesn't add a
770    // URI for a config key for which we've already found a preferred entry, we
771    // keep track of non-preferred keys here.
772    Set<URI> nonPreferredUris = new HashSet<URI>();
773    
774    for (String nsId : nameServices) {
775      URI nsUri = createUri(HdfsConstants.HDFS_URI_SCHEME, nsId, -1);
776      /**
777       * Determine whether the logical URI of the name service can be resolved
778       * by the configured failover proxy provider. If not, we should try to
779       * resolve the URI here
780       */
781      boolean useLogicalUri = false;
782      try {
783        useLogicalUri = HAUtil.useLogicalUri(conf, nsUri);
784      } catch (IOException e){
785        LOG.warn("Getting exception  while trying to determine if nameservice "
786            + nsId + " can use logical URI: " + e);
787      }
788      if (HAUtil.isHAEnabled(conf, nsId) && useLogicalUri) {
789        // Add the logical URI of the nameservice.
790        ret.add(nsUri);
791      } else {
792        // Add the URI corresponding to the address of the NN.
793        boolean uriFound = false;
794        for (String key : keys) {
795          String addr = conf.get(DFSUtilClient.concatSuffixes(key, nsId));
796          if (addr != null) {
797            URI uri = createUri(HdfsConstants.HDFS_URI_SCHEME,
798                NetUtils.createSocketAddr(addr));
799            if (!uriFound) {
800              uriFound = true;
801              ret.add(uri);
802            } else {
803              nonPreferredUris.add(uri);
804            }
805          }
806        }
807      }
808    }
809    
810    // Add the generic configuration keys.
811    boolean uriFound = false;
812    for (String key : keys) {
813      String addr = conf.get(key);
814      if (addr != null) {
815        URI uri = createUri(HdfsConstants.HDFS_URI_SCHEME,
816            NetUtils.createSocketAddr(addr));
817        if (!uriFound) {
818          uriFound = true;
819          ret.add(uri);
820        } else {
821          nonPreferredUris.add(uri);
822        }
823      }
824    }
825
826    // Add the default URI if it is an HDFS URI and we haven't come up with a
827    // valid non-nameservice NN address yet.  Consider the servicerpc-address
828    // and rpc-address to be the "unnamed" nameservice.  defaultFS is our
829    // fallback when rpc-address isn't given.  We therefore only want to add
830    // the defaultFS when neither the servicerpc-address (which is preferred)
831    // nor the rpc-address (which overrides defaultFS) is given.
832    if (!uriFound) {
833      URI defaultUri = FileSystem.getDefaultUri(conf);
834      if (defaultUri != null) {
835        // checks if defaultUri is ip:port format
836        // and convert it to hostname:port format
837        if (defaultUri.getPort() != -1) {
838          defaultUri = createUri(defaultUri.getScheme(),
839              NetUtils.createSocketAddr(defaultUri.getHost(),
840                  defaultUri.getPort()));
841        }
842
843        defaultUri = trimUri(defaultUri);
844
845        if (HdfsConstants.HDFS_URI_SCHEME.equals(defaultUri.getScheme()) &&
846            !nonPreferredUris.contains(defaultUri)) {
847          ret.add(defaultUri);
848        }
849      }
850    }
851    
852    return ret;
853  }
854
855  /**
856   * Given the InetSocketAddress this method returns the nameservice Id
857   * corresponding to the key with matching address, by doing a reverse 
858   * lookup on the list of nameservices until it finds a match.
859   * 
860   * Since the process of resolving URIs to Addresses is slightly expensive,
861   * this utility method should not be used in performance-critical routines.
862   * 
863   * @param conf - configuration
864   * @param address - InetSocketAddress for configured communication with NN.
865   *     Configured addresses are typically given as URIs, but we may have to
866   *     compare against a URI typed in by a human, or the server name may be
867   *     aliased, so we compare unambiguous InetSocketAddresses instead of just
868   *     comparing URI substrings.
869   * @param keys - list of configured communication parameters that should
870   *     be checked for matches.  For example, to compare against RPC addresses,
871   *     provide the list DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
872   *     DFS_NAMENODE_RPC_ADDRESS_KEY.  Use the generic parameter keys,
873   *     not the NameServiceId-suffixed keys.
874   * @return nameserviceId, or null if no match found
875   */
876  public static String getNameServiceIdFromAddress(final Configuration conf, 
877      final InetSocketAddress address, String... keys) {
878    // Configuration with a single namenode and no nameserviceId
879    String[] ids = getSuffixIDs(conf, address, keys);
880    return (ids != null) ? ids[0] : null;
881  }
882  
883  /**
884   * return server http or https address from the configuration for a
885   * given namenode rpc address.
886   * @param namenodeAddr - namenode RPC address
887   * @param conf configuration
888   * @param scheme - the scheme (http / https)
889   * @return server http or https address
890   * @throws IOException 
891   */
892  public static URI getInfoServer(InetSocketAddress namenodeAddr,
893      Configuration conf, String scheme) throws IOException {
894    String[] suffixes = null;
895    if (namenodeAddr != null) {
896      // if non-default namenode, try reverse look up 
897      // the nameServiceID if it is available
898      suffixes = getSuffixIDs(conf, namenodeAddr,
899          DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
900          DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
901    }
902
903    String authority;
904    if ("http".equals(scheme)) {
905      authority = getSuffixedConf(conf, DFS_NAMENODE_HTTP_ADDRESS_KEY,
906          DFS_NAMENODE_HTTP_ADDRESS_DEFAULT, suffixes);
907    } else if ("https".equals(scheme)) {
908      authority = getSuffixedConf(conf, DFS_NAMENODE_HTTPS_ADDRESS_KEY,
909          DFS_NAMENODE_HTTPS_ADDRESS_DEFAULT, suffixes);
910    } else {
911      throw new IllegalArgumentException("Invalid scheme:" + scheme);
912    }
913
914    if (namenodeAddr != null) {
915      authority = substituteForWildcardAddress(authority,
916          namenodeAddr.getHostName());
917    }
918    return URI.create(scheme + "://" + authority);
919  }
920
921  /**
922   * Lookup the HTTP / HTTPS address of the namenode, and replace its hostname
923   * with defaultHost when it found out that the address is a wildcard / local
924   * address.
925   *
926   * @param defaultHost
927   *          The default host name of the namenode.
928   * @param conf
929   *          The configuration
930   * @param scheme
931   *          HTTP or HTTPS
932   * @throws IOException
933   */
934  public static URI getInfoServerWithDefaultHost(String defaultHost,
935      Configuration conf, final String scheme) throws IOException {
936    URI configuredAddr = getInfoServer(null, conf, scheme);
937    String authority = substituteForWildcardAddress(
938        configuredAddr.getAuthority(), defaultHost);
939    return URI.create(scheme + "://" + authority);
940  }
941
942  /**
943   * Determine whether HTTP or HTTPS should be used to connect to the remote
944   * server. Currently the client only connects to the server via HTTPS if the
945   * policy is set to HTTPS_ONLY.
946   *
947   * @return the scheme (HTTP / HTTPS)
948   */
949  public static String getHttpClientScheme(Configuration conf) {
950    HttpConfig.Policy policy = DFSUtil.getHttpPolicy(conf);
951    return policy == HttpConfig.Policy.HTTPS_ONLY ? "https" : "http";
952  }
953
954  /**
955   * Substitute a default host in the case that an address has been configured
956   * with a wildcard. This is used, for example, when determining the HTTP
957   * address of the NN -- if it's configured to bind to 0.0.0.0, we want to
958   * substitute the hostname from the filesystem URI rather than trying to
959   * connect to 0.0.0.0.
960   * @param configuredAddress the address found in the configuration
961   * @param defaultHost the host to substitute with, if configuredAddress
962   * is a local/wildcard address.
963   * @return the substituted address
964   * @throws IOException if it is a wildcard address and security is enabled
965   */
966  @VisibleForTesting
967  static String substituteForWildcardAddress(String configuredAddress,
968    String defaultHost) {
969    InetSocketAddress sockAddr = NetUtils.createSocketAddr(configuredAddress);
970    final InetAddress addr = sockAddr.getAddress();
971    if (addr != null && addr.isAnyLocalAddress()) {
972      return defaultHost + ":" + sockAddr.getPort();
973    } else {
974      return configuredAddress;
975    }
976  }
977  
978  private static String getSuffixedConf(Configuration conf,
979      String key, String defaultVal, String[] suffixes) {
980    String ret = conf.get(DFSUtil.addKeySuffixes(key, suffixes));
981    if (ret != null) {
982      return ret;
983    }
984    return conf.get(key, defaultVal);
985  }
986  
987  /**
988   * Sets the node specific setting into generic configuration key. Looks up
989   * value of "key.nameserviceId.namenodeId" and if found sets that value into 
990   * generic key in the conf. If this is not found, falls back to
991   * "key.nameserviceId" and then the unmodified key.
992   *
993   * Note that this only modifies the runtime conf.
994   * 
995   * @param conf
996   *          Configuration object to lookup specific key and to set the value
997   *          to the key passed. Note the conf object is modified.
998   * @param nameserviceId
999   *          nameservice Id to construct the node specific key. Pass null if
1000   *          federation is not configuration.
1001   * @param nnId
1002   *          namenode Id to construct the node specific key. Pass null if
1003   *          HA is not configured.
1004   * @param keys
1005   *          The key for which node specific value is looked up
1006   */
1007  public static void setGenericConf(Configuration conf,
1008      String nameserviceId, String nnId, String... keys) {
1009    for (String key : keys) {
1010      String value = conf.get(addKeySuffixes(key, nameserviceId, nnId));
1011      if (value != null) {
1012        conf.set(key, value);
1013        continue;
1014      }
1015      value = conf.get(addKeySuffixes(key, nameserviceId));
1016      if (value != null) {
1017        conf.set(key, value);
1018      }
1019    }
1020  }
1021
1022  /**
1023   * Round bytes to GiB (gibibyte)
1024   * @param bytes number of bytes
1025   * @return number of GiB
1026   */
1027  public static int roundBytesToGB(long bytes) {
1028    return Math.round((float)bytes/ 1024 / 1024 / 1024);
1029  }
1030
1031  /**
1032   * Get nameservice Id for the {@link NameNode} based on namenode RPC address
1033   * matching the local node address.
1034   */
1035  public static String getNamenodeNameServiceId(Configuration conf) {
1036    return getNameServiceId(conf, DFS_NAMENODE_RPC_ADDRESS_KEY);
1037  }
1038  
1039  /**
1040   * Get nameservice Id for the BackupNode based on backup node RPC address
1041   * matching the local node address.
1042   */
1043  public static String getBackupNameServiceId(Configuration conf) {
1044    return getNameServiceId(conf, DFS_NAMENODE_BACKUP_ADDRESS_KEY);
1045  }
1046  
1047  /**
1048   * Get nameservice Id for the secondary node based on secondary http address
1049   * matching the local node address.
1050   */
1051  public static String getSecondaryNameServiceId(Configuration conf) {
1052    return getNameServiceId(conf, DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY);
1053  }
1054  
1055  /**
1056   * Get the nameservice Id by matching the {@code addressKey} with the
1057   * the address of the local node. 
1058   * 
1059   * If {@link DFSConfigKeys#DFS_NAMESERVICE_ID} is not specifically
1060   * configured, and more than one nameservice Id is configured, this method 
1061   * determines the nameservice Id by matching the local node's address with the
1062   * configured addresses. When a match is found, it returns the nameservice Id
1063   * from the corresponding configuration key.
1064   * 
1065   * @param conf Configuration
1066   * @param addressKey configuration key to get the address.
1067   * @return nameservice Id on success, null if federation is not configured.
1068   * @throws HadoopIllegalArgumentException on error
1069   */
1070  private static String getNameServiceId(Configuration conf, String addressKey) {
1071    String nameserviceId = conf.get(DFS_NAMESERVICE_ID);
1072    if (nameserviceId != null) {
1073      return nameserviceId;
1074    }
1075    Collection<String> nsIds = DFSUtilClient.getNameServiceIds(conf);
1076    if (1 == nsIds.size()) {
1077      return nsIds.toArray(new String[1])[0];
1078    }
1079    String nnId = conf.get(DFS_HA_NAMENODE_ID_KEY);
1080    
1081    return getSuffixIDs(conf, addressKey, null, nnId, LOCAL_ADDRESS_MATCHER)[0];
1082  }
1083  
1084  /**
1085   * Returns nameservice Id and namenode Id when the local host matches the
1086   * configuration parameter {@code addressKey}.<nameservice Id>.<namenode Id>
1087   * 
1088   * @param conf Configuration
1089   * @param addressKey configuration key corresponding to the address.
1090   * @param knownNsId only look at configs for the given nameservice, if not-null
1091   * @param knownNNId only look at configs for the given namenode, if not null
1092   * @param matcher matching criteria for matching the address
1093   * @return Array with nameservice Id and namenode Id on success. First element
1094   *         in the array is nameservice Id and second element is namenode Id.
1095   *         Null value indicates that the configuration does not have the the
1096   *         Id.
1097   * @throws HadoopIllegalArgumentException on error
1098   */
1099  static String[] getSuffixIDs(final Configuration conf, final String addressKey,
1100      String knownNsId, String knownNNId,
1101      final AddressMatcher matcher) {
1102    String nameserviceId = null;
1103    String namenodeId = null;
1104    int found = 0;
1105    
1106    Collection<String> nsIds = DFSUtilClient.getNameServiceIds(conf);
1107    for (String nsId : DFSUtilClient.emptyAsSingletonNull(nsIds)) {
1108      if (knownNsId != null && !knownNsId.equals(nsId)) {
1109        continue;
1110      }
1111      
1112      Collection<String> nnIds = DFSUtilClient.getNameNodeIds(conf, nsId);
1113      for (String nnId : DFSUtilClient.emptyAsSingletonNull(nnIds)) {
1114        if (LOG.isTraceEnabled()) {
1115          LOG.trace(String.format("addressKey: %s nsId: %s nnId: %s",
1116              addressKey, nsId, nnId));
1117        }
1118        if (knownNNId != null && !knownNNId.equals(nnId)) {
1119          continue;
1120        }
1121        String key = addKeySuffixes(addressKey, nsId, nnId);
1122        String addr = conf.get(key);
1123        if (addr == null) {
1124          continue;
1125        }
1126        InetSocketAddress s = null;
1127        try {
1128          s = NetUtils.createSocketAddr(addr);
1129        } catch (Exception e) {
1130          LOG.warn("Exception in creating socket address " + addr, e);
1131          continue;
1132        }
1133        if (!s.isUnresolved() && matcher.match(s)) {
1134          nameserviceId = nsId;
1135          namenodeId = nnId;
1136          found++;
1137        }
1138      }
1139    }
1140    if (found > 1) { // Only one address must match the local address
1141      String msg = "Configuration has multiple addresses that match "
1142          + "local node's address. Please configure the system with "
1143          + DFS_NAMESERVICE_ID + " and "
1144          + DFS_HA_NAMENODE_ID_KEY;
1145      throw new HadoopIllegalArgumentException(msg);
1146    }
1147    return new String[] { nameserviceId, namenodeId };
1148  }
1149  
1150  /**
1151   * For given set of {@code keys} adds nameservice Id and or namenode Id
1152   * and returns {nameserviceId, namenodeId} when address match is found.
1153   * @see #getSuffixIDs(Configuration, String, String, String, AddressMatcher)
1154   */
1155  static String[] getSuffixIDs(final Configuration conf,
1156      final InetSocketAddress address, final String... keys) {
1157    AddressMatcher matcher = new AddressMatcher() {
1158     @Override
1159      public boolean match(InetSocketAddress s) {
1160        return address.equals(s);
1161      } 
1162    };
1163    
1164    for (String key : keys) {
1165      String[] ids = getSuffixIDs(conf, key, null, null, matcher);
1166      if (ids != null && (ids [0] != null || ids[1] != null)) {
1167        return ids;
1168      }
1169    }
1170    return null;
1171  }
1172  
1173  private interface AddressMatcher {
1174    public boolean match(InetSocketAddress s);
1175  }
1176
1177  /** Create a URI from the scheme and address */
1178  public static URI createUri(String scheme, InetSocketAddress address) {
1179    return DFSUtilClient.createUri(scheme, address);
1180  }
1181
1182  /** Create an URI from scheme, host, and port. */
1183  public static URI createUri(String scheme, String host, int port) {
1184    try {
1185      return new URI(scheme, null, host, port, null, null, null);
1186    } catch (URISyntaxException x) {
1187      throw new IllegalArgumentException(x.getMessage(), x);
1188    }
1189  }
1190
1191  /** Remove unnecessary path from HDFS URI. */
1192  static URI trimUri(URI uri) {
1193    String path = uri.getPath();
1194    if (HdfsConstants.HDFS_URI_SCHEME.equals(uri.getScheme()) &&
1195        path != null && !path.isEmpty()) {
1196      uri = createUri(uri.getScheme(), uri.getHost(), uri.getPort());
1197    }
1198    return uri;
1199  }
1200
1201  /**
1202   * Add protobuf based protocol to the {@link org.apache.hadoop.ipc.RPC.Server}
1203   * @param conf configuration
1204   * @param protocol Protocol interface
1205   * @param service service that implements the protocol
1206   * @param server RPC server to which the protocol & implementation is added to
1207   * @throws IOException
1208   */
1209  public static void addPBProtocol(Configuration conf, Class<?> protocol,
1210      BlockingService service, RPC.Server server) throws IOException {
1211    RPC.setProtocolEngine(conf, protocol, ProtobufRpcEngine.class);
1212    server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocol, service);
1213  }
1214
1215  /**
1216   * Map a logical namenode ID to its service address. Use the given
1217   * nameservice if specified, or the configured one if none is given.
1218   *
1219   * @param conf Configuration
1220   * @param nsId which nameservice nnId is a part of, optional
1221   * @param nnId the namenode ID to get the service addr for
1222   * @return the service addr, null if it could not be determined
1223   */
1224  public static String getNamenodeServiceAddr(final Configuration conf,
1225      String nsId, String nnId) {
1226
1227    if (nsId == null) {
1228      nsId = getOnlyNameServiceIdOrNull(conf);
1229    }
1230
1231    String serviceAddrKey = DFSUtilClient.concatSuffixes(
1232        DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, nsId, nnId);
1233
1234    String addrKey = DFSUtilClient.concatSuffixes(
1235        DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY, nsId, nnId);
1236
1237    String serviceRpcAddr = conf.get(serviceAddrKey);
1238    if (serviceRpcAddr == null) {
1239      serviceRpcAddr = conf.get(addrKey);
1240    }
1241    return serviceRpcAddr;
1242  }
1243
1244  /**
1245   * If the configuration refers to only a single nameservice, return the
1246   * name of that nameservice. If it refers to 0 or more than 1, return null.
1247   */
1248  public static String getOnlyNameServiceIdOrNull(Configuration conf) {
1249    Collection<String> nsIds = DFSUtilClient.getNameServiceIds(conf);
1250    if (1 == nsIds.size()) {
1251      return nsIds.toArray(new String[1])[0];
1252    } else {
1253      // No nameservice ID was given and more than one is configured
1254      return null;
1255    }
1256  }
1257  
1258  public static final Options helpOptions = new Options();
1259  public static final Option helpOpt = new Option("h", "help", false,
1260      "get help information");
1261
1262  static {
1263    helpOptions.addOption(helpOpt);
1264  }
1265
1266  /**
1267   * Parse the arguments for commands
1268   * 
1269   * @param args the argument to be parsed
1270   * @param helpDescription help information to be printed out
1271   * @param out Printer
1272   * @param printGenericCommandUsage whether to print the 
1273   *              generic command usage defined in ToolRunner
1274   * @return true when the argument matches help option, false if not
1275   */
1276  public static boolean parseHelpArgument(String[] args,
1277      String helpDescription, PrintStream out, boolean printGenericCommandUsage) {
1278    if (args.length == 1) {
1279      try {
1280        CommandLineParser parser = new PosixParser();
1281        CommandLine cmdLine = parser.parse(helpOptions, args);
1282        if (cmdLine.hasOption(helpOpt.getOpt())
1283            || cmdLine.hasOption(helpOpt.getLongOpt())) {
1284          // should print out the help information
1285          out.println(helpDescription + "\n");
1286          if (printGenericCommandUsage) {
1287            ToolRunner.printGenericCommandUsage(out);
1288          }
1289          return true;
1290        }
1291      } catch (ParseException pe) {
1292        return false;
1293      }
1294    }
1295    return false;
1296  }
1297  
1298  /**
1299   * Get DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION from configuration.
1300   * 
1301   * @param conf Configuration
1302   * @return Value of DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION
1303   */
1304  public static float getInvalidateWorkPctPerIteration(Configuration conf) {
1305    float blocksInvalidateWorkPct = conf.getFloat(
1306        DFSConfigKeys.DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION,
1307        DFSConfigKeys.DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION_DEFAULT);
1308    Preconditions.checkArgument(
1309        (blocksInvalidateWorkPct > 0 && blocksInvalidateWorkPct <= 1.0f),
1310        DFSConfigKeys.DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION +
1311        " = '" + blocksInvalidateWorkPct + "' is invalid. " +
1312        "It should be a positive, non-zero float value, not greater than 1.0f, " +
1313        "to indicate a percentage.");
1314    return blocksInvalidateWorkPct;
1315  }
1316
1317  /**
1318   * Get DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION from
1319   * configuration.
1320   * 
1321   * @param conf Configuration
1322   * @return Value of DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION
1323   */
1324  public static int getReplWorkMultiplier(Configuration conf) {
1325    int blocksReplWorkMultiplier = conf.getInt(
1326            DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION,
1327            DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION_DEFAULT);
1328    Preconditions.checkArgument(
1329        (blocksReplWorkMultiplier > 0),
1330        DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION +
1331        " = '" + blocksReplWorkMultiplier + "' is invalid. " +
1332        "It should be a positive, non-zero integer value.");
1333    return blocksReplWorkMultiplier;
1334  }
1335  
1336  /**
1337   * Get SPNEGO keytab Key from configuration
1338   * 
1339   * @param conf Configuration
1340   * @param defaultKey default key to be used for config lookup
1341   * @return DFS_WEB_AUTHENTICATION_KERBEROS_KEYTAB_KEY if the key is not empty
1342   *         else return defaultKey
1343   */
1344  public static String getSpnegoKeytabKey(Configuration conf, String defaultKey) {
1345    String value = 
1346        conf.get(DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_KEYTAB_KEY);
1347    return (value == null || value.isEmpty()) ?
1348        defaultKey : DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_KEYTAB_KEY;
1349  }
1350
1351  /**
1352   * Get http policy. Http Policy is chosen as follows:
1353   * <ol>
1354   * <li>If hadoop.ssl.enabled is set, http endpoints are not started. Only
1355   * https endpoints are started on configured https ports</li>
1356   * <li>This configuration is overridden by dfs.https.enable configuration, if
1357   * it is set to true. In that case, both http and https endpoints are stared.</li>
1358   * <li>All the above configurations are overridden by dfs.http.policy
1359   * configuration. With this configuration you can set http-only, https-only
1360   * and http-and-https endpoints.</li>
1361   * </ol>
1362   * See hdfs-default.xml documentation for more details on each of the above
1363   * configuration settings.
1364   */
1365  public static HttpConfig.Policy getHttpPolicy(Configuration conf) {
1366    String policyStr = conf.get(DFSConfigKeys.DFS_HTTP_POLICY_KEY);
1367    if (policyStr == null) {
1368      boolean https = conf.getBoolean(DFSConfigKeys.DFS_HTTPS_ENABLE_KEY,
1369          DFSConfigKeys.DFS_HTTPS_ENABLE_DEFAULT);
1370
1371      boolean hadoopSsl = conf.getBoolean(
1372          CommonConfigurationKeys.HADOOP_SSL_ENABLED_KEY,
1373          CommonConfigurationKeys.HADOOP_SSL_ENABLED_DEFAULT);
1374
1375      if (hadoopSsl) {
1376        LOG.warn(CommonConfigurationKeys.HADOOP_SSL_ENABLED_KEY
1377            + " is deprecated. Please use " + DFSConfigKeys.DFS_HTTP_POLICY_KEY
1378            + ".");
1379      }
1380      if (https) {
1381        LOG.warn(DFSConfigKeys.DFS_HTTPS_ENABLE_KEY
1382            + " is deprecated. Please use " + DFSConfigKeys.DFS_HTTP_POLICY_KEY
1383            + ".");
1384      }
1385
1386      return (hadoopSsl || https) ? HttpConfig.Policy.HTTP_AND_HTTPS
1387          : HttpConfig.Policy.HTTP_ONLY;
1388    }
1389
1390    HttpConfig.Policy policy = HttpConfig.Policy.fromString(policyStr);
1391    if (policy == null) {
1392      throw new HadoopIllegalArgumentException("Unregonized value '"
1393          + policyStr + "' for " + DFSConfigKeys.DFS_HTTP_POLICY_KEY);
1394    }
1395
1396    conf.set(DFSConfigKeys.DFS_HTTP_POLICY_KEY, policy.name());
1397    return policy;
1398  }
1399
1400  public static HttpServer2.Builder loadSslConfToHttpServerBuilder(HttpServer2.Builder builder,
1401      Configuration sslConf) {
1402    return builder
1403        .needsClientAuth(
1404            sslConf.getBoolean(DFS_CLIENT_HTTPS_NEED_AUTH_KEY,
1405                DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT))
1406        .keyPassword(getPassword(sslConf, DFS_SERVER_HTTPS_KEYPASSWORD_KEY))
1407        .keyStore(sslConf.get("ssl.server.keystore.location"),
1408            getPassword(sslConf, DFS_SERVER_HTTPS_KEYSTORE_PASSWORD_KEY),
1409            sslConf.get("ssl.server.keystore.type", "jks"))
1410        .trustStore(sslConf.get("ssl.server.truststore.location"),
1411            getPassword(sslConf, DFS_SERVER_HTTPS_TRUSTSTORE_PASSWORD_KEY),
1412            sslConf.get("ssl.server.truststore.type", "jks"))
1413        .excludeCiphers(
1414            sslConf.get("ssl.server.exclude.cipher.list"));
1415  }
1416
1417  /**
1418   * Load HTTPS-related configuration.
1419   */
1420  public static Configuration loadSslConfiguration(Configuration conf) {
1421    Configuration sslConf = new Configuration(false);
1422
1423    sslConf.addResource(conf.get(
1424        DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY,
1425        DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_DEFAULT));
1426
1427    final String[] reqSslProps = {
1428        DFSConfigKeys.DFS_SERVER_HTTPS_TRUSTSTORE_LOCATION_KEY,
1429        DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_LOCATION_KEY,
1430        DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_PASSWORD_KEY,
1431        DFSConfigKeys.DFS_SERVER_HTTPS_KEYPASSWORD_KEY
1432    };
1433
1434    // Check if the required properties are included
1435    for (String sslProp : reqSslProps) {
1436      if (sslConf.get(sslProp) == null) {
1437        LOG.warn("SSL config " + sslProp + " is missing. If " +
1438            DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY +
1439            " is specified, make sure it is a relative path");
1440      }
1441    }
1442
1443    boolean requireClientAuth = conf.getBoolean(DFS_CLIENT_HTTPS_NEED_AUTH_KEY,
1444        DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT);
1445    sslConf.setBoolean(DFS_CLIENT_HTTPS_NEED_AUTH_KEY, requireClientAuth);
1446    return sslConf;
1447  }
1448
1449  /**
1450   * Return a HttpServer.Builder that the journalnode / namenode / secondary
1451   * namenode can use to initialize their HTTP / HTTPS server.
1452   *
1453   */
1454  public static HttpServer2.Builder httpServerTemplateForNNAndJN(
1455      Configuration conf, final InetSocketAddress httpAddr,
1456      final InetSocketAddress httpsAddr, String name, String spnegoUserNameKey,
1457      String spnegoKeytabFileKey) throws IOException {
1458    HttpConfig.Policy policy = getHttpPolicy(conf);
1459
1460    HttpServer2.Builder builder = new HttpServer2.Builder().setName(name)
1461        .setConf(conf).setACL(new AccessControlList(conf.get(DFS_ADMIN, " ")))
1462        .setSecurityEnabled(UserGroupInformation.isSecurityEnabled())
1463        .setUsernameConfKey(spnegoUserNameKey)
1464        .setKeytabConfKey(getSpnegoKeytabKey(conf, spnegoKeytabFileKey));
1465
1466    // initialize the webserver for uploading/downloading files.
1467    if (UserGroupInformation.isSecurityEnabled()) {
1468      LOG.info("Starting web server as: "
1469          + SecurityUtil.getServerPrincipal(conf.get(spnegoUserNameKey),
1470              httpAddr.getHostName()));
1471    }
1472
1473    if (policy.isHttpEnabled()) {
1474      if (httpAddr.getPort() == 0) {
1475        builder.setFindPort(true);
1476      }
1477
1478      URI uri = URI.create("http://" + NetUtils.getHostPortString(httpAddr));
1479      builder.addEndpoint(uri);
1480      LOG.info("Starting Web-server for " + name + " at: " + uri);
1481    }
1482
1483    if (policy.isHttpsEnabled() && httpsAddr != null) {
1484      Configuration sslConf = loadSslConfiguration(conf);
1485      loadSslConfToHttpServerBuilder(builder, sslConf);
1486
1487      if (httpsAddr.getPort() == 0) {
1488        builder.setFindPort(true);
1489      }
1490
1491      URI uri = URI.create("https://" + NetUtils.getHostPortString(httpsAddr));
1492      builder.addEndpoint(uri);
1493      LOG.info("Starting Web-server for " + name + " at: " + uri);
1494    }
1495    return builder;
1496  }
1497
1498  /**
1499   * Leverages the Configuration.getPassword method to attempt to get
1500   * passwords from the CredentialProvider API before falling back to
1501   * clear text in config - if falling back is allowed.
1502   * @param conf Configuration instance
1503   * @param alias name of the credential to retreive
1504   * @return String credential value or null
1505   */
1506  static String getPassword(Configuration conf, String alias) {
1507    String password = null;
1508    try {
1509      char[] passchars = conf.getPassword(alias);
1510      if (passchars != null) {
1511        password = new String(passchars);
1512      }
1513    }
1514    catch (IOException ioe) {
1515      LOG.warn("Setting password to null since IOException is caught"
1516          + " when getting password", ioe);
1517      password = null;
1518    }
1519    return password;
1520  }
1521
1522  /**
1523   * Converts a Date into an ISO-8601 formatted datetime string.
1524   */
1525  public static String dateToIso8601String(Date date) {
1526    return DFSUtilClient.dateToIso8601String(date);
1527  }
1528
1529  /**
1530   * Converts a time duration in milliseconds into DDD:HH:MM:SS format.
1531   */
1532  public static String durationToString(long durationMs) {
1533    return DFSUtilClient.durationToString(durationMs);
1534  }
1535
1536  /**
1537   * Converts a relative time string into a duration in milliseconds.
1538   */
1539  public static long parseRelativeTime(String relTime) throws IOException {
1540    if (relTime.length() < 2) {
1541      throw new IOException("Unable to parse relative time value of " + relTime
1542          + ": too short");
1543    }
1544    String ttlString = relTime.substring(0, relTime.length()-1);
1545    long ttl;
1546    try {
1547      ttl = Long.parseLong(ttlString);
1548    } catch (NumberFormatException e) {
1549      throw new IOException("Unable to parse relative time value of " + relTime
1550          + ": " + ttlString + " is not a number");
1551    }
1552    if (relTime.endsWith("s")) {
1553      // pass
1554    } else if (relTime.endsWith("m")) {
1555      ttl *= 60;
1556    } else if (relTime.endsWith("h")) {
1557      ttl *= 60*60;
1558    } else if (relTime.endsWith("d")) {
1559      ttl *= 60*60*24;
1560    } else {
1561      throw new IOException("Unable to parse relative time value of " + relTime
1562          + ": unknown time unit " + relTime.charAt(relTime.length() - 1));
1563    }
1564    return ttl*1000;
1565  }
1566
1567  /**
1568   * Assert that all objects in the collection are equal. Returns silently if
1569   * so, throws an AssertionError if any object is not equal. All null values
1570   * are considered equal.
1571   * 
1572   * @param objects the collection of objects to check for equality.
1573   */
1574  public static void assertAllResultsEqual(Collection<?> objects)
1575      throws AssertionError {
1576    if (objects.size() == 0 || objects.size() == 1)
1577      return;
1578    
1579    Object[] resultsArray = objects.toArray();
1580    for (int i = 1; i < resultsArray.length; i++) {
1581      Object currElement = resultsArray[i];
1582      Object lastElement = resultsArray[i - 1];
1583      if ((currElement == null && currElement != lastElement) ||
1584          (currElement != null && !currElement.equals(lastElement))) {
1585        throw new AssertionError("Not all elements match in results: " +
1586          Arrays.toString(resultsArray));
1587      }
1588    }
1589  }
1590
1591  /**
1592   * Creates a new KeyProviderCryptoExtension by wrapping the
1593   * KeyProvider specified in the given Configuration.
1594   *
1595   * @param conf Configuration
1596   * @return new KeyProviderCryptoExtension, or null if no provider was found.
1597   * @throws IOException if the KeyProvider is improperly specified in
1598   *                             the Configuration
1599   */
1600  public static KeyProviderCryptoExtension createKeyProviderCryptoExtension(
1601      final Configuration conf) throws IOException {
1602    KeyProvider keyProvider = DFSUtilClient.createKeyProvider(conf);
1603    if (keyProvider == null) {
1604      return null;
1605    }
1606    KeyProviderCryptoExtension cryptoProvider = KeyProviderCryptoExtension
1607        .createKeyProviderCryptoExtension(keyProvider);
1608    return cryptoProvider;
1609  }
1610
1611}