001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.fs;
019
020import java.io.Closeable;
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.lang.ref.WeakReference;
024import java.net.URI;
025import java.net.URISyntaxException;
026import java.security.PrivilegedExceptionAction;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.EnumSet;
030import java.util.HashMap;
031import java.util.HashSet;
032import java.util.IdentityHashMap;
033import java.util.Iterator;
034import java.util.LinkedList;
035import java.util.List;
036import java.util.Map;
037import java.util.NoSuchElementException;
038import java.util.ServiceLoader;
039import java.util.Set;
040import java.util.Stack;
041import java.util.TreeSet;
042import java.util.concurrent.atomic.AtomicLong;
043
044import org.apache.commons.logging.Log;
045import org.apache.commons.logging.LogFactory;
046import org.apache.hadoop.classification.InterfaceAudience;
047import org.apache.hadoop.classification.InterfaceStability;
048import org.apache.hadoop.conf.Configuration;
049import org.apache.hadoop.conf.Configured;
050import org.apache.hadoop.fs.Options.ChecksumOpt;
051import org.apache.hadoop.fs.Options.Rename;
052import org.apache.hadoop.fs.permission.AclEntry;
053import org.apache.hadoop.fs.permission.AclStatus;
054import org.apache.hadoop.fs.permission.FsAction;
055import org.apache.hadoop.fs.permission.FsPermission;
056import org.apache.hadoop.io.MultipleIOException;
057import org.apache.hadoop.io.Text;
058import org.apache.hadoop.net.NetUtils;
059import org.apache.hadoop.security.AccessControlException;
060import org.apache.hadoop.security.Credentials;
061import org.apache.hadoop.security.SecurityUtil;
062import org.apache.hadoop.security.UserGroupInformation;
063import org.apache.hadoop.security.token.Token;
064import org.apache.hadoop.util.DataChecksum;
065import org.apache.hadoop.util.Progressable;
066import org.apache.hadoop.util.ReflectionUtils;
067import org.apache.hadoop.util.ShutdownHookManager;
068import org.apache.hadoop.util.StringUtils;
069
070import com.google.common.annotations.VisibleForTesting;
071
072/****************************************************************
073 * An abstract base class for a fairly generic filesystem.  It
074 * may be implemented as a distributed filesystem, or as a "local"
075 * one that reflects the locally-connected disk.  The local version
076 * exists for small Hadoop instances and for testing.
077 *
078 * <p>
079 *
080 * All user code that may potentially use the Hadoop Distributed
081 * File System should be written to use a FileSystem object.  The
082 * Hadoop DFS is a multi-machine system that appears as a single
083 * disk.  It's useful because of its fault tolerance and potentially
084 * very large capacity.
085 * 
086 * <p>
087 * The local implementation is {@link LocalFileSystem} and distributed
088 * implementation is DistributedFileSystem.
089 *****************************************************************/
090@InterfaceAudience.Public
091@InterfaceStability.Stable
092public abstract class FileSystem extends Configured implements Closeable {
093  public static final String FS_DEFAULT_NAME_KEY = 
094                   CommonConfigurationKeys.FS_DEFAULT_NAME_KEY;
095  public static final String DEFAULT_FS = 
096                   CommonConfigurationKeys.FS_DEFAULT_NAME_DEFAULT;
097
098  public static final Log LOG = LogFactory.getLog(FileSystem.class);
099
100  /**
101   * Priority of the FileSystem shutdown hook.
102   */
103  public static final int SHUTDOWN_HOOK_PRIORITY = 10;
104
105  /** FileSystem cache */
106  static final Cache CACHE = new Cache();
107
108  /** The key this instance is stored under in the cache. */
109  private Cache.Key key;
110
111  /** Recording statistics per a FileSystem class */
112  private static final Map<Class<? extends FileSystem>, Statistics> 
113    statisticsTable =
114      new IdentityHashMap<Class<? extends FileSystem>, Statistics>();
115  
116  /**
117   * The statistics for this file system.
118   */
119  protected Statistics statistics;
120
121  /**
122   * A cache of files that should be deleted when filsystem is closed
123   * or the JVM is exited.
124   */
125  private Set<Path> deleteOnExit = new TreeSet<Path>();
126  
127  boolean resolveSymlinks;
128  /**
129   * This method adds a file system for testing so that we can find it later. It
130   * is only for testing.
131   * @param uri the uri to store it under
132   * @param conf the configuration to store it under
133   * @param fs the file system to store
134   * @throws IOException
135   */
136  static void addFileSystemForTesting(URI uri, Configuration conf,
137      FileSystem fs) throws IOException {
138    CACHE.map.put(new Cache.Key(uri, conf), fs);
139  }
140
141  /**
142   * Get a filesystem instance based on the uri, the passed
143   * configuration and the user
144   * @param uri of the filesystem
145   * @param conf the configuration to use
146   * @param user to perform the get as
147   * @return the filesystem instance
148   * @throws IOException
149   * @throws InterruptedException
150   */
151  public static FileSystem get(final URI uri, final Configuration conf,
152        final String user) throws IOException, InterruptedException {
153    String ticketCachePath =
154      conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
155    UserGroupInformation ugi =
156        UserGroupInformation.getBestUGI(ticketCachePath, user);
157    return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
158      @Override
159      public FileSystem run() throws IOException {
160        return get(uri, conf);
161      }
162    });
163  }
164
165  /**
166   * Returns the configured filesystem implementation.
167   * @param conf the configuration to use
168   */
169  public static FileSystem get(Configuration conf) throws IOException {
170    return get(getDefaultUri(conf), conf);
171  }
172  
173  /** Get the default filesystem URI from a configuration.
174   * @param conf the configuration to use
175   * @return the uri of the default filesystem
176   */
177  public static URI getDefaultUri(Configuration conf) {
178    return URI.create(fixName(conf.get(FS_DEFAULT_NAME_KEY, DEFAULT_FS)));
179  }
180
181  /** Set the default filesystem URI in a configuration.
182   * @param conf the configuration to alter
183   * @param uri the new default filesystem uri
184   */
185  public static void setDefaultUri(Configuration conf, URI uri) {
186    conf.set(FS_DEFAULT_NAME_KEY, uri.toString());
187  }
188
189  /** Set the default filesystem URI in a configuration.
190   * @param conf the configuration to alter
191   * @param uri the new default filesystem uri
192   */
193  public static void setDefaultUri(Configuration conf, String uri) {
194    setDefaultUri(conf, URI.create(fixName(uri)));
195  }
196
197  /** Called after a new FileSystem instance is constructed.
198   * @param name a uri whose authority section names the host, port, etc.
199   *   for this FileSystem
200   * @param conf the configuration
201   */
202  public void initialize(URI name, Configuration conf) throws IOException {
203    statistics = getStatistics(name.getScheme(), getClass());    
204    resolveSymlinks = conf.getBoolean(
205        CommonConfigurationKeys.FS_CLIENT_RESOLVE_REMOTE_SYMLINKS_KEY,
206        CommonConfigurationKeys.FS_CLIENT_RESOLVE_REMOTE_SYMLINKS_DEFAULT);
207  }
208
209  /**
210   * Return the protocol scheme for the FileSystem.
211   * <p/>
212   * This implementation throws an <code>UnsupportedOperationException</code>.
213   *
214   * @return the protocol scheme for the FileSystem.
215   */
216  public String getScheme() {
217    throw new UnsupportedOperationException("Not implemented by the " + getClass().getSimpleName() + " FileSystem implementation");
218  }
219
220  /** Returns a URI whose scheme and authority identify this FileSystem.*/
221  public abstract URI getUri();
222  
223  /**
224   * Return a canonicalized form of this FileSystem's URI.
225   * 
226   * The default implementation simply calls {@link #canonicalizeUri(URI)}
227   * on the filesystem's own URI, so subclasses typically only need to
228   * implement that method.
229   *
230   * @see #canonicalizeUri(URI)
231   */
232  protected URI getCanonicalUri() {
233    return canonicalizeUri(getUri());
234  }
235  
236  /**
237   * Canonicalize the given URI.
238   * 
239   * This is filesystem-dependent, but may for example consist of
240   * canonicalizing the hostname using DNS and adding the default
241   * port if not specified.
242   * 
243   * The default implementation simply fills in the default port if
244   * not specified and if the filesystem has a default port.
245   *
246   * @return URI
247   * @see NetUtils#getCanonicalUri(URI, int)
248   */
249  protected URI canonicalizeUri(URI uri) {
250    if (uri.getPort() == -1 && getDefaultPort() > 0) {
251      // reconstruct the uri with the default port set
252      try {
253        uri = new URI(uri.getScheme(), uri.getUserInfo(),
254            uri.getHost(), getDefaultPort(),
255            uri.getPath(), uri.getQuery(), uri.getFragment());
256      } catch (URISyntaxException e) {
257        // Should never happen!
258        throw new AssertionError("Valid URI became unparseable: " +
259            uri);
260      }
261    }
262    
263    return uri;
264  }
265  
266  /**
267   * Get the default port for this file system.
268   * @return the default port or 0 if there isn't one
269   */
270  protected int getDefaultPort() {
271    return 0;
272  }
273
274  protected static FileSystem getFSofPath(final Path absOrFqPath,
275      final Configuration conf)
276      throws UnsupportedFileSystemException, IOException {
277    absOrFqPath.checkNotSchemeWithRelative();
278    absOrFqPath.checkNotRelative();
279
280    // Uses the default file system if not fully qualified
281    return get(absOrFqPath.toUri(), conf);
282  }
283
284  /**
285   * Get a canonical service name for this file system.  The token cache is
286   * the only user of the canonical service name, and uses it to lookup this
287   * filesystem's service tokens.
288   * If file system provides a token of its own then it must have a canonical
289   * name, otherwise canonical name can be null.
290   * 
291   * Default Impl: If the file system has child file systems 
292   * (such as an embedded file system) then it is assumed that the fs has no
293   * tokens of its own and hence returns a null name; otherwise a service
294   * name is built using Uri and port.
295   * 
296   * @return a service string that uniquely identifies this file system, null
297   *         if the filesystem does not implement tokens
298   * @see SecurityUtil#buildDTServiceName(URI, int) 
299   */
300  @InterfaceAudience.LimitedPrivate({ "HDFS", "MapReduce" })
301  public String getCanonicalServiceName() {
302    return (getChildFileSystems() == null)
303      ? SecurityUtil.buildDTServiceName(getUri(), getDefaultPort())
304      : null;
305  }
306
307  /** @deprecated call #getUri() instead.*/
308  @Deprecated
309  public String getName() { return getUri().toString(); }
310
311  /** @deprecated call #get(URI,Configuration) instead. */
312  @Deprecated
313  public static FileSystem getNamed(String name, Configuration conf)
314    throws IOException {
315    return get(URI.create(fixName(name)), conf);
316  }
317  
318  /** Update old-format filesystem names, for back-compatibility.  This should
319   * eventually be replaced with a checkName() method that throws an exception
320   * for old-format names. */ 
321  private static String fixName(String name) {
322    // convert old-format name to new-format name
323    if (name.equals("local")) {         // "local" is now "file:///".
324      LOG.warn("\"local\" is a deprecated filesystem name."
325               +" Use \"file:///\" instead.");
326      name = "file:///";
327    } else if (name.indexOf('/')==-1) {   // unqualified is "hdfs://"
328      LOG.warn("\""+name+"\" is a deprecated filesystem name."
329               +" Use \"hdfs://"+name+"/\" instead.");
330      name = "hdfs://"+name;
331    }
332    return name;
333  }
334
335  /**
336   * Get the local file system.
337   * @param conf the configuration to configure the file system with
338   * @return a LocalFileSystem
339   */
340  public static LocalFileSystem getLocal(Configuration conf)
341    throws IOException {
342    return (LocalFileSystem)get(LocalFileSystem.NAME, conf);
343  }
344
345  /** Returns the FileSystem for this URI's scheme and authority.  The scheme
346   * of the URI determines a configuration property name,
347   * <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
348   * The entire URI is passed to the FileSystem instance's initialize method.
349   */
350  public static FileSystem get(URI uri, Configuration conf) throws IOException {
351    String scheme = uri.getScheme();
352    String authority = uri.getAuthority();
353
354    if (scheme == null && authority == null) {     // use default FS
355      return get(conf);
356    }
357
358    if (scheme != null && authority == null) {     // no authority
359      URI defaultUri = getDefaultUri(conf);
360      if (scheme.equals(defaultUri.getScheme())    // if scheme matches default
361          && defaultUri.getAuthority() != null) {  // & default has authority
362        return get(defaultUri, conf);              // return default
363      }
364    }
365    
366    String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
367    if (conf.getBoolean(disableCacheName, false)) {
368      return createFileSystem(uri, conf);
369    }
370
371    return CACHE.get(uri, conf);
372  }
373
374  /**
375   * Returns the FileSystem for this URI's scheme and authority and the 
376   * passed user. Internally invokes {@link #newInstance(URI, Configuration)}
377   * @param uri of the filesystem
378   * @param conf the configuration to use
379   * @param user to perform the get as
380   * @return filesystem instance
381   * @throws IOException
382   * @throws InterruptedException
383   */
384  public static FileSystem newInstance(final URI uri, final Configuration conf,
385      final String user) throws IOException, InterruptedException {
386    String ticketCachePath =
387      conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
388    UserGroupInformation ugi =
389        UserGroupInformation.getBestUGI(ticketCachePath, user);
390    return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
391      @Override
392      public FileSystem run() throws IOException {
393        return newInstance(uri,conf); 
394      }
395    });
396  }
397  /** Returns the FileSystem for this URI's scheme and authority.  The scheme
398   * of the URI determines a configuration property name,
399   * <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
400   * The entire URI is passed to the FileSystem instance's initialize method.
401   * This always returns a new FileSystem object.
402   */
403  public static FileSystem newInstance(URI uri, Configuration conf) throws IOException {
404    String scheme = uri.getScheme();
405    String authority = uri.getAuthority();
406
407    if (scheme == null) {                       // no scheme: use default FS
408      return newInstance(conf);
409    }
410
411    if (authority == null) {                       // no authority
412      URI defaultUri = getDefaultUri(conf);
413      if (scheme.equals(defaultUri.getScheme())    // if scheme matches default
414          && defaultUri.getAuthority() != null) {  // & default has authority
415        return newInstance(defaultUri, conf);              // return default
416      }
417    }
418    return CACHE.getUnique(uri, conf);
419  }
420
421  /** Returns a unique configured filesystem implementation.
422   * This always returns a new FileSystem object.
423   * @param conf the configuration to use
424   */
425  public static FileSystem newInstance(Configuration conf) throws IOException {
426    return newInstance(getDefaultUri(conf), conf);
427  }
428
429  /**
430   * Get a unique local file system object
431   * @param conf the configuration to configure the file system with
432   * @return a LocalFileSystem
433   * This always returns a new FileSystem object.
434   */
435  public static LocalFileSystem newInstanceLocal(Configuration conf)
436    throws IOException {
437    return (LocalFileSystem)newInstance(LocalFileSystem.NAME, conf);
438  }
439
440  /**
441   * Close all cached filesystems. Be sure those filesystems are not
442   * used anymore.
443   * 
444   * @throws IOException
445   */
446  public static void closeAll() throws IOException {
447    CACHE.closeAll();
448  }
449
450  /**
451   * Close all cached filesystems for a given UGI. Be sure those filesystems 
452   * are not used anymore.
453   * @param ugi user group info to close
454   * @throws IOException
455   */
456  public static void closeAllForUGI(UserGroupInformation ugi) 
457  throws IOException {
458    CACHE.closeAll(ugi);
459  }
460
461  /** 
462   * Make sure that a path specifies a FileSystem.
463   * @param path to use
464   */
465  public Path makeQualified(Path path) {
466    checkPath(path);
467    return path.makeQualified(this.getUri(), this.getWorkingDirectory());
468  }
469    
470  /**
471   * Get a new delegation token for this file system.
472   * This is an internal method that should have been declared protected
473   * but wasn't historically.
474   * Callers should use {@link #addDelegationTokens(String, Credentials)}
475   * 
476   * @param renewer the account name that is allowed to renew the token.
477   * @return a new delegation token
478   * @throws IOException
479   */
480  @InterfaceAudience.Private()
481  public Token<?> getDelegationToken(String renewer) throws IOException {
482    return null;
483  }
484  
485  /**
486   * Obtain all delegation tokens used by this FileSystem that are not
487   * already present in the given Credentials.  Existing tokens will neither
488   * be verified as valid nor having the given renewer.  Missing tokens will
489   * be acquired and added to the given Credentials.
490   * 
491   * Default Impl: works for simple fs with its own token
492   * and also for an embedded fs whose tokens are those of its
493   * children file system (i.e. the embedded fs has not tokens of its
494   * own).
495   * 
496   * @param renewer the user allowed to renew the delegation tokens
497   * @param credentials cache in which to add new delegation tokens
498   * @return list of new delegation tokens
499   * @throws IOException
500   */
501  @InterfaceAudience.LimitedPrivate({ "HDFS", "MapReduce" })
502  public Token<?>[] addDelegationTokens(
503      final String renewer, Credentials credentials) throws IOException {
504    if (credentials == null) {
505      credentials = new Credentials();
506    }
507    final List<Token<?>> tokens = new ArrayList<Token<?>>();
508    collectDelegationTokens(renewer, credentials, tokens);
509    return tokens.toArray(new Token<?>[tokens.size()]);
510  }
511  
512  /**
513   * Recursively obtain the tokens for this FileSystem and all descended
514   * FileSystems as determined by getChildFileSystems().
515   * @param renewer the user allowed to renew the delegation tokens
516   * @param credentials cache in which to add the new delegation tokens
517   * @param tokens list in which to add acquired tokens
518   * @throws IOException
519   */
520  private void collectDelegationTokens(final String renewer,
521                                       final Credentials credentials,
522                                       final List<Token<?>> tokens)
523                                           throws IOException {
524    final String serviceName = getCanonicalServiceName();
525    // Collect token of the this filesystem and then of its embedded children
526    if (serviceName != null) { // fs has token, grab it
527      final Text service = new Text(serviceName);
528      Token<?> token = credentials.getToken(service);
529      if (token == null) {
530        token = getDelegationToken(renewer);
531        if (token != null) {
532          tokens.add(token);
533          credentials.addToken(service, token);
534        }
535      }
536    }
537    // Now collect the tokens from the children
538    final FileSystem[] children = getChildFileSystems();
539    if (children != null) {
540      for (final FileSystem fs : children) {
541        fs.collectDelegationTokens(renewer, credentials, tokens);
542      }
543    }
544  }
545
546  /**
547   * Get all the immediate child FileSystems embedded in this FileSystem.
548   * It does not recurse and get grand children.  If a FileSystem
549   * has multiple child FileSystems, then it should return a unique list
550   * of those FileSystems.  Default is to return null to signify no children.
551   * 
552   * @return FileSystems used by this FileSystem
553   */
554  @InterfaceAudience.LimitedPrivate({ "HDFS" })
555  @VisibleForTesting
556  public FileSystem[] getChildFileSystems() {
557    return null;
558  }
559  
560  /** create a file with the provided permission
561   * The permission of the file is set to be the provided permission as in
562   * setPermission, not permission&~umask
563   * 
564   * It is implemented using two RPCs. It is understood that it is inefficient,
565   * but the implementation is thread-safe. The other option is to change the
566   * value of umask in configuration to be 0, but it is not thread-safe.
567   * 
568   * @param fs file system handle
569   * @param file the name of the file to be created
570   * @param permission the permission of the file
571   * @return an output stream
572   * @throws IOException
573   */
574  public static FSDataOutputStream create(FileSystem fs,
575      Path file, FsPermission permission) throws IOException {
576    // create the file with default permission
577    FSDataOutputStream out = fs.create(file);
578    // set its permission to the supplied one
579    fs.setPermission(file, permission);
580    return out;
581  }
582
583  /** create a directory with the provided permission
584   * The permission of the directory is set to be the provided permission as in
585   * setPermission, not permission&~umask
586   * 
587   * @see #create(FileSystem, Path, FsPermission)
588   * 
589   * @param fs file system handle
590   * @param dir the name of the directory to be created
591   * @param permission the permission of the directory
592   * @return true if the directory creation succeeds; false otherwise
593   * @throws IOException
594   */
595  public static boolean mkdirs(FileSystem fs, Path dir, FsPermission permission)
596  throws IOException {
597    // create the directory using the default permission
598    boolean result = fs.mkdirs(dir);
599    // set its permission to be the supplied one
600    fs.setPermission(dir, permission);
601    return result;
602  }
603
604  ///////////////////////////////////////////////////////////////
605  // FileSystem
606  ///////////////////////////////////////////////////////////////
607
608  protected FileSystem() {
609    super(null);
610  }
611
612  /** 
613   * Check that a Path belongs to this FileSystem.
614   * @param path to check
615   */
616  protected void checkPath(Path path) {
617    URI uri = path.toUri();
618    String thatScheme = uri.getScheme();
619    if (thatScheme == null)                // fs is relative
620      return;
621    URI thisUri = getCanonicalUri();
622    String thisScheme = thisUri.getScheme();
623    //authority and scheme are not case sensitive
624    if (thisScheme.equalsIgnoreCase(thatScheme)) {// schemes match
625      String thisAuthority = thisUri.getAuthority();
626      String thatAuthority = uri.getAuthority();
627      if (thatAuthority == null &&                // path's authority is null
628          thisAuthority != null) {                // fs has an authority
629        URI defaultUri = getDefaultUri(getConf());
630        if (thisScheme.equalsIgnoreCase(defaultUri.getScheme())) {
631          uri = defaultUri; // schemes match, so use this uri instead
632        } else {
633          uri = null; // can't determine auth of the path
634        }
635      }
636      if (uri != null) {
637        // canonicalize uri before comparing with this fs
638        uri = canonicalizeUri(uri);
639        thatAuthority = uri.getAuthority();
640        if (thisAuthority == thatAuthority ||       // authorities match
641            (thisAuthority != null &&
642             thisAuthority.equalsIgnoreCase(thatAuthority)))
643          return;
644      }
645    }
646    throw new IllegalArgumentException("Wrong FS: "+path+
647                                       ", expected: "+this.getUri());
648  }
649
650  /**
651   * Return an array containing hostnames, offset and size of 
652   * portions of the given file.  For a nonexistent 
653   * file or regions, null will be returned.
654   *
655   * This call is most helpful with DFS, where it returns 
656   * hostnames of machines that contain the given file.
657   *
658   * The FileSystem will simply return an elt containing 'localhost'.
659   *
660   * @param file FilesStatus to get data from
661   * @param start offset into the given file
662   * @param len length for which to get locations for
663   */
664  public BlockLocation[] getFileBlockLocations(FileStatus file, 
665      long start, long len) throws IOException {
666    if (file == null) {
667      return null;
668    }
669
670    if (start < 0 || len < 0) {
671      throw new IllegalArgumentException("Invalid start or len parameter");
672    }
673
674    if (file.getLen() <= start) {
675      return new BlockLocation[0];
676
677    }
678    String[] name = { "localhost:50010" };
679    String[] host = { "localhost" };
680    return new BlockLocation[] {
681      new BlockLocation(name, host, 0, file.getLen()) };
682  }
683 
684
685  /**
686   * Return an array containing hostnames, offset and size of 
687   * portions of the given file.  For a nonexistent 
688   * file or regions, null will be returned.
689   *
690   * This call is most helpful with DFS, where it returns 
691   * hostnames of machines that contain the given file.
692   *
693   * The FileSystem will simply return an elt containing 'localhost'.
694   *
695   * @param p path is used to identify an FS since an FS could have
696   *          another FS that it could be delegating the call to
697   * @param start offset into the given file
698   * @param len length for which to get locations for
699   */
700  public BlockLocation[] getFileBlockLocations(Path p, 
701      long start, long len) throws IOException {
702    if (p == null) {
703      throw new NullPointerException();
704    }
705    FileStatus file = getFileStatus(p);
706    return getFileBlockLocations(file, start, len);
707  }
708  
709  /**
710   * Return a set of server default configuration values
711   * @return server default configuration values
712   * @throws IOException
713   * @deprecated use {@link #getServerDefaults(Path)} instead
714   */
715  @Deprecated
716  public FsServerDefaults getServerDefaults() throws IOException {
717    Configuration conf = getConf();
718    // CRC32 is chosen as default as it is available in all 
719    // releases that support checksum.
720    // The client trash configuration is ignored.
721    return new FsServerDefaults(getDefaultBlockSize(), 
722        conf.getInt("io.bytes.per.checksum", 512), 
723        64 * 1024, 
724        getDefaultReplication(),
725        conf.getInt("io.file.buffer.size", 4096),
726        false,
727        CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT,
728        DataChecksum.Type.CRC32);
729  }
730
731  /**
732   * Return a set of server default configuration values
733   * @param p path is used to identify an FS since an FS could have
734   *          another FS that it could be delegating the call to
735   * @return server default configuration values
736   * @throws IOException
737   */
738  public FsServerDefaults getServerDefaults(Path p) throws IOException {
739    return getServerDefaults();
740  }
741
742  /**
743   * Return the fully-qualified path of path f resolving the path
744   * through any symlinks or mount point
745   * @param p path to be resolved
746   * @return fully qualified path 
747   * @throws FileNotFoundException
748   */
749   public Path resolvePath(final Path p) throws IOException {
750     checkPath(p);
751     return getFileStatus(p).getPath();
752   }
753
754  /**
755   * Opens an FSDataInputStream at the indicated Path.
756   * @param f the file name to open
757   * @param bufferSize the size of the buffer to be used.
758   */
759  public abstract FSDataInputStream open(Path f, int bufferSize)
760    throws IOException;
761    
762  /**
763   * Opens an FSDataInputStream at the indicated Path.
764   * @param f the file to open
765   */
766  public FSDataInputStream open(Path f) throws IOException {
767    return open(f, getConf().getInt("io.file.buffer.size", 4096));
768  }
769
770  /**
771   * Create an FSDataOutputStream at the indicated Path.
772   * Files are overwritten by default.
773   * @param f the file to create
774   */
775  public FSDataOutputStream create(Path f) throws IOException {
776    return create(f, true);
777  }
778
779  /**
780   * Create an FSDataOutputStream at the indicated Path.
781   * @param f the file to create
782   * @param overwrite if a file with this name already exists, then if true,
783   *   the file will be overwritten, and if false an exception will be thrown.
784   */
785  public FSDataOutputStream create(Path f, boolean overwrite)
786      throws IOException {
787    return create(f, overwrite, 
788                  getConf().getInt("io.file.buffer.size", 4096),
789                  getDefaultReplication(f),
790                  getDefaultBlockSize(f));
791  }
792
793  /**
794   * Create an FSDataOutputStream at the indicated Path with write-progress
795   * reporting.
796   * Files are overwritten by default.
797   * @param f the file to create
798   * @param progress to report progress
799   */
800  public FSDataOutputStream create(Path f, Progressable progress) 
801      throws IOException {
802    return create(f, true, 
803                  getConf().getInt("io.file.buffer.size", 4096),
804                  getDefaultReplication(f),
805                  getDefaultBlockSize(f), progress);
806  }
807
808  /**
809   * Create an FSDataOutputStream at the indicated Path.
810   * Files are overwritten by default.
811   * @param f the file to create
812   * @param replication the replication factor
813   */
814  public FSDataOutputStream create(Path f, short replication)
815      throws IOException {
816    return create(f, true, 
817                  getConf().getInt("io.file.buffer.size", 4096),
818                  replication,
819                  getDefaultBlockSize(f));
820  }
821
822  /**
823   * Create an FSDataOutputStream at the indicated Path with write-progress
824   * reporting.
825   * Files are overwritten by default.
826   * @param f the file to create
827   * @param replication the replication factor
828   * @param progress to report progress
829   */
830  public FSDataOutputStream create(Path f, short replication, 
831      Progressable progress) throws IOException {
832    return create(f, true, 
833                  getConf().getInt(
834                      CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
835                      CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT),
836                  replication,
837                  getDefaultBlockSize(f), progress);
838  }
839
840    
841  /**
842   * Create an FSDataOutputStream at the indicated Path.
843   * @param f the file name to create
844   * @param overwrite if a file with this name already exists, then if true,
845   *   the file will be overwritten, and if false an error will be thrown.
846   * @param bufferSize the size of the buffer to be used.
847   */
848  public FSDataOutputStream create(Path f, 
849                                   boolean overwrite,
850                                   int bufferSize
851                                   ) throws IOException {
852    return create(f, overwrite, bufferSize, 
853                  getDefaultReplication(f),
854                  getDefaultBlockSize(f));
855  }
856    
857  /**
858   * Create an FSDataOutputStream at the indicated Path with write-progress
859   * reporting.
860   * @param f the path of the file to open
861   * @param overwrite if a file with this name already exists, then if true,
862   *   the file will be overwritten, and if false an error will be thrown.
863   * @param bufferSize the size of the buffer to be used.
864   */
865  public FSDataOutputStream create(Path f, 
866                                   boolean overwrite,
867                                   int bufferSize,
868                                   Progressable progress
869                                   ) throws IOException {
870    return create(f, overwrite, bufferSize, 
871                  getDefaultReplication(f),
872                  getDefaultBlockSize(f), progress);
873  }
874    
875    
876  /**
877   * Create an FSDataOutputStream at the indicated Path.
878   * @param f the file name to open
879   * @param overwrite if a file with this name already exists, then if true,
880   *   the file will be overwritten, and if false an error will be thrown.
881   * @param bufferSize the size of the buffer to be used.
882   * @param replication required block replication for the file. 
883   */
884  public FSDataOutputStream create(Path f, 
885                                   boolean overwrite,
886                                   int bufferSize,
887                                   short replication,
888                                   long blockSize
889                                   ) throws IOException {
890    return create(f, overwrite, bufferSize, replication, blockSize, null);
891  }
892
893  /**
894   * Create an FSDataOutputStream at the indicated Path with write-progress
895   * reporting.
896   * @param f the file name to open
897   * @param overwrite if a file with this name already exists, then if true,
898   *   the file will be overwritten, and if false an error will be thrown.
899   * @param bufferSize the size of the buffer to be used.
900   * @param replication required block replication for the file. 
901   */
902  public FSDataOutputStream create(Path f,
903                                            boolean overwrite,
904                                            int bufferSize,
905                                            short replication,
906                                            long blockSize,
907                                            Progressable progress
908                                            ) throws IOException {
909    return this.create(f, FsPermission.getFileDefault().applyUMask(
910        FsPermission.getUMask(getConf())), overwrite, bufferSize,
911        replication, blockSize, progress);
912  }
913
914  /**
915   * Create an FSDataOutputStream at the indicated Path with write-progress
916   * reporting.
917   * @param f the file name to open
918   * @param permission
919   * @param overwrite if a file with this name already exists, then if true,
920   *   the file will be overwritten, and if false an error will be thrown.
921   * @param bufferSize the size of the buffer to be used.
922   * @param replication required block replication for the file.
923   * @param blockSize
924   * @param progress
925   * @throws IOException
926   * @see #setPermission(Path, FsPermission)
927   */
928  public abstract FSDataOutputStream create(Path f,
929      FsPermission permission,
930      boolean overwrite,
931      int bufferSize,
932      short replication,
933      long blockSize,
934      Progressable progress) throws IOException;
935  
936  /**
937   * Create an FSDataOutputStream at the indicated Path with write-progress
938   * reporting.
939   * @param f the file name to open
940   * @param permission
941   * @param flags {@link CreateFlag}s to use for this stream.
942   * @param bufferSize the size of the buffer to be used.
943   * @param replication required block replication for the file.
944   * @param blockSize
945   * @param progress
946   * @throws IOException
947   * @see #setPermission(Path, FsPermission)
948   */
949  public FSDataOutputStream create(Path f,
950      FsPermission permission,
951      EnumSet<CreateFlag> flags,
952      int bufferSize,
953      short replication,
954      long blockSize,
955      Progressable progress) throws IOException {
956    return create(f, permission, flags, bufferSize, replication,
957        blockSize, progress, null);
958  }
959  
960  /**
961   * Create an FSDataOutputStream at the indicated Path with a custom
962   * checksum option
963   * @param f the file name to open
964   * @param permission
965   * @param flags {@link CreateFlag}s to use for this stream.
966   * @param bufferSize the size of the buffer to be used.
967   * @param replication required block replication for the file.
968   * @param blockSize
969   * @param progress
970   * @param checksumOpt checksum parameter. If null, the values
971   *        found in conf will be used.
972   * @throws IOException
973   * @see #setPermission(Path, FsPermission)
974   */
975  public FSDataOutputStream create(Path f,
976      FsPermission permission,
977      EnumSet<CreateFlag> flags,
978      int bufferSize,
979      short replication,
980      long blockSize,
981      Progressable progress,
982      ChecksumOpt checksumOpt) throws IOException {
983    // Checksum options are ignored by default. The file systems that
984    // implement checksum need to override this method. The full
985    // support is currently only available in DFS.
986    return create(f, permission, flags.contains(CreateFlag.OVERWRITE), 
987        bufferSize, replication, blockSize, progress);
988  }
989
990  /*.
991   * This create has been added to support the FileContext that processes
992   * the permission
993   * with umask before calling this method.
994   * This a temporary method added to support the transition from FileSystem
995   * to FileContext for user applications.
996   */
997  @Deprecated
998  protected FSDataOutputStream primitiveCreate(Path f,
999     FsPermission absolutePermission, EnumSet<CreateFlag> flag, int bufferSize,
1000     short replication, long blockSize, Progressable progress,
1001     ChecksumOpt checksumOpt) throws IOException {
1002
1003    boolean pathExists = exists(f);
1004    CreateFlag.validate(f, pathExists, flag);
1005    
1006    // Default impl  assumes that permissions do not matter and 
1007    // nor does the bytesPerChecksum  hence
1008    // calling the regular create is good enough.
1009    // FSs that implement permissions should override this.
1010
1011    if (pathExists && flag.contains(CreateFlag.APPEND)) {
1012      return append(f, bufferSize, progress);
1013    }
1014    
1015    return this.create(f, absolutePermission,
1016        flag.contains(CreateFlag.OVERWRITE), bufferSize, replication,
1017        blockSize, progress);
1018  }
1019  
1020  /**
1021   * This version of the mkdirs method assumes that the permission is absolute.
1022   * It has been added to support the FileContext that processes the permission
1023   * with umask before calling this method.
1024   * This a temporary method added to support the transition from FileSystem
1025   * to FileContext for user applications.
1026   */
1027  @Deprecated
1028  protected boolean primitiveMkdir(Path f, FsPermission absolutePermission)
1029    throws IOException {
1030    // Default impl is to assume that permissions do not matter and hence
1031    // calling the regular mkdirs is good enough.
1032    // FSs that implement permissions should override this.
1033   return this.mkdirs(f, absolutePermission);
1034  }
1035
1036
1037  /**
1038   * This version of the mkdirs method assumes that the permission is absolute.
1039   * It has been added to support the FileContext that processes the permission
1040   * with umask before calling this method.
1041   * This a temporary method added to support the transition from FileSystem
1042   * to FileContext for user applications.
1043   */
1044  @Deprecated
1045  protected void primitiveMkdir(Path f, FsPermission absolutePermission, 
1046                    boolean createParent)
1047    throws IOException {
1048    
1049    if (!createParent) { // parent must exist.
1050      // since the this.mkdirs makes parent dirs automatically
1051      // we must throw exception if parent does not exist.
1052      final FileStatus stat = getFileStatus(f.getParent());
1053      if (stat == null) {
1054        throw new FileNotFoundException("Missing parent:" + f);
1055      }
1056      if (!stat.isDirectory()) {
1057        throw new ParentNotDirectoryException("parent is not a dir");
1058      }
1059      // parent does exist - go ahead with mkdir of leaf
1060    }
1061    // Default impl is to assume that permissions do not matter and hence
1062    // calling the regular mkdirs is good enough.
1063    // FSs that implement permissions should override this.
1064    if (!this.mkdirs(f, absolutePermission)) {
1065      throw new IOException("mkdir of "+ f + " failed");
1066    }
1067  }
1068
1069  /**
1070   * Opens an FSDataOutputStream at the indicated Path with write-progress
1071   * reporting. Same as create(), except fails if parent directory doesn't
1072   * already exist.
1073   * @param f the file name to open
1074   * @param overwrite if a file with this name already exists, then if true,
1075   * the file will be overwritten, and if false an error will be thrown.
1076   * @param bufferSize the size of the buffer to be used.
1077   * @param replication required block replication for the file.
1078   * @param blockSize
1079   * @param progress
1080   * @throws IOException
1081   * @see #setPermission(Path, FsPermission)
1082   * @deprecated API only for 0.20-append
1083   */
1084  @Deprecated
1085  public FSDataOutputStream createNonRecursive(Path f,
1086      boolean overwrite,
1087      int bufferSize, short replication, long blockSize,
1088      Progressable progress) throws IOException {
1089    return this.createNonRecursive(f, FsPermission.getFileDefault(),
1090        overwrite, bufferSize, replication, blockSize, progress);
1091  }
1092
1093  /**
1094   * Opens an FSDataOutputStream at the indicated Path with write-progress
1095   * reporting. Same as create(), except fails if parent directory doesn't
1096   * already exist.
1097   * @param f the file name to open
1098   * @param permission
1099   * @param overwrite if a file with this name already exists, then if true,
1100   * the file will be overwritten, and if false an error will be thrown.
1101   * @param bufferSize the size of the buffer to be used.
1102   * @param replication required block replication for the file.
1103   * @param blockSize
1104   * @param progress
1105   * @throws IOException
1106   * @see #setPermission(Path, FsPermission)
1107   * @deprecated API only for 0.20-append
1108   */
1109   @Deprecated
1110   public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
1111       boolean overwrite, int bufferSize, short replication, long blockSize,
1112       Progressable progress) throws IOException {
1113     return createNonRecursive(f, permission,
1114         overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
1115             : EnumSet.of(CreateFlag.CREATE), bufferSize,
1116             replication, blockSize, progress);
1117   }
1118
1119   /**
1120    * Opens an FSDataOutputStream at the indicated Path with write-progress
1121    * reporting. Same as create(), except fails if parent directory doesn't
1122    * already exist.
1123    * @param f the file name to open
1124    * @param permission
1125    * @param flags {@link CreateFlag}s to use for this stream.
1126    * @param bufferSize the size of the buffer to be used.
1127    * @param replication required block replication for the file.
1128    * @param blockSize
1129    * @param progress
1130    * @throws IOException
1131    * @see #setPermission(Path, FsPermission)
1132    * @deprecated API only for 0.20-append
1133    */
1134    @Deprecated
1135    public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
1136        EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize,
1137        Progressable progress) throws IOException {
1138      throw new IOException("createNonRecursive unsupported for this filesystem "
1139          + this.getClass());
1140    }
1141
1142  /**
1143   * Creates the given Path as a brand-new zero-length file.  If
1144   * create fails, or if it already existed, return false.
1145   *
1146   * @param f path to use for create
1147   */
1148  public boolean createNewFile(Path f) throws IOException {
1149    if (exists(f)) {
1150      return false;
1151    } else {
1152      create(f, false, getConf().getInt("io.file.buffer.size", 4096)).close();
1153      return true;
1154    }
1155  }
1156
1157  /**
1158   * Append to an existing file (optional operation).
1159   * Same as append(f, getConf().getInt("io.file.buffer.size", 4096), null)
1160   * @param f the existing file to be appended.
1161   * @throws IOException
1162   */
1163  public FSDataOutputStream append(Path f) throws IOException {
1164    return append(f, getConf().getInt("io.file.buffer.size", 4096), null);
1165  }
1166  /**
1167   * Append to an existing file (optional operation).
1168   * Same as append(f, bufferSize, null).
1169   * @param f the existing file to be appended.
1170   * @param bufferSize the size of the buffer to be used.
1171   * @throws IOException
1172   */
1173  public FSDataOutputStream append(Path f, int bufferSize) throws IOException {
1174    return append(f, bufferSize, null);
1175  }
1176
1177  /**
1178   * Append to an existing file (optional operation).
1179   * @param f the existing file to be appended.
1180   * @param bufferSize the size of the buffer to be used.
1181   * @param progress for reporting progress if it is not null.
1182   * @throws IOException
1183   */
1184  public abstract FSDataOutputStream append(Path f, int bufferSize,
1185      Progressable progress) throws IOException;
1186
1187  /**
1188   * Concat existing files together.
1189   * @param trg the path to the target destination.
1190   * @param psrcs the paths to the sources to use for the concatenation.
1191   * @throws IOException
1192   */
1193  public void concat(final Path trg, final Path [] psrcs) throws IOException {
1194    throw new UnsupportedOperationException("Not implemented by the " + 
1195        getClass().getSimpleName() + " FileSystem implementation");
1196  }
1197
1198 /**
1199   * Get replication.
1200   * 
1201   * @deprecated Use getFileStatus() instead
1202   * @param src file name
1203   * @return file replication
1204   * @throws IOException
1205   */ 
1206  @Deprecated
1207  public short getReplication(Path src) throws IOException {
1208    return getFileStatus(src).getReplication();
1209  }
1210
1211  /**
1212   * Set replication for an existing file.
1213   * 
1214   * @param src file name
1215   * @param replication new replication
1216   * @throws IOException
1217   * @return true if successful;
1218   *         false if file does not exist or is a directory
1219   */
1220  public boolean setReplication(Path src, short replication)
1221    throws IOException {
1222    return true;
1223  }
1224
1225  /**
1226   * Renames Path src to Path dst.  Can take place on local fs
1227   * or remote DFS.
1228   * @param src path to be renamed
1229   * @param dst new path after rename
1230   * @throws IOException on failure
1231   * @return true if rename is successful
1232   */
1233  public abstract boolean rename(Path src, Path dst) throws IOException;
1234
1235  /**
1236   * Renames Path src to Path dst
1237   * <ul>
1238   * <li
1239   * <li>Fails if src is a file and dst is a directory.
1240   * <li>Fails if src is a directory and dst is a file.
1241   * <li>Fails if the parent of dst does not exist or is a file.
1242   * </ul>
1243   * <p>
1244   * If OVERWRITE option is not passed as an argument, rename fails
1245   * if the dst already exists.
1246   * <p>
1247   * If OVERWRITE option is passed as an argument, rename overwrites
1248   * the dst if it is a file or an empty directory. Rename fails if dst is
1249   * a non-empty directory.
1250   * <p>
1251   * Note that atomicity of rename is dependent on the file system
1252   * implementation. Please refer to the file system documentation for
1253   * details. This default implementation is non atomic.
1254   * <p>
1255   * This method is deprecated since it is a temporary method added to 
1256   * support the transition from FileSystem to FileContext for user 
1257   * applications.
1258   * 
1259   * @param src path to be renamed
1260   * @param dst new path after rename
1261   * @throws IOException on failure
1262   */
1263  @Deprecated
1264  protected void rename(final Path src, final Path dst,
1265      final Rename... options) throws IOException {
1266    // Default implementation
1267    final FileStatus srcStatus = getFileLinkStatus(src);
1268    if (srcStatus == null) {
1269      throw new FileNotFoundException("rename source " + src + " not found.");
1270    }
1271
1272    boolean overwrite = false;
1273    if (null != options) {
1274      for (Rename option : options) {
1275        if (option == Rename.OVERWRITE) {
1276          overwrite = true;
1277        }
1278      }
1279    }
1280
1281    FileStatus dstStatus;
1282    try {
1283      dstStatus = getFileLinkStatus(dst);
1284    } catch (IOException e) {
1285      dstStatus = null;
1286    }
1287    if (dstStatus != null) {
1288      if (srcStatus.isDirectory() != dstStatus.isDirectory()) {
1289        throw new IOException("Source " + src + " Destination " + dst
1290            + " both should be either file or directory");
1291      }
1292      if (!overwrite) {
1293        throw new FileAlreadyExistsException("rename destination " + dst
1294            + " already exists.");
1295      }
1296      // Delete the destination that is a file or an empty directory
1297      if (dstStatus.isDirectory()) {
1298        FileStatus[] list = listStatus(dst);
1299        if (list != null && list.length != 0) {
1300          throw new IOException(
1301              "rename cannot overwrite non empty destination directory " + dst);
1302        }
1303      }
1304      delete(dst, false);
1305    } else {
1306      final Path parent = dst.getParent();
1307      final FileStatus parentStatus = getFileStatus(parent);
1308      if (parentStatus == null) {
1309        throw new FileNotFoundException("rename destination parent " + parent
1310            + " not found.");
1311      }
1312      if (!parentStatus.isDirectory()) {
1313        throw new ParentNotDirectoryException("rename destination parent " + parent
1314            + " is a file.");
1315      }
1316    }
1317    if (!rename(src, dst)) {
1318      throw new IOException("rename from " + src + " to " + dst + " failed.");
1319    }
1320  }
1321
1322  /**
1323   * Truncate the file in the indicated path to the indicated size.
1324   * <ul>
1325   * <li>Fails if path is a directory.
1326   * <li>Fails if path does not exist.
1327   * <li>Fails if path is not closed.
1328   * <li>Fails if new size is greater than current size.
1329   * </ul>
1330   * @param f The path to the file to be truncated
1331   * @param newLength The size the file is to be truncated to
1332   *
1333   * @return <code>true</code> if the file has been truncated to the desired
1334   * <code>newLength</code> and is immediately available to be reused for
1335   * write operations such as <code>append</code>, or
1336   * <code>false</code> if a background process of adjusting the length of
1337   * the last block has been started, and clients should wait for it to
1338   * complete before proceeding with further file updates.
1339   */
1340  public boolean truncate(Path f, long newLength) throws IOException {
1341    throw new UnsupportedOperationException("Not implemented by the " +
1342        getClass().getSimpleName() + " FileSystem implementation");
1343  }
1344  
1345  /**
1346   * Delete a file 
1347   * @deprecated Use {@link #delete(Path, boolean)} instead.
1348   */
1349  @Deprecated
1350  public boolean delete(Path f) throws IOException {
1351    return delete(f, true);
1352  }
1353  
1354  /** Delete a file.
1355   *
1356   * @param f the path to delete.
1357   * @param recursive if path is a directory and set to 
1358   * true, the directory is deleted else throws an exception. In
1359   * case of a file the recursive can be set to either true or false. 
1360   * @return  true if delete is successful else false. 
1361   * @throws IOException
1362   */
1363  public abstract boolean delete(Path f, boolean recursive) throws IOException;
1364
1365  /**
1366   * Mark a path to be deleted when FileSystem is closed.
1367   * When the JVM shuts down,
1368   * all FileSystem objects will be closed automatically.
1369   * Then,
1370   * the marked path will be deleted as a result of closing the FileSystem.
1371   *
1372   * The path has to exist in the file system.
1373   * 
1374   * @param f the path to delete.
1375   * @return  true if deleteOnExit is successful, otherwise false.
1376   * @throws IOException
1377   */
1378  public boolean deleteOnExit(Path f) throws IOException {
1379    if (!exists(f)) {
1380      return false;
1381    }
1382    synchronized (deleteOnExit) {
1383      deleteOnExit.add(f);
1384    }
1385    return true;
1386  }
1387  
1388  /**
1389   * Cancel the deletion of the path when the FileSystem is closed
1390   * @param f the path to cancel deletion
1391   */
1392  public boolean cancelDeleteOnExit(Path f) {
1393    synchronized (deleteOnExit) {
1394      return deleteOnExit.remove(f);
1395    }
1396  }
1397
1398  /**
1399   * Delete all files that were marked as delete-on-exit. This recursively
1400   * deletes all files in the specified paths.
1401   */
1402  protected void processDeleteOnExit() {
1403    synchronized (deleteOnExit) {
1404      for (Iterator<Path> iter = deleteOnExit.iterator(); iter.hasNext();) {
1405        Path path = iter.next();
1406        try {
1407          if (exists(path)) {
1408            delete(path, true);
1409          }
1410        }
1411        catch (IOException e) {
1412          LOG.info("Ignoring failure to deleteOnExit for path " + path);
1413        }
1414        iter.remove();
1415      }
1416    }
1417  }
1418  
1419  /** Check if exists.
1420   * @param f source file
1421   */
1422  public boolean exists(Path f) throws IOException {
1423    try {
1424      return getFileStatus(f) != null;
1425    } catch (FileNotFoundException e) {
1426      return false;
1427    }
1428  }
1429
1430  /** True iff the named path is a directory.
1431   * Note: Avoid using this method. Instead reuse the FileStatus 
1432   * returned by getFileStatus() or listStatus() methods.
1433   * @param f path to check
1434   */
1435  public boolean isDirectory(Path f) throws IOException {
1436    try {
1437      return getFileStatus(f).isDirectory();
1438    } catch (FileNotFoundException e) {
1439      return false;               // f does not exist
1440    }
1441  }
1442
1443  /** True iff the named path is a regular file.
1444   * Note: Avoid using this method. Instead reuse the FileStatus 
1445   * returned by getFileStatus() or listStatus() methods.
1446   * @param f path to check
1447   */
1448  public boolean isFile(Path f) throws IOException {
1449    try {
1450      return getFileStatus(f).isFile();
1451    } catch (FileNotFoundException e) {
1452      return false;               // f does not exist
1453    }
1454  }
1455  
1456  /** The number of bytes in a file. */
1457  /** @deprecated Use getFileStatus() instead */
1458  @Deprecated
1459  public long getLength(Path f) throws IOException {
1460    return getFileStatus(f).getLen();
1461  }
1462    
1463  /** Return the {@link ContentSummary} of a given {@link Path}.
1464  * @param f path to use
1465  */
1466  public ContentSummary getContentSummary(Path f) throws IOException {
1467    FileStatus status = getFileStatus(f);
1468    if (status.isFile()) {
1469      // f is a file
1470      long length = status.getLen();
1471      return new ContentSummary.Builder().length(length).
1472          fileCount(1).directoryCount(0).spaceConsumed(length).build();
1473    }
1474    // f is a directory
1475    long[] summary = {0, 0, 1};
1476    for(FileStatus s : listStatus(f)) {
1477      long length = s.getLen();
1478      ContentSummary c = s.isDirectory() ? getContentSummary(s.getPath()) :
1479          new ContentSummary.Builder().length(length).
1480          fileCount(1).directoryCount(0).spaceConsumed(length).build();
1481      summary[0] += c.getLength();
1482      summary[1] += c.getFileCount();
1483      summary[2] += c.getDirectoryCount();
1484    }
1485    return new ContentSummary.Builder().length(summary[0]).
1486        fileCount(summary[1]).directoryCount(summary[2]).
1487        spaceConsumed(summary[0]).build();
1488  }
1489
1490  final private static PathFilter DEFAULT_FILTER = new PathFilter() {
1491      @Override
1492      public boolean accept(Path file) {
1493        return true;
1494      }     
1495    };
1496    
1497  /**
1498   * List the statuses of the files/directories in the given path if the path is
1499   * a directory.
1500   * 
1501   * @param f given path
1502   * @return the statuses of the files/directories in the given patch
1503   * @throws FileNotFoundException when the path does not exist;
1504   *         IOException see specific implementation
1505   */
1506  public abstract FileStatus[] listStatus(Path f) throws FileNotFoundException, 
1507                                                         IOException;
1508    
1509  /*
1510   * Filter files/directories in the given path using the user-supplied path
1511   * filter. Results are added to the given array <code>results</code>.
1512   */
1513  private void listStatus(ArrayList<FileStatus> results, Path f,
1514      PathFilter filter) throws FileNotFoundException, IOException {
1515    FileStatus listing[] = listStatus(f);
1516    if (listing == null) {
1517      throw new IOException("Error accessing " + f);
1518    }
1519
1520    for (int i = 0; i < listing.length; i++) {
1521      if (filter.accept(listing[i].getPath())) {
1522        results.add(listing[i]);
1523      }
1524    }
1525  }
1526
1527  /**
1528   * @return an iterator over the corrupt files under the given path
1529   * (may contain duplicates if a file has more than one corrupt block)
1530   * @throws IOException
1531   */
1532  public RemoteIterator<Path> listCorruptFileBlocks(Path path)
1533    throws IOException {
1534    throw new UnsupportedOperationException(getClass().getCanonicalName() +
1535                                            " does not support" +
1536                                            " listCorruptFileBlocks");
1537  }
1538
1539  /**
1540   * Filter files/directories in the given path using the user-supplied path
1541   * filter.
1542   * 
1543   * @param f
1544   *          a path name
1545   * @param filter
1546   *          the user-supplied path filter
1547   * @return an array of FileStatus objects for the files under the given path
1548   *         after applying the filter
1549   * @throws FileNotFoundException when the path does not exist;
1550   *         IOException see specific implementation   
1551   */
1552  public FileStatus[] listStatus(Path f, PathFilter filter) 
1553                                   throws FileNotFoundException, IOException {
1554    ArrayList<FileStatus> results = new ArrayList<FileStatus>();
1555    listStatus(results, f, filter);
1556    return results.toArray(new FileStatus[results.size()]);
1557  }
1558
1559  /**
1560   * Filter files/directories in the given list of paths using default
1561   * path filter.
1562   * 
1563   * @param files
1564   *          a list of paths
1565   * @return a list of statuses for the files under the given paths after
1566   *         applying the filter default Path filter
1567   * @throws FileNotFoundException when the path does not exist;
1568   *         IOException see specific implementation
1569   */
1570  public FileStatus[] listStatus(Path[] files)
1571      throws FileNotFoundException, IOException {
1572    return listStatus(files, DEFAULT_FILTER);
1573  }
1574
1575  /**
1576   * Filter files/directories in the given list of paths using user-supplied
1577   * path filter.
1578   * 
1579   * @param files
1580   *          a list of paths
1581   * @param filter
1582   *          the user-supplied path filter
1583   * @return a list of statuses for the files under the given paths after
1584   *         applying the filter
1585   * @throws FileNotFoundException when the path does not exist;
1586   *         IOException see specific implementation
1587   */
1588  public FileStatus[] listStatus(Path[] files, PathFilter filter)
1589      throws FileNotFoundException, IOException {
1590    ArrayList<FileStatus> results = new ArrayList<FileStatus>();
1591    for (int i = 0; i < files.length; i++) {
1592      listStatus(results, files[i], filter);
1593    }
1594    return results.toArray(new FileStatus[results.size()]);
1595  }
1596
1597  /**
1598   * <p>Return all the files that match filePattern and are not checksum
1599   * files. Results are sorted by their names.
1600   * 
1601   * <p>
1602   * A filename pattern is composed of <i>regular</i> characters and
1603   * <i>special pattern matching</i> characters, which are:
1604   *
1605   * <dl>
1606   *  <dd>
1607   *   <dl>
1608   *    <p>
1609   *    <dt> <tt> ? </tt>
1610   *    <dd> Matches any single character.
1611   *
1612   *    <p>
1613   *    <dt> <tt> * </tt>
1614   *    <dd> Matches zero or more characters.
1615   *
1616   *    <p>
1617   *    <dt> <tt> [<i>abc</i>] </tt>
1618   *    <dd> Matches a single character from character set
1619   *     <tt>{<i>a,b,c</i>}</tt>.
1620   *
1621   *    <p>
1622   *    <dt> <tt> [<i>a</i>-<i>b</i>] </tt>
1623   *    <dd> Matches a single character from the character range
1624   *     <tt>{<i>a...b</i>}</tt>.  Note that character <tt><i>a</i></tt> must be
1625   *     lexicographically less than or equal to character <tt><i>b</i></tt>.
1626   *
1627   *    <p>
1628   *    <dt> <tt> [^<i>a</i>] </tt>
1629   *    <dd> Matches a single character that is not from character set or range
1630   *     <tt>{<i>a</i>}</tt>.  Note that the <tt>^</tt> character must occur
1631   *     immediately to the right of the opening bracket.
1632   *
1633   *    <p>
1634   *    <dt> <tt> \<i>c</i> </tt>
1635   *    <dd> Removes (escapes) any special meaning of character <i>c</i>.
1636   *
1637   *    <p>
1638   *    <dt> <tt> {ab,cd} </tt>
1639   *    <dd> Matches a string from the string set <tt>{<i>ab, cd</i>} </tt>
1640   *    
1641   *    <p>
1642   *    <dt> <tt> {ab,c{de,fh}} </tt>
1643   *    <dd> Matches a string from the string set <tt>{<i>ab, cde, cfh</i>}</tt>
1644   *
1645   *   </dl>
1646   *  </dd>
1647   * </dl>
1648   *
1649   * @param pathPattern a regular expression specifying a pth pattern
1650
1651   * @return an array of paths that match the path pattern
1652   * @throws IOException
1653   */
1654  public FileStatus[] globStatus(Path pathPattern) throws IOException {
1655    return new Globber(this, pathPattern, DEFAULT_FILTER).glob();
1656  }
1657  
1658  /**
1659   * Return an array of FileStatus objects whose path names match pathPattern
1660   * and is accepted by the user-supplied path filter. Results are sorted by
1661   * their path names.
1662   * Return null if pathPattern has no glob and the path does not exist.
1663   * Return an empty array if pathPattern has a glob and no path matches it. 
1664   * 
1665   * @param pathPattern
1666   *          a regular expression specifying the path pattern
1667   * @param filter
1668   *          a user-supplied path filter
1669   * @return an array of FileStatus objects
1670   * @throws IOException if any I/O error occurs when fetching file status
1671   */
1672  public FileStatus[] globStatus(Path pathPattern, PathFilter filter)
1673      throws IOException {
1674    return new Globber(this, pathPattern, filter).glob();
1675  }
1676  
1677  /**
1678   * List the statuses of the files/directories in the given path if the path is
1679   * a directory. 
1680   * Return the file's status and block locations If the path is a file.
1681   * 
1682   * If a returned status is a file, it contains the file's block locations.
1683   * 
1684   * @param f is the path
1685   *
1686   * @return an iterator that traverses statuses of the files/directories 
1687   *         in the given path
1688   *
1689   * @throws FileNotFoundException If <code>f</code> does not exist
1690   * @throws IOException If an I/O error occurred
1691   */
1692  public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f)
1693  throws FileNotFoundException, IOException {
1694    return listLocatedStatus(f, DEFAULT_FILTER);
1695  }
1696
1697  /**
1698   * Listing a directory
1699   * The returned results include its block location if it is a file
1700   * The results are filtered by the given path filter
1701   * @param f a path
1702   * @param filter a path filter
1703   * @return an iterator that traverses statuses of the files/directories 
1704   *         in the given path
1705   * @throws FileNotFoundException if <code>f</code> does not exist
1706   * @throws IOException if any I/O error occurred
1707   */
1708  protected RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f,
1709      final PathFilter filter)
1710  throws FileNotFoundException, IOException {
1711    return new RemoteIterator<LocatedFileStatus>() {
1712      private final FileStatus[] stats = listStatus(f, filter);
1713      private int i = 0;
1714
1715      @Override
1716      public boolean hasNext() {
1717        return i<stats.length;
1718      }
1719
1720      @Override
1721      public LocatedFileStatus next() throws IOException {
1722        if (!hasNext()) {
1723          throw new NoSuchElementException("No more entry in " + f);
1724        }
1725        FileStatus result = stats[i++];
1726        BlockLocation[] locs = result.isFile() ?
1727            getFileBlockLocations(result.getPath(), 0, result.getLen()) :
1728            null;
1729        return new LocatedFileStatus(result, locs);
1730      }
1731    };
1732  }
1733
1734  /**
1735   * Returns a remote iterator so that followup calls are made on demand
1736   * while consuming the entries. Each file system implementation should
1737   * override this method and provide a more efficient implementation, if
1738   * possible. 
1739   *
1740   * @param p target path
1741   * @return remote iterator
1742   */
1743  public RemoteIterator<FileStatus> listStatusIterator(final Path p)
1744  throws FileNotFoundException, IOException {
1745    return new RemoteIterator<FileStatus>() {
1746      private final FileStatus[] stats = listStatus(p);
1747      private int i = 0;
1748
1749      @Override
1750      public boolean hasNext() {
1751        return i<stats.length;
1752      }
1753
1754      @Override
1755      public FileStatus next() throws IOException {
1756        if (!hasNext()) {
1757          throw new NoSuchElementException("No more entry in " + p);
1758        }
1759        return stats[i++];
1760      }
1761    };
1762  }
1763
1764  /**
1765   * List the statuses and block locations of the files in the given path.
1766   * 
1767   * If the path is a directory, 
1768   *   if recursive is false, returns files in the directory;
1769   *   if recursive is true, return files in the subtree rooted at the path.
1770   * If the path is a file, return the file's status and block locations.
1771   * 
1772   * @param f is the path
1773   * @param recursive if the subdirectories need to be traversed recursively
1774   *
1775   * @return an iterator that traverses statuses of the files
1776   *
1777   * @throws FileNotFoundException when the path does not exist;
1778   *         IOException see specific implementation
1779   */
1780  public RemoteIterator<LocatedFileStatus> listFiles(
1781      final Path f, final boolean recursive)
1782  throws FileNotFoundException, IOException {
1783    return new RemoteIterator<LocatedFileStatus>() {
1784      private Stack<RemoteIterator<LocatedFileStatus>> itors = 
1785        new Stack<RemoteIterator<LocatedFileStatus>>();
1786      private RemoteIterator<LocatedFileStatus> curItor =
1787        listLocatedStatus(f);
1788      private LocatedFileStatus curFile;
1789     
1790      @Override
1791      public boolean hasNext() throws IOException {
1792        while (curFile == null) {
1793          if (curItor.hasNext()) {
1794            handleFileStat(curItor.next());
1795          } else if (!itors.empty()) {
1796            curItor = itors.pop();
1797          } else {
1798            return false;
1799          }
1800        }
1801        return true;
1802      }
1803
1804      /**
1805       * Process the input stat.
1806       * If it is a file, return the file stat.
1807       * If it is a directory, traverse the directory if recursive is true;
1808       * ignore it if recursive is false.
1809       * @param stat input status
1810       * @throws IOException if any IO error occurs
1811       */
1812      private void handleFileStat(LocatedFileStatus stat) throws IOException {
1813        if (stat.isFile()) { // file
1814          curFile = stat;
1815        } else if (recursive) { // directory
1816          itors.push(curItor);
1817          curItor = listLocatedStatus(stat.getPath());
1818        }
1819      }
1820
1821      @Override
1822      public LocatedFileStatus next() throws IOException {
1823        if (hasNext()) {
1824          LocatedFileStatus result = curFile;
1825          curFile = null;
1826          return result;
1827        } 
1828        throw new java.util.NoSuchElementException("No more entry in " + f);
1829      }
1830    };
1831  }
1832  
1833  /** Return the current user's home directory in this filesystem.
1834   * The default implementation returns "/user/$USER/".
1835   */
1836  public Path getHomeDirectory() {
1837    return this.makeQualified(
1838        new Path("/user/"+System.getProperty("user.name")));
1839  }
1840
1841
1842  /**
1843   * Set the current working directory for the given file system. All relative
1844   * paths will be resolved relative to it.
1845   * 
1846   * @param new_dir
1847   */
1848  public abstract void setWorkingDirectory(Path new_dir);
1849    
1850  /**
1851   * Get the current working directory for the given file system
1852   * @return the directory pathname
1853   */
1854  public abstract Path getWorkingDirectory();
1855  
1856  
1857  /**
1858   * Note: with the new FilesContext class, getWorkingDirectory()
1859   * will be removed. 
1860   * The working directory is implemented in FilesContext.
1861   * 
1862   * Some file systems like LocalFileSystem have an initial workingDir
1863   * that we use as the starting workingDir. For other file systems
1864   * like HDFS there is no built in notion of an initial workingDir.
1865   * 
1866   * @return if there is built in notion of workingDir then it
1867   * is returned; else a null is returned.
1868   */
1869  protected Path getInitialWorkingDirectory() {
1870    return null;
1871  }
1872
1873  /**
1874   * Call {@link #mkdirs(Path, FsPermission)} with default permission.
1875   */
1876  public boolean mkdirs(Path f) throws IOException {
1877    return mkdirs(f, FsPermission.getDirDefault());
1878  }
1879
1880  /**
1881   * Make the given file and all non-existent parents into
1882   * directories. Has the semantics of Unix 'mkdir -p'.
1883   * Existence of the directory hierarchy is not an error.
1884   * @param f path to create
1885   * @param permission to apply to f
1886   */
1887  public abstract boolean mkdirs(Path f, FsPermission permission
1888      ) throws IOException;
1889
1890  /**
1891   * The src file is on the local disk.  Add it to FS at
1892   * the given dst name and the source is kept intact afterwards
1893   * @param src path
1894   * @param dst path
1895   */
1896  public void copyFromLocalFile(Path src, Path dst)
1897    throws IOException {
1898    copyFromLocalFile(false, src, dst);
1899  }
1900
1901  /**
1902   * The src files is on the local disk.  Add it to FS at
1903   * the given dst name, removing the source afterwards.
1904   * @param srcs path
1905   * @param dst path
1906   */
1907  public void moveFromLocalFile(Path[] srcs, Path dst)
1908    throws IOException {
1909    copyFromLocalFile(true, true, srcs, dst);
1910  }
1911
1912  /**
1913   * The src file is on the local disk.  Add it to FS at
1914   * the given dst name, removing the source afterwards.
1915   * @param src path
1916   * @param dst path
1917   */
1918  public void moveFromLocalFile(Path src, Path dst)
1919    throws IOException {
1920    copyFromLocalFile(true, src, dst);
1921  }
1922
1923  /**
1924   * The src file is on the local disk.  Add it to FS at
1925   * the given dst name.
1926   * delSrc indicates if the source should be removed
1927   * @param delSrc whether to delete the src
1928   * @param src path
1929   * @param dst path
1930   */
1931  public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
1932    throws IOException {
1933    copyFromLocalFile(delSrc, true, src, dst);
1934  }
1935  
1936  /**
1937   * The src files are on the local disk.  Add it to FS at
1938   * the given dst name.
1939   * delSrc indicates if the source should be removed
1940   * @param delSrc whether to delete the src
1941   * @param overwrite whether to overwrite an existing file
1942   * @param srcs array of paths which are source
1943   * @param dst path
1944   */
1945  public void copyFromLocalFile(boolean delSrc, boolean overwrite, 
1946                                Path[] srcs, Path dst)
1947    throws IOException {
1948    Configuration conf = getConf();
1949    FileUtil.copy(getLocal(conf), srcs, this, dst, delSrc, overwrite, conf);
1950  }
1951  
1952  /**
1953   * The src file is on the local disk.  Add it to FS at
1954   * the given dst name.
1955   * delSrc indicates if the source should be removed
1956   * @param delSrc whether to delete the src
1957   * @param overwrite whether to overwrite an existing file
1958   * @param src path
1959   * @param dst path
1960   */
1961  public void copyFromLocalFile(boolean delSrc, boolean overwrite, 
1962                                Path src, Path dst)
1963    throws IOException {
1964    Configuration conf = getConf();
1965    FileUtil.copy(getLocal(conf), src, this, dst, delSrc, overwrite, conf);
1966  }
1967    
1968  /**
1969   * The src file is under FS, and the dst is on the local disk.
1970   * Copy it from FS control to the local dst name.
1971   * @param src path
1972   * @param dst path
1973   */
1974  public void copyToLocalFile(Path src, Path dst) throws IOException {
1975    copyToLocalFile(false, src, dst);
1976  }
1977    
1978  /**
1979   * The src file is under FS, and the dst is on the local disk.
1980   * Copy it from FS control to the local dst name.
1981   * Remove the source afterwards
1982   * @param src path
1983   * @param dst path
1984   */
1985  public void moveToLocalFile(Path src, Path dst) throws IOException {
1986    copyToLocalFile(true, src, dst);
1987  }
1988
1989  /**
1990   * The src file is under FS, and the dst is on the local disk.
1991   * Copy it from FS control to the local dst name.
1992   * delSrc indicates if the src will be removed or not.
1993   * @param delSrc whether to delete the src
1994   * @param src path
1995   * @param dst path
1996   */   
1997  public void copyToLocalFile(boolean delSrc, Path src, Path dst)
1998    throws IOException {
1999    copyToLocalFile(delSrc, src, dst, false);
2000  }
2001  
2002    /**
2003   * The src file is under FS, and the dst is on the local disk. Copy it from FS
2004   * control to the local dst name. delSrc indicates if the src will be removed
2005   * or not. useRawLocalFileSystem indicates whether to use RawLocalFileSystem
2006   * as local file system or not. RawLocalFileSystem is non crc file system.So,
2007   * It will not create any crc files at local.
2008   * 
2009   * @param delSrc
2010   *          whether to delete the src
2011   * @param src
2012   *          path
2013   * @param dst
2014   *          path
2015   * @param useRawLocalFileSystem
2016   *          whether to use RawLocalFileSystem as local file system or not.
2017   * 
2018   * @throws IOException
2019   *           - if any IO error
2020   */
2021  public void copyToLocalFile(boolean delSrc, Path src, Path dst,
2022      boolean useRawLocalFileSystem) throws IOException {
2023    Configuration conf = getConf();
2024    FileSystem local = null;
2025    if (useRawLocalFileSystem) {
2026      local = getLocal(conf).getRawFileSystem();
2027    } else {
2028      local = getLocal(conf);
2029    }
2030    FileUtil.copy(this, src, local, dst, delSrc, conf);
2031  }
2032
2033  /**
2034   * Returns a local File that the user can write output to.  The caller
2035   * provides both the eventual FS target name and the local working
2036   * file.  If the FS is local, we write directly into the target.  If
2037   * the FS is remote, we write into the tmp local area.
2038   * @param fsOutputFile path of output file
2039   * @param tmpLocalFile path of local tmp file
2040   */
2041  public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
2042    throws IOException {
2043    return tmpLocalFile;
2044  }
2045
2046  /**
2047   * Called when we're all done writing to the target.  A local FS will
2048   * do nothing, because we've written to exactly the right place.  A remote
2049   * FS will copy the contents of tmpLocalFile to the correct target at
2050   * fsOutputFile.
2051   * @param fsOutputFile path of output file
2052   * @param tmpLocalFile path to local tmp file
2053   */
2054  public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
2055    throws IOException {
2056    moveFromLocalFile(tmpLocalFile, fsOutputFile);
2057  }
2058
2059  /**
2060   * No more filesystem operations are needed.  Will
2061   * release any held locks.
2062   */
2063  @Override
2064  public void close() throws IOException {
2065    // delete all files that were marked as delete-on-exit.
2066    processDeleteOnExit();
2067    CACHE.remove(this.key, this);
2068  }
2069
2070  /** Return the total size of all files in the filesystem.*/
2071  public long getUsed() throws IOException{
2072    long used = 0;
2073    FileStatus[] files = listStatus(new Path("/"));
2074    for(FileStatus file:files){
2075      used += file.getLen();
2076    }
2077    return used;
2078  }
2079  
2080  /**
2081   * Get the block size for a particular file.
2082   * @param f the filename
2083   * @return the number of bytes in a block
2084   */
2085  /** @deprecated Use getFileStatus() instead */
2086  @Deprecated
2087  public long getBlockSize(Path f) throws IOException {
2088    return getFileStatus(f).getBlockSize();
2089  }
2090
2091  /**
2092   * Return the number of bytes that large input files should be optimally
2093   * be split into to minimize i/o time.
2094   * @deprecated use {@link #getDefaultBlockSize(Path)} instead
2095   */
2096  @Deprecated
2097  public long getDefaultBlockSize() {
2098    // default to 32MB: large enough to minimize the impact of seeks
2099    return getConf().getLong("fs.local.block.size", 32 * 1024 * 1024);
2100  }
2101    
2102  /** Return the number of bytes that large input files should be optimally
2103   * be split into to minimize i/o time.  The given path will be used to
2104   * locate the actual filesystem.  The full path does not have to exist.
2105   * @param f path of file
2106   * @return the default block size for the path's filesystem
2107   */
2108  public long getDefaultBlockSize(Path f) {
2109    return getDefaultBlockSize();
2110  }
2111
2112  /**
2113   * Get the default replication.
2114   * @deprecated use {@link #getDefaultReplication(Path)} instead
2115   */
2116  @Deprecated
2117  public short getDefaultReplication() { return 1; }
2118
2119  /**
2120   * Get the default replication for a path.   The given path will be used to
2121   * locate the actual filesystem.  The full path does not have to exist.
2122   * @param path of the file
2123   * @return default replication for the path's filesystem 
2124   */
2125  public short getDefaultReplication(Path path) {
2126    return getDefaultReplication();
2127  }
2128  
2129  /**
2130   * Return a file status object that represents the path.
2131   * @param f The path we want information from
2132   * @return a FileStatus object
2133   * @throws FileNotFoundException when the path does not exist;
2134   *         IOException see specific implementation
2135   */
2136  public abstract FileStatus getFileStatus(Path f) throws IOException;
2137
2138  /**
2139   * Checks if the user can access a path.  The mode specifies which access
2140   * checks to perform.  If the requested permissions are granted, then the
2141   * method returns normally.  If access is denied, then the method throws an
2142   * {@link AccessControlException}.
2143   * <p/>
2144   * The default implementation of this method calls {@link #getFileStatus(Path)}
2145   * and checks the returned permissions against the requested permissions.
2146   * Note that the getFileStatus call will be subject to authorization checks.
2147   * Typically, this requires search (execute) permissions on each directory in
2148   * the path's prefix, but this is implementation-defined.  Any file system
2149   * that provides a richer authorization model (such as ACLs) may override the
2150   * default implementation so that it checks against that model instead.
2151   * <p>
2152   * In general, applications should avoid using this method, due to the risk of
2153   * time-of-check/time-of-use race conditions.  The permissions on a file may
2154   * change immediately after the access call returns.  Most applications should
2155   * prefer running specific file system actions as the desired user represented
2156   * by a {@link UserGroupInformation}.
2157   *
2158   * @param path Path to check
2159   * @param mode type of access to check
2160   * @throws AccessControlException if access is denied
2161   * @throws FileNotFoundException if the path does not exist
2162   * @throws IOException see specific implementation
2163   */
2164  @InterfaceAudience.LimitedPrivate({"HDFS", "Hive"})
2165  public void access(Path path, FsAction mode) throws AccessControlException,
2166      FileNotFoundException, IOException {
2167    checkAccessPermissions(this.getFileStatus(path), mode);
2168  }
2169
2170  /**
2171   * This method provides the default implementation of
2172   * {@link #access(Path, FsAction)}.
2173   *
2174   * @param stat FileStatus to check
2175   * @param mode type of access to check
2176   * @throws IOException for any error
2177   */
2178  @InterfaceAudience.Private
2179  static void checkAccessPermissions(FileStatus stat, FsAction mode)
2180      throws IOException {
2181    FsPermission perm = stat.getPermission();
2182    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
2183    String user = ugi.getShortUserName();
2184    List<String> groups = Arrays.asList(ugi.getGroupNames());
2185    if (user.equals(stat.getOwner())) {
2186      if (perm.getUserAction().implies(mode)) {
2187        return;
2188      }
2189    } else if (groups.contains(stat.getGroup())) {
2190      if (perm.getGroupAction().implies(mode)) {
2191        return;
2192      }
2193    } else {
2194      if (perm.getOtherAction().implies(mode)) {
2195        return;
2196      }
2197    }
2198    throw new AccessControlException(String.format(
2199      "Permission denied: user=%s, path=\"%s\":%s:%s:%s%s", user, stat.getPath(),
2200      stat.getOwner(), stat.getGroup(), stat.isDirectory() ? "d" : "-", perm));
2201  }
2202
2203  /**
2204   * See {@link FileContext#fixRelativePart}
2205   */
2206  protected Path fixRelativePart(Path p) {
2207    if (p.isUriPathAbsolute()) {
2208      return p;
2209    } else {
2210      return new Path(getWorkingDirectory(), p);
2211    }
2212  }
2213
2214  /**
2215   * See {@link FileContext#createSymlink(Path, Path, boolean)}
2216   */
2217  public void createSymlink(final Path target, final Path link,
2218      final boolean createParent) throws AccessControlException,
2219      FileAlreadyExistsException, FileNotFoundException,
2220      ParentNotDirectoryException, UnsupportedFileSystemException, 
2221      IOException {
2222    // Supporting filesystems should override this method
2223    throw new UnsupportedOperationException(
2224        "Filesystem does not support symlinks!");
2225  }
2226
2227  /**
2228   * See {@link FileContext#getFileLinkStatus(Path)}
2229   */
2230  public FileStatus getFileLinkStatus(final Path f)
2231      throws AccessControlException, FileNotFoundException,
2232      UnsupportedFileSystemException, IOException {
2233    // Supporting filesystems should override this method
2234    return getFileStatus(f);
2235  }
2236
2237  /**
2238   * See {@link AbstractFileSystem#supportsSymlinks()}
2239   */
2240  public boolean supportsSymlinks() {
2241    return false;
2242  }
2243
2244  /**
2245   * See {@link FileContext#getLinkTarget(Path)}
2246   */
2247  public Path getLinkTarget(Path f) throws IOException {
2248    // Supporting filesystems should override this method
2249    throw new UnsupportedOperationException(
2250        "Filesystem does not support symlinks!");
2251  }
2252
2253  /**
2254   * See {@link AbstractFileSystem#getLinkTarget(Path)}
2255   */
2256  protected Path resolveLink(Path f) throws IOException {
2257    // Supporting filesystems should override this method
2258    throw new UnsupportedOperationException(
2259        "Filesystem does not support symlinks!");
2260  }
2261
2262  /**
2263   * Get the checksum of a file.
2264   *
2265   * @param f The file path
2266   * @return The file checksum.  The default return value is null,
2267   *  which indicates that no checksum algorithm is implemented
2268   *  in the corresponding FileSystem.
2269   */
2270  public FileChecksum getFileChecksum(Path f) throws IOException {
2271    return getFileChecksum(f, Long.MAX_VALUE);
2272  }
2273
2274  /**
2275   * Get the checksum of a file, from the beginning of the file till the
2276   * specific length.
2277   * @param f The file path
2278   * @param length The length of the file range for checksum calculation
2279   * @return The file checksum.
2280   */
2281  public FileChecksum getFileChecksum(Path f, final long length)
2282      throws IOException {
2283    return null;
2284  }
2285
2286  /**
2287   * Set the verify checksum flag. This is only applicable if the 
2288   * corresponding FileSystem supports checksum. By default doesn't do anything.
2289   * @param verifyChecksum
2290   */
2291  public void setVerifyChecksum(boolean verifyChecksum) {
2292    //doesn't do anything
2293  }
2294
2295  /**
2296   * Set the write checksum flag. This is only applicable if the 
2297   * corresponding FileSystem supports checksum. By default doesn't do anything.
2298   * @param writeChecksum
2299   */
2300  public void setWriteChecksum(boolean writeChecksum) {
2301    //doesn't do anything
2302  }
2303
2304  /**
2305   * Returns a status object describing the use and capacity of the
2306   * file system. If the file system has multiple partitions, the
2307   * use and capacity of the root partition is reflected.
2308   * 
2309   * @return a FsStatus object
2310   * @throws IOException
2311   *           see specific implementation
2312   */
2313  public FsStatus getStatus() throws IOException {
2314    return getStatus(null);
2315  }
2316
2317  /**
2318   * Returns a status object describing the use and capacity of the
2319   * file system. If the file system has multiple partitions, the
2320   * use and capacity of the partition pointed to by the specified
2321   * path is reflected.
2322   * @param p Path for which status should be obtained. null means
2323   * the default partition. 
2324   * @return a FsStatus object
2325   * @throws IOException
2326   *           see specific implementation
2327   */
2328  public FsStatus getStatus(Path p) throws IOException {
2329    return new FsStatus(Long.MAX_VALUE, 0, Long.MAX_VALUE);
2330  }
2331
2332  /**
2333   * Set permission of a path.
2334   * @param p
2335   * @param permission
2336   */
2337  public void setPermission(Path p, FsPermission permission
2338      ) throws IOException {
2339  }
2340
2341  /**
2342   * Set owner of a path (i.e. a file or a directory).
2343   * The parameters username and groupname cannot both be null.
2344   * @param p The path
2345   * @param username If it is null, the original username remains unchanged.
2346   * @param groupname If it is null, the original groupname remains unchanged.
2347   */
2348  public void setOwner(Path p, String username, String groupname
2349      ) throws IOException {
2350  }
2351
2352  /**
2353   * Set access time of a file
2354   * @param p The path
2355   * @param mtime Set the modification time of this file.
2356   *              The number of milliseconds since Jan 1, 1970. 
2357   *              A value of -1 means that this call should not set modification time.
2358   * @param atime Set the access time of this file.
2359   *              The number of milliseconds since Jan 1, 1970. 
2360   *              A value of -1 means that this call should not set access time.
2361   */
2362  public void setTimes(Path p, long mtime, long atime
2363      ) throws IOException {
2364  }
2365
2366  /**
2367   * Create a snapshot with a default name.
2368   * @param path The directory where snapshots will be taken.
2369   * @return the snapshot path.
2370   */
2371  public final Path createSnapshot(Path path) throws IOException {
2372    return createSnapshot(path, null);
2373  }
2374
2375  /**
2376   * Create a snapshot
2377   * @param path The directory where snapshots will be taken.
2378   * @param snapshotName The name of the snapshot
2379   * @return the snapshot path.
2380   */
2381  public Path createSnapshot(Path path, String snapshotName)
2382      throws IOException {
2383    throw new UnsupportedOperationException(getClass().getSimpleName()
2384        + " doesn't support createSnapshot");
2385  }
2386  
2387  /**
2388   * Rename a snapshot
2389   * @param path The directory path where the snapshot was taken
2390   * @param snapshotOldName Old name of the snapshot
2391   * @param snapshotNewName New name of the snapshot
2392   * @throws IOException
2393   */
2394  public void renameSnapshot(Path path, String snapshotOldName,
2395      String snapshotNewName) throws IOException {
2396    throw new UnsupportedOperationException(getClass().getSimpleName()
2397        + " doesn't support renameSnapshot");
2398  }
2399  
2400  /**
2401   * Delete a snapshot of a directory
2402   * @param path  The directory that the to-be-deleted snapshot belongs to
2403   * @param snapshotName The name of the snapshot
2404   */
2405  public void deleteSnapshot(Path path, String snapshotName)
2406      throws IOException {
2407    throw new UnsupportedOperationException(getClass().getSimpleName()
2408        + " doesn't support deleteSnapshot");
2409  }
2410  
2411  /**
2412   * Modifies ACL entries of files and directories.  This method can add new ACL
2413   * entries or modify the permissions on existing ACL entries.  All existing
2414   * ACL entries that are not specified in this call are retained without
2415   * changes.  (Modifications are merged into the current ACL.)
2416   *
2417   * @param path Path to modify
2418   * @param aclSpec List<AclEntry> describing modifications
2419   * @throws IOException if an ACL could not be modified
2420   */
2421  public void modifyAclEntries(Path path, List<AclEntry> aclSpec)
2422      throws IOException {
2423    throw new UnsupportedOperationException(getClass().getSimpleName()
2424        + " doesn't support modifyAclEntries");
2425  }
2426
2427  /**
2428   * Removes ACL entries from files and directories.  Other ACL entries are
2429   * retained.
2430   *
2431   * @param path Path to modify
2432   * @param aclSpec List<AclEntry> describing entries to remove
2433   * @throws IOException if an ACL could not be modified
2434   */
2435  public void removeAclEntries(Path path, List<AclEntry> aclSpec)
2436      throws IOException {
2437    throw new UnsupportedOperationException(getClass().getSimpleName()
2438        + " doesn't support removeAclEntries");
2439  }
2440
2441  /**
2442   * Removes all default ACL entries from files and directories.
2443   *
2444   * @param path Path to modify
2445   * @throws IOException if an ACL could not be modified
2446   */
2447  public void removeDefaultAcl(Path path)
2448      throws IOException {
2449    throw new UnsupportedOperationException(getClass().getSimpleName()
2450        + " doesn't support removeDefaultAcl");
2451  }
2452
2453  /**
2454   * Removes all but the base ACL entries of files and directories.  The entries
2455   * for user, group, and others are retained for compatibility with permission
2456   * bits.
2457   *
2458   * @param path Path to modify
2459   * @throws IOException if an ACL could not be removed
2460   */
2461  public void removeAcl(Path path)
2462      throws IOException {
2463    throw new UnsupportedOperationException(getClass().getSimpleName()
2464        + " doesn't support removeAcl");
2465  }
2466
2467  /**
2468   * Fully replaces ACL of files and directories, discarding all existing
2469   * entries.
2470   *
2471   * @param path Path to modify
2472   * @param aclSpec List<AclEntry> describing modifications, must include entries
2473   *   for user, group, and others for compatibility with permission bits.
2474   * @throws IOException if an ACL could not be modified
2475   */
2476  public void setAcl(Path path, List<AclEntry> aclSpec) throws IOException {
2477    throw new UnsupportedOperationException(getClass().getSimpleName()
2478        + " doesn't support setAcl");
2479  }
2480
2481  /**
2482   * Gets the ACL of a file or directory.
2483   *
2484   * @param path Path to get
2485   * @return AclStatus describing the ACL of the file or directory
2486   * @throws IOException if an ACL could not be read
2487   */
2488  public AclStatus getAclStatus(Path path) throws IOException {
2489    throw new UnsupportedOperationException(getClass().getSimpleName()
2490        + " doesn't support getAclStatus");
2491  }
2492
2493  /**
2494   * Set an xattr of a file or directory.
2495   * The name must be prefixed with the namespace followed by ".". For example,
2496   * "user.attr".
2497   * <p/>
2498   * Refer to the HDFS extended attributes user documentation for details.
2499   *
2500   * @param path Path to modify
2501   * @param name xattr name.
2502   * @param value xattr value.
2503   * @throws IOException
2504   */
2505  public void setXAttr(Path path, String name, byte[] value)
2506      throws IOException {
2507    setXAttr(path, name, value, EnumSet.of(XAttrSetFlag.CREATE,
2508        XAttrSetFlag.REPLACE));
2509  }
2510
2511  /**
2512   * Set an xattr of a file or directory.
2513   * The name must be prefixed with the namespace followed by ".". For example,
2514   * "user.attr".
2515   * <p/>
2516   * Refer to the HDFS extended attributes user documentation for details.
2517   *
2518   * @param path Path to modify
2519   * @param name xattr name.
2520   * @param value xattr value.
2521   * @param flag xattr set flag
2522   * @throws IOException
2523   */
2524  public void setXAttr(Path path, String name, byte[] value,
2525      EnumSet<XAttrSetFlag> flag) throws IOException {
2526    throw new UnsupportedOperationException(getClass().getSimpleName()
2527        + " doesn't support setXAttr");
2528  }
2529
2530  /**
2531   * Get an xattr name and value for a file or directory.
2532   * The name must be prefixed with the namespace followed by ".". For example,
2533   * "user.attr".
2534   * <p/>
2535   * Refer to the HDFS extended attributes user documentation for details.
2536   *
2537   * @param path Path to get extended attribute
2538   * @param name xattr name.
2539   * @return byte[] xattr value.
2540   * @throws IOException
2541   */
2542  public byte[] getXAttr(Path path, String name) throws IOException {
2543    throw new UnsupportedOperationException(getClass().getSimpleName()
2544        + " doesn't support getXAttr");
2545  }
2546
2547  /**
2548   * Get all of the xattr name/value pairs for a file or directory.
2549   * Only those xattrs which the logged-in user has permissions to view
2550   * are returned.
2551   * <p/>
2552   * Refer to the HDFS extended attributes user documentation for details.
2553   *
2554   * @param path Path to get extended attributes
2555   * @return Map<String, byte[]> describing the XAttrs of the file or directory
2556   * @throws IOException
2557   */
2558  public Map<String, byte[]> getXAttrs(Path path) throws IOException {
2559    throw new UnsupportedOperationException(getClass().getSimpleName()
2560        + " doesn't support getXAttrs");
2561  }
2562
2563  /**
2564   * Get all of the xattrs name/value pairs for a file or directory.
2565   * Only those xattrs which the logged-in user has permissions to view
2566   * are returned.
2567   * <p/>
2568   * Refer to the HDFS extended attributes user documentation for details.
2569   *
2570   * @param path Path to get extended attributes
2571   * @param names XAttr names.
2572   * @return Map<String, byte[]> describing the XAttrs of the file or directory
2573   * @throws IOException
2574   */
2575  public Map<String, byte[]> getXAttrs(Path path, List<String> names)
2576      throws IOException {
2577    throw new UnsupportedOperationException(getClass().getSimpleName()
2578        + " doesn't support getXAttrs");
2579  }
2580
2581  /**
2582   * Get all of the xattr names for a file or directory.
2583   * Only those xattr names which the logged-in user has permissions to view
2584   * are returned.
2585   * <p/>
2586   * Refer to the HDFS extended attributes user documentation for details.
2587   *
2588   * @param path Path to get extended attributes
2589   * @return List<String> of the XAttr names of the file or directory
2590   * @throws IOException
2591   */
2592  public List<String> listXAttrs(Path path) throws IOException {
2593    throw new UnsupportedOperationException(getClass().getSimpleName()
2594            + " doesn't support listXAttrs");
2595  }
2596
2597  /**
2598   * Remove an xattr of a file or directory.
2599   * The name must be prefixed with the namespace followed by ".". For example,
2600   * "user.attr".
2601   * <p/>
2602   * Refer to the HDFS extended attributes user documentation for details.
2603   *
2604   * @param path Path to remove extended attribute
2605   * @param name xattr name
2606   * @throws IOException
2607   */
2608  public void removeXAttr(Path path, String name) throws IOException {
2609    throw new UnsupportedOperationException(getClass().getSimpleName()
2610        + " doesn't support removeXAttr");
2611  }
2612
2613  // making it volatile to be able to do a double checked locking
2614  private volatile static boolean FILE_SYSTEMS_LOADED = false;
2615
2616  private static final Map<String, Class<? extends FileSystem>>
2617    SERVICE_FILE_SYSTEMS = new HashMap<String, Class<? extends FileSystem>>();
2618
2619  private static void loadFileSystems() {
2620    synchronized (FileSystem.class) {
2621      if (!FILE_SYSTEMS_LOADED) {
2622        ServiceLoader<FileSystem> serviceLoader = ServiceLoader.load(FileSystem.class);
2623        for (FileSystem fs : serviceLoader) {
2624          SERVICE_FILE_SYSTEMS.put(fs.getScheme(), fs.getClass());
2625        }
2626        FILE_SYSTEMS_LOADED = true;
2627      }
2628    }
2629  }
2630
2631  public static Class<? extends FileSystem> getFileSystemClass(String scheme,
2632      Configuration conf) throws IOException {
2633    if (!FILE_SYSTEMS_LOADED) {
2634      loadFileSystems();
2635    }
2636    Class<? extends FileSystem> clazz = null;
2637    if (conf != null) {
2638      clazz = (Class<? extends FileSystem>) conf.getClass("fs." + scheme + ".impl", null);
2639    }
2640    if (clazz == null) {
2641      clazz = SERVICE_FILE_SYSTEMS.get(scheme);
2642    }
2643    if (clazz == null) {
2644      throw new IOException("No FileSystem for scheme: " + scheme);
2645    }
2646    return clazz;
2647  }
2648
2649  private static FileSystem createFileSystem(URI uri, Configuration conf
2650      ) throws IOException {
2651    Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);
2652    FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
2653    fs.initialize(uri, conf);
2654    return fs;
2655  }
2656
2657  /** Caching FileSystem objects */
2658  static class Cache {
2659    private final ClientFinalizer clientFinalizer = new ClientFinalizer();
2660
2661    private final Map<Key, FileSystem> map = new HashMap<Key, FileSystem>();
2662    private final Set<Key> toAutoClose = new HashSet<Key>();
2663
2664    /** A variable that makes all objects in the cache unique */
2665    private static AtomicLong unique = new AtomicLong(1);
2666
2667    FileSystem get(URI uri, Configuration conf) throws IOException{
2668      Key key = new Key(uri, conf);
2669      return getInternal(uri, conf, key);
2670    }
2671
2672    /** The objects inserted into the cache using this method are all unique */
2673    FileSystem getUnique(URI uri, Configuration conf) throws IOException{
2674      Key key = new Key(uri, conf, unique.getAndIncrement());
2675      return getInternal(uri, conf, key);
2676    }
2677
2678    private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{
2679      FileSystem fs;
2680      synchronized (this) {
2681        fs = map.get(key);
2682      }
2683      if (fs != null) {
2684        return fs;
2685      }
2686
2687      fs = createFileSystem(uri, conf);
2688      synchronized (this) { // refetch the lock again
2689        FileSystem oldfs = map.get(key);
2690        if (oldfs != null) { // a file system is created while lock is releasing
2691          fs.close(); // close the new file system
2692          return oldfs;  // return the old file system
2693        }
2694        
2695        // now insert the new file system into the map
2696        if (map.isEmpty()
2697                && !ShutdownHookManager.get().isShutdownInProgress()) {
2698          ShutdownHookManager.get().addShutdownHook(clientFinalizer, SHUTDOWN_HOOK_PRIORITY);
2699        }
2700        fs.key = key;
2701        map.put(key, fs);
2702        if (conf.getBoolean("fs.automatic.close", true)) {
2703          toAutoClose.add(key);
2704        }
2705        return fs;
2706      }
2707    }
2708
2709    synchronized void remove(Key key, FileSystem fs) {
2710      if (map.containsKey(key) && fs == map.get(key)) {
2711        map.remove(key);
2712        toAutoClose.remove(key);
2713        }
2714    }
2715
2716    synchronized void closeAll() throws IOException {
2717      closeAll(false);
2718    }
2719
2720    /**
2721     * Close all FileSystem instances in the Cache.
2722     * @param onlyAutomatic only close those that are marked for automatic closing
2723     */
2724    synchronized void closeAll(boolean onlyAutomatic) throws IOException {
2725      List<IOException> exceptions = new ArrayList<IOException>();
2726
2727      // Make a copy of the keys in the map since we'll be modifying
2728      // the map while iterating over it, which isn't safe.
2729      List<Key> keys = new ArrayList<Key>();
2730      keys.addAll(map.keySet());
2731
2732      for (Key key : keys) {
2733        final FileSystem fs = map.get(key);
2734
2735        if (onlyAutomatic && !toAutoClose.contains(key)) {
2736          continue;
2737        }
2738
2739        //remove from cache
2740        remove(key, fs);
2741
2742        if (fs != null) {
2743          try {
2744            fs.close();
2745          }
2746          catch(IOException ioe) {
2747            exceptions.add(ioe);
2748          }
2749        }
2750      }
2751
2752      if (!exceptions.isEmpty()) {
2753        throw MultipleIOException.createIOException(exceptions);
2754      }
2755    }
2756
2757    private class ClientFinalizer implements Runnable {
2758      @Override
2759      public synchronized void run() {
2760        try {
2761          closeAll(true);
2762        } catch (IOException e) {
2763          LOG.info("FileSystem.Cache.closeAll() threw an exception:\n" + e);
2764        }
2765      }
2766    }
2767
2768    synchronized void closeAll(UserGroupInformation ugi) throws IOException {
2769      List<FileSystem> targetFSList = new ArrayList<FileSystem>();
2770      //Make a pass over the list and collect the filesystems to close
2771      //we cannot close inline since close() removes the entry from the Map
2772      for (Map.Entry<Key, FileSystem> entry : map.entrySet()) {
2773        final Key key = entry.getKey();
2774        final FileSystem fs = entry.getValue();
2775        if (ugi.equals(key.ugi) && fs != null) {
2776          targetFSList.add(fs);   
2777        }
2778      }
2779      List<IOException> exceptions = new ArrayList<IOException>();
2780      //now make a pass over the target list and close each
2781      for (FileSystem fs : targetFSList) {
2782        try {
2783          fs.close();
2784        }
2785        catch(IOException ioe) {
2786          exceptions.add(ioe);
2787        }
2788      }
2789      if (!exceptions.isEmpty()) {
2790        throw MultipleIOException.createIOException(exceptions);
2791      }
2792    }
2793
2794    /** FileSystem.Cache.Key */
2795    static class Key {
2796      final String scheme;
2797      final String authority;
2798      final UserGroupInformation ugi;
2799      final long unique;   // an artificial way to make a key unique
2800
2801      Key(URI uri, Configuration conf) throws IOException {
2802        this(uri, conf, 0);
2803      }
2804
2805      Key(URI uri, Configuration conf, long unique) throws IOException {
2806        scheme = uri.getScheme()==null ?
2807            "" : StringUtils.toLowerCase(uri.getScheme());
2808        authority = uri.getAuthority()==null ?
2809            "" : StringUtils.toLowerCase(uri.getAuthority());
2810        this.unique = unique;
2811        
2812        this.ugi = UserGroupInformation.getCurrentUser();
2813      }
2814
2815      @Override
2816      public int hashCode() {
2817        return (scheme + authority).hashCode() + ugi.hashCode() + (int)unique;
2818      }
2819
2820      static boolean isEqual(Object a, Object b) {
2821        return a == b || (a != null && a.equals(b));        
2822      }
2823
2824      @Override
2825      public boolean equals(Object obj) {
2826        if (obj == this) {
2827          return true;
2828        }
2829        if (obj != null && obj instanceof Key) {
2830          Key that = (Key)obj;
2831          return isEqual(this.scheme, that.scheme)
2832                 && isEqual(this.authority, that.authority)
2833                 && isEqual(this.ugi, that.ugi)
2834                 && (this.unique == that.unique);
2835        }
2836        return false;        
2837      }
2838
2839      @Override
2840      public String toString() {
2841        return "("+ugi.toString() + ")@" + scheme + "://" + authority;        
2842      }
2843    }
2844  }
2845  
2846  /**
2847   * Tracks statistics about how many reads, writes, and so forth have been
2848   * done in a FileSystem.
2849   * 
2850   * Since there is only one of these objects per FileSystem, there will 
2851   * typically be many threads writing to this object.  Almost every operation
2852   * on an open file will involve a write to this object.  In contrast, reading
2853   * statistics is done infrequently by most programs, and not at all by others.
2854   * Hence, this is optimized for writes.
2855   * 
2856   * Each thread writes to its own thread-local area of memory.  This removes 
2857   * contention and allows us to scale up to many, many threads.  To read
2858   * statistics, the reader thread totals up the contents of all of the 
2859   * thread-local data areas.
2860   */
2861  public static final class Statistics {
2862    /**
2863     * Statistics data.
2864     * 
2865     * There is only a single writer to thread-local StatisticsData objects.
2866     * Hence, volatile is adequate here-- we do not need AtomicLong or similar
2867     * to prevent lost updates.
2868     * The Java specification guarantees that updates to volatile longs will
2869     * be perceived as atomic with respect to other threads, which is all we
2870     * need.
2871     */
2872    public static class StatisticsData {
2873      volatile long bytesRead;
2874      volatile long bytesWritten;
2875      volatile int readOps;
2876      volatile int largeReadOps;
2877      volatile int writeOps;
2878      /**
2879       * Stores a weak reference to the thread owning this StatisticsData.
2880       * This allows us to remove StatisticsData objects that pertain to
2881       * threads that no longer exist.
2882       */
2883      final WeakReference<Thread> owner;
2884
2885      StatisticsData(WeakReference<Thread> owner) {
2886        this.owner = owner;
2887      }
2888
2889      /**
2890       * Add another StatisticsData object to this one.
2891       */
2892      void add(StatisticsData other) {
2893        this.bytesRead += other.bytesRead;
2894        this.bytesWritten += other.bytesWritten;
2895        this.readOps += other.readOps;
2896        this.largeReadOps += other.largeReadOps;
2897        this.writeOps += other.writeOps;
2898      }
2899
2900      /**
2901       * Negate the values of all statistics.
2902       */
2903      void negate() {
2904        this.bytesRead = -this.bytesRead;
2905        this.bytesWritten = -this.bytesWritten;
2906        this.readOps = -this.readOps;
2907        this.largeReadOps = -this.largeReadOps;
2908        this.writeOps = -this.writeOps;
2909      }
2910
2911      @Override
2912      public String toString() {
2913        return bytesRead + " bytes read, " + bytesWritten + " bytes written, "
2914            + readOps + " read ops, " + largeReadOps + " large read ops, "
2915            + writeOps + " write ops";
2916      }
2917      
2918      public long getBytesRead() {
2919        return bytesRead;
2920      }
2921      
2922      public long getBytesWritten() {
2923        return bytesWritten;
2924      }
2925      
2926      public int getReadOps() {
2927        return readOps;
2928      }
2929      
2930      public int getLargeReadOps() {
2931        return largeReadOps;
2932      }
2933      
2934      public int getWriteOps() {
2935        return writeOps;
2936      }
2937    }
2938
2939    private interface StatisticsAggregator<T> {
2940      void accept(StatisticsData data);
2941      T aggregate();
2942    }
2943
2944    private final String scheme;
2945
2946    /**
2947     * rootData is data that doesn't belong to any thread, but will be added
2948     * to the totals.  This is useful for making copies of Statistics objects,
2949     * and for storing data that pertains to threads that have been garbage
2950     * collected.  Protected by the Statistics lock.
2951     */
2952    private final StatisticsData rootData;
2953
2954    /**
2955     * Thread-local data.
2956     */
2957    private final ThreadLocal<StatisticsData> threadData;
2958    
2959    /**
2960     * List of all thread-local data areas.  Protected by the Statistics lock.
2961     */
2962    private LinkedList<StatisticsData> allData;
2963
2964    public Statistics(String scheme) {
2965      this.scheme = scheme;
2966      this.rootData = new StatisticsData(null);
2967      this.threadData = new ThreadLocal<StatisticsData>();
2968      this.allData = null;
2969    }
2970
2971    /**
2972     * Copy constructor.
2973     * 
2974     * @param other    The input Statistics object which is cloned.
2975     */
2976    public Statistics(Statistics other) {
2977      this.scheme = other.scheme;
2978      this.rootData = new StatisticsData(null);
2979      other.visitAll(new StatisticsAggregator<Void>() {
2980        @Override
2981        public void accept(StatisticsData data) {
2982          rootData.add(data);
2983        }
2984
2985        public Void aggregate() {
2986          return null;
2987        }
2988      });
2989      this.threadData = new ThreadLocal<StatisticsData>();
2990    }
2991
2992    /**
2993     * Get or create the thread-local data associated with the current thread.
2994     */
2995    public StatisticsData getThreadStatistics() {
2996      StatisticsData data = threadData.get();
2997      if (data == null) {
2998        data = new StatisticsData(
2999            new WeakReference<Thread>(Thread.currentThread()));
3000        threadData.set(data);
3001        synchronized(this) {
3002          if (allData == null) {
3003            allData = new LinkedList<StatisticsData>();
3004          }
3005          allData.add(data);
3006        }
3007      }
3008      return data;
3009    }
3010
3011    /**
3012     * Increment the bytes read in the statistics
3013     * @param newBytes the additional bytes read
3014     */
3015    public void incrementBytesRead(long newBytes) {
3016      getThreadStatistics().bytesRead += newBytes;
3017    }
3018    
3019    /**
3020     * Increment the bytes written in the statistics
3021     * @param newBytes the additional bytes written
3022     */
3023    public void incrementBytesWritten(long newBytes) {
3024      getThreadStatistics().bytesWritten += newBytes;
3025    }
3026    
3027    /**
3028     * Increment the number of read operations
3029     * @param count number of read operations
3030     */
3031    public void incrementReadOps(int count) {
3032      getThreadStatistics().readOps += count;
3033    }
3034
3035    /**
3036     * Increment the number of large read operations
3037     * @param count number of large read operations
3038     */
3039    public void incrementLargeReadOps(int count) {
3040      getThreadStatistics().largeReadOps += count;
3041    }
3042
3043    /**
3044     * Increment the number of write operations
3045     * @param count number of write operations
3046     */
3047    public void incrementWriteOps(int count) {
3048      getThreadStatistics().writeOps += count;
3049    }
3050
3051    /**
3052     * Apply the given aggregator to all StatisticsData objects associated with
3053     * this Statistics object.
3054     *
3055     * For each StatisticsData object, we will call accept on the visitor.
3056     * Finally, at the end, we will call aggregate to get the final total. 
3057     *
3058     * @param         The visitor to use.
3059     * @return        The total.
3060     */
3061    private synchronized <T> T visitAll(StatisticsAggregator<T> visitor) {
3062      visitor.accept(rootData);
3063      if (allData != null) {
3064        for (Iterator<StatisticsData> iter = allData.iterator();
3065            iter.hasNext(); ) {
3066          StatisticsData data = iter.next();
3067          visitor.accept(data);
3068          if (data.owner.get() == null) {
3069            /*
3070             * If the thread that created this thread-local data no
3071             * longer exists, remove the StatisticsData from our list
3072             * and fold the values into rootData.
3073             */
3074            rootData.add(data);
3075            iter.remove();
3076          }
3077        }
3078      }
3079      return visitor.aggregate();
3080    }
3081
3082    /**
3083     * Get the total number of bytes read
3084     * @return the number of bytes
3085     */
3086    public long getBytesRead() {
3087      return visitAll(new StatisticsAggregator<Long>() {
3088        private long bytesRead = 0;
3089
3090        @Override
3091        public void accept(StatisticsData data) {
3092          bytesRead += data.bytesRead;
3093        }
3094
3095        public Long aggregate() {
3096          return bytesRead;
3097        }
3098      });
3099    }
3100    
3101    /**
3102     * Get the total number of bytes written
3103     * @return the number of bytes
3104     */
3105    public long getBytesWritten() {
3106      return visitAll(new StatisticsAggregator<Long>() {
3107        private long bytesWritten = 0;
3108
3109        @Override
3110        public void accept(StatisticsData data) {
3111          bytesWritten += data.bytesWritten;
3112        }
3113
3114        public Long aggregate() {
3115          return bytesWritten;
3116        }
3117      });
3118    }
3119    
3120    /**
3121     * Get the number of file system read operations such as list files
3122     * @return number of read operations
3123     */
3124    public int getReadOps() {
3125      return visitAll(new StatisticsAggregator<Integer>() {
3126        private int readOps = 0;
3127
3128        @Override
3129        public void accept(StatisticsData data) {
3130          readOps += data.readOps;
3131          readOps += data.largeReadOps;
3132        }
3133
3134        public Integer aggregate() {
3135          return readOps;
3136        }
3137      });
3138    }
3139
3140    /**
3141     * Get the number of large file system read operations such as list files
3142     * under a large directory
3143     * @return number of large read operations
3144     */
3145    public int getLargeReadOps() {
3146      return visitAll(new StatisticsAggregator<Integer>() {
3147        private int largeReadOps = 0;
3148
3149        @Override
3150        public void accept(StatisticsData data) {
3151          largeReadOps += data.largeReadOps;
3152        }
3153
3154        public Integer aggregate() {
3155          return largeReadOps;
3156        }
3157      });
3158    }
3159
3160    /**
3161     * Get the number of file system write operations such as create, append 
3162     * rename etc.
3163     * @return number of write operations
3164     */
3165    public int getWriteOps() {
3166      return visitAll(new StatisticsAggregator<Integer>() {
3167        private int writeOps = 0;
3168
3169        @Override
3170        public void accept(StatisticsData data) {
3171          writeOps += data.writeOps;
3172        }
3173
3174        public Integer aggregate() {
3175          return writeOps;
3176        }
3177      });
3178    }
3179
3180
3181    @Override
3182    public String toString() {
3183      return visitAll(new StatisticsAggregator<String>() {
3184        private StatisticsData total = new StatisticsData(null);
3185
3186        @Override
3187        public void accept(StatisticsData data) {
3188          total.add(data);
3189        }
3190
3191        public String aggregate() {
3192          return total.toString();
3193        }
3194      });
3195    }
3196
3197    /**
3198     * Resets all statistics to 0.
3199     *
3200     * In order to reset, we add up all the thread-local statistics data, and
3201     * set rootData to the negative of that.
3202     *
3203     * This may seem like a counterintuitive way to reset the statsitics.  Why
3204     * can't we just zero out all the thread-local data?  Well, thread-local
3205     * data can only be modified by the thread that owns it.  If we tried to
3206     * modify the thread-local data from this thread, our modification might get
3207     * interleaved with a read-modify-write operation done by the thread that
3208     * owns the data.  That would result in our update getting lost.
3209     *
3210     * The approach used here avoids this problem because it only ever reads
3211     * (not writes) the thread-local data.  Both reads and writes to rootData
3212     * are done under the lock, so we're free to modify rootData from any thread
3213     * that holds the lock.
3214     */
3215    public void reset() {
3216      visitAll(new StatisticsAggregator<Void>() {
3217        private StatisticsData total = new StatisticsData(null);
3218
3219        @Override
3220        public void accept(StatisticsData data) {
3221          total.add(data);
3222        }
3223
3224        public Void aggregate() {
3225          total.negate();
3226          rootData.add(total);
3227          return null;
3228        }
3229      });
3230    }
3231    
3232    /**
3233     * Get the uri scheme associated with this statistics object.
3234     * @return the schema associated with this set of statistics
3235     */
3236    public String getScheme() {
3237      return scheme;
3238    }
3239  }
3240  
3241  /**
3242   * Get the Map of Statistics object indexed by URI Scheme.
3243   * @return a Map having a key as URI scheme and value as Statistics object
3244   * @deprecated use {@link #getAllStatistics} instead
3245   */
3246  @Deprecated
3247  public static synchronized Map<String, Statistics> getStatistics() {
3248    Map<String, Statistics> result = new HashMap<String, Statistics>();
3249    for(Statistics stat: statisticsTable.values()) {
3250      result.put(stat.getScheme(), stat);
3251    }
3252    return result;
3253  }
3254
3255  /**
3256   * Return the FileSystem classes that have Statistics
3257   */
3258  public static synchronized List<Statistics> getAllStatistics() {
3259    return new ArrayList<Statistics>(statisticsTable.values());
3260  }
3261  
3262  /**
3263   * Get the statistics for a particular file system
3264   * @param cls the class to lookup
3265   * @return a statistics object
3266   */
3267  public static synchronized 
3268  Statistics getStatistics(String scheme, Class<? extends FileSystem> cls) {
3269    Statistics result = statisticsTable.get(cls);
3270    if (result == null) {
3271      result = new Statistics(scheme);
3272      statisticsTable.put(cls, result);
3273    }
3274    return result;
3275  }
3276  
3277  /**
3278   * Reset all statistics for all file systems
3279   */
3280  public static synchronized void clearStatistics() {
3281    for(Statistics stat: statisticsTable.values()) {
3282      stat.reset();
3283    }
3284  }
3285
3286  /**
3287   * Print all statistics for all file systems
3288   */
3289  public static synchronized
3290  void printStatistics() throws IOException {
3291    for (Map.Entry<Class<? extends FileSystem>, Statistics> pair: 
3292            statisticsTable.entrySet()) {
3293      System.out.println("  FileSystem " + pair.getKey().getName() + 
3294                         ": " + pair.getValue());
3295    }
3296  }
3297
3298  // Symlinks are temporarily disabled - see HADOOP-10020 and HADOOP-10052
3299  private static boolean symlinksEnabled = false;
3300
3301  private static Configuration conf = null;
3302
3303  @VisibleForTesting
3304  public static boolean areSymlinksEnabled() {
3305    return symlinksEnabled;
3306  }
3307
3308  @VisibleForTesting
3309  public static void enableSymlinks() {
3310    symlinksEnabled = true;
3311  }
3312}