001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hdfs;
019
020import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
021import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMESERVICES;
022
023import java.io.IOException;
024import java.io.InterruptedIOException;
025import java.io.UnsupportedEncodingException;
026import java.net.InetAddress;
027import java.net.InetSocketAddress;
028import java.net.Socket;
029import java.net.URI;
030import java.net.URISyntaxException;
031import java.nio.channels.SocketChannel;
032import java.nio.charset.StandardCharsets;
033import java.text.SimpleDateFormat;
034import java.util.Collection;
035import java.util.Collections;
036import java.util.Date;
037import java.util.HashMap;
038import java.util.List;
039import java.util.Locale;
040import java.util.Map;
041
042import javax.net.SocketFactory;
043
044import org.apache.hadoop.conf.Configuration;
045import org.apache.hadoop.crypto.key.KeyProvider;
046import org.apache.hadoop.fs.BlockLocation;
047import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
048import org.apache.hadoop.fs.FileSystem;
049import org.apache.hadoop.fs.Path;
050import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
051import org.apache.hadoop.hdfs.net.BasicInetPeer;
052import org.apache.hadoop.hdfs.net.NioInetPeer;
053import org.apache.hadoop.hdfs.net.Peer;
054import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
055import org.apache.hadoop.hdfs.protocol.DatanodeID;
056import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
057import org.apache.hadoop.hdfs.protocol.HdfsConstants;
058import org.apache.hadoop.hdfs.protocol.LocatedBlock;
059import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
060import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
061import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
062import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolTranslatorPB;
063import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
064import org.apache.hadoop.hdfs.util.IOUtilsClient;
065import org.apache.hadoop.hdfs.web.WebHdfsConstants;
066import org.apache.hadoop.net.NetUtils;
067import org.apache.hadoop.net.NodeBase;
068import org.apache.hadoop.security.UserGroupInformation;
069import org.apache.hadoop.security.token.Token;
070import org.apache.hadoop.util.KMSUtil;
071import org.apache.hadoop.util.StringUtils;
072import org.slf4j.Logger;
073import org.slf4j.LoggerFactory;
074
075import com.google.common.base.Joiner;
076import com.google.common.collect.Maps;
077import com.google.common.primitives.SignedBytes;
078
079public class DFSUtilClient {
080  public static final byte[] EMPTY_BYTES = {};
081  private static final Logger LOG = LoggerFactory.getLogger(
082      DFSUtilClient.class);
083
084  // Using the charset canonical name for String/byte[] conversions is much
085  // more efficient due to use of cached encoders/decoders.
086  private static final String UTF8_CSN = StandardCharsets.UTF_8.name();
087
088  /**
089   * Converts a string to a byte array using UTF8 encoding.
090   */
091  public static byte[] string2Bytes(String str) {
092    try {
093      return str.getBytes(UTF8_CSN);
094    } catch (UnsupportedEncodingException e) {
095      // should never happen!
096      throw new IllegalArgumentException("UTF8 decoding is not supported", e);
097    }
098  }
099
100  /**
101   * Converts a byte array to a string using UTF8 encoding.
102   */
103  public static String bytes2String(byte[] bytes) {
104    return bytes2String(bytes, 0, bytes.length);
105  }
106
107  /** Return used as percentage of capacity */
108  public static float getPercentUsed(long used, long capacity) {
109    return capacity <= 0 ? 100 : (used * 100.0f)/capacity;
110  }
111
112  /** Return remaining as percentage of capacity */
113  public static float getPercentRemaining(long remaining, long capacity) {
114    return capacity <= 0 ? 0 : (remaining * 100.0f)/capacity;
115  }
116
117  /** Convert percentage to a string. */
118  public static String percent2String(double percentage) {
119    return StringUtils.format("%.2f%%", percentage);
120  }
121
122  /**
123   * Returns collection of nameservice Ids from the configuration.
124   * @param conf configuration
125   * @return collection of nameservice Ids, or null if not specified
126   */
127  public static Collection<String> getNameServiceIds(Configuration conf) {
128    return conf.getTrimmedStringCollection(DFS_NAMESERVICES);
129  }
130
131  /**
132   * Namenode HighAvailability related configuration.
133   * Returns collection of namenode Ids from the configuration. One logical id
134   * for each namenode in the in the HA setup.
135   *
136   * @param conf configuration
137   * @param nsId the nameservice ID to look at, or null for non-federated
138   * @return collection of namenode Ids
139   */
140  public static Collection<String> getNameNodeIds(Configuration conf, String nsId) {
141    String key = addSuffix(DFS_HA_NAMENODES_KEY_PREFIX, nsId);
142    return conf.getTrimmedStringCollection(key);
143  }
144
145  /** Add non empty and non null suffix to a key */
146  static String addSuffix(String key, String suffix) {
147    if (suffix == null || suffix.isEmpty()) {
148      return key;
149    }
150    assert !suffix.startsWith(".") :
151      "suffix '" + suffix + "' should not already have '.' prepended.";
152    return key + "." + suffix;
153  }
154
155  /**
156   * Returns list of InetSocketAddress corresponding to HA NN HTTP addresses from
157   * the configuration.
158   *
159   * @return list of InetSocketAddresses
160   */
161  public static Map<String, Map<String, InetSocketAddress>> getHaNnWebHdfsAddresses(
162      Configuration conf, String scheme) {
163    if (WebHdfsConstants.WEBHDFS_SCHEME.equals(scheme)) {
164      return getAddresses(conf, null,
165          HdfsClientConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
166    } else if (WebHdfsConstants.SWEBHDFS_SCHEME.equals(scheme)) {
167      return getAddresses(conf, null,
168          HdfsClientConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY);
169    } else {
170      throw new IllegalArgumentException("Unsupported scheme: " + scheme);
171    }
172  }
173
174  /**
175   * Convert a LocatedBlocks to BlockLocations[]
176   * @param blocks a LocatedBlocks
177   * @return an array of BlockLocations
178   */
179  public static BlockLocation[] locatedBlocks2Locations(LocatedBlocks blocks) {
180    if (blocks == null) {
181      return new BlockLocation[0];
182    }
183    return locatedBlocks2Locations(blocks.getLocatedBlocks());
184  }
185
186  /**
187   * Convert a List<LocatedBlock> to BlockLocation[]
188   * @param blocks A List<LocatedBlock> to be converted
189   * @return converted array of BlockLocation
190   */
191  public static BlockLocation[] locatedBlocks2Locations(
192      List<LocatedBlock> blocks) {
193    if (blocks == null) {
194      return new BlockLocation[0];
195    }
196    int nrBlocks = blocks.size();
197    BlockLocation[] blkLocations = new BlockLocation[nrBlocks];
198    if (nrBlocks == 0) {
199      return blkLocations;
200    }
201    int idx = 0;
202    for (LocatedBlock blk : blocks) {
203      assert idx < nrBlocks : "Incorrect index";
204      DatanodeInfo[] locations = blk.getLocations();
205      String[] hosts = new String[locations.length];
206      String[] xferAddrs = new String[locations.length];
207      String[] racks = new String[locations.length];
208      for (int hCnt = 0; hCnt < locations.length; hCnt++) {
209        hosts[hCnt] = locations[hCnt].getHostName();
210        xferAddrs[hCnt] = locations[hCnt].getXferAddr();
211        NodeBase node = new NodeBase(xferAddrs[hCnt],
212                                     locations[hCnt].getNetworkLocation());
213        racks[hCnt] = node.toString();
214      }
215      DatanodeInfo[] cachedLocations = blk.getCachedLocations();
216      String[] cachedHosts = new String[cachedLocations.length];
217      for (int i=0; i<cachedLocations.length; i++) {
218        cachedHosts[i] = cachedLocations[i].getHostName();
219      }
220      blkLocations[idx] = new BlockLocation(xferAddrs, hosts, cachedHosts,
221                                            racks,
222                                            blk.getStorageIDs(),
223                                            blk.getStorageTypes(),
224                                            blk.getStartOffset(),
225                                            blk.getBlockSize(),
226                                            blk.isCorrupt());
227      idx++;
228    }
229    return blkLocations;
230  }
231
232  /** Compare two byte arrays by lexicographical order. */
233  public static int compareBytes(byte[] left, byte[] right) {
234    if (left == null) {
235      left = EMPTY_BYTES;
236    }
237    if (right == null) {
238      right = EMPTY_BYTES;
239    }
240    return SignedBytes.lexicographicalComparator().compare(left, right);
241  }
242
243  /**
244   * Given a list of path components returns a byte array
245   */
246  public static byte[] byteArray2bytes(byte[][] pathComponents) {
247    if (pathComponents.length == 0) {
248      return EMPTY_BYTES;
249    } else if (pathComponents.length == 1
250        && (pathComponents[0] == null || pathComponents[0].length == 0)) {
251      return new byte[]{(byte) Path.SEPARATOR_CHAR};
252    }
253    int length = 0;
254    for (int i = 0; i < pathComponents.length; i++) {
255      length += pathComponents[i].length;
256      if (i < pathComponents.length - 1) {
257        length++; // for SEPARATOR
258      }
259    }
260    byte[] path = new byte[length];
261    int index = 0;
262    for (int i = 0; i < pathComponents.length; i++) {
263      System.arraycopy(pathComponents[i], 0, path, index,
264          pathComponents[i].length);
265      index += pathComponents[i].length;
266      if (i < pathComponents.length - 1) {
267        path[index] = (byte) Path.SEPARATOR_CHAR;
268        index++;
269      }
270    }
271    return path;
272  }
273
274  /**
275   * Decode a specific range of bytes of the given byte array to a string
276   * using UTF8.
277   *
278   * @param bytes The bytes to be decoded into characters
279   * @param offset The index of the first byte to decode
280   * @param length The number of bytes to decode
281   * @return The decoded string
282   */
283  static String bytes2String(byte[] bytes, int offset, int length) {
284    try {
285      return new String(bytes, offset, length, UTF8_CSN);
286    } catch (UnsupportedEncodingException e) {
287      // should never happen!
288      throw new IllegalArgumentException("UTF8 encoding is not supported", e);
289    }
290  }
291
292  /**
293   * @return <code>coll</code> if it is non-null and non-empty. Otherwise,
294   * returns a list with a single null value.
295   */
296  static Collection<String> emptyAsSingletonNull(Collection<String> coll) {
297    if (coll == null || coll.isEmpty()) {
298      return Collections.singletonList(null);
299    } else {
300      return coll;
301    }
302  }
303
304  /** Concatenate list of suffix strings '.' separated */
305  static String concatSuffixes(String... suffixes) {
306    if (suffixes == null) {
307      return null;
308    }
309    return Joiner.on(".").skipNulls().join(suffixes);
310  }
311
312  /**
313   * Returns the configured address for all NameNodes in the cluster.
314   * @param conf configuration
315   * @param defaultAddress default address to return in case key is not found.
316   * @param keys Set of keys to look for in the order of preference
317   * @return a map(nameserviceId to map(namenodeId to InetSocketAddress))
318   */
319  static Map<String, Map<String, InetSocketAddress>> getAddresses(
320      Configuration conf, String defaultAddress, String... keys) {
321    Collection<String> nameserviceIds = getNameServiceIds(conf);
322    return getAddressesForNsIds(conf, nameserviceIds, defaultAddress, keys);
323  }
324
325  /**
326   * Returns the configured address for all NameNodes in the cluster.
327   * @param conf configuration
328   * @param defaultAddress default address to return in case key is not found.
329   * @param keys Set of keys to look for in the order of preference
330   *
331   * @return a map(nameserviceId to map(namenodeId to InetSocketAddress))
332   */
333  static Map<String, Map<String, InetSocketAddress>> getAddressesForNsIds(
334      Configuration conf, Collection<String> nsIds, String defaultAddress,
335      String... keys) {
336    // Look for configurations of the form <key>[.<nameserviceId>][.<namenodeId>]
337    // across all of the configured nameservices and namenodes.
338    Map<String, Map<String, InetSocketAddress>> ret = Maps.newLinkedHashMap();
339    for (String nsId : emptyAsSingletonNull(nsIds)) {
340      Map<String, InetSocketAddress> isas =
341          getAddressesForNameserviceId(conf, nsId, defaultAddress, keys);
342      if (!isas.isEmpty()) {
343        ret.put(nsId, isas);
344      }
345    }
346    return ret;
347  }
348
349  static Map<String, InetSocketAddress> getAddressesForNameserviceId(
350      Configuration conf, String nsId, String defaultValue, String... keys) {
351    Collection<String> nnIds = getNameNodeIds(conf, nsId);
352    Map<String, InetSocketAddress> ret = Maps.newHashMap();
353    for (String nnId : emptyAsSingletonNull(nnIds)) {
354      String suffix = concatSuffixes(nsId, nnId);
355      String address = getConfValue(defaultValue, suffix, conf, keys);
356      if (address != null) {
357        InetSocketAddress isa = NetUtils.createSocketAddr(address);
358        if (isa.isUnresolved()) {
359          LOG.warn("Namenode for {} remains unresolved for ID {}. Check your "
360              + "hdfs-site.xml file to ensure namenodes are configured "
361              + "properly.", nsId, nnId);
362        }
363        ret.put(nnId, isa);
364      }
365    }
366    return ret;
367  }
368
369  /**
370   * Given a list of keys in the order of preference, returns a value
371   * for the key in the given order from the configuration.
372   * @param defaultValue default value to return, when key was not found
373   * @param keySuffix suffix to add to the key, if it is not null
374   * @param conf Configuration
375   * @param keys list of keys in the order of preference
376   * @return value of the key or default if a key was not found in configuration
377   */
378  private static String getConfValue(String defaultValue, String keySuffix,
379      Configuration conf, String... keys) {
380    String value = null;
381    for (String key : keys) {
382      key = addSuffix(key, keySuffix);
383      value = conf.get(key);
384      if (value != null) {
385        break;
386      }
387    }
388    if (value == null) {
389      value = defaultValue;
390    }
391    return value;
392  }
393
394  /**
395   * Whether the pathname is valid.  Currently prohibits relative paths,
396   * names which contain a ":" or "//", or other non-canonical paths.
397   */
398  public static boolean isValidName(String src) {
399    // Path must be absolute.
400    if (!src.startsWith(Path.SEPARATOR)) {
401      return false;
402    }
403
404    // Check for ".." "." ":" "/"
405    String[] components = StringUtils.split(src, '/');
406    for (int i = 0; i < components.length; i++) {
407      String element = components[i];
408      if (element.equals(".")  ||
409          (element.contains(":"))  ||
410          (element.contains("/"))) {
411        return false;
412      }
413      // ".." is allowed in path starting with /.reserved/.inodes
414      if (element.equals("..")) {
415        if (components.length > 4
416            && components[1].equals(".reserved")
417            && components[2].equals(".inodes")) {
418          continue;
419        }
420        return false;
421      }
422      // The string may start or end with a /, but not have
423      // "//" in the middle.
424      if (element.isEmpty() && i != components.length - 1 &&
425          i != 0) {
426        return false;
427      }
428    }
429    return true;
430  }
431
432  /**
433   * Converts a time duration in milliseconds into DDD:HH:MM:SS format.
434   */
435  public static String durationToString(long durationMs) {
436    boolean negative = false;
437    if (durationMs < 0) {
438      negative = true;
439      durationMs = -durationMs;
440    }
441    // Chop off the milliseconds
442    long durationSec = durationMs / 1000;
443    final int secondsPerMinute = 60;
444    final int secondsPerHour = 60*60;
445    final int secondsPerDay = 60*60*24;
446    final long days = durationSec / secondsPerDay;
447    durationSec -= days * secondsPerDay;
448    final long hours = durationSec / secondsPerHour;
449    durationSec -= hours * secondsPerHour;
450    final long minutes = durationSec / secondsPerMinute;
451    durationSec -= minutes * secondsPerMinute;
452    final long seconds = durationSec;
453    final long milliseconds = durationMs % 1000;
454    String format = "%03d:%02d:%02d:%02d.%03d";
455    if (negative)  {
456      format = "-" + format;
457    }
458    return String.format(format, days, hours, minutes, seconds, milliseconds);
459  }
460
461  /**
462   * Converts a Date into an ISO-8601 formatted datetime string.
463   */
464  public static String dateToIso8601String(Date date) {
465    SimpleDateFormat df =
466        new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ", Locale.ENGLISH);
467    return df.format(date);
468  }
469
470  private static final Map<String, Boolean> localAddrMap = Collections
471      .synchronizedMap(new HashMap<String, Boolean>());
472
473  public static boolean isLocalAddress(InetSocketAddress targetAddr) {
474    InetAddress addr = targetAddr.getAddress();
475    Boolean cached = localAddrMap.get(addr.getHostAddress());
476    if (cached != null) {
477      LOG.trace("Address {} is {} local", targetAddr, (cached ? "" : "not"));
478      return cached;
479    }
480
481    boolean local = NetUtils.isLocalAddress(addr);
482
483    LOG.trace("Address {} is {} local", targetAddr, (local ? "" : "not"));
484    localAddrMap.put(addr.getHostAddress(), local);
485    return local;
486  }
487
488  /** Create a {@link ClientDatanodeProtocol} proxy */
489  public static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
490      DatanodeID datanodeid, Configuration conf, int socketTimeout,
491      boolean connectToDnViaHostname, LocatedBlock locatedBlock) throws IOException {
492    return new ClientDatanodeProtocolTranslatorPB(datanodeid, conf, socketTimeout,
493        connectToDnViaHostname, locatedBlock);
494  }
495
496  /** Create {@link ClientDatanodeProtocol} proxy using kerberos ticket */
497  public static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
498      DatanodeID datanodeid, Configuration conf, int socketTimeout,
499      boolean connectToDnViaHostname) throws IOException {
500    return new ClientDatanodeProtocolTranslatorPB(
501        datanodeid, conf, socketTimeout, connectToDnViaHostname);
502  }
503
504  /** Create a {@link ClientDatanodeProtocol} proxy */
505  public static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
506      InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
507      SocketFactory factory) throws IOException {
508    return new ClientDatanodeProtocolTranslatorPB(addr, ticket, conf, factory);
509  }
510
511  private static String keyProviderUriKeyName =
512      CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH;
513
514  /**
515   * Set the key provider uri configuration key name for creating key providers.
516   * @param keyName The configuration key name.
517   */
518  public static void setKeyProviderUriKeyName(final String keyName) {
519    keyProviderUriKeyName = keyName;
520  }
521
522  /**
523   * Creates a new KeyProvider from the given Configuration.
524   *
525   * @param conf Configuration
526   * @return new KeyProvider, or null if no provider was found.
527   * @throws IOException if the KeyProvider is improperly specified in
528   *                             the Configuration
529   */
530  public static KeyProvider createKeyProvider(
531      final Configuration conf) throws IOException {
532    return KMSUtil.createKeyProvider(conf, keyProviderUriKeyName);
533  }
534
535  public static Peer peerFromSocket(Socket socket)
536      throws IOException {
537    Peer peer;
538    boolean success = false;
539    try {
540      // TCP_NODELAY is crucial here because of bad interactions between
541      // Nagle's Algorithm and Delayed ACKs. With connection keepalive
542      // between the client and DN, the conversation looks like:
543      //   1. Client -> DN: Read block X
544      //   2. DN -> Client: data for block X
545      //   3. Client -> DN: Status OK (successful read)
546      //   4. Client -> DN: Read block Y
547      // The fact that step #3 and #4 are both in the client->DN direction
548      // triggers Nagling. If the DN is using delayed ACKs, this results
549      // in a delay of 40ms or more.
550      //
551      // TCP_NODELAY disables nagling and thus avoids this performance
552      // disaster.
553      socket.setTcpNoDelay(true);
554      SocketChannel channel = socket.getChannel();
555      if (channel == null) {
556        peer = new BasicInetPeer(socket);
557      } else {
558        peer = new NioInetPeer(socket);
559      }
560      success = true;
561      return peer;
562    } finally {
563      if (!success) {
564        // peer is always null so no need to call peer.close().
565        socket.close();
566      }
567    }
568  }
569
570  public static Peer peerFromSocketAndKey(
571        SaslDataTransferClient saslClient, Socket s,
572        DataEncryptionKeyFactory keyFactory,
573        Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId,
574        int socketTimeoutMs) throws IOException {
575    Peer peer = null;
576    boolean success = false;
577    try {
578      peer = peerFromSocket(s);
579      peer.setReadTimeout(socketTimeoutMs);
580      peer.setWriteTimeout(socketTimeoutMs);
581      peer = saslClient.peerSend(peer, keyFactory, blockToken, datanodeId);
582      success = true;
583      return peer;
584    } finally {
585      if (!success) {
586        IOUtilsClient.cleanup(null, peer);
587      }
588    }
589  }
590
591  public static int getIoFileBufferSize(Configuration conf) {
592    return conf.getInt(
593        CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
594        CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
595  }
596
597  public static int getSmallBufferSize(Configuration conf) {
598    return Math.min(getIoFileBufferSize(conf) / 2, 512);
599  }
600
601  /**
602   * Probe for HDFS Encryption being enabled; this uses the value of the option
603   * {@link CommonConfigurationKeysPublic#HADOOP_SECURITY_KEY_PROVIDER_PATH}
604   * , returning true if that property contains a non-empty, non-whitespace
605   * string.
606   * @param conf configuration to probe
607   * @return true if encryption is considered enabled.
608   */
609  public static boolean isHDFSEncryptionEnabled(Configuration conf) {
610    return !(conf.getTrimmed(
611        CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH, "")
612        .isEmpty());
613  }
614
615  public static InetSocketAddress getNNAddress(String address) {
616    return NetUtils.createSocketAddr(address,
617        HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT);
618  }
619
620  public static InetSocketAddress getNNAddress(Configuration conf) {
621    URI filesystemURI = FileSystem.getDefaultUri(conf);
622    return getNNAddressCheckLogical(conf, filesystemURI);
623  }
624
625  /**
626   * @return address of file system
627   */
628  public static InetSocketAddress getNNAddress(URI filesystemURI) {
629    String authority = filesystemURI.getAuthority();
630    if (authority == null) {
631      throw new IllegalArgumentException(String.format(
632          "Invalid URI for NameNode address (check %s): %s has no authority.",
633          FileSystem.FS_DEFAULT_NAME_KEY, filesystemURI.toString()));
634    }
635    if (!HdfsConstants.HDFS_URI_SCHEME.equalsIgnoreCase(
636        filesystemURI.getScheme())) {
637      throw new IllegalArgumentException(String.format(
638          "Invalid URI for NameNode address (check %s): " +
639          "%s is not of scheme '%s'.", FileSystem.FS_DEFAULT_NAME_KEY,
640          filesystemURI.toString(), HdfsConstants.HDFS_URI_SCHEME));
641    }
642    return getNNAddress(authority);
643  }
644
645  /**
646   * Get the NN address from the URI. If the uri is logical, default address is
647   * returned. Otherwise return the DNS-resolved address of the URI.
648   *
649   * @param conf configuration
650   * @param filesystemURI URI of the file system
651   * @return address of file system
652   */
653  public static InetSocketAddress getNNAddressCheckLogical(Configuration conf,
654      URI filesystemURI) {
655    InetSocketAddress retAddr;
656    if (HAUtilClient.isLogicalUri(conf, filesystemURI)) {
657      retAddr = InetSocketAddress.createUnresolved(filesystemURI.getAuthority(),
658          HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT);
659    } else {
660      retAddr = getNNAddress(filesystemURI);
661    }
662    return retAddr;
663  }
664
665  public static URI getNNUri(InetSocketAddress namenode) {
666    int port = namenode.getPort();
667    String portString =
668        (port == HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT) ?
669        "" : (":" + port);
670    return URI.create(HdfsConstants.HDFS_URI_SCHEME + "://"
671        + namenode.getHostName() + portString);
672  }
673
674  /** Create a URI from the scheme and address */
675  public static URI createUri(String scheme, InetSocketAddress address) {
676    try {
677      return new URI(scheme, null, address.getHostName(), address.getPort(),
678          null, null, null);
679    } catch (URISyntaxException ue) {
680      throw new IllegalArgumentException(ue);
681    }
682  }
683
684  public static InterruptedIOException toInterruptedIOException(String message,
685      InterruptedException e) {
686    final InterruptedIOException iioe = new InterruptedIOException(message);
687    iioe.initCause(e);
688    return iioe;
689  }
690}