001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.fs;
019
020import java.io.Closeable;
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.lang.ref.WeakReference;
024import java.net.URI;
025import java.net.URISyntaxException;
026import java.security.PrivilegedExceptionAction;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.EnumSet;
030import java.util.HashMap;
031import java.util.HashSet;
032import java.util.IdentityHashMap;
033import java.util.Iterator;
034import java.util.LinkedList;
035import java.util.List;
036import java.util.Map;
037import java.util.NoSuchElementException;
038import java.util.ServiceLoader;
039import java.util.Set;
040import java.util.Stack;
041import java.util.TreeSet;
042import java.util.concurrent.atomic.AtomicLong;
043
044import org.apache.commons.logging.Log;
045import org.apache.commons.logging.LogFactory;
046import org.apache.hadoop.classification.InterfaceAudience;
047import org.apache.hadoop.classification.InterfaceStability;
048import org.apache.hadoop.conf.Configuration;
049import org.apache.hadoop.conf.Configured;
050import org.apache.hadoop.fs.Options.ChecksumOpt;
051import org.apache.hadoop.fs.Options.Rename;
052import org.apache.hadoop.fs.permission.AclEntry;
053import org.apache.hadoop.fs.permission.AclStatus;
054import org.apache.hadoop.fs.permission.FsAction;
055import org.apache.hadoop.fs.permission.FsPermission;
056import org.apache.hadoop.io.MultipleIOException;
057import org.apache.hadoop.io.Text;
058import org.apache.hadoop.net.NetUtils;
059import org.apache.hadoop.security.AccessControlException;
060import org.apache.hadoop.security.Credentials;
061import org.apache.hadoop.security.SecurityUtil;
062import org.apache.hadoop.security.UserGroupInformation;
063import org.apache.hadoop.security.token.Token;
064import org.apache.hadoop.util.DataChecksum;
065import org.apache.hadoop.util.Progressable;
066import org.apache.hadoop.util.ReflectionUtils;
067import org.apache.hadoop.util.ShutdownHookManager;
068
069import com.google.common.annotations.VisibleForTesting;
070
071/****************************************************************
072 * An abstract base class for a fairly generic filesystem.  It
073 * may be implemented as a distributed filesystem, or as a "local"
074 * one that reflects the locally-connected disk.  The local version
075 * exists for small Hadoop instances and for testing.
076 *
077 * <p>
078 *
079 * All user code that may potentially use the Hadoop Distributed
080 * File System should be written to use a FileSystem object.  The
081 * Hadoop DFS is a multi-machine system that appears as a single
082 * disk.  It's useful because of its fault tolerance and potentially
083 * very large capacity.
084 * 
085 * <p>
086 * The local implementation is {@link LocalFileSystem} and distributed
087 * implementation is DistributedFileSystem.
088 *****************************************************************/
089@InterfaceAudience.Public
090@InterfaceStability.Stable
091public abstract class FileSystem extends Configured implements Closeable {
092  public static final String FS_DEFAULT_NAME_KEY = 
093                   CommonConfigurationKeys.FS_DEFAULT_NAME_KEY;
094  public static final String DEFAULT_FS = 
095                   CommonConfigurationKeys.FS_DEFAULT_NAME_DEFAULT;
096
097  public static final Log LOG = LogFactory.getLog(FileSystem.class);
098
099  /**
100   * Priority of the FileSystem shutdown hook.
101   */
102  public static final int SHUTDOWN_HOOK_PRIORITY = 10;
103
104  /** FileSystem cache */
105  static final Cache CACHE = new Cache();
106
107  /** The key this instance is stored under in the cache. */
108  private Cache.Key key;
109
110  /** Recording statistics per a FileSystem class */
111  private static final Map<Class<? extends FileSystem>, Statistics> 
112    statisticsTable =
113      new IdentityHashMap<Class<? extends FileSystem>, Statistics>();
114  
115  /**
116   * The statistics for this file system.
117   */
118  protected Statistics statistics;
119
120  /**
121   * A cache of files that should be deleted when filsystem is closed
122   * or the JVM is exited.
123   */
124  private Set<Path> deleteOnExit = new TreeSet<Path>();
125  
126  boolean resolveSymlinks;
127  /**
128   * This method adds a file system for testing so that we can find it later. It
129   * is only for testing.
130   * @param uri the uri to store it under
131   * @param conf the configuration to store it under
132   * @param fs the file system to store
133   * @throws IOException
134   */
135  static void addFileSystemForTesting(URI uri, Configuration conf,
136      FileSystem fs) throws IOException {
137    CACHE.map.put(new Cache.Key(uri, conf), fs);
138  }
139
140  /**
141   * Get a filesystem instance based on the uri, the passed
142   * configuration and the user
143   * @param uri of the filesystem
144   * @param conf the configuration to use
145   * @param user to perform the get as
146   * @return the filesystem instance
147   * @throws IOException
148   * @throws InterruptedException
149   */
150  public static FileSystem get(final URI uri, final Configuration conf,
151        final String user) throws IOException, InterruptedException {
152    String ticketCachePath =
153      conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
154    UserGroupInformation ugi =
155        UserGroupInformation.getBestUGI(ticketCachePath, user);
156    return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
157      @Override
158      public FileSystem run() throws IOException {
159        return get(uri, conf);
160      }
161    });
162  }
163
164  /**
165   * Returns the configured filesystem implementation.
166   * @param conf the configuration to use
167   */
168  public static FileSystem get(Configuration conf) throws IOException {
169    return get(getDefaultUri(conf), conf);
170  }
171  
172  /** Get the default filesystem URI from a configuration.
173   * @param conf the configuration to use
174   * @return the uri of the default filesystem
175   */
176  public static URI getDefaultUri(Configuration conf) {
177    return URI.create(fixName(conf.get(FS_DEFAULT_NAME_KEY, DEFAULT_FS)));
178  }
179
180  /** Set the default filesystem URI in a configuration.
181   * @param conf the configuration to alter
182   * @param uri the new default filesystem uri
183   */
184  public static void setDefaultUri(Configuration conf, URI uri) {
185    conf.set(FS_DEFAULT_NAME_KEY, uri.toString());
186  }
187
188  /** Set the default filesystem URI in a configuration.
189   * @param conf the configuration to alter
190   * @param uri the new default filesystem uri
191   */
192  public static void setDefaultUri(Configuration conf, String uri) {
193    setDefaultUri(conf, URI.create(fixName(uri)));
194  }
195
196  /** Called after a new FileSystem instance is constructed.
197   * @param name a uri whose authority section names the host, port, etc.
198   *   for this FileSystem
199   * @param conf the configuration
200   */
201  public void initialize(URI name, Configuration conf) throws IOException {
202    statistics = getStatistics(name.getScheme(), getClass());    
203    resolveSymlinks = conf.getBoolean(
204        CommonConfigurationKeys.FS_CLIENT_RESOLVE_REMOTE_SYMLINKS_KEY,
205        CommonConfigurationKeys.FS_CLIENT_RESOLVE_REMOTE_SYMLINKS_DEFAULT);
206  }
207
208  /**
209   * Return the protocol scheme for the FileSystem.
210   * <p/>
211   * This implementation throws an <code>UnsupportedOperationException</code>.
212   *
213   * @return the protocol scheme for the FileSystem.
214   */
215  public String getScheme() {
216    throw new UnsupportedOperationException("Not implemented by the " + getClass().getSimpleName() + " FileSystem implementation");
217  }
218
219  /** Returns a URI whose scheme and authority identify this FileSystem.*/
220  public abstract URI getUri();
221  
222  /**
223   * Return a canonicalized form of this FileSystem's URI.
224   * 
225   * The default implementation simply calls {@link #canonicalizeUri(URI)}
226   * on the filesystem's own URI, so subclasses typically only need to
227   * implement that method.
228   *
229   * @see #canonicalizeUri(URI)
230   */
231  protected URI getCanonicalUri() {
232    return canonicalizeUri(getUri());
233  }
234  
235  /**
236   * Canonicalize the given URI.
237   * 
238   * This is filesystem-dependent, but may for example consist of
239   * canonicalizing the hostname using DNS and adding the default
240   * port if not specified.
241   * 
242   * The default implementation simply fills in the default port if
243   * not specified and if the filesystem has a default port.
244   *
245   * @return URI
246   * @see NetUtils#getCanonicalUri(URI, int)
247   */
248  protected URI canonicalizeUri(URI uri) {
249    if (uri.getPort() == -1 && getDefaultPort() > 0) {
250      // reconstruct the uri with the default port set
251      try {
252        uri = new URI(uri.getScheme(), uri.getUserInfo(),
253            uri.getHost(), getDefaultPort(),
254            uri.getPath(), uri.getQuery(), uri.getFragment());
255      } catch (URISyntaxException e) {
256        // Should never happen!
257        throw new AssertionError("Valid URI became unparseable: " +
258            uri);
259      }
260    }
261    
262    return uri;
263  }
264  
265  /**
266   * Get the default port for this file system.
267   * @return the default port or 0 if there isn't one
268   */
269  protected int getDefaultPort() {
270    return 0;
271  }
272
273  protected static FileSystem getFSofPath(final Path absOrFqPath,
274      final Configuration conf)
275      throws UnsupportedFileSystemException, IOException {
276    absOrFqPath.checkNotSchemeWithRelative();
277    absOrFqPath.checkNotRelative();
278
279    // Uses the default file system if not fully qualified
280    return get(absOrFqPath.toUri(), conf);
281  }
282
283  /**
284   * Get a canonical service name for this file system.  The token cache is
285   * the only user of the canonical service name, and uses it to lookup this
286   * filesystem's service tokens.
287   * If file system provides a token of its own then it must have a canonical
288   * name, otherwise canonical name can be null.
289   * 
290   * Default Impl: If the file system has child file systems 
291   * (such as an embedded file system) then it is assumed that the fs has no
292   * tokens of its own and hence returns a null name; otherwise a service
293   * name is built using Uri and port.
294   * 
295   * @return a service string that uniquely identifies this file system, null
296   *         if the filesystem does not implement tokens
297   * @see SecurityUtil#buildDTServiceName(URI, int) 
298   */
299  @InterfaceAudience.LimitedPrivate({ "HDFS", "MapReduce" })
300  public String getCanonicalServiceName() {
301    return (getChildFileSystems() == null)
302      ? SecurityUtil.buildDTServiceName(getUri(), getDefaultPort())
303      : null;
304  }
305
306  /** @deprecated call #getUri() instead.*/
307  @Deprecated
308  public String getName() { return getUri().toString(); }
309
310  /** @deprecated call #get(URI,Configuration) instead. */
311  @Deprecated
312  public static FileSystem getNamed(String name, Configuration conf)
313    throws IOException {
314    return get(URI.create(fixName(name)), conf);
315  }
316  
317  /** Update old-format filesystem names, for back-compatibility.  This should
318   * eventually be replaced with a checkName() method that throws an exception
319   * for old-format names. */ 
320  private static String fixName(String name) {
321    // convert old-format name to new-format name
322    if (name.equals("local")) {         // "local" is now "file:///".
323      LOG.warn("\"local\" is a deprecated filesystem name."
324               +" Use \"file:///\" instead.");
325      name = "file:///";
326    } else if (name.indexOf('/')==-1) {   // unqualified is "hdfs://"
327      LOG.warn("\""+name+"\" is a deprecated filesystem name."
328               +" Use \"hdfs://"+name+"/\" instead.");
329      name = "hdfs://"+name;
330    }
331    return name;
332  }
333
334  /**
335   * Get the local file system.
336   * @param conf the configuration to configure the file system with
337   * @return a LocalFileSystem
338   */
339  public static LocalFileSystem getLocal(Configuration conf)
340    throws IOException {
341    return (LocalFileSystem)get(LocalFileSystem.NAME, conf);
342  }
343
344  /** Returns the FileSystem for this URI's scheme and authority.  The scheme
345   * of the URI determines a configuration property name,
346   * <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
347   * The entire URI is passed to the FileSystem instance's initialize method.
348   */
349  public static FileSystem get(URI uri, Configuration conf) throws IOException {
350    String scheme = uri.getScheme();
351    String authority = uri.getAuthority();
352
353    if (scheme == null && authority == null) {     // use default FS
354      return get(conf);
355    }
356
357    if (scheme != null && authority == null) {     // no authority
358      URI defaultUri = getDefaultUri(conf);
359      if (scheme.equals(defaultUri.getScheme())    // if scheme matches default
360          && defaultUri.getAuthority() != null) {  // & default has authority
361        return get(defaultUri, conf);              // return default
362      }
363    }
364    
365    String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
366    if (conf.getBoolean(disableCacheName, false)) {
367      return createFileSystem(uri, conf);
368    }
369
370    return CACHE.get(uri, conf);
371  }
372
373  /**
374   * Returns the FileSystem for this URI's scheme and authority and the 
375   * passed user. Internally invokes {@link #newInstance(URI, Configuration)}
376   * @param uri of the filesystem
377   * @param conf the configuration to use
378   * @param user to perform the get as
379   * @return filesystem instance
380   * @throws IOException
381   * @throws InterruptedException
382   */
383  public static FileSystem newInstance(final URI uri, final Configuration conf,
384      final String user) throws IOException, InterruptedException {
385    String ticketCachePath =
386      conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
387    UserGroupInformation ugi =
388        UserGroupInformation.getBestUGI(ticketCachePath, user);
389    return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
390      @Override
391      public FileSystem run() throws IOException {
392        return newInstance(uri,conf); 
393      }
394    });
395  }
396  /** Returns the FileSystem for this URI's scheme and authority.  The scheme
397   * of the URI determines a configuration property name,
398   * <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
399   * The entire URI is passed to the FileSystem instance's initialize method.
400   * This always returns a new FileSystem object.
401   */
402  public static FileSystem newInstance(URI uri, Configuration conf) throws IOException {
403    String scheme = uri.getScheme();
404    String authority = uri.getAuthority();
405
406    if (scheme == null) {                       // no scheme: use default FS
407      return newInstance(conf);
408    }
409
410    if (authority == null) {                       // no authority
411      URI defaultUri = getDefaultUri(conf);
412      if (scheme.equals(defaultUri.getScheme())    // if scheme matches default
413          && defaultUri.getAuthority() != null) {  // & default has authority
414        return newInstance(defaultUri, conf);              // return default
415      }
416    }
417    return CACHE.getUnique(uri, conf);
418  }
419
420  /** Returns a unique configured filesystem implementation.
421   * This always returns a new FileSystem object.
422   * @param conf the configuration to use
423   */
424  public static FileSystem newInstance(Configuration conf) throws IOException {
425    return newInstance(getDefaultUri(conf), conf);
426  }
427
428  /**
429   * Get a unique local file system object
430   * @param conf the configuration to configure the file system with
431   * @return a LocalFileSystem
432   * This always returns a new FileSystem object.
433   */
434  public static LocalFileSystem newInstanceLocal(Configuration conf)
435    throws IOException {
436    return (LocalFileSystem)newInstance(LocalFileSystem.NAME, conf);
437  }
438
439  /**
440   * Close all cached filesystems. Be sure those filesystems are not
441   * used anymore.
442   * 
443   * @throws IOException
444   */
445  public static void closeAll() throws IOException {
446    CACHE.closeAll();
447  }
448
449  /**
450   * Close all cached filesystems for a given UGI. Be sure those filesystems 
451   * are not used anymore.
452   * @param ugi user group info to close
453   * @throws IOException
454   */
455  public static void closeAllForUGI(UserGroupInformation ugi) 
456  throws IOException {
457    CACHE.closeAll(ugi);
458  }
459
460  /** 
461   * Make sure that a path specifies a FileSystem.
462   * @param path to use
463   */
464  public Path makeQualified(Path path) {
465    checkPath(path);
466    return path.makeQualified(this.getUri(), this.getWorkingDirectory());
467  }
468    
469  /**
470   * Get a new delegation token for this file system.
471   * This is an internal method that should have been declared protected
472   * but wasn't historically.
473   * Callers should use {@link #addDelegationTokens(String, Credentials)}
474   * 
475   * @param renewer the account name that is allowed to renew the token.
476   * @return a new delegation token
477   * @throws IOException
478   */
479  @InterfaceAudience.Private()
480  public Token<?> getDelegationToken(String renewer) throws IOException {
481    return null;
482  }
483  
484  /**
485   * Obtain all delegation tokens used by this FileSystem that are not
486   * already present in the given Credentials.  Existing tokens will neither
487   * be verified as valid nor having the given renewer.  Missing tokens will
488   * be acquired and added to the given Credentials.
489   * 
490   * Default Impl: works for simple fs with its own token
491   * and also for an embedded fs whose tokens are those of its
492   * children file system (i.e. the embedded fs has not tokens of its
493   * own).
494   * 
495   * @param renewer the user allowed to renew the delegation tokens
496   * @param credentials cache in which to add new delegation tokens
497   * @return list of new delegation tokens
498   * @throws IOException
499   */
500  @InterfaceAudience.LimitedPrivate({ "HDFS", "MapReduce" })
501  public Token<?>[] addDelegationTokens(
502      final String renewer, Credentials credentials) throws IOException {
503    if (credentials == null) {
504      credentials = new Credentials();
505    }
506    final List<Token<?>> tokens = new ArrayList<Token<?>>();
507    collectDelegationTokens(renewer, credentials, tokens);
508    return tokens.toArray(new Token<?>[tokens.size()]);
509  }
510  
511  /**
512   * Recursively obtain the tokens for this FileSystem and all descended
513   * FileSystems as determined by getChildFileSystems().
514   * @param renewer the user allowed to renew the delegation tokens
515   * @param credentials cache in which to add the new delegation tokens
516   * @param tokens list in which to add acquired tokens
517   * @throws IOException
518   */
519  private void collectDelegationTokens(final String renewer,
520                                       final Credentials credentials,
521                                       final List<Token<?>> tokens)
522                                           throws IOException {
523    final String serviceName = getCanonicalServiceName();
524    // Collect token of the this filesystem and then of its embedded children
525    if (serviceName != null) { // fs has token, grab it
526      final Text service = new Text(serviceName);
527      Token<?> token = credentials.getToken(service);
528      if (token == null) {
529        token = getDelegationToken(renewer);
530        if (token != null) {
531          tokens.add(token);
532          credentials.addToken(service, token);
533        }
534      }
535    }
536    // Now collect the tokens from the children
537    final FileSystem[] children = getChildFileSystems();
538    if (children != null) {
539      for (final FileSystem fs : children) {
540        fs.collectDelegationTokens(renewer, credentials, tokens);
541      }
542    }
543  }
544
545  /**
546   * Get all the immediate child FileSystems embedded in this FileSystem.
547   * It does not recurse and get grand children.  If a FileSystem
548   * has multiple child FileSystems, then it should return a unique list
549   * of those FileSystems.  Default is to return null to signify no children.
550   * 
551   * @return FileSystems used by this FileSystem
552   */
553  @InterfaceAudience.LimitedPrivate({ "HDFS" })
554  @VisibleForTesting
555  public FileSystem[] getChildFileSystems() {
556    return null;
557  }
558  
559  /** create a file with the provided permission
560   * The permission of the file is set to be the provided permission as in
561   * setPermission, not permission&~umask
562   * 
563   * It is implemented using two RPCs. It is understood that it is inefficient,
564   * but the implementation is thread-safe. The other option is to change the
565   * value of umask in configuration to be 0, but it is not thread-safe.
566   * 
567   * @param fs file system handle
568   * @param file the name of the file to be created
569   * @param permission the permission of the file
570   * @return an output stream
571   * @throws IOException
572   */
573  public static FSDataOutputStream create(FileSystem fs,
574      Path file, FsPermission permission) throws IOException {
575    // create the file with default permission
576    FSDataOutputStream out = fs.create(file);
577    // set its permission to the supplied one
578    fs.setPermission(file, permission);
579    return out;
580  }
581
582  /** create a directory with the provided permission
583   * The permission of the directory is set to be the provided permission as in
584   * setPermission, not permission&~umask
585   * 
586   * @see #create(FileSystem, Path, FsPermission)
587   * 
588   * @param fs file system handle
589   * @param dir the name of the directory to be created
590   * @param permission the permission of the directory
591   * @return true if the directory creation succeeds; false otherwise
592   * @throws IOException
593   */
594  public static boolean mkdirs(FileSystem fs, Path dir, FsPermission permission)
595  throws IOException {
596    // create the directory using the default permission
597    boolean result = fs.mkdirs(dir);
598    // set its permission to be the supplied one
599    fs.setPermission(dir, permission);
600    return result;
601  }
602
603  ///////////////////////////////////////////////////////////////
604  // FileSystem
605  ///////////////////////////////////////////////////////////////
606
607  protected FileSystem() {
608    super(null);
609  }
610
611  /** 
612   * Check that a Path belongs to this FileSystem.
613   * @param path to check
614   */
615  protected void checkPath(Path path) {
616    URI uri = path.toUri();
617    String thatScheme = uri.getScheme();
618    if (thatScheme == null)                // fs is relative
619      return;
620    URI thisUri = getCanonicalUri();
621    String thisScheme = thisUri.getScheme();
622    //authority and scheme are not case sensitive
623    if (thisScheme.equalsIgnoreCase(thatScheme)) {// schemes match
624      String thisAuthority = thisUri.getAuthority();
625      String thatAuthority = uri.getAuthority();
626      if (thatAuthority == null &&                // path's authority is null
627          thisAuthority != null) {                // fs has an authority
628        URI defaultUri = getDefaultUri(getConf());
629        if (thisScheme.equalsIgnoreCase(defaultUri.getScheme())) {
630          uri = defaultUri; // schemes match, so use this uri instead
631        } else {
632          uri = null; // can't determine auth of the path
633        }
634      }
635      if (uri != null) {
636        // canonicalize uri before comparing with this fs
637        uri = canonicalizeUri(uri);
638        thatAuthority = uri.getAuthority();
639        if (thisAuthority == thatAuthority ||       // authorities match
640            (thisAuthority != null &&
641             thisAuthority.equalsIgnoreCase(thatAuthority)))
642          return;
643      }
644    }
645    throw new IllegalArgumentException("Wrong FS: "+path+
646                                       ", expected: "+this.getUri());
647  }
648
649  /**
650   * Return an array containing hostnames, offset and size of 
651   * portions of the given file.  For a nonexistent 
652   * file or regions, null will be returned.
653   *
654   * This call is most helpful with DFS, where it returns 
655   * hostnames of machines that contain the given file.
656   *
657   * The FileSystem will simply return an elt containing 'localhost'.
658   *
659   * @param file FilesStatus to get data from
660   * @param start offset into the given file
661   * @param len length for which to get locations for
662   */
663  public BlockLocation[] getFileBlockLocations(FileStatus file, 
664      long start, long len) throws IOException {
665    if (file == null) {
666      return null;
667    }
668
669    if (start < 0 || len < 0) {
670      throw new IllegalArgumentException("Invalid start or len parameter");
671    }
672
673    if (file.getLen() <= start) {
674      return new BlockLocation[0];
675
676    }
677    String[] name = { "localhost:50010" };
678    String[] host = { "localhost" };
679    return new BlockLocation[] {
680      new BlockLocation(name, host, 0, file.getLen()) };
681  }
682 
683
684  /**
685   * Return an array containing hostnames, offset and size of 
686   * portions of the given file.  For a nonexistent 
687   * file or regions, null will be returned.
688   *
689   * This call is most helpful with DFS, where it returns 
690   * hostnames of machines that contain the given file.
691   *
692   * The FileSystem will simply return an elt containing 'localhost'.
693   *
694   * @param p path is used to identify an FS since an FS could have
695   *          another FS that it could be delegating the call to
696   * @param start offset into the given file
697   * @param len length for which to get locations for
698   */
699  public BlockLocation[] getFileBlockLocations(Path p, 
700      long start, long len) throws IOException {
701    if (p == null) {
702      throw new NullPointerException();
703    }
704    FileStatus file = getFileStatus(p);
705    return getFileBlockLocations(file, start, len);
706  }
707  
708  /**
709   * Return a set of server default configuration values
710   * @return server default configuration values
711   * @throws IOException
712   * @deprecated use {@link #getServerDefaults(Path)} instead
713   */
714  @Deprecated
715  public FsServerDefaults getServerDefaults() throws IOException {
716    Configuration conf = getConf();
717    // CRC32 is chosen as default as it is available in all 
718    // releases that support checksum.
719    // The client trash configuration is ignored.
720    return new FsServerDefaults(getDefaultBlockSize(), 
721        conf.getInt("io.bytes.per.checksum", 512), 
722        64 * 1024, 
723        getDefaultReplication(),
724        conf.getInt("io.file.buffer.size", 4096),
725        false,
726        CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT,
727        DataChecksum.Type.CRC32);
728  }
729
730  /**
731   * Return a set of server default configuration values
732   * @param p path is used to identify an FS since an FS could have
733   *          another FS that it could be delegating the call to
734   * @return server default configuration values
735   * @throws IOException
736   */
737  public FsServerDefaults getServerDefaults(Path p) throws IOException {
738    return getServerDefaults();
739  }
740
741  /**
742   * Return the fully-qualified path of path f resolving the path
743   * through any symlinks or mount point
744   * @param p path to be resolved
745   * @return fully qualified path 
746   * @throws FileNotFoundException
747   */
748   public Path resolvePath(final Path p) throws IOException {
749     checkPath(p);
750     return getFileStatus(p).getPath();
751   }
752
753  /**
754   * Opens an FSDataInputStream at the indicated Path.
755   * @param f the file name to open
756   * @param bufferSize the size of the buffer to be used.
757   */
758  public abstract FSDataInputStream open(Path f, int bufferSize)
759    throws IOException;
760    
761  /**
762   * Opens an FSDataInputStream at the indicated Path.
763   * @param f the file to open
764   */
765  public FSDataInputStream open(Path f) throws IOException {
766    return open(f, getConf().getInt("io.file.buffer.size", 4096));
767  }
768
769  /**
770   * Create an FSDataOutputStream at the indicated Path.
771   * Files are overwritten by default.
772   * @param f the file to create
773   */
774  public FSDataOutputStream create(Path f) throws IOException {
775    return create(f, true);
776  }
777
778  /**
779   * Create an FSDataOutputStream at the indicated Path.
780   * @param f the file to create
781   * @param overwrite if a file with this name already exists, then if true,
782   *   the file will be overwritten, and if false an exception will be thrown.
783   */
784  public FSDataOutputStream create(Path f, boolean overwrite)
785      throws IOException {
786    return create(f, overwrite, 
787                  getConf().getInt("io.file.buffer.size", 4096),
788                  getDefaultReplication(f),
789                  getDefaultBlockSize(f));
790  }
791
792  /**
793   * Create an FSDataOutputStream at the indicated Path with write-progress
794   * reporting.
795   * Files are overwritten by default.
796   * @param f the file to create
797   * @param progress to report progress
798   */
799  public FSDataOutputStream create(Path f, Progressable progress) 
800      throws IOException {
801    return create(f, true, 
802                  getConf().getInt("io.file.buffer.size", 4096),
803                  getDefaultReplication(f),
804                  getDefaultBlockSize(f), progress);
805  }
806
807  /**
808   * Create an FSDataOutputStream at the indicated Path.
809   * Files are overwritten by default.
810   * @param f the file to create
811   * @param replication the replication factor
812   */
813  public FSDataOutputStream create(Path f, short replication)
814      throws IOException {
815    return create(f, true, 
816                  getConf().getInt("io.file.buffer.size", 4096),
817                  replication,
818                  getDefaultBlockSize(f));
819  }
820
821  /**
822   * Create an FSDataOutputStream at the indicated Path with write-progress
823   * reporting.
824   * Files are overwritten by default.
825   * @param f the file to create
826   * @param replication the replication factor
827   * @param progress to report progress
828   */
829  public FSDataOutputStream create(Path f, short replication, 
830      Progressable progress) throws IOException {
831    return create(f, true, 
832                  getConf().getInt(
833                      CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
834                      CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT),
835                  replication,
836                  getDefaultBlockSize(f), progress);
837  }
838
839    
840  /**
841   * Create an FSDataOutputStream at the indicated Path.
842   * @param f the file name to create
843   * @param overwrite if a file with this name already exists, then if true,
844   *   the file will be overwritten, and if false an error will be thrown.
845   * @param bufferSize the size of the buffer to be used.
846   */
847  public FSDataOutputStream create(Path f, 
848                                   boolean overwrite,
849                                   int bufferSize
850                                   ) throws IOException {
851    return create(f, overwrite, bufferSize, 
852                  getDefaultReplication(f),
853                  getDefaultBlockSize(f));
854  }
855    
856  /**
857   * Create an FSDataOutputStream at the indicated Path with write-progress
858   * reporting.
859   * @param f the path of the file to open
860   * @param overwrite if a file with this name already exists, then if true,
861   *   the file will be overwritten, and if false an error will be thrown.
862   * @param bufferSize the size of the buffer to be used.
863   */
864  public FSDataOutputStream create(Path f, 
865                                   boolean overwrite,
866                                   int bufferSize,
867                                   Progressable progress
868                                   ) throws IOException {
869    return create(f, overwrite, bufferSize, 
870                  getDefaultReplication(f),
871                  getDefaultBlockSize(f), progress);
872  }
873    
874    
875  /**
876   * Create an FSDataOutputStream at the indicated Path.
877   * @param f the file name to open
878   * @param overwrite if a file with this name already exists, then if true,
879   *   the file will be overwritten, and if false an error will be thrown.
880   * @param bufferSize the size of the buffer to be used.
881   * @param replication required block replication for the file. 
882   */
883  public FSDataOutputStream create(Path f, 
884                                   boolean overwrite,
885                                   int bufferSize,
886                                   short replication,
887                                   long blockSize
888                                   ) throws IOException {
889    return create(f, overwrite, bufferSize, replication, blockSize, null);
890  }
891
892  /**
893   * Create an FSDataOutputStream at the indicated Path with write-progress
894   * reporting.
895   * @param f the file name to open
896   * @param overwrite if a file with this name already exists, then if true,
897   *   the file will be overwritten, and if false an error will be thrown.
898   * @param bufferSize the size of the buffer to be used.
899   * @param replication required block replication for the file. 
900   */
901  public FSDataOutputStream create(Path f,
902                                            boolean overwrite,
903                                            int bufferSize,
904                                            short replication,
905                                            long blockSize,
906                                            Progressable progress
907                                            ) throws IOException {
908    return this.create(f, FsPermission.getFileDefault().applyUMask(
909        FsPermission.getUMask(getConf())), overwrite, bufferSize,
910        replication, blockSize, progress);
911  }
912
913  /**
914   * Create an FSDataOutputStream at the indicated Path with write-progress
915   * reporting.
916   * @param f the file name to open
917   * @param permission
918   * @param overwrite if a file with this name already exists, then if true,
919   *   the file will be overwritten, and if false an error will be thrown.
920   * @param bufferSize the size of the buffer to be used.
921   * @param replication required block replication for the file.
922   * @param blockSize
923   * @param progress
924   * @throws IOException
925   * @see #setPermission(Path, FsPermission)
926   */
927  public abstract FSDataOutputStream create(Path f,
928      FsPermission permission,
929      boolean overwrite,
930      int bufferSize,
931      short replication,
932      long blockSize,
933      Progressable progress) throws IOException;
934  
935  /**
936   * Create an FSDataOutputStream at the indicated Path with write-progress
937   * reporting.
938   * @param f the file name to open
939   * @param permission
940   * @param flags {@link CreateFlag}s to use for this stream.
941   * @param bufferSize the size of the buffer to be used.
942   * @param replication required block replication for the file.
943   * @param blockSize
944   * @param progress
945   * @throws IOException
946   * @see #setPermission(Path, FsPermission)
947   */
948  public FSDataOutputStream create(Path f,
949      FsPermission permission,
950      EnumSet<CreateFlag> flags,
951      int bufferSize,
952      short replication,
953      long blockSize,
954      Progressable progress) throws IOException {
955    return create(f, permission, flags, bufferSize, replication,
956        blockSize, progress, null);
957  }
958  
959  /**
960   * Create an FSDataOutputStream at the indicated Path with a custom
961   * checksum option
962   * @param f the file name to open
963   * @param permission
964   * @param flags {@link CreateFlag}s to use for this stream.
965   * @param bufferSize the size of the buffer to be used.
966   * @param replication required block replication for the file.
967   * @param blockSize
968   * @param progress
969   * @param checksumOpt checksum parameter. If null, the values
970   *        found in conf will be used.
971   * @throws IOException
972   * @see #setPermission(Path, FsPermission)
973   */
974  public FSDataOutputStream create(Path f,
975      FsPermission permission,
976      EnumSet<CreateFlag> flags,
977      int bufferSize,
978      short replication,
979      long blockSize,
980      Progressable progress,
981      ChecksumOpt checksumOpt) throws IOException {
982    // Checksum options are ignored by default. The file systems that
983    // implement checksum need to override this method. The full
984    // support is currently only available in DFS.
985    return create(f, permission, flags.contains(CreateFlag.OVERWRITE), 
986        bufferSize, replication, blockSize, progress);
987  }
988
989  /*.
990   * This create has been added to support the FileContext that processes
991   * the permission
992   * with umask before calling this method.
993   * This a temporary method added to support the transition from FileSystem
994   * to FileContext for user applications.
995   */
996  @Deprecated
997  protected FSDataOutputStream primitiveCreate(Path f,
998     FsPermission absolutePermission, EnumSet<CreateFlag> flag, int bufferSize,
999     short replication, long blockSize, Progressable progress,
1000     ChecksumOpt checksumOpt) throws IOException {
1001
1002    boolean pathExists = exists(f);
1003    CreateFlag.validate(f, pathExists, flag);
1004    
1005    // Default impl  assumes that permissions do not matter and 
1006    // nor does the bytesPerChecksum  hence
1007    // calling the regular create is good enough.
1008    // FSs that implement permissions should override this.
1009
1010    if (pathExists && flag.contains(CreateFlag.APPEND)) {
1011      return append(f, bufferSize, progress);
1012    }
1013    
1014    return this.create(f, absolutePermission,
1015        flag.contains(CreateFlag.OVERWRITE), bufferSize, replication,
1016        blockSize, progress);
1017  }
1018  
1019  /**
1020   * This version of the mkdirs method assumes that the permission is absolute.
1021   * It has been added to support the FileContext that processes the permission
1022   * with umask before calling this method.
1023   * This a temporary method added to support the transition from FileSystem
1024   * to FileContext for user applications.
1025   */
1026  @Deprecated
1027  protected boolean primitiveMkdir(Path f, FsPermission absolutePermission)
1028    throws IOException {
1029    // Default impl is to assume that permissions do not matter and hence
1030    // calling the regular mkdirs is good enough.
1031    // FSs that implement permissions should override this.
1032   return this.mkdirs(f, absolutePermission);
1033  }
1034
1035
1036  /**
1037   * This version of the mkdirs method assumes that the permission is absolute.
1038   * It has been added to support the FileContext that processes the permission
1039   * with umask before calling this method.
1040   * This a temporary method added to support the transition from FileSystem
1041   * to FileContext for user applications.
1042   */
1043  @Deprecated
1044  protected void primitiveMkdir(Path f, FsPermission absolutePermission, 
1045                    boolean createParent)
1046    throws IOException {
1047    
1048    if (!createParent) { // parent must exist.
1049      // since the this.mkdirs makes parent dirs automatically
1050      // we must throw exception if parent does not exist.
1051      final FileStatus stat = getFileStatus(f.getParent());
1052      if (stat == null) {
1053        throw new FileNotFoundException("Missing parent:" + f);
1054      }
1055      if (!stat.isDirectory()) {
1056        throw new ParentNotDirectoryException("parent is not a dir");
1057      }
1058      // parent does exist - go ahead with mkdir of leaf
1059    }
1060    // Default impl is to assume that permissions do not matter and hence
1061    // calling the regular mkdirs is good enough.
1062    // FSs that implement permissions should override this.
1063    if (!this.mkdirs(f, absolutePermission)) {
1064      throw new IOException("mkdir of "+ f + " failed");
1065    }
1066  }
1067
1068  /**
1069   * Opens an FSDataOutputStream at the indicated Path with write-progress
1070   * reporting. Same as create(), except fails if parent directory doesn't
1071   * already exist.
1072   * @param f the file name to open
1073   * @param overwrite if a file with this name already exists, then if true,
1074   * the file will be overwritten, and if false an error will be thrown.
1075   * @param bufferSize the size of the buffer to be used.
1076   * @param replication required block replication for the file.
1077   * @param blockSize
1078   * @param progress
1079   * @throws IOException
1080   * @see #setPermission(Path, FsPermission)
1081   * @deprecated API only for 0.20-append
1082   */
1083  @Deprecated
1084  public FSDataOutputStream createNonRecursive(Path f,
1085      boolean overwrite,
1086      int bufferSize, short replication, long blockSize,
1087      Progressable progress) throws IOException {
1088    return this.createNonRecursive(f, FsPermission.getFileDefault(),
1089        overwrite, bufferSize, replication, blockSize, progress);
1090  }
1091
1092  /**
1093   * Opens an FSDataOutputStream at the indicated Path with write-progress
1094   * reporting. Same as create(), except fails if parent directory doesn't
1095   * already exist.
1096   * @param f the file name to open
1097   * @param permission
1098   * @param overwrite if a file with this name already exists, then if true,
1099   * the file will be overwritten, and if false an error will be thrown.
1100   * @param bufferSize the size of the buffer to be used.
1101   * @param replication required block replication for the file.
1102   * @param blockSize
1103   * @param progress
1104   * @throws IOException
1105   * @see #setPermission(Path, FsPermission)
1106   * @deprecated API only for 0.20-append
1107   */
1108   @Deprecated
1109   public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
1110       boolean overwrite, int bufferSize, short replication, long blockSize,
1111       Progressable progress) throws IOException {
1112     return createNonRecursive(f, permission,
1113         overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
1114             : EnumSet.of(CreateFlag.CREATE), bufferSize,
1115             replication, blockSize, progress);
1116   }
1117
1118   /**
1119    * Opens an FSDataOutputStream at the indicated Path with write-progress
1120    * reporting. Same as create(), except fails if parent directory doesn't
1121    * already exist.
1122    * @param f the file name to open
1123    * @param permission
1124    * @param flags {@link CreateFlag}s to use for this stream.
1125    * @param bufferSize the size of the buffer to be used.
1126    * @param replication required block replication for the file.
1127    * @param blockSize
1128    * @param progress
1129    * @throws IOException
1130    * @see #setPermission(Path, FsPermission)
1131    * @deprecated API only for 0.20-append
1132    */
1133    @Deprecated
1134    public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
1135        EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize,
1136        Progressable progress) throws IOException {
1137      throw new IOException("createNonRecursive unsupported for this filesystem "
1138          + this.getClass());
1139    }
1140
1141  /**
1142   * Creates the given Path as a brand-new zero-length file.  If
1143   * create fails, or if it already existed, return false.
1144   *
1145   * @param f path to use for create
1146   */
1147  public boolean createNewFile(Path f) throws IOException {
1148    if (exists(f)) {
1149      return false;
1150    } else {
1151      create(f, false, getConf().getInt("io.file.buffer.size", 4096)).close();
1152      return true;
1153    }
1154  }
1155
1156  /**
1157   * Append to an existing file (optional operation).
1158   * Same as append(f, getConf().getInt("io.file.buffer.size", 4096), null)
1159   * @param f the existing file to be appended.
1160   * @throws IOException
1161   */
1162  public FSDataOutputStream append(Path f) throws IOException {
1163    return append(f, getConf().getInt("io.file.buffer.size", 4096), null);
1164  }
1165  /**
1166   * Append to an existing file (optional operation).
1167   * Same as append(f, bufferSize, null).
1168   * @param f the existing file to be appended.
1169   * @param bufferSize the size of the buffer to be used.
1170   * @throws IOException
1171   */
1172  public FSDataOutputStream append(Path f, int bufferSize) throws IOException {
1173    return append(f, bufferSize, null);
1174  }
1175
1176  /**
1177   * Append to an existing file (optional operation).
1178   * @param f the existing file to be appended.
1179   * @param bufferSize the size of the buffer to be used.
1180   * @param progress for reporting progress if it is not null.
1181   * @throws IOException
1182   */
1183  public abstract FSDataOutputStream append(Path f, int bufferSize,
1184      Progressable progress) throws IOException;
1185
1186  /**
1187   * Concat existing files together.
1188   * @param trg the path to the target destination.
1189   * @param psrcs the paths to the sources to use for the concatenation.
1190   * @throws IOException
1191   */
1192  public void concat(final Path trg, final Path [] psrcs) throws IOException {
1193    throw new UnsupportedOperationException("Not implemented by the " + 
1194        getClass().getSimpleName() + " FileSystem implementation");
1195  }
1196
1197 /**
1198   * Get replication.
1199   * 
1200   * @deprecated Use getFileStatus() instead
1201   * @param src file name
1202   * @return file replication
1203   * @throws IOException
1204   */ 
1205  @Deprecated
1206  public short getReplication(Path src) throws IOException {
1207    return getFileStatus(src).getReplication();
1208  }
1209
1210  /**
1211   * Set replication for an existing file.
1212   * 
1213   * @param src file name
1214   * @param replication new replication
1215   * @throws IOException
1216   * @return true if successful;
1217   *         false if file does not exist or is a directory
1218   */
1219  public boolean setReplication(Path src, short replication)
1220    throws IOException {
1221    return true;
1222  }
1223
1224  /**
1225   * Renames Path src to Path dst.  Can take place on local fs
1226   * or remote DFS.
1227   * @param src path to be renamed
1228   * @param dst new path after rename
1229   * @throws IOException on failure
1230   * @return true if rename is successful
1231   */
1232  public abstract boolean rename(Path src, Path dst) throws IOException;
1233
1234  /**
1235   * Renames Path src to Path dst
1236   * <ul>
1237   * <li
1238   * <li>Fails if src is a file and dst is a directory.
1239   * <li>Fails if src is a directory and dst is a file.
1240   * <li>Fails if the parent of dst does not exist or is a file.
1241   * </ul>
1242   * <p>
1243   * If OVERWRITE option is not passed as an argument, rename fails
1244   * if the dst already exists.
1245   * <p>
1246   * If OVERWRITE option is passed as an argument, rename overwrites
1247   * the dst if it is a file or an empty directory. Rename fails if dst is
1248   * a non-empty directory.
1249   * <p>
1250   * Note that atomicity of rename is dependent on the file system
1251   * implementation. Please refer to the file system documentation for
1252   * details. This default implementation is non atomic.
1253   * <p>
1254   * This method is deprecated since it is a temporary method added to 
1255   * support the transition from FileSystem to FileContext for user 
1256   * applications.
1257   * 
1258   * @param src path to be renamed
1259   * @param dst new path after rename
1260   * @throws IOException on failure
1261   */
1262  @Deprecated
1263  protected void rename(final Path src, final Path dst,
1264      final Rename... options) throws IOException {
1265    // Default implementation
1266    final FileStatus srcStatus = getFileLinkStatus(src);
1267    if (srcStatus == null) {
1268      throw new FileNotFoundException("rename source " + src + " not found.");
1269    }
1270
1271    boolean overwrite = false;
1272    if (null != options) {
1273      for (Rename option : options) {
1274        if (option == Rename.OVERWRITE) {
1275          overwrite = true;
1276        }
1277      }
1278    }
1279
1280    FileStatus dstStatus;
1281    try {
1282      dstStatus = getFileLinkStatus(dst);
1283    } catch (IOException e) {
1284      dstStatus = null;
1285    }
1286    if (dstStatus != null) {
1287      if (srcStatus.isDirectory() != dstStatus.isDirectory()) {
1288        throw new IOException("Source " + src + " Destination " + dst
1289            + " both should be either file or directory");
1290      }
1291      if (!overwrite) {
1292        throw new FileAlreadyExistsException("rename destination " + dst
1293            + " already exists.");
1294      }
1295      // Delete the destination that is a file or an empty directory
1296      if (dstStatus.isDirectory()) {
1297        FileStatus[] list = listStatus(dst);
1298        if (list != null && list.length != 0) {
1299          throw new IOException(
1300              "rename cannot overwrite non empty destination directory " + dst);
1301        }
1302      }
1303      delete(dst, false);
1304    } else {
1305      final Path parent = dst.getParent();
1306      final FileStatus parentStatus = getFileStatus(parent);
1307      if (parentStatus == null) {
1308        throw new FileNotFoundException("rename destination parent " + parent
1309            + " not found.");
1310      }
1311      if (!parentStatus.isDirectory()) {
1312        throw new ParentNotDirectoryException("rename destination parent " + parent
1313            + " is a file.");
1314      }
1315    }
1316    if (!rename(src, dst)) {
1317      throw new IOException("rename from " + src + " to " + dst + " failed.");
1318    }
1319  }
1320  
1321  /**
1322   * Delete a file 
1323   * @deprecated Use {@link #delete(Path, boolean)} instead.
1324   */
1325  @Deprecated
1326  public boolean delete(Path f) throws IOException {
1327    return delete(f, true);
1328  }
1329  
1330  /** Delete a file.
1331   *
1332   * @param f the path to delete.
1333   * @param recursive if path is a directory and set to 
1334   * true, the directory is deleted else throws an exception. In
1335   * case of a file the recursive can be set to either true or false. 
1336   * @return  true if delete is successful else false. 
1337   * @throws IOException
1338   */
1339  public abstract boolean delete(Path f, boolean recursive) throws IOException;
1340
1341  /**
1342   * Mark a path to be deleted when FileSystem is closed.
1343   * When the JVM shuts down,
1344   * all FileSystem objects will be closed automatically.
1345   * Then,
1346   * the marked path will be deleted as a result of closing the FileSystem.
1347   *
1348   * The path has to exist in the file system.
1349   * 
1350   * @param f the path to delete.
1351   * @return  true if deleteOnExit is successful, otherwise false.
1352   * @throws IOException
1353   */
1354  public boolean deleteOnExit(Path f) throws IOException {
1355    if (!exists(f)) {
1356      return false;
1357    }
1358    synchronized (deleteOnExit) {
1359      deleteOnExit.add(f);
1360    }
1361    return true;
1362  }
1363  
1364  /**
1365   * Cancel the deletion of the path when the FileSystem is closed
1366   * @param f the path to cancel deletion
1367   */
1368  public boolean cancelDeleteOnExit(Path f) {
1369    synchronized (deleteOnExit) {
1370      return deleteOnExit.remove(f);
1371    }
1372  }
1373
1374  /**
1375   * Delete all files that were marked as delete-on-exit. This recursively
1376   * deletes all files in the specified paths.
1377   */
1378  protected void processDeleteOnExit() {
1379    synchronized (deleteOnExit) {
1380      for (Iterator<Path> iter = deleteOnExit.iterator(); iter.hasNext();) {
1381        Path path = iter.next();
1382        try {
1383          if (exists(path)) {
1384            delete(path, true);
1385          }
1386        }
1387        catch (IOException e) {
1388          LOG.info("Ignoring failure to deleteOnExit for path " + path);
1389        }
1390        iter.remove();
1391      }
1392    }
1393  }
1394  
1395  /** Check if exists.
1396   * @param f source file
1397   */
1398  public boolean exists(Path f) throws IOException {
1399    try {
1400      return getFileStatus(f) != null;
1401    } catch (FileNotFoundException e) {
1402      return false;
1403    }
1404  }
1405
1406  /** True iff the named path is a directory.
1407   * Note: Avoid using this method. Instead reuse the FileStatus 
1408   * returned by getFileStatus() or listStatus() methods.
1409   * @param f path to check
1410   */
1411  public boolean isDirectory(Path f) throws IOException {
1412    try {
1413      return getFileStatus(f).isDirectory();
1414    } catch (FileNotFoundException e) {
1415      return false;               // f does not exist
1416    }
1417  }
1418
1419  /** True iff the named path is a regular file.
1420   * Note: Avoid using this method. Instead reuse the FileStatus 
1421   * returned by getFileStatus() or listStatus() methods.
1422   * @param f path to check
1423   */
1424  public boolean isFile(Path f) throws IOException {
1425    try {
1426      return getFileStatus(f).isFile();
1427    } catch (FileNotFoundException e) {
1428      return false;               // f does not exist
1429    }
1430  }
1431  
1432  /** The number of bytes in a file. */
1433  /** @deprecated Use getFileStatus() instead */
1434  @Deprecated
1435  public long getLength(Path f) throws IOException {
1436    return getFileStatus(f).getLen();
1437  }
1438    
1439  /** Return the {@link ContentSummary} of a given {@link Path}.
1440  * @param f path to use
1441  */
1442  public ContentSummary getContentSummary(Path f) throws IOException {
1443    FileStatus status = getFileStatus(f);
1444    if (status.isFile()) {
1445      // f is a file
1446      return new ContentSummary(status.getLen(), 1, 0);
1447    }
1448    // f is a directory
1449    long[] summary = {0, 0, 1};
1450    for(FileStatus s : listStatus(f)) {
1451      ContentSummary c = s.isDirectory() ? getContentSummary(s.getPath()) :
1452                                     new ContentSummary(s.getLen(), 1, 0);
1453      summary[0] += c.getLength();
1454      summary[1] += c.getFileCount();
1455      summary[2] += c.getDirectoryCount();
1456    }
1457    return new ContentSummary(summary[0], summary[1], summary[2]);
1458  }
1459
1460  final private static PathFilter DEFAULT_FILTER = new PathFilter() {
1461      @Override
1462      public boolean accept(Path file) {
1463        return true;
1464      }     
1465    };
1466    
1467  /**
1468   * List the statuses of the files/directories in the given path if the path is
1469   * a directory.
1470   * 
1471   * @param f given path
1472   * @return the statuses of the files/directories in the given patch
1473   * @throws FileNotFoundException when the path does not exist;
1474   *         IOException see specific implementation
1475   */
1476  public abstract FileStatus[] listStatus(Path f) throws FileNotFoundException, 
1477                                                         IOException;
1478    
1479  /*
1480   * Filter files/directories in the given path using the user-supplied path
1481   * filter. Results are added to the given array <code>results</code>.
1482   */
1483  private void listStatus(ArrayList<FileStatus> results, Path f,
1484      PathFilter filter) throws FileNotFoundException, IOException {
1485    FileStatus listing[] = listStatus(f);
1486    if (listing == null) {
1487      throw new IOException("Error accessing " + f);
1488    }
1489
1490    for (int i = 0; i < listing.length; i++) {
1491      if (filter.accept(listing[i].getPath())) {
1492        results.add(listing[i]);
1493      }
1494    }
1495  }
1496
1497  /**
1498   * @return an iterator over the corrupt files under the given path
1499   * (may contain duplicates if a file has more than one corrupt block)
1500   * @throws IOException
1501   */
1502  public RemoteIterator<Path> listCorruptFileBlocks(Path path)
1503    throws IOException {
1504    throw new UnsupportedOperationException(getClass().getCanonicalName() +
1505                                            " does not support" +
1506                                            " listCorruptFileBlocks");
1507  }
1508
1509  /**
1510   * Filter files/directories in the given path using the user-supplied path
1511   * filter.
1512   * 
1513   * @param f
1514   *          a path name
1515   * @param filter
1516   *          the user-supplied path filter
1517   * @return an array of FileStatus objects for the files under the given path
1518   *         after applying the filter
1519   * @throws FileNotFoundException when the path does not exist;
1520   *         IOException see specific implementation   
1521   */
1522  public FileStatus[] listStatus(Path f, PathFilter filter) 
1523                                   throws FileNotFoundException, IOException {
1524    ArrayList<FileStatus> results = new ArrayList<FileStatus>();
1525    listStatus(results, f, filter);
1526    return results.toArray(new FileStatus[results.size()]);
1527  }
1528
1529  /**
1530   * Filter files/directories in the given list of paths using default
1531   * path filter.
1532   * 
1533   * @param files
1534   *          a list of paths
1535   * @return a list of statuses for the files under the given paths after
1536   *         applying the filter default Path filter
1537   * @throws FileNotFoundException when the path does not exist;
1538   *         IOException see specific implementation
1539   */
1540  public FileStatus[] listStatus(Path[] files)
1541      throws FileNotFoundException, IOException {
1542    return listStatus(files, DEFAULT_FILTER);
1543  }
1544
1545  /**
1546   * Filter files/directories in the given list of paths using user-supplied
1547   * path filter.
1548   * 
1549   * @param files
1550   *          a list of paths
1551   * @param filter
1552   *          the user-supplied path filter
1553   * @return a list of statuses for the files under the given paths after
1554   *         applying the filter
1555   * @throws FileNotFoundException when the path does not exist;
1556   *         IOException see specific implementation
1557   */
1558  public FileStatus[] listStatus(Path[] files, PathFilter filter)
1559      throws FileNotFoundException, IOException {
1560    ArrayList<FileStatus> results = new ArrayList<FileStatus>();
1561    for (int i = 0; i < files.length; i++) {
1562      listStatus(results, files[i], filter);
1563    }
1564    return results.toArray(new FileStatus[results.size()]);
1565  }
1566
1567  /**
1568   * <p>Return all the files that match filePattern and are not checksum
1569   * files. Results are sorted by their names.
1570   * 
1571   * <p>
1572   * A filename pattern is composed of <i>regular</i> characters and
1573   * <i>special pattern matching</i> characters, which are:
1574   *
1575   * <dl>
1576   *  <dd>
1577   *   <dl>
1578   *    <p>
1579   *    <dt> <tt> ? </tt>
1580   *    <dd> Matches any single character.
1581   *
1582   *    <p>
1583   *    <dt> <tt> * </tt>
1584   *    <dd> Matches zero or more characters.
1585   *
1586   *    <p>
1587   *    <dt> <tt> [<i>abc</i>] </tt>
1588   *    <dd> Matches a single character from character set
1589   *     <tt>{<i>a,b,c</i>}</tt>.
1590   *
1591   *    <p>
1592   *    <dt> <tt> [<i>a</i>-<i>b</i>] </tt>
1593   *    <dd> Matches a single character from the character range
1594   *     <tt>{<i>a...b</i>}</tt>.  Note that character <tt><i>a</i></tt> must be
1595   *     lexicographically less than or equal to character <tt><i>b</i></tt>.
1596   *
1597   *    <p>
1598   *    <dt> <tt> [^<i>a</i>] </tt>
1599   *    <dd> Matches a single character that is not from character set or range
1600   *     <tt>{<i>a</i>}</tt>.  Note that the <tt>^</tt> character must occur
1601   *     immediately to the right of the opening bracket.
1602   *
1603   *    <p>
1604   *    <dt> <tt> \<i>c</i> </tt>
1605   *    <dd> Removes (escapes) any special meaning of character <i>c</i>.
1606   *
1607   *    <p>
1608   *    <dt> <tt> {ab,cd} </tt>
1609   *    <dd> Matches a string from the string set <tt>{<i>ab, cd</i>} </tt>
1610   *    
1611   *    <p>
1612   *    <dt> <tt> {ab,c{de,fh}} </tt>
1613   *    <dd> Matches a string from the string set <tt>{<i>ab, cde, cfh</i>}</tt>
1614   *
1615   *   </dl>
1616   *  </dd>
1617   * </dl>
1618   *
1619   * @param pathPattern a regular expression specifying a pth pattern
1620
1621   * @return an array of paths that match the path pattern
1622   * @throws IOException
1623   */
1624  public FileStatus[] globStatus(Path pathPattern) throws IOException {
1625    return new Globber(this, pathPattern, DEFAULT_FILTER).glob();
1626  }
1627  
1628  /**
1629   * Return an array of FileStatus objects whose path names match pathPattern
1630   * and is accepted by the user-supplied path filter. Results are sorted by
1631   * their path names.
1632   * Return null if pathPattern has no glob and the path does not exist.
1633   * Return an empty array if pathPattern has a glob and no path matches it. 
1634   * 
1635   * @param pathPattern
1636   *          a regular expression specifying the path pattern
1637   * @param filter
1638   *          a user-supplied path filter
1639   * @return an array of FileStatus objects
1640   * @throws IOException if any I/O error occurs when fetching file status
1641   */
1642  public FileStatus[] globStatus(Path pathPattern, PathFilter filter)
1643      throws IOException {
1644    return new Globber(this, pathPattern, filter).glob();
1645  }
1646  
1647  /**
1648   * List the statuses of the files/directories in the given path if the path is
1649   * a directory. 
1650   * Return the file's status and block locations If the path is a file.
1651   * 
1652   * If a returned status is a file, it contains the file's block locations.
1653   * 
1654   * @param f is the path
1655   *
1656   * @return an iterator that traverses statuses of the files/directories 
1657   *         in the given path
1658   *
1659   * @throws FileNotFoundException If <code>f</code> does not exist
1660   * @throws IOException If an I/O error occurred
1661   */
1662  public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f)
1663  throws FileNotFoundException, IOException {
1664    return listLocatedStatus(f, DEFAULT_FILTER);
1665  }
1666
1667  /**
1668   * Listing a directory
1669   * The returned results include its block location if it is a file
1670   * The results are filtered by the given path filter
1671   * @param f a path
1672   * @param filter a path filter
1673   * @return an iterator that traverses statuses of the files/directories 
1674   *         in the given path
1675   * @throws FileNotFoundException if <code>f</code> does not exist
1676   * @throws IOException if any I/O error occurred
1677   */
1678  protected RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f,
1679      final PathFilter filter)
1680  throws FileNotFoundException, IOException {
1681    return new RemoteIterator<LocatedFileStatus>() {
1682      private final FileStatus[] stats = listStatus(f, filter);
1683      private int i = 0;
1684
1685      @Override
1686      public boolean hasNext() {
1687        return i<stats.length;
1688      }
1689
1690      @Override
1691      public LocatedFileStatus next() throws IOException {
1692        if (!hasNext()) {
1693          throw new NoSuchElementException("No more entry in " + f);
1694        }
1695        FileStatus result = stats[i++];
1696        BlockLocation[] locs = result.isFile() ?
1697            getFileBlockLocations(result.getPath(), 0, result.getLen()) :
1698            null;
1699        return new LocatedFileStatus(result, locs);
1700      }
1701    };
1702  }
1703
1704  /**
1705   * List the statuses and block locations of the files in the given path.
1706   * 
1707   * If the path is a directory, 
1708   *   if recursive is false, returns files in the directory;
1709   *   if recursive is true, return files in the subtree rooted at the path.
1710   * If the path is a file, return the file's status and block locations.
1711   * 
1712   * @param f is the path
1713   * @param recursive if the subdirectories need to be traversed recursively
1714   *
1715   * @return an iterator that traverses statuses of the files
1716   *
1717   * @throws FileNotFoundException when the path does not exist;
1718   *         IOException see specific implementation
1719   */
1720  public RemoteIterator<LocatedFileStatus> listFiles(
1721      final Path f, final boolean recursive)
1722  throws FileNotFoundException, IOException {
1723    return new RemoteIterator<LocatedFileStatus>() {
1724      private Stack<RemoteIterator<LocatedFileStatus>> itors = 
1725        new Stack<RemoteIterator<LocatedFileStatus>>();
1726      private RemoteIterator<LocatedFileStatus> curItor =
1727        listLocatedStatus(f);
1728      private LocatedFileStatus curFile;
1729     
1730      @Override
1731      public boolean hasNext() throws IOException {
1732        while (curFile == null) {
1733          if (curItor.hasNext()) {
1734            handleFileStat(curItor.next());
1735          } else if (!itors.empty()) {
1736            curItor = itors.pop();
1737          } else {
1738            return false;
1739          }
1740        }
1741        return true;
1742      }
1743
1744      /**
1745       * Process the input stat.
1746       * If it is a file, return the file stat.
1747       * If it is a directory, traverse the directory if recursive is true;
1748       * ignore it if recursive is false.
1749       * @param stat input status
1750       * @throws IOException if any IO error occurs
1751       */
1752      private void handleFileStat(LocatedFileStatus stat) throws IOException {
1753        if (stat.isFile()) { // file
1754          curFile = stat;
1755        } else if (recursive) { // directory
1756          itors.push(curItor);
1757          curItor = listLocatedStatus(stat.getPath());
1758        }
1759      }
1760
1761      @Override
1762      public LocatedFileStatus next() throws IOException {
1763        if (hasNext()) {
1764          LocatedFileStatus result = curFile;
1765          curFile = null;
1766          return result;
1767        } 
1768        throw new java.util.NoSuchElementException("No more entry in " + f);
1769      }
1770    };
1771  }
1772  
1773  /** Return the current user's home directory in this filesystem.
1774   * The default implementation returns "/user/$USER/".
1775   */
1776  public Path getHomeDirectory() {
1777    return this.makeQualified(
1778        new Path("/user/"+System.getProperty("user.name")));
1779  }
1780
1781
1782  /**
1783   * Set the current working directory for the given file system. All relative
1784   * paths will be resolved relative to it.
1785   * 
1786   * @param new_dir
1787   */
1788  public abstract void setWorkingDirectory(Path new_dir);
1789    
1790  /**
1791   * Get the current working directory for the given file system
1792   * @return the directory pathname
1793   */
1794  public abstract Path getWorkingDirectory();
1795  
1796  
1797  /**
1798   * Note: with the new FilesContext class, getWorkingDirectory()
1799   * will be removed. 
1800   * The working directory is implemented in FilesContext.
1801   * 
1802   * Some file systems like LocalFileSystem have an initial workingDir
1803   * that we use as the starting workingDir. For other file systems
1804   * like HDFS there is no built in notion of an initial workingDir.
1805   * 
1806   * @return if there is built in notion of workingDir then it
1807   * is returned; else a null is returned.
1808   */
1809  protected Path getInitialWorkingDirectory() {
1810    return null;
1811  }
1812
1813  /**
1814   * Call {@link #mkdirs(Path, FsPermission)} with default permission.
1815   */
1816  public boolean mkdirs(Path f) throws IOException {
1817    return mkdirs(f, FsPermission.getDirDefault());
1818  }
1819
1820  /**
1821   * Make the given file and all non-existent parents into
1822   * directories. Has the semantics of Unix 'mkdir -p'.
1823   * Existence of the directory hierarchy is not an error.
1824   * @param f path to create
1825   * @param permission to apply to f
1826   */
1827  public abstract boolean mkdirs(Path f, FsPermission permission
1828      ) throws IOException;
1829
1830  /**
1831   * The src file is on the local disk.  Add it to FS at
1832   * the given dst name and the source is kept intact afterwards
1833   * @param src path
1834   * @param dst path
1835   */
1836  public void copyFromLocalFile(Path src, Path dst)
1837    throws IOException {
1838    copyFromLocalFile(false, src, dst);
1839  }
1840
1841  /**
1842   * The src files is on the local disk.  Add it to FS at
1843   * the given dst name, removing the source afterwards.
1844   * @param srcs path
1845   * @param dst path
1846   */
1847  public void moveFromLocalFile(Path[] srcs, Path dst)
1848    throws IOException {
1849    copyFromLocalFile(true, true, srcs, dst);
1850  }
1851
1852  /**
1853   * The src file is on the local disk.  Add it to FS at
1854   * the given dst name, removing the source afterwards.
1855   * @param src path
1856   * @param dst path
1857   */
1858  public void moveFromLocalFile(Path src, Path dst)
1859    throws IOException {
1860    copyFromLocalFile(true, src, dst);
1861  }
1862
1863  /**
1864   * The src file is on the local disk.  Add it to FS at
1865   * the given dst name.
1866   * delSrc indicates if the source should be removed
1867   * @param delSrc whether to delete the src
1868   * @param src path
1869   * @param dst path
1870   */
1871  public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
1872    throws IOException {
1873    copyFromLocalFile(delSrc, true, src, dst);
1874  }
1875  
1876  /**
1877   * The src files are on the local disk.  Add it to FS at
1878   * the given dst name.
1879   * delSrc indicates if the source should be removed
1880   * @param delSrc whether to delete the src
1881   * @param overwrite whether to overwrite an existing file
1882   * @param srcs array of paths which are source
1883   * @param dst path
1884   */
1885  public void copyFromLocalFile(boolean delSrc, boolean overwrite, 
1886                                Path[] srcs, Path dst)
1887    throws IOException {
1888    Configuration conf = getConf();
1889    FileUtil.copy(getLocal(conf), srcs, this, dst, delSrc, overwrite, conf);
1890  }
1891  
1892  /**
1893   * The src file is on the local disk.  Add it to FS at
1894   * the given dst name.
1895   * delSrc indicates if the source should be removed
1896   * @param delSrc whether to delete the src
1897   * @param overwrite whether to overwrite an existing file
1898   * @param src path
1899   * @param dst path
1900   */
1901  public void copyFromLocalFile(boolean delSrc, boolean overwrite, 
1902                                Path src, Path dst)
1903    throws IOException {
1904    Configuration conf = getConf();
1905    FileUtil.copy(getLocal(conf), src, this, dst, delSrc, overwrite, conf);
1906  }
1907    
1908  /**
1909   * The src file is under FS, and the dst is on the local disk.
1910   * Copy it from FS control to the local dst name.
1911   * @param src path
1912   * @param dst path
1913   */
1914  public void copyToLocalFile(Path src, Path dst) throws IOException {
1915    copyToLocalFile(false, src, dst);
1916  }
1917    
1918  /**
1919   * The src file is under FS, and the dst is on the local disk.
1920   * Copy it from FS control to the local dst name.
1921   * Remove the source afterwards
1922   * @param src path
1923   * @param dst path
1924   */
1925  public void moveToLocalFile(Path src, Path dst) throws IOException {
1926    copyToLocalFile(true, src, dst);
1927  }
1928
1929  /**
1930   * The src file is under FS, and the dst is on the local disk.
1931   * Copy it from FS control to the local dst name.
1932   * delSrc indicates if the src will be removed or not.
1933   * @param delSrc whether to delete the src
1934   * @param src path
1935   * @param dst path
1936   */   
1937  public void copyToLocalFile(boolean delSrc, Path src, Path dst)
1938    throws IOException {
1939    copyToLocalFile(delSrc, src, dst, false);
1940  }
1941  
1942    /**
1943   * The src file is under FS, and the dst is on the local disk. Copy it from FS
1944   * control to the local dst name. delSrc indicates if the src will be removed
1945   * or not. useRawLocalFileSystem indicates whether to use RawLocalFileSystem
1946   * as local file system or not. RawLocalFileSystem is non crc file system.So,
1947   * It will not create any crc files at local.
1948   * 
1949   * @param delSrc
1950   *          whether to delete the src
1951   * @param src
1952   *          path
1953   * @param dst
1954   *          path
1955   * @param useRawLocalFileSystem
1956   *          whether to use RawLocalFileSystem as local file system or not.
1957   * 
1958   * @throws IOException
1959   *           - if any IO error
1960   */
1961  public void copyToLocalFile(boolean delSrc, Path src, Path dst,
1962      boolean useRawLocalFileSystem) throws IOException {
1963    Configuration conf = getConf();
1964    FileSystem local = null;
1965    if (useRawLocalFileSystem) {
1966      local = getLocal(conf).getRawFileSystem();
1967    } else {
1968      local = getLocal(conf);
1969    }
1970    FileUtil.copy(this, src, local, dst, delSrc, conf);
1971  }
1972
1973  /**
1974   * Returns a local File that the user can write output to.  The caller
1975   * provides both the eventual FS target name and the local working
1976   * file.  If the FS is local, we write directly into the target.  If
1977   * the FS is remote, we write into the tmp local area.
1978   * @param fsOutputFile path of output file
1979   * @param tmpLocalFile path of local tmp file
1980   */
1981  public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
1982    throws IOException {
1983    return tmpLocalFile;
1984  }
1985
1986  /**
1987   * Called when we're all done writing to the target.  A local FS will
1988   * do nothing, because we've written to exactly the right place.  A remote
1989   * FS will copy the contents of tmpLocalFile to the correct target at
1990   * fsOutputFile.
1991   * @param fsOutputFile path of output file
1992   * @param tmpLocalFile path to local tmp file
1993   */
1994  public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
1995    throws IOException {
1996    moveFromLocalFile(tmpLocalFile, fsOutputFile);
1997  }
1998
1999  /**
2000   * No more filesystem operations are needed.  Will
2001   * release any held locks.
2002   */
2003  @Override
2004  public void close() throws IOException {
2005    // delete all files that were marked as delete-on-exit.
2006    processDeleteOnExit();
2007    CACHE.remove(this.key, this);
2008  }
2009
2010  /** Return the total size of all files in the filesystem.*/
2011  public long getUsed() throws IOException{
2012    long used = 0;
2013    FileStatus[] files = listStatus(new Path("/"));
2014    for(FileStatus file:files){
2015      used += file.getLen();
2016    }
2017    return used;
2018  }
2019  
2020  /**
2021   * Get the block size for a particular file.
2022   * @param f the filename
2023   * @return the number of bytes in a block
2024   */
2025  /** @deprecated Use getFileStatus() instead */
2026  @Deprecated
2027  public long getBlockSize(Path f) throws IOException {
2028    return getFileStatus(f).getBlockSize();
2029  }
2030
2031  /**
2032   * Return the number of bytes that large input files should be optimally
2033   * be split into to minimize i/o time.
2034   * @deprecated use {@link #getDefaultBlockSize(Path)} instead
2035   */
2036  @Deprecated
2037  public long getDefaultBlockSize() {
2038    // default to 32MB: large enough to minimize the impact of seeks
2039    return getConf().getLong("fs.local.block.size", 32 * 1024 * 1024);
2040  }
2041    
2042  /** Return the number of bytes that large input files should be optimally
2043   * be split into to minimize i/o time.  The given path will be used to
2044   * locate the actual filesystem.  The full path does not have to exist.
2045   * @param f path of file
2046   * @return the default block size for the path's filesystem
2047   */
2048  public long getDefaultBlockSize(Path f) {
2049    return getDefaultBlockSize();
2050  }
2051
2052  /**
2053   * Get the default replication.
2054   * @deprecated use {@link #getDefaultReplication(Path)} instead
2055   */
2056  @Deprecated
2057  public short getDefaultReplication() { return 1; }
2058
2059  /**
2060   * Get the default replication for a path.   The given path will be used to
2061   * locate the actual filesystem.  The full path does not have to exist.
2062   * @param path of the file
2063   * @return default replication for the path's filesystem 
2064   */
2065  public short getDefaultReplication(Path path) {
2066    return getDefaultReplication();
2067  }
2068  
2069  /**
2070   * Return a file status object that represents the path.
2071   * @param f The path we want information from
2072   * @return a FileStatus object
2073   * @throws FileNotFoundException when the path does not exist;
2074   *         IOException see specific implementation
2075   */
2076  public abstract FileStatus getFileStatus(Path f) throws IOException;
2077
2078  /**
2079   * Checks if the user can access a path.  The mode specifies which access
2080   * checks to perform.  If the requested permissions are granted, then the
2081   * method returns normally.  If access is denied, then the method throws an
2082   * {@link AccessControlException}.
2083   * <p/>
2084   * The default implementation of this method calls {@link #getFileStatus(Path)}
2085   * and checks the returned permissions against the requested permissions.
2086   * Note that the getFileStatus call will be subject to authorization checks.
2087   * Typically, this requires search (execute) permissions on each directory in
2088   * the path's prefix, but this is implementation-defined.  Any file system
2089   * that provides a richer authorization model (such as ACLs) may override the
2090   * default implementation so that it checks against that model instead.
2091   * <p>
2092   * In general, applications should avoid using this method, due to the risk of
2093   * time-of-check/time-of-use race conditions.  The permissions on a file may
2094   * change immediately after the access call returns.  Most applications should
2095   * prefer running specific file system actions as the desired user represented
2096   * by a {@link UserGroupInformation}.
2097   *
2098   * @param path Path to check
2099   * @param mode type of access to check
2100   * @throws AccessControlException if access is denied
2101   * @throws FileNotFoundException if the path does not exist
2102   * @throws IOException see specific implementation
2103   */
2104  @InterfaceAudience.LimitedPrivate({"HDFS", "Hive"})
2105  public void access(Path path, FsAction mode) throws AccessControlException,
2106      FileNotFoundException, IOException {
2107    checkAccessPermissions(this.getFileStatus(path), mode);
2108  }
2109
2110  /**
2111   * This method provides the default implementation of
2112   * {@link #access(Path, FsAction)}.
2113   *
2114   * @param stat FileStatus to check
2115   * @param mode type of access to check
2116   * @throws IOException for any error
2117   */
2118  @InterfaceAudience.Private
2119  static void checkAccessPermissions(FileStatus stat, FsAction mode)
2120      throws IOException {
2121    FsPermission perm = stat.getPermission();
2122    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
2123    String user = ugi.getShortUserName();
2124    List<String> groups = Arrays.asList(ugi.getGroupNames());
2125    if (user.equals(stat.getOwner())) {
2126      if (perm.getUserAction().implies(mode)) {
2127        return;
2128      }
2129    } else if (groups.contains(stat.getGroup())) {
2130      if (perm.getGroupAction().implies(mode)) {
2131        return;
2132      }
2133    } else {
2134      if (perm.getOtherAction().implies(mode)) {
2135        return;
2136      }
2137    }
2138    throw new AccessControlException(String.format(
2139      "Permission denied: user=%s, path=\"%s\":%s:%s:%s%s", user, stat.getPath(),
2140      stat.getOwner(), stat.getGroup(), stat.isDirectory() ? "d" : "-", perm));
2141  }
2142
2143  /**
2144   * See {@link FileContext#fixRelativePart}
2145   */
2146  protected Path fixRelativePart(Path p) {
2147    if (p.isUriPathAbsolute()) {
2148      return p;
2149    } else {
2150      return new Path(getWorkingDirectory(), p);
2151    }
2152  }
2153
2154  /**
2155   * See {@link FileContext#createSymlink(Path, Path, boolean)}
2156   */
2157  public void createSymlink(final Path target, final Path link,
2158      final boolean createParent) throws AccessControlException,
2159      FileAlreadyExistsException, FileNotFoundException,
2160      ParentNotDirectoryException, UnsupportedFileSystemException, 
2161      IOException {
2162    // Supporting filesystems should override this method
2163    throw new UnsupportedOperationException(
2164        "Filesystem does not support symlinks!");
2165  }
2166
2167  /**
2168   * See {@link FileContext#getFileLinkStatus(Path)}
2169   */
2170  public FileStatus getFileLinkStatus(final Path f)
2171      throws AccessControlException, FileNotFoundException,
2172      UnsupportedFileSystemException, IOException {
2173    // Supporting filesystems should override this method
2174    return getFileStatus(f);
2175  }
2176
2177  /**
2178   * See {@link AbstractFileSystem#supportsSymlinks()}
2179   */
2180  public boolean supportsSymlinks() {
2181    return false;
2182  }
2183
2184  /**
2185   * See {@link FileContext#getLinkTarget(Path)}
2186   */
2187  public Path getLinkTarget(Path f) throws IOException {
2188    // Supporting filesystems should override this method
2189    throw new UnsupportedOperationException(
2190        "Filesystem does not support symlinks!");
2191  }
2192
2193  /**
2194   * See {@link AbstractFileSystem#getLinkTarget(Path)}
2195   */
2196  protected Path resolveLink(Path f) throws IOException {
2197    // Supporting filesystems should override this method
2198    throw new UnsupportedOperationException(
2199        "Filesystem does not support symlinks!");
2200  }
2201
2202  /**
2203   * Get the checksum of a file.
2204   *
2205   * @param f The file path
2206   * @return The file checksum.  The default return value is null,
2207   *  which indicates that no checksum algorithm is implemented
2208   *  in the corresponding FileSystem.
2209   */
2210  public FileChecksum getFileChecksum(Path f) throws IOException {
2211    return getFileChecksum(f, Long.MAX_VALUE);
2212  }
2213
2214  /**
2215   * Get the checksum of a file, from the beginning of the file till the
2216   * specific length.
2217   * @param f The file path
2218   * @param length The length of the file range for checksum calculation
2219   * @return The file checksum.
2220   */
2221  public FileChecksum getFileChecksum(Path f, final long length)
2222      throws IOException {
2223    return null;
2224  }
2225
2226  /**
2227   * Set the verify checksum flag. This is only applicable if the 
2228   * corresponding FileSystem supports checksum. By default doesn't do anything.
2229   * @param verifyChecksum
2230   */
2231  public void setVerifyChecksum(boolean verifyChecksum) {
2232    //doesn't do anything
2233  }
2234
2235  /**
2236   * Set the write checksum flag. This is only applicable if the 
2237   * corresponding FileSystem supports checksum. By default doesn't do anything.
2238   * @param writeChecksum
2239   */
2240  public void setWriteChecksum(boolean writeChecksum) {
2241    //doesn't do anything
2242  }
2243
2244  /**
2245   * Returns a status object describing the use and capacity of the
2246   * file system. If the file system has multiple partitions, the
2247   * use and capacity of the root partition is reflected.
2248   * 
2249   * @return a FsStatus object
2250   * @throws IOException
2251   *           see specific implementation
2252   */
2253  public FsStatus getStatus() throws IOException {
2254    return getStatus(null);
2255  }
2256
2257  /**
2258   * Returns a status object describing the use and capacity of the
2259   * file system. If the file system has multiple partitions, the
2260   * use and capacity of the partition pointed to by the specified
2261   * path is reflected.
2262   * @param p Path for which status should be obtained. null means
2263   * the default partition. 
2264   * @return a FsStatus object
2265   * @throws IOException
2266   *           see specific implementation
2267   */
2268  public FsStatus getStatus(Path p) throws IOException {
2269    return new FsStatus(Long.MAX_VALUE, 0, Long.MAX_VALUE);
2270  }
2271
2272  /**
2273   * Set permission of a path.
2274   * @param p
2275   * @param permission
2276   */
2277  public void setPermission(Path p, FsPermission permission
2278      ) throws IOException {
2279  }
2280
2281  /**
2282   * Set owner of a path (i.e. a file or a directory).
2283   * The parameters username and groupname cannot both be null.
2284   * @param p The path
2285   * @param username If it is null, the original username remains unchanged.
2286   * @param groupname If it is null, the original groupname remains unchanged.
2287   */
2288  public void setOwner(Path p, String username, String groupname
2289      ) throws IOException {
2290  }
2291
2292  /**
2293   * Set access time of a file
2294   * @param p The path
2295   * @param mtime Set the modification time of this file.
2296   *              The number of milliseconds since Jan 1, 1970. 
2297   *              A value of -1 means that this call should not set modification time.
2298   * @param atime Set the access time of this file.
2299   *              The number of milliseconds since Jan 1, 1970. 
2300   *              A value of -1 means that this call should not set access time.
2301   */
2302  public void setTimes(Path p, long mtime, long atime
2303      ) throws IOException {
2304  }
2305
2306  /**
2307   * Create a snapshot with a default name.
2308   * @param path The directory where snapshots will be taken.
2309   * @return the snapshot path.
2310   */
2311  public final Path createSnapshot(Path path) throws IOException {
2312    return createSnapshot(path, null);
2313  }
2314
2315  /**
2316   * Create a snapshot
2317   * @param path The directory where snapshots will be taken.
2318   * @param snapshotName The name of the snapshot
2319   * @return the snapshot path.
2320   */
2321  public Path createSnapshot(Path path, String snapshotName)
2322      throws IOException {
2323    throw new UnsupportedOperationException(getClass().getSimpleName()
2324        + " doesn't support createSnapshot");
2325  }
2326  
2327  /**
2328   * Rename a snapshot
2329   * @param path The directory path where the snapshot was taken
2330   * @param snapshotOldName Old name of the snapshot
2331   * @param snapshotNewName New name of the snapshot
2332   * @throws IOException
2333   */
2334  public void renameSnapshot(Path path, String snapshotOldName,
2335      String snapshotNewName) throws IOException {
2336    throw new UnsupportedOperationException(getClass().getSimpleName()
2337        + " doesn't support renameSnapshot");
2338  }
2339  
2340  /**
2341   * Delete a snapshot of a directory
2342   * @param path  The directory that the to-be-deleted snapshot belongs to
2343   * @param snapshotName The name of the snapshot
2344   */
2345  public void deleteSnapshot(Path path, String snapshotName)
2346      throws IOException {
2347    throw new UnsupportedOperationException(getClass().getSimpleName()
2348        + " doesn't support deleteSnapshot");
2349  }
2350  
2351  /**
2352   * Modifies ACL entries of files and directories.  This method can add new ACL
2353   * entries or modify the permissions on existing ACL entries.  All existing
2354   * ACL entries that are not specified in this call are retained without
2355   * changes.  (Modifications are merged into the current ACL.)
2356   *
2357   * @param path Path to modify
2358   * @param aclSpec List<AclEntry> describing modifications
2359   * @throws IOException if an ACL could not be modified
2360   */
2361  public void modifyAclEntries(Path path, List<AclEntry> aclSpec)
2362      throws IOException {
2363    throw new UnsupportedOperationException(getClass().getSimpleName()
2364        + " doesn't support modifyAclEntries");
2365  }
2366
2367  /**
2368   * Removes ACL entries from files and directories.  Other ACL entries are
2369   * retained.
2370   *
2371   * @param path Path to modify
2372   * @param aclSpec List<AclEntry> describing entries to remove
2373   * @throws IOException if an ACL could not be modified
2374   */
2375  public void removeAclEntries(Path path, List<AclEntry> aclSpec)
2376      throws IOException {
2377    throw new UnsupportedOperationException(getClass().getSimpleName()
2378        + " doesn't support removeAclEntries");
2379  }
2380
2381  /**
2382   * Removes all default ACL entries from files and directories.
2383   *
2384   * @param path Path to modify
2385   * @throws IOException if an ACL could not be modified
2386   */
2387  public void removeDefaultAcl(Path path)
2388      throws IOException {
2389    throw new UnsupportedOperationException(getClass().getSimpleName()
2390        + " doesn't support removeDefaultAcl");
2391  }
2392
2393  /**
2394   * Removes all but the base ACL entries of files and directories.  The entries
2395   * for user, group, and others are retained for compatibility with permission
2396   * bits.
2397   *
2398   * @param path Path to modify
2399   * @throws IOException if an ACL could not be removed
2400   */
2401  public void removeAcl(Path path)
2402      throws IOException {
2403    throw new UnsupportedOperationException(getClass().getSimpleName()
2404        + " doesn't support removeAcl");
2405  }
2406
2407  /**
2408   * Fully replaces ACL of files and directories, discarding all existing
2409   * entries.
2410   *
2411   * @param path Path to modify
2412   * @param aclSpec List<AclEntry> describing modifications, must include entries
2413   *   for user, group, and others for compatibility with permission bits.
2414   * @throws IOException if an ACL could not be modified
2415   */
2416  public void setAcl(Path path, List<AclEntry> aclSpec) throws IOException {
2417    throw new UnsupportedOperationException(getClass().getSimpleName()
2418        + " doesn't support setAcl");
2419  }
2420
2421  /**
2422   * Gets the ACL of a file or directory.
2423   *
2424   * @param path Path to get
2425   * @return AclStatus describing the ACL of the file or directory
2426   * @throws IOException if an ACL could not be read
2427   */
2428  public AclStatus getAclStatus(Path path) throws IOException {
2429    throw new UnsupportedOperationException(getClass().getSimpleName()
2430        + " doesn't support getAclStatus");
2431  }
2432
2433  /**
2434   * Set an xattr of a file or directory.
2435   * The name must be prefixed with the namespace followed by ".". For example,
2436   * "user.attr".
2437   * <p/>
2438   * Refer to the HDFS extended attributes user documentation for details.
2439   *
2440   * @param path Path to modify
2441   * @param name xattr name.
2442   * @param value xattr value.
2443   * @throws IOException
2444   */
2445  public void setXAttr(Path path, String name, byte[] value)
2446      throws IOException {
2447    setXAttr(path, name, value, EnumSet.of(XAttrSetFlag.CREATE,
2448        XAttrSetFlag.REPLACE));
2449  }
2450
2451  /**
2452   * Set an xattr of a file or directory.
2453   * The name must be prefixed with the namespace followed by ".". For example,
2454   * "user.attr".
2455   * <p/>
2456   * Refer to the HDFS extended attributes user documentation for details.
2457   *
2458   * @param path Path to modify
2459   * @param name xattr name.
2460   * @param value xattr value.
2461   * @param flag xattr set flag
2462   * @throws IOException
2463   */
2464  public void setXAttr(Path path, String name, byte[] value,
2465      EnumSet<XAttrSetFlag> flag) throws IOException {
2466    throw new UnsupportedOperationException(getClass().getSimpleName()
2467        + " doesn't support setXAttr");
2468  }
2469
2470  /**
2471   * Get an xattr name and value for a file or directory.
2472   * The name must be prefixed with the namespace followed by ".". For example,
2473   * "user.attr".
2474   * <p/>
2475   * Refer to the HDFS extended attributes user documentation for details.
2476   *
2477   * @param path Path to get extended attribute
2478   * @param name xattr name.
2479   * @return byte[] xattr value.
2480   * @throws IOException
2481   */
2482  public byte[] getXAttr(Path path, String name) throws IOException {
2483    throw new UnsupportedOperationException(getClass().getSimpleName()
2484        + " doesn't support getXAttr");
2485  }
2486
2487  /**
2488   * Get all of the xattr name/value pairs for a file or directory.
2489   * Only those xattrs which the logged-in user has permissions to view
2490   * are returned.
2491   * <p/>
2492   * Refer to the HDFS extended attributes user documentation for details.
2493   *
2494   * @param path Path to get extended attributes
2495   * @return Map<String, byte[]> describing the XAttrs of the file or directory
2496   * @throws IOException
2497   */
2498  public Map<String, byte[]> getXAttrs(Path path) throws IOException {
2499    throw new UnsupportedOperationException(getClass().getSimpleName()
2500        + " doesn't support getXAttrs");
2501  }
2502
2503  /**
2504   * Get all of the xattrs name/value pairs for a file or directory.
2505   * Only those xattrs which the logged-in user has permissions to view
2506   * are returned.
2507   * <p/>
2508   * Refer to the HDFS extended attributes user documentation for details.
2509   *
2510   * @param path Path to get extended attributes
2511   * @param names XAttr names.
2512   * @return Map<String, byte[]> describing the XAttrs of the file or directory
2513   * @throws IOException
2514   */
2515  public Map<String, byte[]> getXAttrs(Path path, List<String> names)
2516      throws IOException {
2517    throw new UnsupportedOperationException(getClass().getSimpleName()
2518        + " doesn't support getXAttrs");
2519  }
2520
2521  /**
2522   * Get all of the xattr names for a file or directory.
2523   * Only those xattr names which the logged-in user has permissions to view
2524   * are returned.
2525   * <p/>
2526   * Refer to the HDFS extended attributes user documentation for details.
2527   *
2528   * @param path Path to get extended attributes
2529   * @return List<String> of the XAttr names of the file or directory
2530   * @throws IOException
2531   */
2532  public List<String> listXAttrs(Path path) throws IOException {
2533    throw new UnsupportedOperationException(getClass().getSimpleName()
2534            + " doesn't support listXAttrs");
2535  }
2536
2537  /**
2538   * Remove an xattr of a file or directory.
2539   * The name must be prefixed with the namespace followed by ".". For example,
2540   * "user.attr".
2541   * <p/>
2542   * Refer to the HDFS extended attributes user documentation for details.
2543   *
2544   * @param path Path to remove extended attribute
2545   * @param name xattr name
2546   * @throws IOException
2547   */
2548  public void removeXAttr(Path path, String name) throws IOException {
2549    throw new UnsupportedOperationException(getClass().getSimpleName()
2550        + " doesn't support removeXAttr");
2551  }
2552
2553  // making it volatile to be able to do a double checked locking
2554  private volatile static boolean FILE_SYSTEMS_LOADED = false;
2555
2556  private static final Map<String, Class<? extends FileSystem>>
2557    SERVICE_FILE_SYSTEMS = new HashMap<String, Class<? extends FileSystem>>();
2558
2559  private static void loadFileSystems() {
2560    synchronized (FileSystem.class) {
2561      if (!FILE_SYSTEMS_LOADED) {
2562        ServiceLoader<FileSystem> serviceLoader = ServiceLoader.load(FileSystem.class);
2563        for (FileSystem fs : serviceLoader) {
2564          SERVICE_FILE_SYSTEMS.put(fs.getScheme(), fs.getClass());
2565        }
2566        FILE_SYSTEMS_LOADED = true;
2567      }
2568    }
2569  }
2570
2571  public static Class<? extends FileSystem> getFileSystemClass(String scheme,
2572      Configuration conf) throws IOException {
2573    if (!FILE_SYSTEMS_LOADED) {
2574      loadFileSystems();
2575    }
2576    Class<? extends FileSystem> clazz = null;
2577    if (conf != null) {
2578      clazz = (Class<? extends FileSystem>) conf.getClass("fs." + scheme + ".impl", null);
2579    }
2580    if (clazz == null) {
2581      clazz = SERVICE_FILE_SYSTEMS.get(scheme);
2582    }
2583    if (clazz == null) {
2584      throw new IOException("No FileSystem for scheme: " + scheme);
2585    }
2586    return clazz;
2587  }
2588
2589  private static FileSystem createFileSystem(URI uri, Configuration conf
2590      ) throws IOException {
2591    Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);
2592    if (clazz == null) {
2593      throw new IOException("No FileSystem for scheme: " + uri.getScheme());
2594    }
2595    FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
2596    fs.initialize(uri, conf);
2597    return fs;
2598  }
2599
2600  /** Caching FileSystem objects */
2601  static class Cache {
2602    private final ClientFinalizer clientFinalizer = new ClientFinalizer();
2603
2604    private final Map<Key, FileSystem> map = new HashMap<Key, FileSystem>();
2605    private final Set<Key> toAutoClose = new HashSet<Key>();
2606
2607    /** A variable that makes all objects in the cache unique */
2608    private static AtomicLong unique = new AtomicLong(1);
2609
2610    FileSystem get(URI uri, Configuration conf) throws IOException{
2611      Key key = new Key(uri, conf);
2612      return getInternal(uri, conf, key);
2613    }
2614
2615    /** The objects inserted into the cache using this method are all unique */
2616    FileSystem getUnique(URI uri, Configuration conf) throws IOException{
2617      Key key = new Key(uri, conf, unique.getAndIncrement());
2618      return getInternal(uri, conf, key);
2619    }
2620
2621    private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{
2622      FileSystem fs;
2623      synchronized (this) {
2624        fs = map.get(key);
2625      }
2626      if (fs != null) {
2627        return fs;
2628      }
2629
2630      fs = createFileSystem(uri, conf);
2631      synchronized (this) { // refetch the lock again
2632        FileSystem oldfs = map.get(key);
2633        if (oldfs != null) { // a file system is created while lock is releasing
2634          fs.close(); // close the new file system
2635          return oldfs;  // return the old file system
2636        }
2637        
2638        // now insert the new file system into the map
2639        if (map.isEmpty()
2640                && !ShutdownHookManager.get().isShutdownInProgress()) {
2641          ShutdownHookManager.get().addShutdownHook(clientFinalizer, SHUTDOWN_HOOK_PRIORITY);
2642        }
2643        fs.key = key;
2644        map.put(key, fs);
2645        if (conf.getBoolean("fs.automatic.close", true)) {
2646          toAutoClose.add(key);
2647        }
2648        return fs;
2649      }
2650    }
2651
2652    synchronized void remove(Key key, FileSystem fs) {
2653      if (map.containsKey(key) && fs == map.get(key)) {
2654        map.remove(key);
2655        toAutoClose.remove(key);
2656        }
2657    }
2658
2659    synchronized void closeAll() throws IOException {
2660      closeAll(false);
2661    }
2662
2663    /**
2664     * Close all FileSystem instances in the Cache.
2665     * @param onlyAutomatic only close those that are marked for automatic closing
2666     */
2667    synchronized void closeAll(boolean onlyAutomatic) throws IOException {
2668      List<IOException> exceptions = new ArrayList<IOException>();
2669
2670      // Make a copy of the keys in the map since we'll be modifying
2671      // the map while iterating over it, which isn't safe.
2672      List<Key> keys = new ArrayList<Key>();
2673      keys.addAll(map.keySet());
2674
2675      for (Key key : keys) {
2676        final FileSystem fs = map.get(key);
2677
2678        if (onlyAutomatic && !toAutoClose.contains(key)) {
2679          continue;
2680        }
2681
2682        //remove from cache
2683        remove(key, fs);
2684
2685        if (fs != null) {
2686          try {
2687            fs.close();
2688          }
2689          catch(IOException ioe) {
2690            exceptions.add(ioe);
2691          }
2692        }
2693      }
2694
2695      if (!exceptions.isEmpty()) {
2696        throw MultipleIOException.createIOException(exceptions);
2697      }
2698    }
2699
2700    private class ClientFinalizer implements Runnable {
2701      @Override
2702      public synchronized void run() {
2703        try {
2704          closeAll(true);
2705        } catch (IOException e) {
2706          LOG.info("FileSystem.Cache.closeAll() threw an exception:\n" + e);
2707        }
2708      }
2709    }
2710
2711    synchronized void closeAll(UserGroupInformation ugi) throws IOException {
2712      List<FileSystem> targetFSList = new ArrayList<FileSystem>();
2713      //Make a pass over the list and collect the filesystems to close
2714      //we cannot close inline since close() removes the entry from the Map
2715      for (Map.Entry<Key, FileSystem> entry : map.entrySet()) {
2716        final Key key = entry.getKey();
2717        final FileSystem fs = entry.getValue();
2718        if (ugi.equals(key.ugi) && fs != null) {
2719          targetFSList.add(fs);   
2720        }
2721      }
2722      List<IOException> exceptions = new ArrayList<IOException>();
2723      //now make a pass over the target list and close each
2724      for (FileSystem fs : targetFSList) {
2725        try {
2726          fs.close();
2727        }
2728        catch(IOException ioe) {
2729          exceptions.add(ioe);
2730        }
2731      }
2732      if (!exceptions.isEmpty()) {
2733        throw MultipleIOException.createIOException(exceptions);
2734      }
2735    }
2736
2737    /** FileSystem.Cache.Key */
2738    static class Key {
2739      final String scheme;
2740      final String authority;
2741      final UserGroupInformation ugi;
2742      final long unique;   // an artificial way to make a key unique
2743
2744      Key(URI uri, Configuration conf) throws IOException {
2745        this(uri, conf, 0);
2746      }
2747
2748      Key(URI uri, Configuration conf, long unique) throws IOException {
2749        scheme = uri.getScheme()==null?"":uri.getScheme().toLowerCase();
2750        authority = uri.getAuthority()==null?"":uri.getAuthority().toLowerCase();
2751        this.unique = unique;
2752        
2753        this.ugi = UserGroupInformation.getCurrentUser();
2754      }
2755
2756      @Override
2757      public int hashCode() {
2758        return (scheme + authority).hashCode() + ugi.hashCode() + (int)unique;
2759      }
2760
2761      static boolean isEqual(Object a, Object b) {
2762        return a == b || (a != null && a.equals(b));        
2763      }
2764
2765      @Override
2766      public boolean equals(Object obj) {
2767        if (obj == this) {
2768          return true;
2769        }
2770        if (obj != null && obj instanceof Key) {
2771          Key that = (Key)obj;
2772          return isEqual(this.scheme, that.scheme)
2773                 && isEqual(this.authority, that.authority)
2774                 && isEqual(this.ugi, that.ugi)
2775                 && (this.unique == that.unique);
2776        }
2777        return false;        
2778      }
2779
2780      @Override
2781      public String toString() {
2782        return "("+ugi.toString() + ")@" + scheme + "://" + authority;        
2783      }
2784    }
2785  }
2786  
2787  /**
2788   * Tracks statistics about how many reads, writes, and so forth have been
2789   * done in a FileSystem.
2790   * 
2791   * Since there is only one of these objects per FileSystem, there will 
2792   * typically be many threads writing to this object.  Almost every operation
2793   * on an open file will involve a write to this object.  In contrast, reading
2794   * statistics is done infrequently by most programs, and not at all by others.
2795   * Hence, this is optimized for writes.
2796   * 
2797   * Each thread writes to its own thread-local area of memory.  This removes 
2798   * contention and allows us to scale up to many, many threads.  To read
2799   * statistics, the reader thread totals up the contents of all of the 
2800   * thread-local data areas.
2801   */
2802  public static final class Statistics {
2803    /**
2804     * Statistics data.
2805     * 
2806     * There is only a single writer to thread-local StatisticsData objects.
2807     * Hence, volatile is adequate here-- we do not need AtomicLong or similar
2808     * to prevent lost updates.
2809     * The Java specification guarantees that updates to volatile longs will
2810     * be perceived as atomic with respect to other threads, which is all we
2811     * need.
2812     */
2813    public static class StatisticsData {
2814      volatile long bytesRead;
2815      volatile long bytesWritten;
2816      volatile int readOps;
2817      volatile int largeReadOps;
2818      volatile int writeOps;
2819      /**
2820       * Stores a weak reference to the thread owning this StatisticsData.
2821       * This allows us to remove StatisticsData objects that pertain to
2822       * threads that no longer exist.
2823       */
2824      final WeakReference<Thread> owner;
2825
2826      StatisticsData(WeakReference<Thread> owner) {
2827        this.owner = owner;
2828      }
2829
2830      /**
2831       * Add another StatisticsData object to this one.
2832       */
2833      void add(StatisticsData other) {
2834        this.bytesRead += other.bytesRead;
2835        this.bytesWritten += other.bytesWritten;
2836        this.readOps += other.readOps;
2837        this.largeReadOps += other.largeReadOps;
2838        this.writeOps += other.writeOps;
2839      }
2840
2841      /**
2842       * Negate the values of all statistics.
2843       */
2844      void negate() {
2845        this.bytesRead = -this.bytesRead;
2846        this.bytesWritten = -this.bytesWritten;
2847        this.readOps = -this.readOps;
2848        this.largeReadOps = -this.largeReadOps;
2849        this.writeOps = -this.writeOps;
2850      }
2851
2852      @Override
2853      public String toString() {
2854        return bytesRead + " bytes read, " + bytesWritten + " bytes written, "
2855            + readOps + " read ops, " + largeReadOps + " large read ops, "
2856            + writeOps + " write ops";
2857      }
2858      
2859      public long getBytesRead() {
2860        return bytesRead;
2861      }
2862      
2863      public long getBytesWritten() {
2864        return bytesWritten;
2865      }
2866      
2867      public int getReadOps() {
2868        return readOps;
2869      }
2870      
2871      public int getLargeReadOps() {
2872        return largeReadOps;
2873      }
2874      
2875      public int getWriteOps() {
2876        return writeOps;
2877      }
2878    }
2879
2880    private interface StatisticsAggregator<T> {
2881      void accept(StatisticsData data);
2882      T aggregate();
2883    }
2884
2885    private final String scheme;
2886
2887    /**
2888     * rootData is data that doesn't belong to any thread, but will be added
2889     * to the totals.  This is useful for making copies of Statistics objects,
2890     * and for storing data that pertains to threads that have been garbage
2891     * collected.  Protected by the Statistics lock.
2892     */
2893    private final StatisticsData rootData;
2894
2895    /**
2896     * Thread-local data.
2897     */
2898    private final ThreadLocal<StatisticsData> threadData;
2899    
2900    /**
2901     * List of all thread-local data areas.  Protected by the Statistics lock.
2902     */
2903    private LinkedList<StatisticsData> allData;
2904
2905    public Statistics(String scheme) {
2906      this.scheme = scheme;
2907      this.rootData = new StatisticsData(null);
2908      this.threadData = new ThreadLocal<StatisticsData>();
2909      this.allData = null;
2910    }
2911
2912    /**
2913     * Copy constructor.
2914     * 
2915     * @param other    The input Statistics object which is cloned.
2916     */
2917    public Statistics(Statistics other) {
2918      this.scheme = other.scheme;
2919      this.rootData = new StatisticsData(null);
2920      other.visitAll(new StatisticsAggregator<Void>() {
2921        @Override
2922        public void accept(StatisticsData data) {
2923          rootData.add(data);
2924        }
2925
2926        public Void aggregate() {
2927          return null;
2928        }
2929      });
2930      this.threadData = new ThreadLocal<StatisticsData>();
2931    }
2932
2933    /**
2934     * Get or create the thread-local data associated with the current thread.
2935     */
2936    public StatisticsData getThreadStatistics() {
2937      StatisticsData data = threadData.get();
2938      if (data == null) {
2939        data = new StatisticsData(
2940            new WeakReference<Thread>(Thread.currentThread()));
2941        threadData.set(data);
2942        synchronized(this) {
2943          if (allData == null) {
2944            allData = new LinkedList<StatisticsData>();
2945          }
2946          allData.add(data);
2947        }
2948      }
2949      return data;
2950    }
2951
2952    /**
2953     * Increment the bytes read in the statistics
2954     * @param newBytes the additional bytes read
2955     */
2956    public void incrementBytesRead(long newBytes) {
2957      getThreadStatistics().bytesRead += newBytes;
2958    }
2959    
2960    /**
2961     * Increment the bytes written in the statistics
2962     * @param newBytes the additional bytes written
2963     */
2964    public void incrementBytesWritten(long newBytes) {
2965      getThreadStatistics().bytesWritten += newBytes;
2966    }
2967    
2968    /**
2969     * Increment the number of read operations
2970     * @param count number of read operations
2971     */
2972    public void incrementReadOps(int count) {
2973      getThreadStatistics().readOps += count;
2974    }
2975
2976    /**
2977     * Increment the number of large read operations
2978     * @param count number of large read operations
2979     */
2980    public void incrementLargeReadOps(int count) {
2981      getThreadStatistics().largeReadOps += count;
2982    }
2983
2984    /**
2985     * Increment the number of write operations
2986     * @param count number of write operations
2987     */
2988    public void incrementWriteOps(int count) {
2989      getThreadStatistics().writeOps += count;
2990    }
2991
2992    /**
2993     * Apply the given aggregator to all StatisticsData objects associated with
2994     * this Statistics object.
2995     *
2996     * For each StatisticsData object, we will call accept on the visitor.
2997     * Finally, at the end, we will call aggregate to get the final total. 
2998     *
2999     * @param         The visitor to use.
3000     * @return        The total.
3001     */
3002    private synchronized <T> T visitAll(StatisticsAggregator<T> visitor) {
3003      visitor.accept(rootData);
3004      if (allData != null) {
3005        for (Iterator<StatisticsData> iter = allData.iterator();
3006            iter.hasNext(); ) {
3007          StatisticsData data = iter.next();
3008          visitor.accept(data);
3009          if (data.owner.get() == null) {
3010            /*
3011             * If the thread that created this thread-local data no
3012             * longer exists, remove the StatisticsData from our list
3013             * and fold the values into rootData.
3014             */
3015            rootData.add(data);
3016            iter.remove();
3017          }
3018        }
3019      }
3020      return visitor.aggregate();
3021    }
3022
3023    /**
3024     * Get the total number of bytes read
3025     * @return the number of bytes
3026     */
3027    public long getBytesRead() {
3028      return visitAll(new StatisticsAggregator<Long>() {
3029        private long bytesRead = 0;
3030
3031        @Override
3032        public void accept(StatisticsData data) {
3033          bytesRead += data.bytesRead;
3034        }
3035
3036        public Long aggregate() {
3037          return bytesRead;
3038        }
3039      });
3040    }
3041    
3042    /**
3043     * Get the total number of bytes written
3044     * @return the number of bytes
3045     */
3046    public long getBytesWritten() {
3047      return visitAll(new StatisticsAggregator<Long>() {
3048        private long bytesWritten = 0;
3049
3050        @Override
3051        public void accept(StatisticsData data) {
3052          bytesWritten += data.bytesWritten;
3053        }
3054
3055        public Long aggregate() {
3056          return bytesWritten;
3057        }
3058      });
3059    }
3060    
3061    /**
3062     * Get the number of file system read operations such as list files
3063     * @return number of read operations
3064     */
3065    public int getReadOps() {
3066      return visitAll(new StatisticsAggregator<Integer>() {
3067        private int readOps = 0;
3068
3069        @Override
3070        public void accept(StatisticsData data) {
3071          readOps += data.readOps;
3072          readOps += data.largeReadOps;
3073        }
3074
3075        public Integer aggregate() {
3076          return readOps;
3077        }
3078      });
3079    }
3080
3081    /**
3082     * Get the number of large file system read operations such as list files
3083     * under a large directory
3084     * @return number of large read operations
3085     */
3086    public int getLargeReadOps() {
3087      return visitAll(new StatisticsAggregator<Integer>() {
3088        private int largeReadOps = 0;
3089
3090        @Override
3091        public void accept(StatisticsData data) {
3092          largeReadOps += data.largeReadOps;
3093        }
3094
3095        public Integer aggregate() {
3096          return largeReadOps;
3097        }
3098      });
3099    }
3100
3101    /**
3102     * Get the number of file system write operations such as create, append 
3103     * rename etc.
3104     * @return number of write operations
3105     */
3106    public int getWriteOps() {
3107      return visitAll(new StatisticsAggregator<Integer>() {
3108        private int writeOps = 0;
3109
3110        @Override
3111        public void accept(StatisticsData data) {
3112          writeOps += data.writeOps;
3113        }
3114
3115        public Integer aggregate() {
3116          return writeOps;
3117        }
3118      });
3119    }
3120
3121
3122    @Override
3123    public String toString() {
3124      return visitAll(new StatisticsAggregator<String>() {
3125        private StatisticsData total = new StatisticsData(null);
3126
3127        @Override
3128        public void accept(StatisticsData data) {
3129          total.add(data);
3130        }
3131
3132        public String aggregate() {
3133          return total.toString();
3134        }
3135      });
3136    }
3137
3138    /**
3139     * Resets all statistics to 0.
3140     *
3141     * In order to reset, we add up all the thread-local statistics data, and
3142     * set rootData to the negative of that.
3143     *
3144     * This may seem like a counterintuitive way to reset the statsitics.  Why
3145     * can't we just zero out all the thread-local data?  Well, thread-local
3146     * data can only be modified by the thread that owns it.  If we tried to
3147     * modify the thread-local data from this thread, our modification might get
3148     * interleaved with a read-modify-write operation done by the thread that
3149     * owns the data.  That would result in our update getting lost.
3150     *
3151     * The approach used here avoids this problem because it only ever reads
3152     * (not writes) the thread-local data.  Both reads and writes to rootData
3153     * are done under the lock, so we're free to modify rootData from any thread
3154     * that holds the lock.
3155     */
3156    public void reset() {
3157      visitAll(new StatisticsAggregator<Void>() {
3158        private StatisticsData total = new StatisticsData(null);
3159
3160        @Override
3161        public void accept(StatisticsData data) {
3162          total.add(data);
3163        }
3164
3165        public Void aggregate() {
3166          total.negate();
3167          rootData.add(total);
3168          return null;
3169        }
3170      });
3171    }
3172    
3173    /**
3174     * Get the uri scheme associated with this statistics object.
3175     * @return the schema associated with this set of statistics
3176     */
3177    public String getScheme() {
3178      return scheme;
3179    }
3180  }
3181  
3182  /**
3183   * Get the Map of Statistics object indexed by URI Scheme.
3184   * @return a Map having a key as URI scheme and value as Statistics object
3185   * @deprecated use {@link #getAllStatistics} instead
3186   */
3187  @Deprecated
3188  public static synchronized Map<String, Statistics> getStatistics() {
3189    Map<String, Statistics> result = new HashMap<String, Statistics>();
3190    for(Statistics stat: statisticsTable.values()) {
3191      result.put(stat.getScheme(), stat);
3192    }
3193    return result;
3194  }
3195
3196  /**
3197   * Return the FileSystem classes that have Statistics
3198   */
3199  public static synchronized List<Statistics> getAllStatistics() {
3200    return new ArrayList<Statistics>(statisticsTable.values());
3201  }
3202  
3203  /**
3204   * Get the statistics for a particular file system
3205   * @param cls the class to lookup
3206   * @return a statistics object
3207   */
3208  public static synchronized 
3209  Statistics getStatistics(String scheme, Class<? extends FileSystem> cls) {
3210    Statistics result = statisticsTable.get(cls);
3211    if (result == null) {
3212      result = new Statistics(scheme);
3213      statisticsTable.put(cls, result);
3214    }
3215    return result;
3216  }
3217  
3218  /**
3219   * Reset all statistics for all file systems
3220   */
3221  public static synchronized void clearStatistics() {
3222    for(Statistics stat: statisticsTable.values()) {
3223      stat.reset();
3224    }
3225  }
3226
3227  /**
3228   * Print all statistics for all file systems
3229   */
3230  public static synchronized
3231  void printStatistics() throws IOException {
3232    for (Map.Entry<Class<? extends FileSystem>, Statistics> pair: 
3233            statisticsTable.entrySet()) {
3234      System.out.println("  FileSystem " + pair.getKey().getName() + 
3235                         ": " + pair.getValue());
3236    }
3237  }
3238
3239  // Symlinks are temporarily disabled - see HADOOP-10020 and HADOOP-10052
3240  private static boolean symlinksEnabled = false;
3241
3242  private static Configuration conf = null;
3243
3244  @VisibleForTesting
3245  public static boolean areSymlinksEnabled() {
3246    return symlinksEnabled;
3247  }
3248
3249  @VisibleForTesting
3250  public static void enableSymlinks() {
3251    symlinksEnabled = true;
3252  }
3253}