001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.fs;
019
020import java.io.Closeable;
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.lang.ref.WeakReference;
024import java.lang.ref.ReferenceQueue;
025import java.net.URI;
026import java.net.URISyntaxException;
027import java.security.PrivilegedExceptionAction;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.EnumSet;
031import java.util.HashMap;
032import java.util.HashSet;
033import java.util.IdentityHashMap;
034import java.util.Iterator;
035import java.util.List;
036import java.util.Map;
037import java.util.NoSuchElementException;
038import java.util.ServiceConfigurationError;
039import java.util.ServiceLoader;
040import java.util.Set;
041import java.util.Stack;
042import java.util.TreeSet;
043import java.util.concurrent.atomic.AtomicLong;
044
045import org.apache.commons.logging.Log;
046import org.apache.commons.logging.LogFactory;
047import org.apache.hadoop.classification.InterfaceAudience;
048import org.apache.hadoop.classification.InterfaceStability;
049import org.apache.hadoop.conf.Configuration;
050import org.apache.hadoop.conf.Configured;
051import org.apache.hadoop.fs.Options.ChecksumOpt;
052import org.apache.hadoop.fs.Options.Rename;
053import org.apache.hadoop.fs.permission.AclEntry;
054import org.apache.hadoop.fs.permission.AclStatus;
055import org.apache.hadoop.fs.permission.FsAction;
056import org.apache.hadoop.fs.permission.FsPermission;
057import org.apache.hadoop.io.MultipleIOException;
058import org.apache.hadoop.io.Text;
059import org.apache.hadoop.net.NetUtils;
060import org.apache.hadoop.security.AccessControlException;
061import org.apache.hadoop.security.Credentials;
062import org.apache.hadoop.security.SecurityUtil;
063import org.apache.hadoop.security.UserGroupInformation;
064import org.apache.hadoop.security.token.Token;
065import org.apache.hadoop.util.ClassUtil;
066import org.apache.hadoop.util.DataChecksum;
067import org.apache.hadoop.util.Progressable;
068import org.apache.hadoop.util.ReflectionUtils;
069import org.apache.hadoop.util.ShutdownHookManager;
070import org.apache.hadoop.util.StringUtils;
071
072import com.google.common.annotations.VisibleForTesting;
073
074/****************************************************************
075 * An abstract base class for a fairly generic filesystem.  It
076 * may be implemented as a distributed filesystem, or as a "local"
077 * one that reflects the locally-connected disk.  The local version
078 * exists for small Hadoop instances and for testing.
079 *
080 * <p>
081 *
082 * All user code that may potentially use the Hadoop Distributed
083 * File System should be written to use a FileSystem object.  The
084 * Hadoop DFS is a multi-machine system that appears as a single
085 * disk.  It's useful because of its fault tolerance and potentially
086 * very large capacity.
087 * 
088 * <p>
089 * The local implementation is {@link LocalFileSystem} and distributed
090 * implementation is DistributedFileSystem.
091 *****************************************************************/
092@InterfaceAudience.Public
093@InterfaceStability.Stable
094public abstract class FileSystem extends Configured implements Closeable {
095  public static final String FS_DEFAULT_NAME_KEY = 
096                   CommonConfigurationKeys.FS_DEFAULT_NAME_KEY;
097  public static final String DEFAULT_FS = 
098                   CommonConfigurationKeys.FS_DEFAULT_NAME_DEFAULT;
099
100  public static final Log LOG = LogFactory.getLog(FileSystem.class);
101
102  /**
103   * Priority of the FileSystem shutdown hook.
104   */
105  public static final int SHUTDOWN_HOOK_PRIORITY = 10;
106
107  /** FileSystem cache */
108  static final Cache CACHE = new Cache();
109
110  /** The key this instance is stored under in the cache. */
111  private Cache.Key key;
112
113  /** Recording statistics per a FileSystem class */
114  private static final Map<Class<? extends FileSystem>, Statistics> 
115    statisticsTable =
116      new IdentityHashMap<Class<? extends FileSystem>, Statistics>();
117  
118  /**
119   * The statistics for this file system.
120   */
121  protected Statistics statistics;
122
123  /**
124   * A cache of files that should be deleted when filsystem is closed
125   * or the JVM is exited.
126   */
127  private Set<Path> deleteOnExit = new TreeSet<Path>();
128  
129  boolean resolveSymlinks;
130  /**
131   * This method adds a file system for testing so that we can find it later. It
132   * is only for testing.
133   * @param uri the uri to store it under
134   * @param conf the configuration to store it under
135   * @param fs the file system to store
136   * @throws IOException
137   */
138  static void addFileSystemForTesting(URI uri, Configuration conf,
139      FileSystem fs) throws IOException {
140    CACHE.map.put(new Cache.Key(uri, conf), fs);
141  }
142
143  /**
144   * Get a filesystem instance based on the uri, the passed
145   * configuration and the user
146   * @param uri of the filesystem
147   * @param conf the configuration to use
148   * @param user to perform the get as
149   * @return the filesystem instance
150   * @throws IOException
151   * @throws InterruptedException
152   */
153  public static FileSystem get(final URI uri, final Configuration conf,
154        final String user) throws IOException, InterruptedException {
155    String ticketCachePath =
156      conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
157    UserGroupInformation ugi =
158        UserGroupInformation.getBestUGI(ticketCachePath, user);
159    return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
160      @Override
161      public FileSystem run() throws IOException {
162        return get(uri, conf);
163      }
164    });
165  }
166
167  /**
168   * Returns the configured filesystem implementation.
169   * @param conf the configuration to use
170   */
171  public static FileSystem get(Configuration conf) throws IOException {
172    return get(getDefaultUri(conf), conf);
173  }
174  
175  /** Get the default filesystem URI from a configuration.
176   * @param conf the configuration to use
177   * @return the uri of the default filesystem
178   */
179  public static URI getDefaultUri(Configuration conf) {
180    return URI.create(fixName(conf.get(FS_DEFAULT_NAME_KEY, DEFAULT_FS)));
181  }
182
183  /** Set the default filesystem URI in a configuration.
184   * @param conf the configuration to alter
185   * @param uri the new default filesystem uri
186   */
187  public static void setDefaultUri(Configuration conf, URI uri) {
188    conf.set(FS_DEFAULT_NAME_KEY, uri.toString());
189  }
190
191  /** Set the default filesystem URI in a configuration.
192   * @param conf the configuration to alter
193   * @param uri the new default filesystem uri
194   */
195  public static void setDefaultUri(Configuration conf, String uri) {
196    setDefaultUri(conf, URI.create(fixName(uri)));
197  }
198
199  /** Called after a new FileSystem instance is constructed.
200   * @param name a uri whose authority section names the host, port, etc.
201   *   for this FileSystem
202   * @param conf the configuration
203   */
204  public void initialize(URI name, Configuration conf) throws IOException {
205    statistics = getStatistics(name.getScheme(), getClass());    
206    resolveSymlinks = conf.getBoolean(
207        CommonConfigurationKeys.FS_CLIENT_RESOLVE_REMOTE_SYMLINKS_KEY,
208        CommonConfigurationKeys.FS_CLIENT_RESOLVE_REMOTE_SYMLINKS_DEFAULT);
209  }
210
211  /**
212   * Return the protocol scheme for the FileSystem.
213   * <p/>
214   * This implementation throws an <code>UnsupportedOperationException</code>.
215   *
216   * @return the protocol scheme for the FileSystem.
217   */
218  public String getScheme() {
219    throw new UnsupportedOperationException("Not implemented by the " + getClass().getSimpleName() + " FileSystem implementation");
220  }
221
222  /** Returns a URI whose scheme and authority identify this FileSystem.*/
223  public abstract URI getUri();
224  
225  /**
226   * Return a canonicalized form of this FileSystem's URI.
227   * 
228   * The default implementation simply calls {@link #canonicalizeUri(URI)}
229   * on the filesystem's own URI, so subclasses typically only need to
230   * implement that method.
231   *
232   * @see #canonicalizeUri(URI)
233   */
234  protected URI getCanonicalUri() {
235    return canonicalizeUri(getUri());
236  }
237  
238  /**
239   * Canonicalize the given URI.
240   * 
241   * This is filesystem-dependent, but may for example consist of
242   * canonicalizing the hostname using DNS and adding the default
243   * port if not specified.
244   * 
245   * The default implementation simply fills in the default port if
246   * not specified and if the filesystem has a default port.
247   *
248   * @return URI
249   * @see NetUtils#getCanonicalUri(URI, int)
250   */
251  protected URI canonicalizeUri(URI uri) {
252    if (uri.getPort() == -1 && getDefaultPort() > 0) {
253      // reconstruct the uri with the default port set
254      try {
255        uri = new URI(uri.getScheme(), uri.getUserInfo(),
256            uri.getHost(), getDefaultPort(),
257            uri.getPath(), uri.getQuery(), uri.getFragment());
258      } catch (URISyntaxException e) {
259        // Should never happen!
260        throw new AssertionError("Valid URI became unparseable: " +
261            uri);
262      }
263    }
264    
265    return uri;
266  }
267  
268  /**
269   * Get the default port for this file system.
270   * @return the default port or 0 if there isn't one
271   */
272  protected int getDefaultPort() {
273    return 0;
274  }
275
276  protected static FileSystem getFSofPath(final Path absOrFqPath,
277      final Configuration conf)
278      throws UnsupportedFileSystemException, IOException {
279    absOrFqPath.checkNotSchemeWithRelative();
280    absOrFqPath.checkNotRelative();
281
282    // Uses the default file system if not fully qualified
283    return get(absOrFqPath.toUri(), conf);
284  }
285
286  /**
287   * Get a canonical service name for this file system.  The token cache is
288   * the only user of the canonical service name, and uses it to lookup this
289   * filesystem's service tokens.
290   * If file system provides a token of its own then it must have a canonical
291   * name, otherwise canonical name can be null.
292   * 
293   * Default Impl: If the file system has child file systems 
294   * (such as an embedded file system) then it is assumed that the fs has no
295   * tokens of its own and hence returns a null name; otherwise a service
296   * name is built using Uri and port.
297   * 
298   * @return a service string that uniquely identifies this file system, null
299   *         if the filesystem does not implement tokens
300   * @see SecurityUtil#buildDTServiceName(URI, int) 
301   */
302  @InterfaceAudience.LimitedPrivate({ "HDFS", "MapReduce" })
303  public String getCanonicalServiceName() {
304    return (getChildFileSystems() == null)
305      ? SecurityUtil.buildDTServiceName(getUri(), getDefaultPort())
306      : null;
307  }
308
309  /** @deprecated call #getUri() instead.*/
310  @Deprecated
311  public String getName() { return getUri().toString(); }
312
313  /** @deprecated call #get(URI,Configuration) instead. */
314  @Deprecated
315  public static FileSystem getNamed(String name, Configuration conf)
316    throws IOException {
317    return get(URI.create(fixName(name)), conf);
318  }
319  
320  /** Update old-format filesystem names, for back-compatibility.  This should
321   * eventually be replaced with a checkName() method that throws an exception
322   * for old-format names. */ 
323  private static String fixName(String name) {
324    // convert old-format name to new-format name
325    if (name.equals("local")) {         // "local" is now "file:///".
326      LOG.warn("\"local\" is a deprecated filesystem name."
327               +" Use \"file:///\" instead.");
328      name = "file:///";
329    } else if (name.indexOf('/')==-1) {   // unqualified is "hdfs://"
330      LOG.warn("\""+name+"\" is a deprecated filesystem name."
331               +" Use \"hdfs://"+name+"/\" instead.");
332      name = "hdfs://"+name;
333    }
334    return name;
335  }
336
337  /**
338   * Get the local file system.
339   * @param conf the configuration to configure the file system with
340   * @return a LocalFileSystem
341   */
342  public static LocalFileSystem getLocal(Configuration conf)
343    throws IOException {
344    return (LocalFileSystem)get(LocalFileSystem.NAME, conf);
345  }
346
347  /** Returns the FileSystem for this URI's scheme and authority.  The scheme
348   * of the URI determines a configuration property name,
349   * <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
350   * The entire URI is passed to the FileSystem instance's initialize method.
351   */
352  public static FileSystem get(URI uri, Configuration conf) throws IOException {
353    String scheme = uri.getScheme();
354    String authority = uri.getAuthority();
355
356    if (scheme == null && authority == null) {     // use default FS
357      return get(conf);
358    }
359
360    if (scheme != null && authority == null) {     // no authority
361      URI defaultUri = getDefaultUri(conf);
362      if (scheme.equals(defaultUri.getScheme())    // if scheme matches default
363          && defaultUri.getAuthority() != null) {  // & default has authority
364        return get(defaultUri, conf);              // return default
365      }
366    }
367    
368    String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
369    if (conf.getBoolean(disableCacheName, false)) {
370      return createFileSystem(uri, conf);
371    }
372
373    return CACHE.get(uri, conf);
374  }
375
376  /**
377   * Returns the FileSystem for this URI's scheme and authority and the 
378   * passed user. Internally invokes {@link #newInstance(URI, Configuration)}
379   * @param uri of the filesystem
380   * @param conf the configuration to use
381   * @param user to perform the get as
382   * @return filesystem instance
383   * @throws IOException
384   * @throws InterruptedException
385   */
386  public static FileSystem newInstance(final URI uri, final Configuration conf,
387      final String user) throws IOException, InterruptedException {
388    String ticketCachePath =
389      conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
390    UserGroupInformation ugi =
391        UserGroupInformation.getBestUGI(ticketCachePath, user);
392    return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
393      @Override
394      public FileSystem run() throws IOException {
395        return newInstance(uri,conf); 
396      }
397    });
398  }
399  /** Returns the FileSystem for this URI's scheme and authority.  The scheme
400   * of the URI determines a configuration property name,
401   * <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
402   * The entire URI is passed to the FileSystem instance's initialize method.
403   * This always returns a new FileSystem object.
404   */
405  public static FileSystem newInstance(URI uri, Configuration conf) throws IOException {
406    String scheme = uri.getScheme();
407    String authority = uri.getAuthority();
408
409    if (scheme == null) {                       // no scheme: use default FS
410      return newInstance(conf);
411    }
412
413    if (authority == null) {                       // no authority
414      URI defaultUri = getDefaultUri(conf);
415      if (scheme.equals(defaultUri.getScheme())    // if scheme matches default
416          && defaultUri.getAuthority() != null) {  // & default has authority
417        return newInstance(defaultUri, conf);              // return default
418      }
419    }
420    return CACHE.getUnique(uri, conf);
421  }
422
423  /** Returns a unique configured filesystem implementation.
424   * This always returns a new FileSystem object.
425   * @param conf the configuration to use
426   */
427  public static FileSystem newInstance(Configuration conf) throws IOException {
428    return newInstance(getDefaultUri(conf), conf);
429  }
430
431  /**
432   * Get a unique local file system object
433   * @param conf the configuration to configure the file system with
434   * @return a LocalFileSystem
435   * This always returns a new FileSystem object.
436   */
437  public static LocalFileSystem newInstanceLocal(Configuration conf)
438    throws IOException {
439    return (LocalFileSystem)newInstance(LocalFileSystem.NAME, conf);
440  }
441
442  /**
443   * Close all cached filesystems. Be sure those filesystems are not
444   * used anymore.
445   * 
446   * @throws IOException
447   */
448  public static void closeAll() throws IOException {
449    CACHE.closeAll();
450  }
451
452  /**
453   * Close all cached filesystems for a given UGI. Be sure those filesystems 
454   * are not used anymore.
455   * @param ugi user group info to close
456   * @throws IOException
457   */
458  public static void closeAllForUGI(UserGroupInformation ugi) 
459  throws IOException {
460    CACHE.closeAll(ugi);
461  }
462
463  /** 
464   * Make sure that a path specifies a FileSystem.
465   * @param path to use
466   */
467  public Path makeQualified(Path path) {
468    checkPath(path);
469    return path.makeQualified(this.getUri(), this.getWorkingDirectory());
470  }
471    
472  /**
473   * Get a new delegation token for this file system.
474   * This is an internal method that should have been declared protected
475   * but wasn't historically.
476   * Callers should use {@link #addDelegationTokens(String, Credentials)}
477   * 
478   * @param renewer the account name that is allowed to renew the token.
479   * @return a new delegation token
480   * @throws IOException
481   */
482  @InterfaceAudience.Private()
483  public Token<?> getDelegationToken(String renewer) throws IOException {
484    return null;
485  }
486  
487  /**
488   * Obtain all delegation tokens used by this FileSystem that are not
489   * already present in the given Credentials.  Existing tokens will neither
490   * be verified as valid nor having the given renewer.  Missing tokens will
491   * be acquired and added to the given Credentials.
492   * 
493   * Default Impl: works for simple fs with its own token
494   * and also for an embedded fs whose tokens are those of its
495   * children file system (i.e. the embedded fs has not tokens of its
496   * own).
497   * 
498   * @param renewer the user allowed to renew the delegation tokens
499   * @param credentials cache in which to add new delegation tokens
500   * @return list of new delegation tokens
501   * @throws IOException
502   */
503  @InterfaceAudience.LimitedPrivate({ "HDFS", "MapReduce" })
504  public Token<?>[] addDelegationTokens(
505      final String renewer, Credentials credentials) throws IOException {
506    if (credentials == null) {
507      credentials = new Credentials();
508    }
509    final List<Token<?>> tokens = new ArrayList<Token<?>>();
510    collectDelegationTokens(renewer, credentials, tokens);
511    return tokens.toArray(new Token<?>[tokens.size()]);
512  }
513  
514  /**
515   * Recursively obtain the tokens for this FileSystem and all descended
516   * FileSystems as determined by getChildFileSystems().
517   * @param renewer the user allowed to renew the delegation tokens
518   * @param credentials cache in which to add the new delegation tokens
519   * @param tokens list in which to add acquired tokens
520   * @throws IOException
521   */
522  private void collectDelegationTokens(final String renewer,
523                                       final Credentials credentials,
524                                       final List<Token<?>> tokens)
525                                           throws IOException {
526    final String serviceName = getCanonicalServiceName();
527    // Collect token of the this filesystem and then of its embedded children
528    if (serviceName != null) { // fs has token, grab it
529      final Text service = new Text(serviceName);
530      Token<?> token = credentials.getToken(service);
531      if (token == null) {
532        token = getDelegationToken(renewer);
533        if (token != null) {
534          tokens.add(token);
535          credentials.addToken(service, token);
536        }
537      }
538    }
539    // Now collect the tokens from the children
540    final FileSystem[] children = getChildFileSystems();
541    if (children != null) {
542      for (final FileSystem fs : children) {
543        fs.collectDelegationTokens(renewer, credentials, tokens);
544      }
545    }
546  }
547
548  /**
549   * Get all the immediate child FileSystems embedded in this FileSystem.
550   * It does not recurse and get grand children.  If a FileSystem
551   * has multiple child FileSystems, then it should return a unique list
552   * of those FileSystems.  Default is to return null to signify no children.
553   * 
554   * @return FileSystems used by this FileSystem
555   */
556  @InterfaceAudience.LimitedPrivate({ "HDFS" })
557  @VisibleForTesting
558  public FileSystem[] getChildFileSystems() {
559    return null;
560  }
561  
562  /** create a file with the provided permission
563   * The permission of the file is set to be the provided permission as in
564   * setPermission, not permission&~umask
565   * 
566   * It is implemented using two RPCs. It is understood that it is inefficient,
567   * but the implementation is thread-safe. The other option is to change the
568   * value of umask in configuration to be 0, but it is not thread-safe.
569   * 
570   * @param fs file system handle
571   * @param file the name of the file to be created
572   * @param permission the permission of the file
573   * @return an output stream
574   * @throws IOException
575   */
576  public static FSDataOutputStream create(FileSystem fs,
577      Path file, FsPermission permission) throws IOException {
578    // create the file with default permission
579    FSDataOutputStream out = fs.create(file);
580    // set its permission to the supplied one
581    fs.setPermission(file, permission);
582    return out;
583  }
584
585  /** create a directory with the provided permission
586   * The permission of the directory is set to be the provided permission as in
587   * setPermission, not permission&~umask
588   * 
589   * @see #create(FileSystem, Path, FsPermission)
590   * 
591   * @param fs file system handle
592   * @param dir the name of the directory to be created
593   * @param permission the permission of the directory
594   * @return true if the directory creation succeeds; false otherwise
595   * @throws IOException
596   */
597  public static boolean mkdirs(FileSystem fs, Path dir, FsPermission permission)
598  throws IOException {
599    // create the directory using the default permission
600    boolean result = fs.mkdirs(dir);
601    // set its permission to be the supplied one
602    fs.setPermission(dir, permission);
603    return result;
604  }
605
606  ///////////////////////////////////////////////////////////////
607  // FileSystem
608  ///////////////////////////////////////////////////////////////
609
610  protected FileSystem() {
611    super(null);
612  }
613
614  /** 
615   * Check that a Path belongs to this FileSystem.
616   * @param path to check
617   */
618  protected void checkPath(Path path) {
619    URI uri = path.toUri();
620    String thatScheme = uri.getScheme();
621    if (thatScheme == null)                // fs is relative
622      return;
623    URI thisUri = getCanonicalUri();
624    String thisScheme = thisUri.getScheme();
625    //authority and scheme are not case sensitive
626    if (thisScheme.equalsIgnoreCase(thatScheme)) {// schemes match
627      String thisAuthority = thisUri.getAuthority();
628      String thatAuthority = uri.getAuthority();
629      if (thatAuthority == null &&                // path's authority is null
630          thisAuthority != null) {                // fs has an authority
631        URI defaultUri = getDefaultUri(getConf());
632        if (thisScheme.equalsIgnoreCase(defaultUri.getScheme())) {
633          uri = defaultUri; // schemes match, so use this uri instead
634        } else {
635          uri = null; // can't determine auth of the path
636        }
637      }
638      if (uri != null) {
639        // canonicalize uri before comparing with this fs
640        uri = canonicalizeUri(uri);
641        thatAuthority = uri.getAuthority();
642        if (thisAuthority == thatAuthority ||       // authorities match
643            (thisAuthority != null &&
644             thisAuthority.equalsIgnoreCase(thatAuthority)))
645          return;
646      }
647    }
648    throw new IllegalArgumentException("Wrong FS: "+path+
649                                       ", expected: "+this.getUri());
650  }
651
652  /**
653   * Return an array containing hostnames, offset and size of 
654   * portions of the given file.  For a nonexistent 
655   * file or regions, null will be returned.
656   *
657   * This call is most helpful with DFS, where it returns 
658   * hostnames of machines that contain the given file.
659   *
660   * The FileSystem will simply return an elt containing 'localhost'.
661   *
662   * @param file FilesStatus to get data from
663   * @param start offset into the given file
664   * @param len length for which to get locations for
665   */
666  public BlockLocation[] getFileBlockLocations(FileStatus file, 
667      long start, long len) throws IOException {
668    if (file == null) {
669      return null;
670    }
671
672    if (start < 0 || len < 0) {
673      throw new IllegalArgumentException("Invalid start or len parameter");
674    }
675
676    if (file.getLen() <= start) {
677      return new BlockLocation[0];
678
679    }
680    String[] name = { "localhost:50010" };
681    String[] host = { "localhost" };
682    return new BlockLocation[] {
683      new BlockLocation(name, host, 0, file.getLen()) };
684  }
685 
686
687  /**
688   * Return an array containing hostnames, offset and size of 
689   * portions of the given file.  For a nonexistent 
690   * file or regions, null will be returned.
691   *
692   * This call is most helpful with DFS, where it returns 
693   * hostnames of machines that contain the given file.
694   *
695   * The FileSystem will simply return an elt containing 'localhost'.
696   *
697   * @param p path is used to identify an FS since an FS could have
698   *          another FS that it could be delegating the call to
699   * @param start offset into the given file
700   * @param len length for which to get locations for
701   */
702  public BlockLocation[] getFileBlockLocations(Path p, 
703      long start, long len) throws IOException {
704    if (p == null) {
705      throw new NullPointerException();
706    }
707    FileStatus file = getFileStatus(p);
708    return getFileBlockLocations(file, start, len);
709  }
710  
711  /**
712   * Return a set of server default configuration values
713   * @return server default configuration values
714   * @throws IOException
715   * @deprecated use {@link #getServerDefaults(Path)} instead
716   */
717  @Deprecated
718  public FsServerDefaults getServerDefaults() throws IOException {
719    Configuration conf = getConf();
720    // CRC32 is chosen as default as it is available in all 
721    // releases that support checksum.
722    // The client trash configuration is ignored.
723    return new FsServerDefaults(getDefaultBlockSize(), 
724        conf.getInt("io.bytes.per.checksum", 512), 
725        64 * 1024, 
726        getDefaultReplication(),
727        conf.getInt("io.file.buffer.size", 4096),
728        false,
729        CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT,
730        DataChecksum.Type.CRC32);
731  }
732
733  /**
734   * Return a set of server default configuration values
735   * @param p path is used to identify an FS since an FS could have
736   *          another FS that it could be delegating the call to
737   * @return server default configuration values
738   * @throws IOException
739   */
740  public FsServerDefaults getServerDefaults(Path p) throws IOException {
741    return getServerDefaults();
742  }
743
744  /**
745   * Return the fully-qualified path of path f resolving the path
746   * through any symlinks or mount point
747   * @param p path to be resolved
748   * @return fully qualified path 
749   * @throws FileNotFoundException
750   */
751   public Path resolvePath(final Path p) throws IOException {
752     checkPath(p);
753     return getFileStatus(p).getPath();
754   }
755
756  /**
757   * Opens an FSDataInputStream at the indicated Path.
758   * @param f the file name to open
759   * @param bufferSize the size of the buffer to be used.
760   */
761  public abstract FSDataInputStream open(Path f, int bufferSize)
762    throws IOException;
763    
764  /**
765   * Opens an FSDataInputStream at the indicated Path.
766   * @param f the file to open
767   */
768  public FSDataInputStream open(Path f) throws IOException {
769    return open(f, getConf().getInt("io.file.buffer.size", 4096));
770  }
771
772  /**
773   * Create an FSDataOutputStream at the indicated Path.
774   * Files are overwritten by default.
775   * @param f the file to create
776   */
777  public FSDataOutputStream create(Path f) throws IOException {
778    return create(f, true);
779  }
780
781  /**
782   * Create an FSDataOutputStream at the indicated Path.
783   * @param f the file to create
784   * @param overwrite if a file with this name already exists, then if true,
785   *   the file will be overwritten, and if false an exception will be thrown.
786   */
787  public FSDataOutputStream create(Path f, boolean overwrite)
788      throws IOException {
789    return create(f, overwrite, 
790                  getConf().getInt("io.file.buffer.size", 4096),
791                  getDefaultReplication(f),
792                  getDefaultBlockSize(f));
793  }
794
795  /**
796   * Create an FSDataOutputStream at the indicated Path with write-progress
797   * reporting.
798   * Files are overwritten by default.
799   * @param f the file to create
800   * @param progress to report progress
801   */
802  public FSDataOutputStream create(Path f, Progressable progress) 
803      throws IOException {
804    return create(f, true, 
805                  getConf().getInt("io.file.buffer.size", 4096),
806                  getDefaultReplication(f),
807                  getDefaultBlockSize(f), progress);
808  }
809
810  /**
811   * Create an FSDataOutputStream at the indicated Path.
812   * Files are overwritten by default.
813   * @param f the file to create
814   * @param replication the replication factor
815   */
816  public FSDataOutputStream create(Path f, short replication)
817      throws IOException {
818    return create(f, true, 
819                  getConf().getInt("io.file.buffer.size", 4096),
820                  replication,
821                  getDefaultBlockSize(f));
822  }
823
824  /**
825   * Create an FSDataOutputStream at the indicated Path with write-progress
826   * reporting.
827   * Files are overwritten by default.
828   * @param f the file to create
829   * @param replication the replication factor
830   * @param progress to report progress
831   */
832  public FSDataOutputStream create(Path f, short replication, 
833      Progressable progress) throws IOException {
834    return create(f, true, 
835                  getConf().getInt(
836                      CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
837                      CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT),
838                  replication,
839                  getDefaultBlockSize(f), progress);
840  }
841
842    
843  /**
844   * Create an FSDataOutputStream at the indicated Path.
845   * @param f the file name to create
846   * @param overwrite if a file with this name already exists, then if true,
847   *   the file will be overwritten, and if false an error will be thrown.
848   * @param bufferSize the size of the buffer to be used.
849   */
850  public FSDataOutputStream create(Path f, 
851                                   boolean overwrite,
852                                   int bufferSize
853                                   ) throws IOException {
854    return create(f, overwrite, bufferSize, 
855                  getDefaultReplication(f),
856                  getDefaultBlockSize(f));
857  }
858    
859  /**
860   * Create an FSDataOutputStream at the indicated Path with write-progress
861   * reporting.
862   * @param f the path of the file to open
863   * @param overwrite if a file with this name already exists, then if true,
864   *   the file will be overwritten, and if false an error will be thrown.
865   * @param bufferSize the size of the buffer to be used.
866   */
867  public FSDataOutputStream create(Path f, 
868                                   boolean overwrite,
869                                   int bufferSize,
870                                   Progressable progress
871                                   ) throws IOException {
872    return create(f, overwrite, bufferSize, 
873                  getDefaultReplication(f),
874                  getDefaultBlockSize(f), progress);
875  }
876    
877    
878  /**
879   * Create an FSDataOutputStream at the indicated Path.
880   * @param f the file name to open
881   * @param overwrite if a file with this name already exists, then if true,
882   *   the file will be overwritten, and if false an error will be thrown.
883   * @param bufferSize the size of the buffer to be used.
884   * @param replication required block replication for the file. 
885   */
886  public FSDataOutputStream create(Path f, 
887                                   boolean overwrite,
888                                   int bufferSize,
889                                   short replication,
890                                   long blockSize
891                                   ) throws IOException {
892    return create(f, overwrite, bufferSize, replication, blockSize, null);
893  }
894
895  /**
896   * Create an FSDataOutputStream at the indicated Path with write-progress
897   * reporting.
898   * @param f the file name to open
899   * @param overwrite if a file with this name already exists, then if true,
900   *   the file will be overwritten, and if false an error will be thrown.
901   * @param bufferSize the size of the buffer to be used.
902   * @param replication required block replication for the file. 
903   */
904  public FSDataOutputStream create(Path f,
905                                            boolean overwrite,
906                                            int bufferSize,
907                                            short replication,
908                                            long blockSize,
909                                            Progressable progress
910                                            ) throws IOException {
911    return this.create(f, FsPermission.getFileDefault().applyUMask(
912        FsPermission.getUMask(getConf())), overwrite, bufferSize,
913        replication, blockSize, progress);
914  }
915
916  /**
917   * Create an FSDataOutputStream at the indicated Path with write-progress
918   * reporting.
919   * @param f the file name to open
920   * @param permission
921   * @param overwrite if a file with this name already exists, then if true,
922   *   the file will be overwritten, and if false an error will be thrown.
923   * @param bufferSize the size of the buffer to be used.
924   * @param replication required block replication for the file.
925   * @param blockSize
926   * @param progress
927   * @throws IOException
928   * @see #setPermission(Path, FsPermission)
929   */
930  public abstract FSDataOutputStream create(Path f,
931      FsPermission permission,
932      boolean overwrite,
933      int bufferSize,
934      short replication,
935      long blockSize,
936      Progressable progress) throws IOException;
937  
938  /**
939   * Create an FSDataOutputStream at the indicated Path with write-progress
940   * reporting.
941   * @param f the file name to open
942   * @param permission
943   * @param flags {@link CreateFlag}s to use for this stream.
944   * @param bufferSize the size of the buffer to be used.
945   * @param replication required block replication for the file.
946   * @param blockSize
947   * @param progress
948   * @throws IOException
949   * @see #setPermission(Path, FsPermission)
950   */
951  public FSDataOutputStream create(Path f,
952      FsPermission permission,
953      EnumSet<CreateFlag> flags,
954      int bufferSize,
955      short replication,
956      long blockSize,
957      Progressable progress) throws IOException {
958    return create(f, permission, flags, bufferSize, replication,
959        blockSize, progress, null);
960  }
961  
962  /**
963   * Create an FSDataOutputStream at the indicated Path with a custom
964   * checksum option
965   * @param f the file name to open
966   * @param permission
967   * @param flags {@link CreateFlag}s to use for this stream.
968   * @param bufferSize the size of the buffer to be used.
969   * @param replication required block replication for the file.
970   * @param blockSize
971   * @param progress
972   * @param checksumOpt checksum parameter. If null, the values
973   *        found in conf will be used.
974   * @throws IOException
975   * @see #setPermission(Path, FsPermission)
976   */
977  public FSDataOutputStream create(Path f,
978      FsPermission permission,
979      EnumSet<CreateFlag> flags,
980      int bufferSize,
981      short replication,
982      long blockSize,
983      Progressable progress,
984      ChecksumOpt checksumOpt) throws IOException {
985    // Checksum options are ignored by default. The file systems that
986    // implement checksum need to override this method. The full
987    // support is currently only available in DFS.
988    return create(f, permission, flags.contains(CreateFlag.OVERWRITE), 
989        bufferSize, replication, blockSize, progress);
990  }
991
992  /*.
993   * This create has been added to support the FileContext that processes
994   * the permission
995   * with umask before calling this method.
996   * This a temporary method added to support the transition from FileSystem
997   * to FileContext for user applications.
998   */
999  @Deprecated
1000  protected FSDataOutputStream primitiveCreate(Path f,
1001     FsPermission absolutePermission, EnumSet<CreateFlag> flag, int bufferSize,
1002     short replication, long blockSize, Progressable progress,
1003     ChecksumOpt checksumOpt) throws IOException {
1004
1005    boolean pathExists = exists(f);
1006    CreateFlag.validate(f, pathExists, flag);
1007    
1008    // Default impl  assumes that permissions do not matter and 
1009    // nor does the bytesPerChecksum  hence
1010    // calling the regular create is good enough.
1011    // FSs that implement permissions should override this.
1012
1013    if (pathExists && flag.contains(CreateFlag.APPEND)) {
1014      return append(f, bufferSize, progress);
1015    }
1016    
1017    return this.create(f, absolutePermission,
1018        flag.contains(CreateFlag.OVERWRITE), bufferSize, replication,
1019        blockSize, progress);
1020  }
1021  
1022  /**
1023   * This version of the mkdirs method assumes that the permission is absolute.
1024   * It has been added to support the FileContext that processes the permission
1025   * with umask before calling this method.
1026   * This a temporary method added to support the transition from FileSystem
1027   * to FileContext for user applications.
1028   */
1029  @Deprecated
1030  protected boolean primitiveMkdir(Path f, FsPermission absolutePermission)
1031    throws IOException {
1032    // Default impl is to assume that permissions do not matter and hence
1033    // calling the regular mkdirs is good enough.
1034    // FSs that implement permissions should override this.
1035   return this.mkdirs(f, absolutePermission);
1036  }
1037
1038
1039  /**
1040   * This version of the mkdirs method assumes that the permission is absolute.
1041   * It has been added to support the FileContext that processes the permission
1042   * with umask before calling this method.
1043   * This a temporary method added to support the transition from FileSystem
1044   * to FileContext for user applications.
1045   */
1046  @Deprecated
1047  protected void primitiveMkdir(Path f, FsPermission absolutePermission, 
1048                    boolean createParent)
1049    throws IOException {
1050    
1051    if (!createParent) { // parent must exist.
1052      // since the this.mkdirs makes parent dirs automatically
1053      // we must throw exception if parent does not exist.
1054      final FileStatus stat = getFileStatus(f.getParent());
1055      if (stat == null) {
1056        throw new FileNotFoundException("Missing parent:" + f);
1057      }
1058      if (!stat.isDirectory()) {
1059        throw new ParentNotDirectoryException("parent is not a dir");
1060      }
1061      // parent does exist - go ahead with mkdir of leaf
1062    }
1063    // Default impl is to assume that permissions do not matter and hence
1064    // calling the regular mkdirs is good enough.
1065    // FSs that implement permissions should override this.
1066    if (!this.mkdirs(f, absolutePermission)) {
1067      throw new IOException("mkdir of "+ f + " failed");
1068    }
1069  }
1070
1071  /**
1072   * Opens an FSDataOutputStream at the indicated Path with write-progress
1073   * reporting. Same as create(), except fails if parent directory doesn't
1074   * already exist.
1075   * @param f the file name to open
1076   * @param overwrite if a file with this name already exists, then if true,
1077   * the file will be overwritten, and if false an error will be thrown.
1078   * @param bufferSize the size of the buffer to be used.
1079   * @param replication required block replication for the file.
1080   * @param blockSize
1081   * @param progress
1082   * @throws IOException
1083   * @see #setPermission(Path, FsPermission)
1084   * @deprecated API only for 0.20-append
1085   */
1086  @Deprecated
1087  public FSDataOutputStream createNonRecursive(Path f,
1088      boolean overwrite,
1089      int bufferSize, short replication, long blockSize,
1090      Progressable progress) throws IOException {
1091    return this.createNonRecursive(f, FsPermission.getFileDefault(),
1092        overwrite, bufferSize, replication, blockSize, progress);
1093  }
1094
1095  /**
1096   * Opens an FSDataOutputStream at the indicated Path with write-progress
1097   * reporting. Same as create(), except fails if parent directory doesn't
1098   * already exist.
1099   * @param f the file name to open
1100   * @param permission
1101   * @param overwrite if a file with this name already exists, then if true,
1102   * the file will be overwritten, and if false an error will be thrown.
1103   * @param bufferSize the size of the buffer to be used.
1104   * @param replication required block replication for the file.
1105   * @param blockSize
1106   * @param progress
1107   * @throws IOException
1108   * @see #setPermission(Path, FsPermission)
1109   * @deprecated API only for 0.20-append
1110   */
1111   @Deprecated
1112   public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
1113       boolean overwrite, int bufferSize, short replication, long blockSize,
1114       Progressable progress) throws IOException {
1115     return createNonRecursive(f, permission,
1116         overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
1117             : EnumSet.of(CreateFlag.CREATE), bufferSize,
1118             replication, blockSize, progress);
1119   }
1120
1121   /**
1122    * Opens an FSDataOutputStream at the indicated Path with write-progress
1123    * reporting. Same as create(), except fails if parent directory doesn't
1124    * already exist.
1125    * @param f the file name to open
1126    * @param permission
1127    * @param flags {@link CreateFlag}s to use for this stream.
1128    * @param bufferSize the size of the buffer to be used.
1129    * @param replication required block replication for the file.
1130    * @param blockSize
1131    * @param progress
1132    * @throws IOException
1133    * @see #setPermission(Path, FsPermission)
1134    * @deprecated API only for 0.20-append
1135    */
1136    @Deprecated
1137    public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
1138        EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize,
1139        Progressable progress) throws IOException {
1140      throw new IOException("createNonRecursive unsupported for this filesystem "
1141          + this.getClass());
1142    }
1143
1144  /**
1145   * Creates the given Path as a brand-new zero-length file.  If
1146   * create fails, or if it already existed, return false.
1147   *
1148   * @param f path to use for create
1149   */
1150  public boolean createNewFile(Path f) throws IOException {
1151    if (exists(f)) {
1152      return false;
1153    } else {
1154      create(f, false, getConf().getInt("io.file.buffer.size", 4096)).close();
1155      return true;
1156    }
1157  }
1158
1159  /**
1160   * Append to an existing file (optional operation).
1161   * Same as append(f, getConf().getInt("io.file.buffer.size", 4096), null)
1162   * @param f the existing file to be appended.
1163   * @throws IOException
1164   */
1165  public FSDataOutputStream append(Path f) throws IOException {
1166    return append(f, getConf().getInt("io.file.buffer.size", 4096), null);
1167  }
1168  /**
1169   * Append to an existing file (optional operation).
1170   * Same as append(f, bufferSize, null).
1171   * @param f the existing file to be appended.
1172   * @param bufferSize the size of the buffer to be used.
1173   * @throws IOException
1174   */
1175  public FSDataOutputStream append(Path f, int bufferSize) throws IOException {
1176    return append(f, bufferSize, null);
1177  }
1178
1179  /**
1180   * Append to an existing file (optional operation).
1181   * @param f the existing file to be appended.
1182   * @param bufferSize the size of the buffer to be used.
1183   * @param progress for reporting progress if it is not null.
1184   * @throws IOException
1185   */
1186  public abstract FSDataOutputStream append(Path f, int bufferSize,
1187      Progressable progress) throws IOException;
1188
1189  /**
1190   * Concat existing files together.
1191   * @param trg the path to the target destination.
1192   * @param psrcs the paths to the sources to use for the concatenation.
1193   * @throws IOException
1194   */
1195  public void concat(final Path trg, final Path [] psrcs) throws IOException {
1196    throw new UnsupportedOperationException("Not implemented by the " + 
1197        getClass().getSimpleName() + " FileSystem implementation");
1198  }
1199
1200 /**
1201   * Get replication.
1202   * 
1203   * @deprecated Use getFileStatus() instead
1204   * @param src file name
1205   * @return file replication
1206   * @throws IOException
1207   */ 
1208  @Deprecated
1209  public short getReplication(Path src) throws IOException {
1210    return getFileStatus(src).getReplication();
1211  }
1212
1213  /**
1214   * Set replication for an existing file.
1215   * 
1216   * @param src file name
1217   * @param replication new replication
1218   * @throws IOException
1219   * @return true if successful;
1220   *         false if file does not exist or is a directory
1221   */
1222  public boolean setReplication(Path src, short replication)
1223    throws IOException {
1224    return true;
1225  }
1226
1227  /**
1228   * Renames Path src to Path dst.  Can take place on local fs
1229   * or remote DFS.
1230   * @param src path to be renamed
1231   * @param dst new path after rename
1232   * @throws IOException on failure
1233   * @return true if rename is successful
1234   */
1235  public abstract boolean rename(Path src, Path dst) throws IOException;
1236
1237  /**
1238   * Renames Path src to Path dst
1239   * <ul>
1240   * <li
1241   * <li>Fails if src is a file and dst is a directory.
1242   * <li>Fails if src is a directory and dst is a file.
1243   * <li>Fails if the parent of dst does not exist or is a file.
1244   * </ul>
1245   * <p>
1246   * If OVERWRITE option is not passed as an argument, rename fails
1247   * if the dst already exists.
1248   * <p>
1249   * If OVERWRITE option is passed as an argument, rename overwrites
1250   * the dst if it is a file or an empty directory. Rename fails if dst is
1251   * a non-empty directory.
1252   * <p>
1253   * Note that atomicity of rename is dependent on the file system
1254   * implementation. Please refer to the file system documentation for
1255   * details. This default implementation is non atomic.
1256   * <p>
1257   * This method is deprecated since it is a temporary method added to 
1258   * support the transition from FileSystem to FileContext for user 
1259   * applications.
1260   * 
1261   * @param src path to be renamed
1262   * @param dst new path after rename
1263   * @throws IOException on failure
1264   */
1265  @Deprecated
1266  protected void rename(final Path src, final Path dst,
1267      final Rename... options) throws IOException {
1268    // Default implementation
1269    final FileStatus srcStatus = getFileLinkStatus(src);
1270    if (srcStatus == null) {
1271      throw new FileNotFoundException("rename source " + src + " not found.");
1272    }
1273
1274    boolean overwrite = false;
1275    if (null != options) {
1276      for (Rename option : options) {
1277        if (option == Rename.OVERWRITE) {
1278          overwrite = true;
1279        }
1280      }
1281    }
1282
1283    FileStatus dstStatus;
1284    try {
1285      dstStatus = getFileLinkStatus(dst);
1286    } catch (IOException e) {
1287      dstStatus = null;
1288    }
1289    if (dstStatus != null) {
1290      if (srcStatus.isDirectory() != dstStatus.isDirectory()) {
1291        throw new IOException("Source " + src + " Destination " + dst
1292            + " both should be either file or directory");
1293      }
1294      if (!overwrite) {
1295        throw new FileAlreadyExistsException("rename destination " + dst
1296            + " already exists.");
1297      }
1298      // Delete the destination that is a file or an empty directory
1299      if (dstStatus.isDirectory()) {
1300        FileStatus[] list = listStatus(dst);
1301        if (list != null && list.length != 0) {
1302          throw new IOException(
1303              "rename cannot overwrite non empty destination directory " + dst);
1304        }
1305      }
1306      delete(dst, false);
1307    } else {
1308      final Path parent = dst.getParent();
1309      final FileStatus parentStatus = getFileStatus(parent);
1310      if (parentStatus == null) {
1311        throw new FileNotFoundException("rename destination parent " + parent
1312            + " not found.");
1313      }
1314      if (!parentStatus.isDirectory()) {
1315        throw new ParentNotDirectoryException("rename destination parent " + parent
1316            + " is a file.");
1317      }
1318    }
1319    if (!rename(src, dst)) {
1320      throw new IOException("rename from " + src + " to " + dst + " failed.");
1321    }
1322  }
1323
1324  /**
1325   * Truncate the file in the indicated path to the indicated size.
1326   * <ul>
1327   * <li>Fails if path is a directory.
1328   * <li>Fails if path does not exist.
1329   * <li>Fails if path is not closed.
1330   * <li>Fails if new size is greater than current size.
1331   * </ul>
1332   * @param f The path to the file to be truncated
1333   * @param newLength The size the file is to be truncated to
1334   *
1335   * @return <code>true</code> if the file has been truncated to the desired
1336   * <code>newLength</code> and is immediately available to be reused for
1337   * write operations such as <code>append</code>, or
1338   * <code>false</code> if a background process of adjusting the length of
1339   * the last block has been started, and clients should wait for it to
1340   * complete before proceeding with further file updates.
1341   */
1342  public boolean truncate(Path f, long newLength) throws IOException {
1343    throw new UnsupportedOperationException("Not implemented by the " +
1344        getClass().getSimpleName() + " FileSystem implementation");
1345  }
1346  
1347  /**
1348   * Delete a file 
1349   * @deprecated Use {@link #delete(Path, boolean)} instead.
1350   */
1351  @Deprecated
1352  public boolean delete(Path f) throws IOException {
1353    return delete(f, true);
1354  }
1355  
1356  /** Delete a file.
1357   *
1358   * @param f the path to delete.
1359   * @param recursive if path is a directory and set to 
1360   * true, the directory is deleted else throws an exception. In
1361   * case of a file the recursive can be set to either true or false. 
1362   * @return  true if delete is successful else false. 
1363   * @throws IOException
1364   */
1365  public abstract boolean delete(Path f, boolean recursive) throws IOException;
1366
1367  /**
1368   * Mark a path to be deleted when FileSystem is closed.
1369   * When the JVM shuts down,
1370   * all FileSystem objects will be closed automatically.
1371   * Then,
1372   * the marked path will be deleted as a result of closing the FileSystem.
1373   *
1374   * The path has to exist in the file system.
1375   * 
1376   * @param f the path to delete.
1377   * @return  true if deleteOnExit is successful, otherwise false.
1378   * @throws IOException
1379   */
1380  public boolean deleteOnExit(Path f) throws IOException {
1381    if (!exists(f)) {
1382      return false;
1383    }
1384    synchronized (deleteOnExit) {
1385      deleteOnExit.add(f);
1386    }
1387    return true;
1388  }
1389  
1390  /**
1391   * Cancel the deletion of the path when the FileSystem is closed
1392   * @param f the path to cancel deletion
1393   */
1394  public boolean cancelDeleteOnExit(Path f) {
1395    synchronized (deleteOnExit) {
1396      return deleteOnExit.remove(f);
1397    }
1398  }
1399
1400  /**
1401   * Delete all files that were marked as delete-on-exit. This recursively
1402   * deletes all files in the specified paths.
1403   */
1404  protected void processDeleteOnExit() {
1405    synchronized (deleteOnExit) {
1406      for (Iterator<Path> iter = deleteOnExit.iterator(); iter.hasNext();) {
1407        Path path = iter.next();
1408        try {
1409          if (exists(path)) {
1410            delete(path, true);
1411          }
1412        }
1413        catch (IOException e) {
1414          LOG.info("Ignoring failure to deleteOnExit for path " + path);
1415        }
1416        iter.remove();
1417      }
1418    }
1419  }
1420  
1421  /** Check if exists.
1422   * @param f source file
1423   */
1424  public boolean exists(Path f) throws IOException {
1425    try {
1426      return getFileStatus(f) != null;
1427    } catch (FileNotFoundException e) {
1428      return false;
1429    }
1430  }
1431
1432  /** True iff the named path is a directory.
1433   * Note: Avoid using this method. Instead reuse the FileStatus 
1434   * returned by getFileStatus() or listStatus() methods.
1435   * @param f path to check
1436   */
1437  public boolean isDirectory(Path f) throws IOException {
1438    try {
1439      return getFileStatus(f).isDirectory();
1440    } catch (FileNotFoundException e) {
1441      return false;               // f does not exist
1442    }
1443  }
1444
1445  /** True iff the named path is a regular file.
1446   * Note: Avoid using this method. Instead reuse the FileStatus 
1447   * returned by getFileStatus() or listStatus() methods.
1448   * @param f path to check
1449   */
1450  public boolean isFile(Path f) throws IOException {
1451    try {
1452      return getFileStatus(f).isFile();
1453    } catch (FileNotFoundException e) {
1454      return false;               // f does not exist
1455    }
1456  }
1457  
1458  /** The number of bytes in a file. */
1459  /** @deprecated Use getFileStatus() instead */
1460  @Deprecated
1461  public long getLength(Path f) throws IOException {
1462    return getFileStatus(f).getLen();
1463  }
1464    
1465  /** Return the {@link ContentSummary} of a given {@link Path}.
1466  * @param f path to use
1467  */
1468  public ContentSummary getContentSummary(Path f) throws IOException {
1469    FileStatus status = getFileStatus(f);
1470    if (status.isFile()) {
1471      // f is a file
1472      long length = status.getLen();
1473      return new ContentSummary.Builder().length(length).
1474          fileCount(1).directoryCount(0).spaceConsumed(length).build();
1475    }
1476    // f is a directory
1477    long[] summary = {0, 0, 1};
1478    for(FileStatus s : listStatus(f)) {
1479      long length = s.getLen();
1480      ContentSummary c = s.isDirectory() ? getContentSummary(s.getPath()) :
1481          new ContentSummary.Builder().length(length).
1482          fileCount(1).directoryCount(0).spaceConsumed(length).build();
1483      summary[0] += c.getLength();
1484      summary[1] += c.getFileCount();
1485      summary[2] += c.getDirectoryCount();
1486    }
1487    return new ContentSummary.Builder().length(summary[0]).
1488        fileCount(summary[1]).directoryCount(summary[2]).
1489        spaceConsumed(summary[0]).build();
1490  }
1491
1492  final private static PathFilter DEFAULT_FILTER = new PathFilter() {
1493      @Override
1494      public boolean accept(Path file) {
1495        return true;
1496      }     
1497    };
1498    
1499  /**
1500   * List the statuses of the files/directories in the given path if the path is
1501   * a directory.
1502   * 
1503   * @param f given path
1504   * @return the statuses of the files/directories in the given patch
1505   * @throws FileNotFoundException when the path does not exist;
1506   *         IOException see specific implementation
1507   */
1508  public abstract FileStatus[] listStatus(Path f) throws FileNotFoundException, 
1509                                                         IOException;
1510    
1511  /*
1512   * Filter files/directories in the given path using the user-supplied path
1513   * filter. Results are added to the given array <code>results</code>.
1514   */
1515  private void listStatus(ArrayList<FileStatus> results, Path f,
1516      PathFilter filter) throws FileNotFoundException, IOException {
1517    FileStatus listing[] = listStatus(f);
1518    if (listing == null) {
1519      throw new IOException("Error accessing " + f);
1520    }
1521
1522    for (int i = 0; i < listing.length; i++) {
1523      if (filter.accept(listing[i].getPath())) {
1524        results.add(listing[i]);
1525      }
1526    }
1527  }
1528
1529  /**
1530   * @return an iterator over the corrupt files under the given path
1531   * (may contain duplicates if a file has more than one corrupt block)
1532   * @throws IOException
1533   */
1534  public RemoteIterator<Path> listCorruptFileBlocks(Path path)
1535    throws IOException {
1536    throw new UnsupportedOperationException(getClass().getCanonicalName() +
1537                                            " does not support" +
1538                                            " listCorruptFileBlocks");
1539  }
1540
1541  /**
1542   * Filter files/directories in the given path using the user-supplied path
1543   * filter.
1544   * 
1545   * @param f
1546   *          a path name
1547   * @param filter
1548   *          the user-supplied path filter
1549   * @return an array of FileStatus objects for the files under the given path
1550   *         after applying the filter
1551   * @throws FileNotFoundException when the path does not exist;
1552   *         IOException see specific implementation   
1553   */
1554  public FileStatus[] listStatus(Path f, PathFilter filter) 
1555                                   throws FileNotFoundException, IOException {
1556    ArrayList<FileStatus> results = new ArrayList<FileStatus>();
1557    listStatus(results, f, filter);
1558    return results.toArray(new FileStatus[results.size()]);
1559  }
1560
1561  /**
1562   * Filter files/directories in the given list of paths using default
1563   * path filter.
1564   * 
1565   * @param files
1566   *          a list of paths
1567   * @return a list of statuses for the files under the given paths after
1568   *         applying the filter default Path filter
1569   * @throws FileNotFoundException when the path does not exist;
1570   *         IOException see specific implementation
1571   */
1572  public FileStatus[] listStatus(Path[] files)
1573      throws FileNotFoundException, IOException {
1574    return listStatus(files, DEFAULT_FILTER);
1575  }
1576
1577  /**
1578   * Filter files/directories in the given list of paths using user-supplied
1579   * path filter.
1580   * 
1581   * @param files
1582   *          a list of paths
1583   * @param filter
1584   *          the user-supplied path filter
1585   * @return a list of statuses for the files under the given paths after
1586   *         applying the filter
1587   * @throws FileNotFoundException when the path does not exist;
1588   *         IOException see specific implementation
1589   */
1590  public FileStatus[] listStatus(Path[] files, PathFilter filter)
1591      throws FileNotFoundException, IOException {
1592    ArrayList<FileStatus> results = new ArrayList<FileStatus>();
1593    for (int i = 0; i < files.length; i++) {
1594      listStatus(results, files[i], filter);
1595    }
1596    return results.toArray(new FileStatus[results.size()]);
1597  }
1598
1599  /**
1600   * <p>Return all the files that match filePattern and are not checksum
1601   * files. Results are sorted by their names.
1602   * 
1603   * <p>
1604   * A filename pattern is composed of <i>regular</i> characters and
1605   * <i>special pattern matching</i> characters, which are:
1606   *
1607   * <dl>
1608   *  <dd>
1609   *   <dl>
1610   *    <p>
1611   *    <dt> <tt> ? </tt>
1612   *    <dd> Matches any single character.
1613   *
1614   *    <p>
1615   *    <dt> <tt> * </tt>
1616   *    <dd> Matches zero or more characters.
1617   *
1618   *    <p>
1619   *    <dt> <tt> [<i>abc</i>] </tt>
1620   *    <dd> Matches a single character from character set
1621   *     <tt>{<i>a,b,c</i>}</tt>.
1622   *
1623   *    <p>
1624   *    <dt> <tt> [<i>a</i>-<i>b</i>] </tt>
1625   *    <dd> Matches a single character from the character range
1626   *     <tt>{<i>a...b</i>}</tt>.  Note that character <tt><i>a</i></tt> must be
1627   *     lexicographically less than or equal to character <tt><i>b</i></tt>.
1628   *
1629   *    <p>
1630   *    <dt> <tt> [^<i>a</i>] </tt>
1631   *    <dd> Matches a single character that is not from character set or range
1632   *     <tt>{<i>a</i>}</tt>.  Note that the <tt>^</tt> character must occur
1633   *     immediately to the right of the opening bracket.
1634   *
1635   *    <p>
1636   *    <dt> <tt> \<i>c</i> </tt>
1637   *    <dd> Removes (escapes) any special meaning of character <i>c</i>.
1638   *
1639   *    <p>
1640   *    <dt> <tt> {ab,cd} </tt>
1641   *    <dd> Matches a string from the string set <tt>{<i>ab, cd</i>} </tt>
1642   *    
1643   *    <p>
1644   *    <dt> <tt> {ab,c{de,fh}} </tt>
1645   *    <dd> Matches a string from the string set <tt>{<i>ab, cde, cfh</i>}</tt>
1646   *
1647   *   </dl>
1648   *  </dd>
1649   * </dl>
1650   *
1651   * @param pathPattern a regular expression specifying a pth pattern
1652
1653   * @return an array of paths that match the path pattern
1654   * @throws IOException
1655   */
1656  public FileStatus[] globStatus(Path pathPattern) throws IOException {
1657    return new Globber(this, pathPattern, DEFAULT_FILTER).glob();
1658  }
1659  
1660  /**
1661   * Return an array of FileStatus objects whose path names match pathPattern
1662   * and is accepted by the user-supplied path filter. Results are sorted by
1663   * their path names.
1664   * Return null if pathPattern has no glob and the path does not exist.
1665   * Return an empty array if pathPattern has a glob and no path matches it. 
1666   * 
1667   * @param pathPattern
1668   *          a regular expression specifying the path pattern
1669   * @param filter
1670   *          a user-supplied path filter
1671   * @return an array of FileStatus objects
1672   * @throws IOException if any I/O error occurs when fetching file status
1673   */
1674  public FileStatus[] globStatus(Path pathPattern, PathFilter filter)
1675      throws IOException {
1676    return new Globber(this, pathPattern, filter).glob();
1677  }
1678  
1679  /**
1680   * List the statuses of the files/directories in the given path if the path is
1681   * a directory. 
1682   * Return the file's status and block locations If the path is a file.
1683   * 
1684   * If a returned status is a file, it contains the file's block locations.
1685   * 
1686   * @param f is the path
1687   *
1688   * @return an iterator that traverses statuses of the files/directories 
1689   *         in the given path
1690   *
1691   * @throws FileNotFoundException If <code>f</code> does not exist
1692   * @throws IOException If an I/O error occurred
1693   */
1694  public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f)
1695  throws FileNotFoundException, IOException {
1696    return listLocatedStatus(f, DEFAULT_FILTER);
1697  }
1698
1699  /**
1700   * Listing a directory
1701   * The returned results include its block location if it is a file
1702   * The results are filtered by the given path filter
1703   * @param f a path
1704   * @param filter a path filter
1705   * @return an iterator that traverses statuses of the files/directories 
1706   *         in the given path
1707   * @throws FileNotFoundException if <code>f</code> does not exist
1708   * @throws IOException if any I/O error occurred
1709   */
1710  protected RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f,
1711      final PathFilter filter)
1712  throws FileNotFoundException, IOException {
1713    return new RemoteIterator<LocatedFileStatus>() {
1714      private final FileStatus[] stats = listStatus(f, filter);
1715      private int i = 0;
1716
1717      @Override
1718      public boolean hasNext() {
1719        return i<stats.length;
1720      }
1721
1722      @Override
1723      public LocatedFileStatus next() throws IOException {
1724        if (!hasNext()) {
1725          throw new NoSuchElementException("No more entry in " + f);
1726        }
1727        FileStatus result = stats[i++];
1728        // for files, use getBlockLocations(FileStatus, int, int) to avoid
1729        // calling getFileStatus(Path) to load the FileStatus again
1730        BlockLocation[] locs = result.isFile() ?
1731            getFileBlockLocations(result, 0, result.getLen()) :
1732            null;
1733        return new LocatedFileStatus(result, locs);
1734      }
1735    };
1736  }
1737
1738  /**
1739   * Returns a remote iterator so that followup calls are made on demand
1740   * while consuming the entries. Each file system implementation should
1741   * override this method and provide a more efficient implementation, if
1742   * possible. 
1743   *
1744   * @param p target path
1745   * @return remote iterator
1746   */
1747  public RemoteIterator<FileStatus> listStatusIterator(final Path p)
1748  throws FileNotFoundException, IOException {
1749    return new RemoteIterator<FileStatus>() {
1750      private final FileStatus[] stats = listStatus(p);
1751      private int i = 0;
1752
1753      @Override
1754      public boolean hasNext() {
1755        return i<stats.length;
1756      }
1757
1758      @Override
1759      public FileStatus next() throws IOException {
1760        if (!hasNext()) {
1761          throw new NoSuchElementException("No more entry in " + p);
1762        }
1763        return stats[i++];
1764      }
1765    };
1766  }
1767
1768  /**
1769   * List the statuses and block locations of the files in the given path.
1770   * 
1771   * If the path is a directory, 
1772   *   if recursive is false, returns files in the directory;
1773   *   if recursive is true, return files in the subtree rooted at the path.
1774   * If the path is a file, return the file's status and block locations.
1775   * 
1776   * @param f is the path
1777   * @param recursive if the subdirectories need to be traversed recursively
1778   *
1779   * @return an iterator that traverses statuses of the files
1780   *
1781   * @throws FileNotFoundException when the path does not exist;
1782   *         IOException see specific implementation
1783   */
1784  public RemoteIterator<LocatedFileStatus> listFiles(
1785      final Path f, final boolean recursive)
1786  throws FileNotFoundException, IOException {
1787    return new RemoteIterator<LocatedFileStatus>() {
1788      private Stack<RemoteIterator<LocatedFileStatus>> itors = 
1789        new Stack<RemoteIterator<LocatedFileStatus>>();
1790      private RemoteIterator<LocatedFileStatus> curItor =
1791        listLocatedStatus(f);
1792      private LocatedFileStatus curFile;
1793     
1794      @Override
1795      public boolean hasNext() throws IOException {
1796        while (curFile == null) {
1797          if (curItor.hasNext()) {
1798            handleFileStat(curItor.next());
1799          } else if (!itors.empty()) {
1800            curItor = itors.pop();
1801          } else {
1802            return false;
1803          }
1804        }
1805        return true;
1806      }
1807
1808      /**
1809       * Process the input stat.
1810       * If it is a file, return the file stat.
1811       * If it is a directory, traverse the directory if recursive is true;
1812       * ignore it if recursive is false.
1813       * @param stat input status
1814       * @throws IOException if any IO error occurs
1815       */
1816      private void handleFileStat(LocatedFileStatus stat) throws IOException {
1817        if (stat.isFile()) { // file
1818          curFile = stat;
1819        } else if (recursive) { // directory
1820          itors.push(curItor);
1821          curItor = listLocatedStatus(stat.getPath());
1822        }
1823      }
1824
1825      @Override
1826      public LocatedFileStatus next() throws IOException {
1827        if (hasNext()) {
1828          LocatedFileStatus result = curFile;
1829          curFile = null;
1830          return result;
1831        } 
1832        throw new java.util.NoSuchElementException("No more entry in " + f);
1833      }
1834    };
1835  }
1836  
1837  /** Return the current user's home directory in this filesystem.
1838   * The default implementation returns "/user/$USER/".
1839   */
1840  public Path getHomeDirectory() {
1841    return this.makeQualified(
1842        new Path("/user/"+System.getProperty("user.name")));
1843  }
1844
1845
1846  /**
1847   * Set the current working directory for the given file system. All relative
1848   * paths will be resolved relative to it.
1849   * 
1850   * @param new_dir
1851   */
1852  public abstract void setWorkingDirectory(Path new_dir);
1853    
1854  /**
1855   * Get the current working directory for the given file system
1856   * @return the directory pathname
1857   */
1858  public abstract Path getWorkingDirectory();
1859  
1860  
1861  /**
1862   * Note: with the new FilesContext class, getWorkingDirectory()
1863   * will be removed. 
1864   * The working directory is implemented in FilesContext.
1865   * 
1866   * Some file systems like LocalFileSystem have an initial workingDir
1867   * that we use as the starting workingDir. For other file systems
1868   * like HDFS there is no built in notion of an initial workingDir.
1869   * 
1870   * @return if there is built in notion of workingDir then it
1871   * is returned; else a null is returned.
1872   */
1873  protected Path getInitialWorkingDirectory() {
1874    return null;
1875  }
1876
1877  /**
1878   * Call {@link #mkdirs(Path, FsPermission)} with default permission.
1879   */
1880  public boolean mkdirs(Path f) throws IOException {
1881    return mkdirs(f, FsPermission.getDirDefault());
1882  }
1883
1884  /**
1885   * Make the given file and all non-existent parents into
1886   * directories. Has the semantics of Unix 'mkdir -p'.
1887   * Existence of the directory hierarchy is not an error.
1888   * @param f path to create
1889   * @param permission to apply to f
1890   */
1891  public abstract boolean mkdirs(Path f, FsPermission permission
1892      ) throws IOException;
1893
1894  /**
1895   * The src file is on the local disk.  Add it to FS at
1896   * the given dst name and the source is kept intact afterwards
1897   * @param src path
1898   * @param dst path
1899   */
1900  public void copyFromLocalFile(Path src, Path dst)
1901    throws IOException {
1902    copyFromLocalFile(false, src, dst);
1903  }
1904
1905  /**
1906   * The src files is on the local disk.  Add it to FS at
1907   * the given dst name, removing the source afterwards.
1908   * @param srcs path
1909   * @param dst path
1910   */
1911  public void moveFromLocalFile(Path[] srcs, Path dst)
1912    throws IOException {
1913    copyFromLocalFile(true, true, srcs, dst);
1914  }
1915
1916  /**
1917   * The src file is on the local disk.  Add it to FS at
1918   * the given dst name, removing the source afterwards.
1919   * @param src path
1920   * @param dst path
1921   */
1922  public void moveFromLocalFile(Path src, Path dst)
1923    throws IOException {
1924    copyFromLocalFile(true, src, dst);
1925  }
1926
1927  /**
1928   * The src file is on the local disk.  Add it to FS at
1929   * the given dst name.
1930   * delSrc indicates if the source should be removed
1931   * @param delSrc whether to delete the src
1932   * @param src path
1933   * @param dst path
1934   */
1935  public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
1936    throws IOException {
1937    copyFromLocalFile(delSrc, true, src, dst);
1938  }
1939  
1940  /**
1941   * The src files are on the local disk.  Add it to FS at
1942   * the given dst name.
1943   * delSrc indicates if the source should be removed
1944   * @param delSrc whether to delete the src
1945   * @param overwrite whether to overwrite an existing file
1946   * @param srcs array of paths which are source
1947   * @param dst path
1948   */
1949  public void copyFromLocalFile(boolean delSrc, boolean overwrite, 
1950                                Path[] srcs, Path dst)
1951    throws IOException {
1952    Configuration conf = getConf();
1953    FileUtil.copy(getLocal(conf), srcs, this, dst, delSrc, overwrite, conf);
1954  }
1955  
1956  /**
1957   * The src file is on the local disk.  Add it to FS at
1958   * the given dst name.
1959   * delSrc indicates if the source should be removed
1960   * @param delSrc whether to delete the src
1961   * @param overwrite whether to overwrite an existing file
1962   * @param src path
1963   * @param dst path
1964   */
1965  public void copyFromLocalFile(boolean delSrc, boolean overwrite, 
1966                                Path src, Path dst)
1967    throws IOException {
1968    Configuration conf = getConf();
1969    FileUtil.copy(getLocal(conf), src, this, dst, delSrc, overwrite, conf);
1970  }
1971    
1972  /**
1973   * The src file is under FS, and the dst is on the local disk.
1974   * Copy it from FS control to the local dst name.
1975   * @param src path
1976   * @param dst path
1977   */
1978  public void copyToLocalFile(Path src, Path dst) throws IOException {
1979    copyToLocalFile(false, src, dst);
1980  }
1981    
1982  /**
1983   * The src file is under FS, and the dst is on the local disk.
1984   * Copy it from FS control to the local dst name.
1985   * Remove the source afterwards
1986   * @param src path
1987   * @param dst path
1988   */
1989  public void moveToLocalFile(Path src, Path dst) throws IOException {
1990    copyToLocalFile(true, src, dst);
1991  }
1992
1993  /**
1994   * The src file is under FS, and the dst is on the local disk.
1995   * Copy it from FS control to the local dst name.
1996   * delSrc indicates if the src will be removed or not.
1997   * @param delSrc whether to delete the src
1998   * @param src path
1999   * @param dst path
2000   */   
2001  public void copyToLocalFile(boolean delSrc, Path src, Path dst)
2002    throws IOException {
2003    copyToLocalFile(delSrc, src, dst, false);
2004  }
2005  
2006    /**
2007   * The src file is under FS, and the dst is on the local disk. Copy it from FS
2008   * control to the local dst name. delSrc indicates if the src will be removed
2009   * or not. useRawLocalFileSystem indicates whether to use RawLocalFileSystem
2010   * as local file system or not. RawLocalFileSystem is non crc file system.So,
2011   * It will not create any crc files at local.
2012   * 
2013   * @param delSrc
2014   *          whether to delete the src
2015   * @param src
2016   *          path
2017   * @param dst
2018   *          path
2019   * @param useRawLocalFileSystem
2020   *          whether to use RawLocalFileSystem as local file system or not.
2021   * 
2022   * @throws IOException
2023   *           - if any IO error
2024   */
2025  public void copyToLocalFile(boolean delSrc, Path src, Path dst,
2026      boolean useRawLocalFileSystem) throws IOException {
2027    Configuration conf = getConf();
2028    FileSystem local = null;
2029    if (useRawLocalFileSystem) {
2030      local = getLocal(conf).getRawFileSystem();
2031    } else {
2032      local = getLocal(conf);
2033    }
2034    FileUtil.copy(this, src, local, dst, delSrc, conf);
2035  }
2036
2037  /**
2038   * Returns a local File that the user can write output to.  The caller
2039   * provides both the eventual FS target name and the local working
2040   * file.  If the FS is local, we write directly into the target.  If
2041   * the FS is remote, we write into the tmp local area.
2042   * @param fsOutputFile path of output file
2043   * @param tmpLocalFile path of local tmp file
2044   */
2045  public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
2046    throws IOException {
2047    return tmpLocalFile;
2048  }
2049
2050  /**
2051   * Called when we're all done writing to the target.  A local FS will
2052   * do nothing, because we've written to exactly the right place.  A remote
2053   * FS will copy the contents of tmpLocalFile to the correct target at
2054   * fsOutputFile.
2055   * @param fsOutputFile path of output file
2056   * @param tmpLocalFile path to local tmp file
2057   */
2058  public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
2059    throws IOException {
2060    moveFromLocalFile(tmpLocalFile, fsOutputFile);
2061  }
2062
2063  /**
2064   * No more filesystem operations are needed.  Will
2065   * release any held locks.
2066   */
2067  @Override
2068  public void close() throws IOException {
2069    // delete all files that were marked as delete-on-exit.
2070    processDeleteOnExit();
2071    CACHE.remove(this.key, this);
2072  }
2073
2074  /** Return the total size of all files in the filesystem.*/
2075  public long getUsed() throws IOException{
2076    long used = 0;
2077    FileStatus[] files = listStatus(new Path("/"));
2078    for(FileStatus file:files){
2079      used += file.getLen();
2080    }
2081    return used;
2082  }
2083  
2084  /**
2085   * Get the block size for a particular file.
2086   * @param f the filename
2087   * @return the number of bytes in a block
2088   */
2089  /** @deprecated Use getFileStatus() instead */
2090  @Deprecated
2091  public long getBlockSize(Path f) throws IOException {
2092    return getFileStatus(f).getBlockSize();
2093  }
2094
2095  /**
2096   * Return the number of bytes that large input files should be optimally
2097   * be split into to minimize i/o time.
2098   * @deprecated use {@link #getDefaultBlockSize(Path)} instead
2099   */
2100  @Deprecated
2101  public long getDefaultBlockSize() {
2102    // default to 32MB: large enough to minimize the impact of seeks
2103    return getConf().getLong("fs.local.block.size", 32 * 1024 * 1024);
2104  }
2105    
2106  /** Return the number of bytes that large input files should be optimally
2107   * be split into to minimize i/o time.  The given path will be used to
2108   * locate the actual filesystem.  The full path does not have to exist.
2109   * @param f path of file
2110   * @return the default block size for the path's filesystem
2111   */
2112  public long getDefaultBlockSize(Path f) {
2113    return getDefaultBlockSize();
2114  }
2115
2116  /**
2117   * Get the default replication.
2118   * @deprecated use {@link #getDefaultReplication(Path)} instead
2119   */
2120  @Deprecated
2121  public short getDefaultReplication() { return 1; }
2122
2123  /**
2124   * Get the default replication for a path.   The given path will be used to
2125   * locate the actual filesystem.  The full path does not have to exist.
2126   * @param path of the file
2127   * @return default replication for the path's filesystem 
2128   */
2129  public short getDefaultReplication(Path path) {
2130    return getDefaultReplication();
2131  }
2132  
2133  /**
2134   * Return a file status object that represents the path.
2135   * @param f The path we want information from
2136   * @return a FileStatus object
2137   * @throws FileNotFoundException when the path does not exist;
2138   *         IOException see specific implementation
2139   */
2140  public abstract FileStatus getFileStatus(Path f) throws IOException;
2141
2142  /**
2143   * Checks if the user can access a path.  The mode specifies which access
2144   * checks to perform.  If the requested permissions are granted, then the
2145   * method returns normally.  If access is denied, then the method throws an
2146   * {@link AccessControlException}.
2147   * <p/>
2148   * The default implementation of this method calls {@link #getFileStatus(Path)}
2149   * and checks the returned permissions against the requested permissions.
2150   * Note that the getFileStatus call will be subject to authorization checks.
2151   * Typically, this requires search (execute) permissions on each directory in
2152   * the path's prefix, but this is implementation-defined.  Any file system
2153   * that provides a richer authorization model (such as ACLs) may override the
2154   * default implementation so that it checks against that model instead.
2155   * <p>
2156   * In general, applications should avoid using this method, due to the risk of
2157   * time-of-check/time-of-use race conditions.  The permissions on a file may
2158   * change immediately after the access call returns.  Most applications should
2159   * prefer running specific file system actions as the desired user represented
2160   * by a {@link UserGroupInformation}.
2161   *
2162   * @param path Path to check
2163   * @param mode type of access to check
2164   * @throws AccessControlException if access is denied
2165   * @throws FileNotFoundException if the path does not exist
2166   * @throws IOException see specific implementation
2167   */
2168  @InterfaceAudience.LimitedPrivate({"HDFS", "Hive"})
2169  public void access(Path path, FsAction mode) throws AccessControlException,
2170      FileNotFoundException, IOException {
2171    checkAccessPermissions(this.getFileStatus(path), mode);
2172  }
2173
2174  /**
2175   * This method provides the default implementation of
2176   * {@link #access(Path, FsAction)}.
2177   *
2178   * @param stat FileStatus to check
2179   * @param mode type of access to check
2180   * @throws IOException for any error
2181   */
2182  @InterfaceAudience.Private
2183  static void checkAccessPermissions(FileStatus stat, FsAction mode)
2184      throws IOException {
2185    FsPermission perm = stat.getPermission();
2186    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
2187    String user = ugi.getShortUserName();
2188    List<String> groups = Arrays.asList(ugi.getGroupNames());
2189    if (user.equals(stat.getOwner())) {
2190      if (perm.getUserAction().implies(mode)) {
2191        return;
2192      }
2193    } else if (groups.contains(stat.getGroup())) {
2194      if (perm.getGroupAction().implies(mode)) {
2195        return;
2196      }
2197    } else {
2198      if (perm.getOtherAction().implies(mode)) {
2199        return;
2200      }
2201    }
2202    throw new AccessControlException(String.format(
2203      "Permission denied: user=%s, path=\"%s\":%s:%s:%s%s", user, stat.getPath(),
2204      stat.getOwner(), stat.getGroup(), stat.isDirectory() ? "d" : "-", perm));
2205  }
2206
2207  /**
2208   * See {@link FileContext#fixRelativePart}
2209   */
2210  protected Path fixRelativePart(Path p) {
2211    if (p.isUriPathAbsolute()) {
2212      return p;
2213    } else {
2214      return new Path(getWorkingDirectory(), p);
2215    }
2216  }
2217
2218  /**
2219   * See {@link FileContext#createSymlink(Path, Path, boolean)}
2220   */
2221  public void createSymlink(final Path target, final Path link,
2222      final boolean createParent) throws AccessControlException,
2223      FileAlreadyExistsException, FileNotFoundException,
2224      ParentNotDirectoryException, UnsupportedFileSystemException, 
2225      IOException {
2226    // Supporting filesystems should override this method
2227    throw new UnsupportedOperationException(
2228        "Filesystem does not support symlinks!");
2229  }
2230
2231  /**
2232   * See {@link FileContext#getFileLinkStatus(Path)}
2233   */
2234  public FileStatus getFileLinkStatus(final Path f)
2235      throws AccessControlException, FileNotFoundException,
2236      UnsupportedFileSystemException, IOException {
2237    // Supporting filesystems should override this method
2238    return getFileStatus(f);
2239  }
2240
2241  /**
2242   * See {@link AbstractFileSystem#supportsSymlinks()}
2243   */
2244  public boolean supportsSymlinks() {
2245    return false;
2246  }
2247
2248  /**
2249   * See {@link FileContext#getLinkTarget(Path)}
2250   */
2251  public Path getLinkTarget(Path f) throws IOException {
2252    // Supporting filesystems should override this method
2253    throw new UnsupportedOperationException(
2254        "Filesystem does not support symlinks!");
2255  }
2256
2257  /**
2258   * See {@link AbstractFileSystem#getLinkTarget(Path)}
2259   */
2260  protected Path resolveLink(Path f) throws IOException {
2261    // Supporting filesystems should override this method
2262    throw new UnsupportedOperationException(
2263        "Filesystem does not support symlinks!");
2264  }
2265
2266  /**
2267   * Get the checksum of a file.
2268   *
2269   * @param f The file path
2270   * @return The file checksum.  The default return value is null,
2271   *  which indicates that no checksum algorithm is implemented
2272   *  in the corresponding FileSystem.
2273   */
2274  public FileChecksum getFileChecksum(Path f) throws IOException {
2275    return getFileChecksum(f, Long.MAX_VALUE);
2276  }
2277
2278  /**
2279   * Get the checksum of a file, from the beginning of the file till the
2280   * specific length.
2281   * @param f The file path
2282   * @param length The length of the file range for checksum calculation
2283   * @return The file checksum.
2284   */
2285  public FileChecksum getFileChecksum(Path f, final long length)
2286      throws IOException {
2287    return null;
2288  }
2289
2290  /**
2291   * Set the verify checksum flag. This is only applicable if the 
2292   * corresponding FileSystem supports checksum. By default doesn't do anything.
2293   * @param verifyChecksum
2294   */
2295  public void setVerifyChecksum(boolean verifyChecksum) {
2296    //doesn't do anything
2297  }
2298
2299  /**
2300   * Set the write checksum flag. This is only applicable if the 
2301   * corresponding FileSystem supports checksum. By default doesn't do anything.
2302   * @param writeChecksum
2303   */
2304  public void setWriteChecksum(boolean writeChecksum) {
2305    //doesn't do anything
2306  }
2307
2308  /**
2309   * Returns a status object describing the use and capacity of the
2310   * file system. If the file system has multiple partitions, the
2311   * use and capacity of the root partition is reflected.
2312   * 
2313   * @return a FsStatus object
2314   * @throws IOException
2315   *           see specific implementation
2316   */
2317  public FsStatus getStatus() throws IOException {
2318    return getStatus(null);
2319  }
2320
2321  /**
2322   * Returns a status object describing the use and capacity of the
2323   * file system. If the file system has multiple partitions, the
2324   * use and capacity of the partition pointed to by the specified
2325   * path is reflected.
2326   * @param p Path for which status should be obtained. null means
2327   * the default partition. 
2328   * @return a FsStatus object
2329   * @throws IOException
2330   *           see specific implementation
2331   */
2332  public FsStatus getStatus(Path p) throws IOException {
2333    return new FsStatus(Long.MAX_VALUE, 0, Long.MAX_VALUE);
2334  }
2335
2336  /**
2337   * Set permission of a path.
2338   * @param p
2339   * @param permission
2340   */
2341  public void setPermission(Path p, FsPermission permission
2342      ) throws IOException {
2343  }
2344
2345  /**
2346   * Set owner of a path (i.e. a file or a directory).
2347   * The parameters username and groupname cannot both be null.
2348   * @param p The path
2349   * @param username If it is null, the original username remains unchanged.
2350   * @param groupname If it is null, the original groupname remains unchanged.
2351   */
2352  public void setOwner(Path p, String username, String groupname
2353      ) throws IOException {
2354  }
2355
2356  /**
2357   * Set access time of a file
2358   * @param p The path
2359   * @param mtime Set the modification time of this file.
2360   *              The number of milliseconds since Jan 1, 1970. 
2361   *              A value of -1 means that this call should not set modification time.
2362   * @param atime Set the access time of this file.
2363   *              The number of milliseconds since Jan 1, 1970. 
2364   *              A value of -1 means that this call should not set access time.
2365   */
2366  public void setTimes(Path p, long mtime, long atime
2367      ) throws IOException {
2368  }
2369
2370  /**
2371   * Create a snapshot with a default name.
2372   * @param path The directory where snapshots will be taken.
2373   * @return the snapshot path.
2374   */
2375  public final Path createSnapshot(Path path) throws IOException {
2376    return createSnapshot(path, null);
2377  }
2378
2379  /**
2380   * Create a snapshot
2381   * @param path The directory where snapshots will be taken.
2382   * @param snapshotName The name of the snapshot
2383   * @return the snapshot path.
2384   */
2385  public Path createSnapshot(Path path, String snapshotName)
2386      throws IOException {
2387    throw new UnsupportedOperationException(getClass().getSimpleName()
2388        + " doesn't support createSnapshot");
2389  }
2390  
2391  /**
2392   * Rename a snapshot
2393   * @param path The directory path where the snapshot was taken
2394   * @param snapshotOldName Old name of the snapshot
2395   * @param snapshotNewName New name of the snapshot
2396   * @throws IOException
2397   */
2398  public void renameSnapshot(Path path, String snapshotOldName,
2399      String snapshotNewName) throws IOException {
2400    throw new UnsupportedOperationException(getClass().getSimpleName()
2401        + " doesn't support renameSnapshot");
2402  }
2403  
2404  /**
2405   * Delete a snapshot of a directory
2406   * @param path  The directory that the to-be-deleted snapshot belongs to
2407   * @param snapshotName The name of the snapshot
2408   */
2409  public void deleteSnapshot(Path path, String snapshotName)
2410      throws IOException {
2411    throw new UnsupportedOperationException(getClass().getSimpleName()
2412        + " doesn't support deleteSnapshot");
2413  }
2414  
2415  /**
2416   * Modifies ACL entries of files and directories.  This method can add new ACL
2417   * entries or modify the permissions on existing ACL entries.  All existing
2418   * ACL entries that are not specified in this call are retained without
2419   * changes.  (Modifications are merged into the current ACL.)
2420   *
2421   * @param path Path to modify
2422   * @param aclSpec List<AclEntry> describing modifications
2423   * @throws IOException if an ACL could not be modified
2424   */
2425  public void modifyAclEntries(Path path, List<AclEntry> aclSpec)
2426      throws IOException {
2427    throw new UnsupportedOperationException(getClass().getSimpleName()
2428        + " doesn't support modifyAclEntries");
2429  }
2430
2431  /**
2432   * Removes ACL entries from files and directories.  Other ACL entries are
2433   * retained.
2434   *
2435   * @param path Path to modify
2436   * @param aclSpec List<AclEntry> describing entries to remove
2437   * @throws IOException if an ACL could not be modified
2438   */
2439  public void removeAclEntries(Path path, List<AclEntry> aclSpec)
2440      throws IOException {
2441    throw new UnsupportedOperationException(getClass().getSimpleName()
2442        + " doesn't support removeAclEntries");
2443  }
2444
2445  /**
2446   * Removes all default ACL entries from files and directories.
2447   *
2448   * @param path Path to modify
2449   * @throws IOException if an ACL could not be modified
2450   */
2451  public void removeDefaultAcl(Path path)
2452      throws IOException {
2453    throw new UnsupportedOperationException(getClass().getSimpleName()
2454        + " doesn't support removeDefaultAcl");
2455  }
2456
2457  /**
2458   * Removes all but the base ACL entries of files and directories.  The entries
2459   * for user, group, and others are retained for compatibility with permission
2460   * bits.
2461   *
2462   * @param path Path to modify
2463   * @throws IOException if an ACL could not be removed
2464   */
2465  public void removeAcl(Path path)
2466      throws IOException {
2467    throw new UnsupportedOperationException(getClass().getSimpleName()
2468        + " doesn't support removeAcl");
2469  }
2470
2471  /**
2472   * Fully replaces ACL of files and directories, discarding all existing
2473   * entries.
2474   *
2475   * @param path Path to modify
2476   * @param aclSpec List<AclEntry> describing modifications, must include entries
2477   *   for user, group, and others for compatibility with permission bits.
2478   * @throws IOException if an ACL could not be modified
2479   */
2480  public void setAcl(Path path, List<AclEntry> aclSpec) throws IOException {
2481    throw new UnsupportedOperationException(getClass().getSimpleName()
2482        + " doesn't support setAcl");
2483  }
2484
2485  /**
2486   * Gets the ACL of a file or directory.
2487   *
2488   * @param path Path to get
2489   * @return AclStatus describing the ACL of the file or directory
2490   * @throws IOException if an ACL could not be read
2491   */
2492  public AclStatus getAclStatus(Path path) throws IOException {
2493    throw new UnsupportedOperationException(getClass().getSimpleName()
2494        + " doesn't support getAclStatus");
2495  }
2496
2497  /**
2498   * Set an xattr of a file or directory.
2499   * The name must be prefixed with the namespace followed by ".". For example,
2500   * "user.attr".
2501   * <p/>
2502   * Refer to the HDFS extended attributes user documentation for details.
2503   *
2504   * @param path Path to modify
2505   * @param name xattr name.
2506   * @param value xattr value.
2507   * @throws IOException
2508   */
2509  public void setXAttr(Path path, String name, byte[] value)
2510      throws IOException {
2511    setXAttr(path, name, value, EnumSet.of(XAttrSetFlag.CREATE,
2512        XAttrSetFlag.REPLACE));
2513  }
2514
2515  /**
2516   * Set an xattr of a file or directory.
2517   * The name must be prefixed with the namespace followed by ".". For example,
2518   * "user.attr".
2519   * <p/>
2520   * Refer to the HDFS extended attributes user documentation for details.
2521   *
2522   * @param path Path to modify
2523   * @param name xattr name.
2524   * @param value xattr value.
2525   * @param flag xattr set flag
2526   * @throws IOException
2527   */
2528  public void setXAttr(Path path, String name, byte[] value,
2529      EnumSet<XAttrSetFlag> flag) throws IOException {
2530    throw new UnsupportedOperationException(getClass().getSimpleName()
2531        + " doesn't support setXAttr");
2532  }
2533
2534  /**
2535   * Get an xattr name and value for a file or directory.
2536   * The name must be prefixed with the namespace followed by ".". For example,
2537   * "user.attr".
2538   * <p/>
2539   * Refer to the HDFS extended attributes user documentation for details.
2540   *
2541   * @param path Path to get extended attribute
2542   * @param name xattr name.
2543   * @return byte[] xattr value.
2544   * @throws IOException
2545   */
2546  public byte[] getXAttr(Path path, String name) throws IOException {
2547    throw new UnsupportedOperationException(getClass().getSimpleName()
2548        + " doesn't support getXAttr");
2549  }
2550
2551  /**
2552   * Get all of the xattr name/value pairs for a file or directory.
2553   * Only those xattrs which the logged-in user has permissions to view
2554   * are returned.
2555   * <p/>
2556   * Refer to the HDFS extended attributes user documentation for details.
2557   *
2558   * @param path Path to get extended attributes
2559   * @return Map<String, byte[]> describing the XAttrs of the file or directory
2560   * @throws IOException
2561   */
2562  public Map<String, byte[]> getXAttrs(Path path) throws IOException {
2563    throw new UnsupportedOperationException(getClass().getSimpleName()
2564        + " doesn't support getXAttrs");
2565  }
2566
2567  /**
2568   * Get all of the xattrs name/value pairs for a file or directory.
2569   * Only those xattrs which the logged-in user has permissions to view
2570   * are returned.
2571   * <p/>
2572   * Refer to the HDFS extended attributes user documentation for details.
2573   *
2574   * @param path Path to get extended attributes
2575   * @param names XAttr names.
2576   * @return Map<String, byte[]> describing the XAttrs of the file or directory
2577   * @throws IOException
2578   */
2579  public Map<String, byte[]> getXAttrs(Path path, List<String> names)
2580      throws IOException {
2581    throw new UnsupportedOperationException(getClass().getSimpleName()
2582        + " doesn't support getXAttrs");
2583  }
2584
2585  /**
2586   * Get all of the xattr names for a file or directory.
2587   * Only those xattr names which the logged-in user has permissions to view
2588   * are returned.
2589   * <p/>
2590   * Refer to the HDFS extended attributes user documentation for details.
2591   *
2592   * @param path Path to get extended attributes
2593   * @return List<String> of the XAttr names of the file or directory
2594   * @throws IOException
2595   */
2596  public List<String> listXAttrs(Path path) throws IOException {
2597    throw new UnsupportedOperationException(getClass().getSimpleName()
2598            + " doesn't support listXAttrs");
2599  }
2600
2601  /**
2602   * Remove an xattr of a file or directory.
2603   * The name must be prefixed with the namespace followed by ".". For example,
2604   * "user.attr".
2605   * <p/>
2606   * Refer to the HDFS extended attributes user documentation for details.
2607   *
2608   * @param path Path to remove extended attribute
2609   * @param name xattr name
2610   * @throws IOException
2611   */
2612  public void removeXAttr(Path path, String name) throws IOException {
2613    throw new UnsupportedOperationException(getClass().getSimpleName()
2614        + " doesn't support removeXAttr");
2615  }
2616
2617  // making it volatile to be able to do a double checked locking
2618  private volatile static boolean FILE_SYSTEMS_LOADED = false;
2619
2620  private static final Map<String, Class<? extends FileSystem>>
2621    SERVICE_FILE_SYSTEMS = new HashMap<String, Class<? extends FileSystem>>();
2622
2623  private static void loadFileSystems() {
2624    synchronized (FileSystem.class) {
2625      if (!FILE_SYSTEMS_LOADED) {
2626        ServiceLoader<FileSystem> serviceLoader = ServiceLoader.load(FileSystem.class);
2627        Iterator<FileSystem> it = serviceLoader.iterator();
2628        while (it.hasNext()) {
2629          FileSystem fs = null;
2630          try {
2631            fs = it.next();
2632            try {
2633              SERVICE_FILE_SYSTEMS.put(fs.getScheme(), fs.getClass());
2634            } catch (Exception e) {
2635              LOG.warn("Cannot load: " + fs + " from " +
2636                  ClassUtil.findContainingJar(fs.getClass()), e);
2637            }
2638          } catch (ServiceConfigurationError ee) {
2639            LOG.warn("Cannot load filesystem", ee);
2640          }
2641        }
2642        FILE_SYSTEMS_LOADED = true;
2643      }
2644    }
2645  }
2646
2647  public static Class<? extends FileSystem> getFileSystemClass(String scheme,
2648      Configuration conf) throws IOException {
2649    if (!FILE_SYSTEMS_LOADED) {
2650      loadFileSystems();
2651    }
2652    Class<? extends FileSystem> clazz = null;
2653    if (conf != null) {
2654      clazz = (Class<? extends FileSystem>) conf.getClass("fs." + scheme + ".impl", null);
2655    }
2656    if (clazz == null) {
2657      clazz = SERVICE_FILE_SYSTEMS.get(scheme);
2658    }
2659    if (clazz == null) {
2660      throw new IOException("No FileSystem for scheme: " + scheme);
2661    }
2662    return clazz;
2663  }
2664
2665  private static FileSystem createFileSystem(URI uri, Configuration conf
2666      ) throws IOException {
2667    Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);
2668    FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
2669    fs.initialize(uri, conf);
2670    return fs;
2671  }
2672
2673  /** Caching FileSystem objects */
2674  static class Cache {
2675    private final ClientFinalizer clientFinalizer = new ClientFinalizer();
2676
2677    private final Map<Key, FileSystem> map = new HashMap<Key, FileSystem>();
2678    private final Set<Key> toAutoClose = new HashSet<Key>();
2679
2680    /** A variable that makes all objects in the cache unique */
2681    private static AtomicLong unique = new AtomicLong(1);
2682
2683    FileSystem get(URI uri, Configuration conf) throws IOException{
2684      Key key = new Key(uri, conf);
2685      return getInternal(uri, conf, key);
2686    }
2687
2688    /** The objects inserted into the cache using this method are all unique */
2689    FileSystem getUnique(URI uri, Configuration conf) throws IOException{
2690      Key key = new Key(uri, conf, unique.getAndIncrement());
2691      return getInternal(uri, conf, key);
2692    }
2693
2694    private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{
2695      FileSystem fs;
2696      synchronized (this) {
2697        fs = map.get(key);
2698      }
2699      if (fs != null) {
2700        return fs;
2701      }
2702
2703      fs = createFileSystem(uri, conf);
2704      synchronized (this) { // refetch the lock again
2705        FileSystem oldfs = map.get(key);
2706        if (oldfs != null) { // a file system is created while lock is releasing
2707          fs.close(); // close the new file system
2708          return oldfs;  // return the old file system
2709        }
2710        
2711        // now insert the new file system into the map
2712        if (map.isEmpty()
2713                && !ShutdownHookManager.get().isShutdownInProgress()) {
2714          ShutdownHookManager.get().addShutdownHook(clientFinalizer, SHUTDOWN_HOOK_PRIORITY);
2715        }
2716        fs.key = key;
2717        map.put(key, fs);
2718        if (conf.getBoolean("fs.automatic.close", true)) {
2719          toAutoClose.add(key);
2720        }
2721        return fs;
2722      }
2723    }
2724
2725    synchronized void remove(Key key, FileSystem fs) {
2726      if (map.containsKey(key) && fs == map.get(key)) {
2727        map.remove(key);
2728        toAutoClose.remove(key);
2729        }
2730    }
2731
2732    synchronized void closeAll() throws IOException {
2733      closeAll(false);
2734    }
2735
2736    /**
2737     * Close all FileSystem instances in the Cache.
2738     * @param onlyAutomatic only close those that are marked for automatic closing
2739     */
2740    synchronized void closeAll(boolean onlyAutomatic) throws IOException {
2741      List<IOException> exceptions = new ArrayList<IOException>();
2742
2743      // Make a copy of the keys in the map since we'll be modifying
2744      // the map while iterating over it, which isn't safe.
2745      List<Key> keys = new ArrayList<Key>();
2746      keys.addAll(map.keySet());
2747
2748      for (Key key : keys) {
2749        final FileSystem fs = map.get(key);
2750
2751        if (onlyAutomatic && !toAutoClose.contains(key)) {
2752          continue;
2753        }
2754
2755        //remove from cache
2756        remove(key, fs);
2757
2758        if (fs != null) {
2759          try {
2760            fs.close();
2761          }
2762          catch(IOException ioe) {
2763            exceptions.add(ioe);
2764          }
2765        }
2766      }
2767
2768      if (!exceptions.isEmpty()) {
2769        throw MultipleIOException.createIOException(exceptions);
2770      }
2771    }
2772
2773    private class ClientFinalizer implements Runnable {
2774      @Override
2775      public synchronized void run() {
2776        try {
2777          closeAll(true);
2778        } catch (IOException e) {
2779          LOG.info("FileSystem.Cache.closeAll() threw an exception:\n" + e);
2780        }
2781      }
2782    }
2783
2784    synchronized void closeAll(UserGroupInformation ugi) throws IOException {
2785      List<FileSystem> targetFSList = new ArrayList<FileSystem>();
2786      //Make a pass over the list and collect the filesystems to close
2787      //we cannot close inline since close() removes the entry from the Map
2788      for (Map.Entry<Key, FileSystem> entry : map.entrySet()) {
2789        final Key key = entry.getKey();
2790        final FileSystem fs = entry.getValue();
2791        if (ugi.equals(key.ugi) && fs != null) {
2792          targetFSList.add(fs);   
2793        }
2794      }
2795      List<IOException> exceptions = new ArrayList<IOException>();
2796      //now make a pass over the target list and close each
2797      for (FileSystem fs : targetFSList) {
2798        try {
2799          fs.close();
2800        }
2801        catch(IOException ioe) {
2802          exceptions.add(ioe);
2803        }
2804      }
2805      if (!exceptions.isEmpty()) {
2806        throw MultipleIOException.createIOException(exceptions);
2807      }
2808    }
2809
2810    /** FileSystem.Cache.Key */
2811    static class Key {
2812      final String scheme;
2813      final String authority;
2814      final UserGroupInformation ugi;
2815      final long unique;   // an artificial way to make a key unique
2816
2817      Key(URI uri, Configuration conf) throws IOException {
2818        this(uri, conf, 0);
2819      }
2820
2821      Key(URI uri, Configuration conf, long unique) throws IOException {
2822        scheme = uri.getScheme()==null ?
2823            "" : StringUtils.toLowerCase(uri.getScheme());
2824        authority = uri.getAuthority()==null ?
2825            "" : StringUtils.toLowerCase(uri.getAuthority());
2826        this.unique = unique;
2827        
2828        this.ugi = UserGroupInformation.getCurrentUser();
2829      }
2830
2831      @Override
2832      public int hashCode() {
2833        return (scheme + authority).hashCode() + ugi.hashCode() + (int)unique;
2834      }
2835
2836      static boolean isEqual(Object a, Object b) {
2837        return a == b || (a != null && a.equals(b));        
2838      }
2839
2840      @Override
2841      public boolean equals(Object obj) {
2842        if (obj == this) {
2843          return true;
2844        }
2845        if (obj != null && obj instanceof Key) {
2846          Key that = (Key)obj;
2847          return isEqual(this.scheme, that.scheme)
2848                 && isEqual(this.authority, that.authority)
2849                 && isEqual(this.ugi, that.ugi)
2850                 && (this.unique == that.unique);
2851        }
2852        return false;        
2853      }
2854
2855      @Override
2856      public String toString() {
2857        return "("+ugi.toString() + ")@" + scheme + "://" + authority;        
2858      }
2859    }
2860  }
2861  
2862  /**
2863   * Tracks statistics about how many reads, writes, and so forth have been
2864   * done in a FileSystem.
2865   * 
2866   * Since there is only one of these objects per FileSystem, there will 
2867   * typically be many threads writing to this object.  Almost every operation
2868   * on an open file will involve a write to this object.  In contrast, reading
2869   * statistics is done infrequently by most programs, and not at all by others.
2870   * Hence, this is optimized for writes.
2871   * 
2872   * Each thread writes to its own thread-local area of memory.  This removes 
2873   * contention and allows us to scale up to many, many threads.  To read
2874   * statistics, the reader thread totals up the contents of all of the 
2875   * thread-local data areas.
2876   */
2877  public static final class Statistics {
2878    /**
2879     * Statistics data.
2880     * 
2881     * There is only a single writer to thread-local StatisticsData objects.
2882     * Hence, volatile is adequate here-- we do not need AtomicLong or similar
2883     * to prevent lost updates.
2884     * The Java specification guarantees that updates to volatile longs will
2885     * be perceived as atomic with respect to other threads, which is all we
2886     * need.
2887     */
2888    public static class StatisticsData {
2889      volatile long bytesRead;
2890      volatile long bytesWritten;
2891      volatile int readOps;
2892      volatile int largeReadOps;
2893      volatile int writeOps;
2894
2895      /**
2896       * Add another StatisticsData object to this one.
2897       */
2898      void add(StatisticsData other) {
2899        this.bytesRead += other.bytesRead;
2900        this.bytesWritten += other.bytesWritten;
2901        this.readOps += other.readOps;
2902        this.largeReadOps += other.largeReadOps;
2903        this.writeOps += other.writeOps;
2904      }
2905
2906      /**
2907       * Negate the values of all statistics.
2908       */
2909      void negate() {
2910        this.bytesRead = -this.bytesRead;
2911        this.bytesWritten = -this.bytesWritten;
2912        this.readOps = -this.readOps;
2913        this.largeReadOps = -this.largeReadOps;
2914        this.writeOps = -this.writeOps;
2915      }
2916
2917      @Override
2918      public String toString() {
2919        return bytesRead + " bytes read, " + bytesWritten + " bytes written, "
2920            + readOps + " read ops, " + largeReadOps + " large read ops, "
2921            + writeOps + " write ops";
2922      }
2923      
2924      public long getBytesRead() {
2925        return bytesRead;
2926      }
2927      
2928      public long getBytesWritten() {
2929        return bytesWritten;
2930      }
2931      
2932      public int getReadOps() {
2933        return readOps;
2934      }
2935      
2936      public int getLargeReadOps() {
2937        return largeReadOps;
2938      }
2939      
2940      public int getWriteOps() {
2941        return writeOps;
2942      }
2943    }
2944
2945    private interface StatisticsAggregator<T> {
2946      void accept(StatisticsData data);
2947      T aggregate();
2948    }
2949
2950    private final String scheme;
2951
2952    /**
2953     * rootData is data that doesn't belong to any thread, but will be added
2954     * to the totals.  This is useful for making copies of Statistics objects,
2955     * and for storing data that pertains to threads that have been garbage
2956     * collected.  Protected by the Statistics lock.
2957     */
2958    private final StatisticsData rootData;
2959
2960    /**
2961     * Thread-local data.
2962     */
2963    private final ThreadLocal<StatisticsData> threadData;
2964
2965    /**
2966     * Set of all thread-local data areas.  Protected by the Statistics lock.
2967     * The references to the statistics data are kept using weak references
2968     * to the associated threads. Proper clean-up is performed by the cleaner
2969     * thread when the threads are garbage collected.
2970     */
2971    private final Set<StatisticsDataReference> allData;
2972
2973    /**
2974     * Global reference queue and a cleaner thread that manage statistics data
2975     * references from all filesystem instances.
2976     */
2977    private static final ReferenceQueue<Thread> STATS_DATA_REF_QUEUE;
2978    private static final Thread STATS_DATA_CLEANER;
2979
2980    static {
2981      STATS_DATA_REF_QUEUE = new ReferenceQueue<Thread>();
2982      // start a single daemon cleaner thread
2983      STATS_DATA_CLEANER = new Thread(new StatisticsDataReferenceCleaner());
2984      STATS_DATA_CLEANER.
2985          setName(StatisticsDataReferenceCleaner.class.getName());
2986      STATS_DATA_CLEANER.setDaemon(true);
2987      STATS_DATA_CLEANER.start();
2988    }
2989
2990    public Statistics(String scheme) {
2991      this.scheme = scheme;
2992      this.rootData = new StatisticsData();
2993      this.threadData = new ThreadLocal<StatisticsData>();
2994      this.allData = new HashSet<StatisticsDataReference>();
2995    }
2996
2997    /**
2998     * Copy constructor.
2999     * 
3000     * @param other    The input Statistics object which is cloned.
3001     */
3002    public Statistics(Statistics other) {
3003      this.scheme = other.scheme;
3004      this.rootData = new StatisticsData();
3005      other.visitAll(new StatisticsAggregator<Void>() {
3006        @Override
3007        public void accept(StatisticsData data) {
3008          rootData.add(data);
3009        }
3010
3011        public Void aggregate() {
3012          return null;
3013        }
3014      });
3015      this.threadData = new ThreadLocal<StatisticsData>();
3016      this.allData = new HashSet<StatisticsDataReference>();
3017    }
3018
3019    /**
3020     * A weak reference to a thread that also includes the data associated
3021     * with that thread. On the thread being garbage collected, it is enqueued
3022     * to the reference queue for clean-up.
3023     */
3024    private class StatisticsDataReference extends WeakReference<Thread> {
3025      private final StatisticsData data;
3026
3027      public StatisticsDataReference(StatisticsData data, Thread thread) {
3028        super(thread, STATS_DATA_REF_QUEUE);
3029        this.data = data;
3030      }
3031
3032      public StatisticsData getData() {
3033        return data;
3034      }
3035
3036      /**
3037       * Performs clean-up action when the associated thread is garbage
3038       * collected.
3039       */
3040      public void cleanUp() {
3041        // use the statistics lock for safety
3042        synchronized (Statistics.this) {
3043          /*
3044           * If the thread that created this thread-local data no longer exists,
3045           * remove the StatisticsData from our list and fold the values into
3046           * rootData.
3047           */
3048          rootData.add(data);
3049          allData.remove(this);
3050        }
3051      }
3052    }
3053
3054    /**
3055     * Background action to act on references being removed.
3056     */
3057    private static class StatisticsDataReferenceCleaner implements Runnable {
3058      @Override
3059      public void run() {
3060        while (true) {
3061          try {
3062            StatisticsDataReference ref =
3063                (StatisticsDataReference)STATS_DATA_REF_QUEUE.remove();
3064            ref.cleanUp();
3065          } catch (Throwable th) {
3066            // the cleaner thread should continue to run even if there are
3067            // exceptions, including InterruptedException
3068            LOG.warn("exception in the cleaner thread but it will continue to "
3069                + "run", th);
3070          }
3071        }
3072      }
3073    }
3074
3075    /**
3076     * Get or create the thread-local data associated with the current thread.
3077     */
3078    public StatisticsData getThreadStatistics() {
3079      StatisticsData data = threadData.get();
3080      if (data == null) {
3081        data = new StatisticsData();
3082        threadData.set(data);
3083        StatisticsDataReference ref =
3084            new StatisticsDataReference(data, Thread.currentThread());
3085        synchronized(this) {
3086          allData.add(ref);
3087        }
3088      }
3089      return data;
3090    }
3091
3092    /**
3093     * Increment the bytes read in the statistics
3094     * @param newBytes the additional bytes read
3095     */
3096    public void incrementBytesRead(long newBytes) {
3097      getThreadStatistics().bytesRead += newBytes;
3098    }
3099    
3100    /**
3101     * Increment the bytes written in the statistics
3102     * @param newBytes the additional bytes written
3103     */
3104    public void incrementBytesWritten(long newBytes) {
3105      getThreadStatistics().bytesWritten += newBytes;
3106    }
3107    
3108    /**
3109     * Increment the number of read operations
3110     * @param count number of read operations
3111     */
3112    public void incrementReadOps(int count) {
3113      getThreadStatistics().readOps += count;
3114    }
3115
3116    /**
3117     * Increment the number of large read operations
3118     * @param count number of large read operations
3119     */
3120    public void incrementLargeReadOps(int count) {
3121      getThreadStatistics().largeReadOps += count;
3122    }
3123
3124    /**
3125     * Increment the number of write operations
3126     * @param count number of write operations
3127     */
3128    public void incrementWriteOps(int count) {
3129      getThreadStatistics().writeOps += count;
3130    }
3131
3132    /**
3133     * Apply the given aggregator to all StatisticsData objects associated with
3134     * this Statistics object.
3135     *
3136     * For each StatisticsData object, we will call accept on the visitor.
3137     * Finally, at the end, we will call aggregate to get the final total. 
3138     *
3139     * @param         The visitor to use.
3140     * @return        The total.
3141     */
3142    private synchronized <T> T visitAll(StatisticsAggregator<T> visitor) {
3143      visitor.accept(rootData);
3144      for (StatisticsDataReference ref: allData) {
3145        StatisticsData data = ref.getData();
3146        visitor.accept(data);
3147      }
3148      return visitor.aggregate();
3149    }
3150
3151    /**
3152     * Get the total number of bytes read
3153     * @return the number of bytes
3154     */
3155    public long getBytesRead() {
3156      return visitAll(new StatisticsAggregator<Long>() {
3157        private long bytesRead = 0;
3158
3159        @Override
3160        public void accept(StatisticsData data) {
3161          bytesRead += data.bytesRead;
3162        }
3163
3164        public Long aggregate() {
3165          return bytesRead;
3166        }
3167      });
3168    }
3169    
3170    /**
3171     * Get the total number of bytes written
3172     * @return the number of bytes
3173     */
3174    public long getBytesWritten() {
3175      return visitAll(new StatisticsAggregator<Long>() {
3176        private long bytesWritten = 0;
3177
3178        @Override
3179        public void accept(StatisticsData data) {
3180          bytesWritten += data.bytesWritten;
3181        }
3182
3183        public Long aggregate() {
3184          return bytesWritten;
3185        }
3186      });
3187    }
3188    
3189    /**
3190     * Get the number of file system read operations such as list files
3191     * @return number of read operations
3192     */
3193    public int getReadOps() {
3194      return visitAll(new StatisticsAggregator<Integer>() {
3195        private int readOps = 0;
3196
3197        @Override
3198        public void accept(StatisticsData data) {
3199          readOps += data.readOps;
3200          readOps += data.largeReadOps;
3201        }
3202
3203        public Integer aggregate() {
3204          return readOps;
3205        }
3206      });
3207    }
3208
3209    /**
3210     * Get the number of large file system read operations such as list files
3211     * under a large directory
3212     * @return number of large read operations
3213     */
3214    public int getLargeReadOps() {
3215      return visitAll(new StatisticsAggregator<Integer>() {
3216        private int largeReadOps = 0;
3217
3218        @Override
3219        public void accept(StatisticsData data) {
3220          largeReadOps += data.largeReadOps;
3221        }
3222
3223        public Integer aggregate() {
3224          return largeReadOps;
3225        }
3226      });
3227    }
3228
3229    /**
3230     * Get the number of file system write operations such as create, append 
3231     * rename etc.
3232     * @return number of write operations
3233     */
3234    public int getWriteOps() {
3235      return visitAll(new StatisticsAggregator<Integer>() {
3236        private int writeOps = 0;
3237
3238        @Override
3239        public void accept(StatisticsData data) {
3240          writeOps += data.writeOps;
3241        }
3242
3243        public Integer aggregate() {
3244          return writeOps;
3245        }
3246      });
3247    }
3248
3249
3250    @Override
3251    public String toString() {
3252      return visitAll(new StatisticsAggregator<String>() {
3253        private StatisticsData total = new StatisticsData();
3254
3255        @Override
3256        public void accept(StatisticsData data) {
3257          total.add(data);
3258        }
3259
3260        public String aggregate() {
3261          return total.toString();
3262        }
3263      });
3264    }
3265
3266    /**
3267     * Resets all statistics to 0.
3268     *
3269     * In order to reset, we add up all the thread-local statistics data, and
3270     * set rootData to the negative of that.
3271     *
3272     * This may seem like a counterintuitive way to reset the statsitics.  Why
3273     * can't we just zero out all the thread-local data?  Well, thread-local
3274     * data can only be modified by the thread that owns it.  If we tried to
3275     * modify the thread-local data from this thread, our modification might get
3276     * interleaved with a read-modify-write operation done by the thread that
3277     * owns the data.  That would result in our update getting lost.
3278     *
3279     * The approach used here avoids this problem because it only ever reads
3280     * (not writes) the thread-local data.  Both reads and writes to rootData
3281     * are done under the lock, so we're free to modify rootData from any thread
3282     * that holds the lock.
3283     */
3284    public void reset() {
3285      visitAll(new StatisticsAggregator<Void>() {
3286        private StatisticsData total = new StatisticsData();
3287
3288        @Override
3289        public void accept(StatisticsData data) {
3290          total.add(data);
3291        }
3292
3293        public Void aggregate() {
3294          total.negate();
3295          rootData.add(total);
3296          return null;
3297        }
3298      });
3299    }
3300    
3301    /**
3302     * Get the uri scheme associated with this statistics object.
3303     * @return the schema associated with this set of statistics
3304     */
3305    public String getScheme() {
3306      return scheme;
3307    }
3308
3309    @VisibleForTesting
3310    synchronized int getAllThreadLocalDataSize() {
3311      return allData.size();
3312    }
3313  }
3314  
3315  /**
3316   * Get the Map of Statistics object indexed by URI Scheme.
3317   * @return a Map having a key as URI scheme and value as Statistics object
3318   * @deprecated use {@link #getAllStatistics} instead
3319   */
3320  @Deprecated
3321  public static synchronized Map<String, Statistics> getStatistics() {
3322    Map<String, Statistics> result = new HashMap<String, Statistics>();
3323    for(Statistics stat: statisticsTable.values()) {
3324      result.put(stat.getScheme(), stat);
3325    }
3326    return result;
3327  }
3328
3329  /**
3330   * Return the FileSystem classes that have Statistics
3331   */
3332  public static synchronized List<Statistics> getAllStatistics() {
3333    return new ArrayList<Statistics>(statisticsTable.values());
3334  }
3335  
3336  /**
3337   * Get the statistics for a particular file system
3338   * @param cls the class to lookup
3339   * @return a statistics object
3340   */
3341  public static synchronized 
3342  Statistics getStatistics(String scheme, Class<? extends FileSystem> cls) {
3343    Statistics result = statisticsTable.get(cls);
3344    if (result == null) {
3345      result = new Statistics(scheme);
3346      statisticsTable.put(cls, result);
3347    }
3348    return result;
3349  }
3350  
3351  /**
3352   * Reset all statistics for all file systems
3353   */
3354  public static synchronized void clearStatistics() {
3355    for(Statistics stat: statisticsTable.values()) {
3356      stat.reset();
3357    }
3358  }
3359
3360  /**
3361   * Print all statistics for all file systems
3362   */
3363  public static synchronized
3364  void printStatistics() throws IOException {
3365    for (Map.Entry<Class<? extends FileSystem>, Statistics> pair: 
3366            statisticsTable.entrySet()) {
3367      System.out.println("  FileSystem " + pair.getKey().getName() + 
3368                         ": " + pair.getValue());
3369    }
3370  }
3371
3372  // Symlinks are temporarily disabled - see HADOOP-10020 and HADOOP-10052
3373  private static boolean symlinksEnabled = false;
3374
3375  private static Configuration conf = null;
3376
3377  @VisibleForTesting
3378  public static boolean areSymlinksEnabled() {
3379    return symlinksEnabled;
3380  }
3381
3382  @VisibleForTesting
3383  public static void enableSymlinks() {
3384    symlinksEnabled = true;
3385  }
3386}