001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.fs;
019
020import java.io.Closeable;
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.lang.ref.WeakReference;
024import java.lang.ref.ReferenceQueue;
025import java.net.URI;
026import java.net.URISyntaxException;
027import java.security.PrivilegedExceptionAction;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.EnumSet;
031import java.util.HashMap;
032import java.util.HashSet;
033import java.util.IdentityHashMap;
034import java.util.Iterator;
035import java.util.List;
036import java.util.Map;
037import java.util.NoSuchElementException;
038import java.util.ServiceLoader;
039import java.util.Set;
040import java.util.Stack;
041import java.util.TreeSet;
042import java.util.concurrent.atomic.AtomicLong;
043
044import org.apache.commons.logging.Log;
045import org.apache.commons.logging.LogFactory;
046import org.apache.hadoop.classification.InterfaceAudience;
047import org.apache.hadoop.classification.InterfaceStability;
048import org.apache.hadoop.conf.Configuration;
049import org.apache.hadoop.conf.Configured;
050import org.apache.hadoop.fs.Options.ChecksumOpt;
051import org.apache.hadoop.fs.Options.Rename;
052import org.apache.hadoop.fs.permission.AclEntry;
053import org.apache.hadoop.fs.permission.AclStatus;
054import org.apache.hadoop.fs.permission.FsAction;
055import org.apache.hadoop.fs.permission.FsPermission;
056import org.apache.hadoop.io.MultipleIOException;
057import org.apache.hadoop.io.Text;
058import org.apache.hadoop.net.NetUtils;
059import org.apache.hadoop.security.AccessControlException;
060import org.apache.hadoop.security.Credentials;
061import org.apache.hadoop.security.SecurityUtil;
062import org.apache.hadoop.security.UserGroupInformation;
063import org.apache.hadoop.security.token.Token;
064import org.apache.hadoop.util.DataChecksum;
065import org.apache.hadoop.util.Progressable;
066import org.apache.hadoop.util.ReflectionUtils;
067import org.apache.hadoop.util.ShutdownHookManager;
068
069import com.google.common.annotations.VisibleForTesting;
070
071/****************************************************************
072 * An abstract base class for a fairly generic filesystem.  It
073 * may be implemented as a distributed filesystem, or as a "local"
074 * one that reflects the locally-connected disk.  The local version
075 * exists for small Hadoop instances and for testing.
076 *
077 * <p>
078 *
079 * All user code that may potentially use the Hadoop Distributed
080 * File System should be written to use a FileSystem object.  The
081 * Hadoop DFS is a multi-machine system that appears as a single
082 * disk.  It's useful because of its fault tolerance and potentially
083 * very large capacity.
084 * 
085 * <p>
086 * The local implementation is {@link LocalFileSystem} and distributed
087 * implementation is DistributedFileSystem.
088 *****************************************************************/
089@InterfaceAudience.Public
090@InterfaceStability.Stable
091public abstract class FileSystem extends Configured implements Closeable {
092  public static final String FS_DEFAULT_NAME_KEY = 
093                   CommonConfigurationKeys.FS_DEFAULT_NAME_KEY;
094  public static final String DEFAULT_FS = 
095                   CommonConfigurationKeys.FS_DEFAULT_NAME_DEFAULT;
096
097  public static final Log LOG = LogFactory.getLog(FileSystem.class);
098
099  /**
100   * Priority of the FileSystem shutdown hook.
101   */
102  public static final int SHUTDOWN_HOOK_PRIORITY = 10;
103
104  /** FileSystem cache */
105  static final Cache CACHE = new Cache();
106
107  /** The key this instance is stored under in the cache. */
108  private Cache.Key key;
109
110  /** Recording statistics per a FileSystem class */
111  private static final Map<Class<? extends FileSystem>, Statistics> 
112    statisticsTable =
113      new IdentityHashMap<Class<? extends FileSystem>, Statistics>();
114  
115  /**
116   * The statistics for this file system.
117   */
118  protected Statistics statistics;
119
120  /**
121   * A cache of files that should be deleted when filsystem is closed
122   * or the JVM is exited.
123   */
124  private Set<Path> deleteOnExit = new TreeSet<Path>();
125  
126  boolean resolveSymlinks;
127  /**
128   * This method adds a file system for testing so that we can find it later. It
129   * is only for testing.
130   * @param uri the uri to store it under
131   * @param conf the configuration to store it under
132   * @param fs the file system to store
133   * @throws IOException
134   */
135  static void addFileSystemForTesting(URI uri, Configuration conf,
136      FileSystem fs) throws IOException {
137    CACHE.map.put(new Cache.Key(uri, conf), fs);
138  }
139
140  /**
141   * Get a filesystem instance based on the uri, the passed
142   * configuration and the user
143   * @param uri of the filesystem
144   * @param conf the configuration to use
145   * @param user to perform the get as
146   * @return the filesystem instance
147   * @throws IOException
148   * @throws InterruptedException
149   */
150  public static FileSystem get(final URI uri, final Configuration conf,
151        final String user) throws IOException, InterruptedException {
152    String ticketCachePath =
153      conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
154    UserGroupInformation ugi =
155        UserGroupInformation.getBestUGI(ticketCachePath, user);
156    return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
157      @Override
158      public FileSystem run() throws IOException {
159        return get(uri, conf);
160      }
161    });
162  }
163
164  /**
165   * Returns the configured filesystem implementation.
166   * @param conf the configuration to use
167   */
168  public static FileSystem get(Configuration conf) throws IOException {
169    return get(getDefaultUri(conf), conf);
170  }
171  
172  /** Get the default filesystem URI from a configuration.
173   * @param conf the configuration to use
174   * @return the uri of the default filesystem
175   */
176  public static URI getDefaultUri(Configuration conf) {
177    return URI.create(fixName(conf.get(FS_DEFAULT_NAME_KEY, DEFAULT_FS)));
178  }
179
180  /** Set the default filesystem URI in a configuration.
181   * @param conf the configuration to alter
182   * @param uri the new default filesystem uri
183   */
184  public static void setDefaultUri(Configuration conf, URI uri) {
185    conf.set(FS_DEFAULT_NAME_KEY, uri.toString());
186  }
187
188  /** Set the default filesystem URI in a configuration.
189   * @param conf the configuration to alter
190   * @param uri the new default filesystem uri
191   */
192  public static void setDefaultUri(Configuration conf, String uri) {
193    setDefaultUri(conf, URI.create(fixName(uri)));
194  }
195
196  /** Called after a new FileSystem instance is constructed.
197   * @param name a uri whose authority section names the host, port, etc.
198   *   for this FileSystem
199   * @param conf the configuration
200   */
201  public void initialize(URI name, Configuration conf) throws IOException {
202    statistics = getStatistics(name.getScheme(), getClass());    
203    resolveSymlinks = conf.getBoolean(
204        CommonConfigurationKeys.FS_CLIENT_RESOLVE_REMOTE_SYMLINKS_KEY,
205        CommonConfigurationKeys.FS_CLIENT_RESOLVE_REMOTE_SYMLINKS_DEFAULT);
206  }
207
208  /**
209   * Return the protocol scheme for the FileSystem.
210   * <p/>
211   * This implementation throws an <code>UnsupportedOperationException</code>.
212   *
213   * @return the protocol scheme for the FileSystem.
214   */
215  public String getScheme() {
216    throw new UnsupportedOperationException("Not implemented by the " + getClass().getSimpleName() + " FileSystem implementation");
217  }
218
219  /** Returns a URI whose scheme and authority identify this FileSystem.*/
220  public abstract URI getUri();
221  
222  /**
223   * Return a canonicalized form of this FileSystem's URI.
224   * 
225   * The default implementation simply calls {@link #canonicalizeUri(URI)}
226   * on the filesystem's own URI, so subclasses typically only need to
227   * implement that method.
228   *
229   * @see #canonicalizeUri(URI)
230   */
231  protected URI getCanonicalUri() {
232    return canonicalizeUri(getUri());
233  }
234  
235  /**
236   * Canonicalize the given URI.
237   * 
238   * This is filesystem-dependent, but may for example consist of
239   * canonicalizing the hostname using DNS and adding the default
240   * port if not specified.
241   * 
242   * The default implementation simply fills in the default port if
243   * not specified and if the filesystem has a default port.
244   *
245   * @return URI
246   * @see NetUtils#getCanonicalUri(URI, int)
247   */
248  protected URI canonicalizeUri(URI uri) {
249    if (uri.getPort() == -1 && getDefaultPort() > 0) {
250      // reconstruct the uri with the default port set
251      try {
252        uri = new URI(uri.getScheme(), uri.getUserInfo(),
253            uri.getHost(), getDefaultPort(),
254            uri.getPath(), uri.getQuery(), uri.getFragment());
255      } catch (URISyntaxException e) {
256        // Should never happen!
257        throw new AssertionError("Valid URI became unparseable: " +
258            uri);
259      }
260    }
261    
262    return uri;
263  }
264  
265  /**
266   * Get the default port for this file system.
267   * @return the default port or 0 if there isn't one
268   */
269  protected int getDefaultPort() {
270    return 0;
271  }
272
273  protected static FileSystem getFSofPath(final Path absOrFqPath,
274      final Configuration conf)
275      throws UnsupportedFileSystemException, IOException {
276    absOrFqPath.checkNotSchemeWithRelative();
277    absOrFqPath.checkNotRelative();
278
279    // Uses the default file system if not fully qualified
280    return get(absOrFqPath.toUri(), conf);
281  }
282
283  /**
284   * Get a canonical service name for this file system.  The token cache is
285   * the only user of the canonical service name, and uses it to lookup this
286   * filesystem's service tokens.
287   * If file system provides a token of its own then it must have a canonical
288   * name, otherwise canonical name can be null.
289   * 
290   * Default Impl: If the file system has child file systems 
291   * (such as an embedded file system) then it is assumed that the fs has no
292   * tokens of its own and hence returns a null name; otherwise a service
293   * name is built using Uri and port.
294   * 
295   * @return a service string that uniquely identifies this file system, null
296   *         if the filesystem does not implement tokens
297   * @see SecurityUtil#buildDTServiceName(URI, int) 
298   */
299  @InterfaceAudience.LimitedPrivate({ "HDFS", "MapReduce" })
300  public String getCanonicalServiceName() {
301    return (getChildFileSystems() == null)
302      ? SecurityUtil.buildDTServiceName(getUri(), getDefaultPort())
303      : null;
304  }
305
306  /** @deprecated call #getUri() instead.*/
307  @Deprecated
308  public String getName() { return getUri().toString(); }
309
310  /** @deprecated call #get(URI,Configuration) instead. */
311  @Deprecated
312  public static FileSystem getNamed(String name, Configuration conf)
313    throws IOException {
314    return get(URI.create(fixName(name)), conf);
315  }
316  
317  /** Update old-format filesystem names, for back-compatibility.  This should
318   * eventually be replaced with a checkName() method that throws an exception
319   * for old-format names. */ 
320  private static String fixName(String name) {
321    // convert old-format name to new-format name
322    if (name.equals("local")) {         // "local" is now "file:///".
323      LOG.warn("\"local\" is a deprecated filesystem name."
324               +" Use \"file:///\" instead.");
325      name = "file:///";
326    } else if (name.indexOf('/')==-1) {   // unqualified is "hdfs://"
327      LOG.warn("\""+name+"\" is a deprecated filesystem name."
328               +" Use \"hdfs://"+name+"/\" instead.");
329      name = "hdfs://"+name;
330    }
331    return name;
332  }
333
334  /**
335   * Get the local file system.
336   * @param conf the configuration to configure the file system with
337   * @return a LocalFileSystem
338   */
339  public static LocalFileSystem getLocal(Configuration conf)
340    throws IOException {
341    return (LocalFileSystem)get(LocalFileSystem.NAME, conf);
342  }
343
344  /** Returns the FileSystem for this URI's scheme and authority.  The scheme
345   * of the URI determines a configuration property name,
346   * <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
347   * The entire URI is passed to the FileSystem instance's initialize method.
348   */
349  public static FileSystem get(URI uri, Configuration conf) throws IOException {
350    String scheme = uri.getScheme();
351    String authority = uri.getAuthority();
352
353    if (scheme == null && authority == null) {     // use default FS
354      return get(conf);
355    }
356
357    if (scheme != null && authority == null) {     // no authority
358      URI defaultUri = getDefaultUri(conf);
359      if (scheme.equals(defaultUri.getScheme())    // if scheme matches default
360          && defaultUri.getAuthority() != null) {  // & default has authority
361        return get(defaultUri, conf);              // return default
362      }
363    }
364    
365    String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
366    if (conf.getBoolean(disableCacheName, false)) {
367      return createFileSystem(uri, conf);
368    }
369
370    return CACHE.get(uri, conf);
371  }
372
373  /**
374   * Returns the FileSystem for this URI's scheme and authority and the 
375   * passed user. Internally invokes {@link #newInstance(URI, Configuration)}
376   * @param uri of the filesystem
377   * @param conf the configuration to use
378   * @param user to perform the get as
379   * @return filesystem instance
380   * @throws IOException
381   * @throws InterruptedException
382   */
383  public static FileSystem newInstance(final URI uri, final Configuration conf,
384      final String user) throws IOException, InterruptedException {
385    String ticketCachePath =
386      conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
387    UserGroupInformation ugi =
388        UserGroupInformation.getBestUGI(ticketCachePath, user);
389    return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
390      @Override
391      public FileSystem run() throws IOException {
392        return newInstance(uri,conf); 
393      }
394    });
395  }
396  /** Returns the FileSystem for this URI's scheme and authority.  The scheme
397   * of the URI determines a configuration property name,
398   * <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
399   * The entire URI is passed to the FileSystem instance's initialize method.
400   * This always returns a new FileSystem object.
401   */
402  public static FileSystem newInstance(URI uri, Configuration conf) throws IOException {
403    String scheme = uri.getScheme();
404    String authority = uri.getAuthority();
405
406    if (scheme == null) {                       // no scheme: use default FS
407      return newInstance(conf);
408    }
409
410    if (authority == null) {                       // no authority
411      URI defaultUri = getDefaultUri(conf);
412      if (scheme.equals(defaultUri.getScheme())    // if scheme matches default
413          && defaultUri.getAuthority() != null) {  // & default has authority
414        return newInstance(defaultUri, conf);              // return default
415      }
416    }
417    return CACHE.getUnique(uri, conf);
418  }
419
420  /** Returns a unique configured filesystem implementation.
421   * This always returns a new FileSystem object.
422   * @param conf the configuration to use
423   */
424  public static FileSystem newInstance(Configuration conf) throws IOException {
425    return newInstance(getDefaultUri(conf), conf);
426  }
427
428  /**
429   * Get a unique local file system object
430   * @param conf the configuration to configure the file system with
431   * @return a LocalFileSystem
432   * This always returns a new FileSystem object.
433   */
434  public static LocalFileSystem newInstanceLocal(Configuration conf)
435    throws IOException {
436    return (LocalFileSystem)newInstance(LocalFileSystem.NAME, conf);
437  }
438
439  /**
440   * Close all cached filesystems. Be sure those filesystems are not
441   * used anymore.
442   * 
443   * @throws IOException
444   */
445  public static void closeAll() throws IOException {
446    CACHE.closeAll();
447  }
448
449  /**
450   * Close all cached filesystems for a given UGI. Be sure those filesystems 
451   * are not used anymore.
452   * @param ugi user group info to close
453   * @throws IOException
454   */
455  public static void closeAllForUGI(UserGroupInformation ugi) 
456  throws IOException {
457    CACHE.closeAll(ugi);
458  }
459
460  /** 
461   * Make sure that a path specifies a FileSystem.
462   * @param path to use
463   */
464  public Path makeQualified(Path path) {
465    checkPath(path);
466    return path.makeQualified(this.getUri(), this.getWorkingDirectory());
467  }
468    
469  /**
470   * Get a new delegation token for this file system.
471   * This is an internal method that should have been declared protected
472   * but wasn't historically.
473   * Callers should use {@link #addDelegationTokens(String, Credentials)}
474   * 
475   * @param renewer the account name that is allowed to renew the token.
476   * @return a new delegation token
477   * @throws IOException
478   */
479  @InterfaceAudience.Private()
480  public Token<?> getDelegationToken(String renewer) throws IOException {
481    return null;
482  }
483  
484  /**
485   * Obtain all delegation tokens used by this FileSystem that are not
486   * already present in the given Credentials.  Existing tokens will neither
487   * be verified as valid nor having the given renewer.  Missing tokens will
488   * be acquired and added to the given Credentials.
489   * 
490   * Default Impl: works for simple fs with its own token
491   * and also for an embedded fs whose tokens are those of its
492   * children file system (i.e. the embedded fs has not tokens of its
493   * own).
494   * 
495   * @param renewer the user allowed to renew the delegation tokens
496   * @param credentials cache in which to add new delegation tokens
497   * @return list of new delegation tokens
498   * @throws IOException
499   */
500  @InterfaceAudience.LimitedPrivate({ "HDFS", "MapReduce" })
501  public Token<?>[] addDelegationTokens(
502      final String renewer, Credentials credentials) throws IOException {
503    if (credentials == null) {
504      credentials = new Credentials();
505    }
506    final List<Token<?>> tokens = new ArrayList<Token<?>>();
507    collectDelegationTokens(renewer, credentials, tokens);
508    return tokens.toArray(new Token<?>[tokens.size()]);
509  }
510  
511  /**
512   * Recursively obtain the tokens for this FileSystem and all descended
513   * FileSystems as determined by getChildFileSystems().
514   * @param renewer the user allowed to renew the delegation tokens
515   * @param credentials cache in which to add the new delegation tokens
516   * @param tokens list in which to add acquired tokens
517   * @throws IOException
518   */
519  private void collectDelegationTokens(final String renewer,
520                                       final Credentials credentials,
521                                       final List<Token<?>> tokens)
522                                           throws IOException {
523    final String serviceName = getCanonicalServiceName();
524    // Collect token of the this filesystem and then of its embedded children
525    if (serviceName != null) { // fs has token, grab it
526      final Text service = new Text(serviceName);
527      Token<?> token = credentials.getToken(service);
528      if (token == null) {
529        token = getDelegationToken(renewer);
530        if (token != null) {
531          tokens.add(token);
532          credentials.addToken(service, token);
533        }
534      }
535    }
536    // Now collect the tokens from the children
537    final FileSystem[] children = getChildFileSystems();
538    if (children != null) {
539      for (final FileSystem fs : children) {
540        fs.collectDelegationTokens(renewer, credentials, tokens);
541      }
542    }
543  }
544
545  /**
546   * Get all the immediate child FileSystems embedded in this FileSystem.
547   * It does not recurse and get grand children.  If a FileSystem
548   * has multiple child FileSystems, then it should return a unique list
549   * of those FileSystems.  Default is to return null to signify no children.
550   * 
551   * @return FileSystems used by this FileSystem
552   */
553  @InterfaceAudience.LimitedPrivate({ "HDFS" })
554  @VisibleForTesting
555  public FileSystem[] getChildFileSystems() {
556    return null;
557  }
558  
559  /** create a file with the provided permission
560   * The permission of the file is set to be the provided permission as in
561   * setPermission, not permission&~umask
562   * 
563   * It is implemented using two RPCs. It is understood that it is inefficient,
564   * but the implementation is thread-safe. The other option is to change the
565   * value of umask in configuration to be 0, but it is not thread-safe.
566   * 
567   * @param fs file system handle
568   * @param file the name of the file to be created
569   * @param permission the permission of the file
570   * @return an output stream
571   * @throws IOException
572   */
573  public static FSDataOutputStream create(FileSystem fs,
574      Path file, FsPermission permission) throws IOException {
575    // create the file with default permission
576    FSDataOutputStream out = fs.create(file);
577    // set its permission to the supplied one
578    fs.setPermission(file, permission);
579    return out;
580  }
581
582  /** create a directory with the provided permission
583   * The permission of the directory is set to be the provided permission as in
584   * setPermission, not permission&~umask
585   * 
586   * @see #create(FileSystem, Path, FsPermission)
587   * 
588   * @param fs file system handle
589   * @param dir the name of the directory to be created
590   * @param permission the permission of the directory
591   * @return true if the directory creation succeeds; false otherwise
592   * @throws IOException
593   */
594  public static boolean mkdirs(FileSystem fs, Path dir, FsPermission permission)
595  throws IOException {
596    // create the directory using the default permission
597    boolean result = fs.mkdirs(dir);
598    // set its permission to be the supplied one
599    fs.setPermission(dir, permission);
600    return result;
601  }
602
603  ///////////////////////////////////////////////////////////////
604  // FileSystem
605  ///////////////////////////////////////////////////////////////
606
607  protected FileSystem() {
608    super(null);
609  }
610
611  /** 
612   * Check that a Path belongs to this FileSystem.
613   * @param path to check
614   */
615  protected void checkPath(Path path) {
616    URI uri = path.toUri();
617    String thatScheme = uri.getScheme();
618    if (thatScheme == null)                // fs is relative
619      return;
620    URI thisUri = getCanonicalUri();
621    String thisScheme = thisUri.getScheme();
622    //authority and scheme are not case sensitive
623    if (thisScheme.equalsIgnoreCase(thatScheme)) {// schemes match
624      String thisAuthority = thisUri.getAuthority();
625      String thatAuthority = uri.getAuthority();
626      if (thatAuthority == null &&                // path's authority is null
627          thisAuthority != null) {                // fs has an authority
628        URI defaultUri = getDefaultUri(getConf());
629        if (thisScheme.equalsIgnoreCase(defaultUri.getScheme())) {
630          uri = defaultUri; // schemes match, so use this uri instead
631        } else {
632          uri = null; // can't determine auth of the path
633        }
634      }
635      if (uri != null) {
636        // canonicalize uri before comparing with this fs
637        uri = canonicalizeUri(uri);
638        thatAuthority = uri.getAuthority();
639        if (thisAuthority == thatAuthority ||       // authorities match
640            (thisAuthority != null &&
641             thisAuthority.equalsIgnoreCase(thatAuthority)))
642          return;
643      }
644    }
645    throw new IllegalArgumentException("Wrong FS: "+path+
646                                       ", expected: "+this.getUri());
647  }
648
649  /**
650   * Return an array containing hostnames, offset and size of 
651   * portions of the given file.  For a nonexistent 
652   * file or regions, null will be returned.
653   *
654   * This call is most helpful with DFS, where it returns 
655   * hostnames of machines that contain the given file.
656   *
657   * The FileSystem will simply return an elt containing 'localhost'.
658   *
659   * @param file FilesStatus to get data from
660   * @param start offset into the given file
661   * @param len length for which to get locations for
662   */
663  public BlockLocation[] getFileBlockLocations(FileStatus file, 
664      long start, long len) throws IOException {
665    if (file == null) {
666      return null;
667    }
668
669    if (start < 0 || len < 0) {
670      throw new IllegalArgumentException("Invalid start or len parameter");
671    }
672
673    if (file.getLen() <= start) {
674      return new BlockLocation[0];
675
676    }
677    String[] name = { "localhost:50010" };
678    String[] host = { "localhost" };
679    return new BlockLocation[] {
680      new BlockLocation(name, host, 0, file.getLen()) };
681  }
682 
683
684  /**
685   * Return an array containing hostnames, offset and size of 
686   * portions of the given file.  For a nonexistent 
687   * file or regions, null will be returned.
688   *
689   * This call is most helpful with DFS, where it returns 
690   * hostnames of machines that contain the given file.
691   *
692   * The FileSystem will simply return an elt containing 'localhost'.
693   *
694   * @param p path is used to identify an FS since an FS could have
695   *          another FS that it could be delegating the call to
696   * @param start offset into the given file
697   * @param len length for which to get locations for
698   */
699  public BlockLocation[] getFileBlockLocations(Path p, 
700      long start, long len) throws IOException {
701    if (p == null) {
702      throw new NullPointerException();
703    }
704    FileStatus file = getFileStatus(p);
705    return getFileBlockLocations(file, start, len);
706  }
707  
708  /**
709   * Return a set of server default configuration values
710   * @return server default configuration values
711   * @throws IOException
712   * @deprecated use {@link #getServerDefaults(Path)} instead
713   */
714  @Deprecated
715  public FsServerDefaults getServerDefaults() throws IOException {
716    Configuration conf = getConf();
717    // CRC32 is chosen as default as it is available in all 
718    // releases that support checksum.
719    // The client trash configuration is ignored.
720    return new FsServerDefaults(getDefaultBlockSize(), 
721        conf.getInt("io.bytes.per.checksum", 512), 
722        64 * 1024, 
723        getDefaultReplication(),
724        conf.getInt("io.file.buffer.size", 4096),
725        false,
726        CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT,
727        DataChecksum.Type.CRC32);
728  }
729
730  /**
731   * Return a set of server default configuration values
732   * @param p path is used to identify an FS since an FS could have
733   *          another FS that it could be delegating the call to
734   * @return server default configuration values
735   * @throws IOException
736   */
737  public FsServerDefaults getServerDefaults(Path p) throws IOException {
738    return getServerDefaults();
739  }
740
741  /**
742   * Return the fully-qualified path of path f resolving the path
743   * through any symlinks or mount point
744   * @param p path to be resolved
745   * @return fully qualified path 
746   * @throws FileNotFoundException
747   */
748   public Path resolvePath(final Path p) throws IOException {
749     checkPath(p);
750     return getFileStatus(p).getPath();
751   }
752
753  /**
754   * Opens an FSDataInputStream at the indicated Path.
755   * @param f the file name to open
756   * @param bufferSize the size of the buffer to be used.
757   */
758  public abstract FSDataInputStream open(Path f, int bufferSize)
759    throws IOException;
760    
761  /**
762   * Opens an FSDataInputStream at the indicated Path.
763   * @param f the file to open
764   */
765  public FSDataInputStream open(Path f) throws IOException {
766    return open(f, getConf().getInt("io.file.buffer.size", 4096));
767  }
768
769  /**
770   * Create an FSDataOutputStream at the indicated Path.
771   * Files are overwritten by default.
772   * @param f the file to create
773   */
774  public FSDataOutputStream create(Path f) throws IOException {
775    return create(f, true);
776  }
777
778  /**
779   * Create an FSDataOutputStream at the indicated Path.
780   * @param f the file to create
781   * @param overwrite if a file with this name already exists, then if true,
782   *   the file will be overwritten, and if false an exception will be thrown.
783   */
784  public FSDataOutputStream create(Path f, boolean overwrite)
785      throws IOException {
786    return create(f, overwrite, 
787                  getConf().getInt("io.file.buffer.size", 4096),
788                  getDefaultReplication(f),
789                  getDefaultBlockSize(f));
790  }
791
792  /**
793   * Create an FSDataOutputStream at the indicated Path with write-progress
794   * reporting.
795   * Files are overwritten by default.
796   * @param f the file to create
797   * @param progress to report progress
798   */
799  public FSDataOutputStream create(Path f, Progressable progress) 
800      throws IOException {
801    return create(f, true, 
802                  getConf().getInt("io.file.buffer.size", 4096),
803                  getDefaultReplication(f),
804                  getDefaultBlockSize(f), progress);
805  }
806
807  /**
808   * Create an FSDataOutputStream at the indicated Path.
809   * Files are overwritten by default.
810   * @param f the file to create
811   * @param replication the replication factor
812   */
813  public FSDataOutputStream create(Path f, short replication)
814      throws IOException {
815    return create(f, true, 
816                  getConf().getInt("io.file.buffer.size", 4096),
817                  replication,
818                  getDefaultBlockSize(f));
819  }
820
821  /**
822   * Create an FSDataOutputStream at the indicated Path with write-progress
823   * reporting.
824   * Files are overwritten by default.
825   * @param f the file to create
826   * @param replication the replication factor
827   * @param progress to report progress
828   */
829  public FSDataOutputStream create(Path f, short replication, 
830      Progressable progress) throws IOException {
831    return create(f, true, 
832                  getConf().getInt(
833                      CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
834                      CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT),
835                  replication,
836                  getDefaultBlockSize(f), progress);
837  }
838
839    
840  /**
841   * Create an FSDataOutputStream at the indicated Path.
842   * @param f the file name to create
843   * @param overwrite if a file with this name already exists, then if true,
844   *   the file will be overwritten, and if false an error will be thrown.
845   * @param bufferSize the size of the buffer to be used.
846   */
847  public FSDataOutputStream create(Path f, 
848                                   boolean overwrite,
849                                   int bufferSize
850                                   ) throws IOException {
851    return create(f, overwrite, bufferSize, 
852                  getDefaultReplication(f),
853                  getDefaultBlockSize(f));
854  }
855    
856  /**
857   * Create an FSDataOutputStream at the indicated Path with write-progress
858   * reporting.
859   * @param f the path of the file to open
860   * @param overwrite if a file with this name already exists, then if true,
861   *   the file will be overwritten, and if false an error will be thrown.
862   * @param bufferSize the size of the buffer to be used.
863   */
864  public FSDataOutputStream create(Path f, 
865                                   boolean overwrite,
866                                   int bufferSize,
867                                   Progressable progress
868                                   ) throws IOException {
869    return create(f, overwrite, bufferSize, 
870                  getDefaultReplication(f),
871                  getDefaultBlockSize(f), progress);
872  }
873    
874    
875  /**
876   * Create an FSDataOutputStream at the indicated Path.
877   * @param f the file name to open
878   * @param overwrite if a file with this name already exists, then if true,
879   *   the file will be overwritten, and if false an error will be thrown.
880   * @param bufferSize the size of the buffer to be used.
881   * @param replication required block replication for the file. 
882   */
883  public FSDataOutputStream create(Path f, 
884                                   boolean overwrite,
885                                   int bufferSize,
886                                   short replication,
887                                   long blockSize
888                                   ) throws IOException {
889    return create(f, overwrite, bufferSize, replication, blockSize, null);
890  }
891
892  /**
893   * Create an FSDataOutputStream at the indicated Path with write-progress
894   * reporting.
895   * @param f the file name to open
896   * @param overwrite if a file with this name already exists, then if true,
897   *   the file will be overwritten, and if false an error will be thrown.
898   * @param bufferSize the size of the buffer to be used.
899   * @param replication required block replication for the file. 
900   */
901  public FSDataOutputStream create(Path f,
902                                            boolean overwrite,
903                                            int bufferSize,
904                                            short replication,
905                                            long blockSize,
906                                            Progressable progress
907                                            ) throws IOException {
908    return this.create(f, FsPermission.getFileDefault().applyUMask(
909        FsPermission.getUMask(getConf())), overwrite, bufferSize,
910        replication, blockSize, progress);
911  }
912
913  /**
914   * Create an FSDataOutputStream at the indicated Path with write-progress
915   * reporting.
916   * @param f the file name to open
917   * @param permission
918   * @param overwrite if a file with this name already exists, then if true,
919   *   the file will be overwritten, and if false an error will be thrown.
920   * @param bufferSize the size of the buffer to be used.
921   * @param replication required block replication for the file.
922   * @param blockSize
923   * @param progress
924   * @throws IOException
925   * @see #setPermission(Path, FsPermission)
926   */
927  public abstract FSDataOutputStream create(Path f,
928      FsPermission permission,
929      boolean overwrite,
930      int bufferSize,
931      short replication,
932      long blockSize,
933      Progressable progress) throws IOException;
934  
935  /**
936   * Create an FSDataOutputStream at the indicated Path with write-progress
937   * reporting.
938   * @param f the file name to open
939   * @param permission
940   * @param flags {@link CreateFlag}s to use for this stream.
941   * @param bufferSize the size of the buffer to be used.
942   * @param replication required block replication for the file.
943   * @param blockSize
944   * @param progress
945   * @throws IOException
946   * @see #setPermission(Path, FsPermission)
947   */
948  public FSDataOutputStream create(Path f,
949      FsPermission permission,
950      EnumSet<CreateFlag> flags,
951      int bufferSize,
952      short replication,
953      long blockSize,
954      Progressable progress) throws IOException {
955    return create(f, permission, flags, bufferSize, replication,
956        blockSize, progress, null);
957  }
958  
959  /**
960   * Create an FSDataOutputStream at the indicated Path with a custom
961   * checksum option
962   * @param f the file name to open
963   * @param permission
964   * @param flags {@link CreateFlag}s to use for this stream.
965   * @param bufferSize the size of the buffer to be used.
966   * @param replication required block replication for the file.
967   * @param blockSize
968   * @param progress
969   * @param checksumOpt checksum parameter. If null, the values
970   *        found in conf will be used.
971   * @throws IOException
972   * @see #setPermission(Path, FsPermission)
973   */
974  public FSDataOutputStream create(Path f,
975      FsPermission permission,
976      EnumSet<CreateFlag> flags,
977      int bufferSize,
978      short replication,
979      long blockSize,
980      Progressable progress,
981      ChecksumOpt checksumOpt) throws IOException {
982    // Checksum options are ignored by default. The file systems that
983    // implement checksum need to override this method. The full
984    // support is currently only available in DFS.
985    return create(f, permission, flags.contains(CreateFlag.OVERWRITE), 
986        bufferSize, replication, blockSize, progress);
987  }
988
989  /*.
990   * This create has been added to support the FileContext that processes
991   * the permission
992   * with umask before calling this method.
993   * This a temporary method added to support the transition from FileSystem
994   * to FileContext for user applications.
995   */
996  @Deprecated
997  protected FSDataOutputStream primitiveCreate(Path f,
998     FsPermission absolutePermission, EnumSet<CreateFlag> flag, int bufferSize,
999     short replication, long blockSize, Progressable progress,
1000     ChecksumOpt checksumOpt) throws IOException {
1001
1002    boolean pathExists = exists(f);
1003    CreateFlag.validate(f, pathExists, flag);
1004    
1005    // Default impl  assumes that permissions do not matter and 
1006    // nor does the bytesPerChecksum  hence
1007    // calling the regular create is good enough.
1008    // FSs that implement permissions should override this.
1009
1010    if (pathExists && flag.contains(CreateFlag.APPEND)) {
1011      return append(f, bufferSize, progress);
1012    }
1013    
1014    return this.create(f, absolutePermission,
1015        flag.contains(CreateFlag.OVERWRITE), bufferSize, replication,
1016        blockSize, progress);
1017  }
1018  
1019  /**
1020   * This version of the mkdirs method assumes that the permission is absolute.
1021   * It has been added to support the FileContext that processes the permission
1022   * with umask before calling this method.
1023   * This a temporary method added to support the transition from FileSystem
1024   * to FileContext for user applications.
1025   */
1026  @Deprecated
1027  protected boolean primitiveMkdir(Path f, FsPermission absolutePermission)
1028    throws IOException {
1029    // Default impl is to assume that permissions do not matter and hence
1030    // calling the regular mkdirs is good enough.
1031    // FSs that implement permissions should override this.
1032   return this.mkdirs(f, absolutePermission);
1033  }
1034
1035
1036  /**
1037   * This version of the mkdirs method assumes that the permission is absolute.
1038   * It has been added to support the FileContext that processes the permission
1039   * with umask before calling this method.
1040   * This a temporary method added to support the transition from FileSystem
1041   * to FileContext for user applications.
1042   */
1043  @Deprecated
1044  protected void primitiveMkdir(Path f, FsPermission absolutePermission, 
1045                    boolean createParent)
1046    throws IOException {
1047    
1048    if (!createParent) { // parent must exist.
1049      // since the this.mkdirs makes parent dirs automatically
1050      // we must throw exception if parent does not exist.
1051      final FileStatus stat = getFileStatus(f.getParent());
1052      if (stat == null) {
1053        throw new FileNotFoundException("Missing parent:" + f);
1054      }
1055      if (!stat.isDirectory()) {
1056        throw new ParentNotDirectoryException("parent is not a dir");
1057      }
1058      // parent does exist - go ahead with mkdir of leaf
1059    }
1060    // Default impl is to assume that permissions do not matter and hence
1061    // calling the regular mkdirs is good enough.
1062    // FSs that implement permissions should override this.
1063    if (!this.mkdirs(f, absolutePermission)) {
1064      throw new IOException("mkdir of "+ f + " failed");
1065    }
1066  }
1067
1068  /**
1069   * Opens an FSDataOutputStream at the indicated Path with write-progress
1070   * reporting. Same as create(), except fails if parent directory doesn't
1071   * already exist.
1072   * @param f the file name to open
1073   * @param overwrite if a file with this name already exists, then if true,
1074   * the file will be overwritten, and if false an error will be thrown.
1075   * @param bufferSize the size of the buffer to be used.
1076   * @param replication required block replication for the file.
1077   * @param blockSize
1078   * @param progress
1079   * @throws IOException
1080   * @see #setPermission(Path, FsPermission)
1081   * @deprecated API only for 0.20-append
1082   */
1083  @Deprecated
1084  public FSDataOutputStream createNonRecursive(Path f,
1085      boolean overwrite,
1086      int bufferSize, short replication, long blockSize,
1087      Progressable progress) throws IOException {
1088    return this.createNonRecursive(f, FsPermission.getFileDefault(),
1089        overwrite, bufferSize, replication, blockSize, progress);
1090  }
1091
1092  /**
1093   * Opens an FSDataOutputStream at the indicated Path with write-progress
1094   * reporting. Same as create(), except fails if parent directory doesn't
1095   * already exist.
1096   * @param f the file name to open
1097   * @param permission
1098   * @param overwrite if a file with this name already exists, then if true,
1099   * the file will be overwritten, and if false an error will be thrown.
1100   * @param bufferSize the size of the buffer to be used.
1101   * @param replication required block replication for the file.
1102   * @param blockSize
1103   * @param progress
1104   * @throws IOException
1105   * @see #setPermission(Path, FsPermission)
1106   * @deprecated API only for 0.20-append
1107   */
1108   @Deprecated
1109   public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
1110       boolean overwrite, int bufferSize, short replication, long blockSize,
1111       Progressable progress) throws IOException {
1112     return createNonRecursive(f, permission,
1113         overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
1114             : EnumSet.of(CreateFlag.CREATE), bufferSize,
1115             replication, blockSize, progress);
1116   }
1117
1118   /**
1119    * Opens an FSDataOutputStream at the indicated Path with write-progress
1120    * reporting. Same as create(), except fails if parent directory doesn't
1121    * already exist.
1122    * @param f the file name to open
1123    * @param permission
1124    * @param flags {@link CreateFlag}s to use for this stream.
1125    * @param bufferSize the size of the buffer to be used.
1126    * @param replication required block replication for the file.
1127    * @param blockSize
1128    * @param progress
1129    * @throws IOException
1130    * @see #setPermission(Path, FsPermission)
1131    * @deprecated API only for 0.20-append
1132    */
1133    @Deprecated
1134    public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
1135        EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize,
1136        Progressable progress) throws IOException {
1137      throw new IOException("createNonRecursive unsupported for this filesystem "
1138          + this.getClass());
1139    }
1140
1141  /**
1142   * Creates the given Path as a brand-new zero-length file.  If
1143   * create fails, or if it already existed, return false.
1144   *
1145   * @param f path to use for create
1146   */
1147  public boolean createNewFile(Path f) throws IOException {
1148    if (exists(f)) {
1149      return false;
1150    } else {
1151      create(f, false, getConf().getInt("io.file.buffer.size", 4096)).close();
1152      return true;
1153    }
1154  }
1155
1156  /**
1157   * Append to an existing file (optional operation).
1158   * Same as append(f, getConf().getInt("io.file.buffer.size", 4096), null)
1159   * @param f the existing file to be appended.
1160   * @throws IOException
1161   */
1162  public FSDataOutputStream append(Path f) throws IOException {
1163    return append(f, getConf().getInt("io.file.buffer.size", 4096), null);
1164  }
1165  /**
1166   * Append to an existing file (optional operation).
1167   * Same as append(f, bufferSize, null).
1168   * @param f the existing file to be appended.
1169   * @param bufferSize the size of the buffer to be used.
1170   * @throws IOException
1171   */
1172  public FSDataOutputStream append(Path f, int bufferSize) throws IOException {
1173    return append(f, bufferSize, null);
1174  }
1175
1176  /**
1177   * Append to an existing file (optional operation).
1178   * @param f the existing file to be appended.
1179   * @param bufferSize the size of the buffer to be used.
1180   * @param progress for reporting progress if it is not null.
1181   * @throws IOException
1182   */
1183  public abstract FSDataOutputStream append(Path f, int bufferSize,
1184      Progressable progress) throws IOException;
1185
1186  /**
1187   * Concat existing files together.
1188   * @param trg the path to the target destination.
1189   * @param psrcs the paths to the sources to use for the concatenation.
1190   * @throws IOException
1191   */
1192  public void concat(final Path trg, final Path [] psrcs) throws IOException {
1193    throw new UnsupportedOperationException("Not implemented by the " + 
1194        getClass().getSimpleName() + " FileSystem implementation");
1195  }
1196
1197 /**
1198   * Get replication.
1199   * 
1200   * @deprecated Use getFileStatus() instead
1201   * @param src file name
1202   * @return file replication
1203   * @throws IOException
1204   */ 
1205  @Deprecated
1206  public short getReplication(Path src) throws IOException {
1207    return getFileStatus(src).getReplication();
1208  }
1209
1210  /**
1211   * Set replication for an existing file.
1212   * 
1213   * @param src file name
1214   * @param replication new replication
1215   * @throws IOException
1216   * @return true if successful;
1217   *         false if file does not exist or is a directory
1218   */
1219  public boolean setReplication(Path src, short replication)
1220    throws IOException {
1221    return true;
1222  }
1223
1224  /**
1225   * Renames Path src to Path dst.  Can take place on local fs
1226   * or remote DFS.
1227   * @param src path to be renamed
1228   * @param dst new path after rename
1229   * @throws IOException on failure
1230   * @return true if rename is successful
1231   */
1232  public abstract boolean rename(Path src, Path dst) throws IOException;
1233
1234  /**
1235   * Renames Path src to Path dst
1236   * <ul>
1237   * <li
1238   * <li>Fails if src is a file and dst is a directory.
1239   * <li>Fails if src is a directory and dst is a file.
1240   * <li>Fails if the parent of dst does not exist or is a file.
1241   * </ul>
1242   * <p>
1243   * If OVERWRITE option is not passed as an argument, rename fails
1244   * if the dst already exists.
1245   * <p>
1246   * If OVERWRITE option is passed as an argument, rename overwrites
1247   * the dst if it is a file or an empty directory. Rename fails if dst is
1248   * a non-empty directory.
1249   * <p>
1250   * Note that atomicity of rename is dependent on the file system
1251   * implementation. Please refer to the file system documentation for
1252   * details. This default implementation is non atomic.
1253   * <p>
1254   * This method is deprecated since it is a temporary method added to 
1255   * support the transition from FileSystem to FileContext for user 
1256   * applications.
1257   * 
1258   * @param src path to be renamed
1259   * @param dst new path after rename
1260   * @throws IOException on failure
1261   */
1262  @Deprecated
1263  protected void rename(final Path src, final Path dst,
1264      final Rename... options) throws IOException {
1265    // Default implementation
1266    final FileStatus srcStatus = getFileLinkStatus(src);
1267    if (srcStatus == null) {
1268      throw new FileNotFoundException("rename source " + src + " not found.");
1269    }
1270
1271    boolean overwrite = false;
1272    if (null != options) {
1273      for (Rename option : options) {
1274        if (option == Rename.OVERWRITE) {
1275          overwrite = true;
1276        }
1277      }
1278    }
1279
1280    FileStatus dstStatus;
1281    try {
1282      dstStatus = getFileLinkStatus(dst);
1283    } catch (IOException e) {
1284      dstStatus = null;
1285    }
1286    if (dstStatus != null) {
1287      if (srcStatus.isDirectory() != dstStatus.isDirectory()) {
1288        throw new IOException("Source " + src + " Destination " + dst
1289            + " both should be either file or directory");
1290      }
1291      if (!overwrite) {
1292        throw new FileAlreadyExistsException("rename destination " + dst
1293            + " already exists.");
1294      }
1295      // Delete the destination that is a file or an empty directory
1296      if (dstStatus.isDirectory()) {
1297        FileStatus[] list = listStatus(dst);
1298        if (list != null && list.length != 0) {
1299          throw new IOException(
1300              "rename cannot overwrite non empty destination directory " + dst);
1301        }
1302      }
1303      delete(dst, false);
1304    } else {
1305      final Path parent = dst.getParent();
1306      final FileStatus parentStatus = getFileStatus(parent);
1307      if (parentStatus == null) {
1308        throw new FileNotFoundException("rename destination parent " + parent
1309            + " not found.");
1310      }
1311      if (!parentStatus.isDirectory()) {
1312        throw new ParentNotDirectoryException("rename destination parent " + parent
1313            + " is a file.");
1314      }
1315    }
1316    if (!rename(src, dst)) {
1317      throw new IOException("rename from " + src + " to " + dst + " failed.");
1318    }
1319  }
1320  
1321  /**
1322   * Delete a file 
1323   * @deprecated Use {@link #delete(Path, boolean)} instead.
1324   */
1325  @Deprecated
1326  public boolean delete(Path f) throws IOException {
1327    return delete(f, true);
1328  }
1329  
1330  /** Delete a file.
1331   *
1332   * @param f the path to delete.
1333   * @param recursive if path is a directory and set to 
1334   * true, the directory is deleted else throws an exception. In
1335   * case of a file the recursive can be set to either true or false. 
1336   * @return  true if delete is successful else false. 
1337   * @throws IOException
1338   */
1339  public abstract boolean delete(Path f, boolean recursive) throws IOException;
1340
1341  /**
1342   * Mark a path to be deleted when FileSystem is closed.
1343   * When the JVM shuts down,
1344   * all FileSystem objects will be closed automatically.
1345   * Then,
1346   * the marked path will be deleted as a result of closing the FileSystem.
1347   *
1348   * The path has to exist in the file system.
1349   * 
1350   * @param f the path to delete.
1351   * @return  true if deleteOnExit is successful, otherwise false.
1352   * @throws IOException
1353   */
1354  public boolean deleteOnExit(Path f) throws IOException {
1355    if (!exists(f)) {
1356      return false;
1357    }
1358    synchronized (deleteOnExit) {
1359      deleteOnExit.add(f);
1360    }
1361    return true;
1362  }
1363  
1364  /**
1365   * Cancel the deletion of the path when the FileSystem is closed
1366   * @param f the path to cancel deletion
1367   */
1368  public boolean cancelDeleteOnExit(Path f) {
1369    synchronized (deleteOnExit) {
1370      return deleteOnExit.remove(f);
1371    }
1372  }
1373
1374  /**
1375   * Delete all files that were marked as delete-on-exit. This recursively
1376   * deletes all files in the specified paths.
1377   */
1378  protected void processDeleteOnExit() {
1379    synchronized (deleteOnExit) {
1380      for (Iterator<Path> iter = deleteOnExit.iterator(); iter.hasNext();) {
1381        Path path = iter.next();
1382        try {
1383          if (exists(path)) {
1384            delete(path, true);
1385          }
1386        }
1387        catch (IOException e) {
1388          LOG.info("Ignoring failure to deleteOnExit for path " + path);
1389        }
1390        iter.remove();
1391      }
1392    }
1393  }
1394  
1395  /** Check if exists.
1396   * @param f source file
1397   */
1398  public boolean exists(Path f) throws IOException {
1399    try {
1400      return getFileStatus(f) != null;
1401    } catch (FileNotFoundException e) {
1402      return false;
1403    }
1404  }
1405
1406  /** True iff the named path is a directory.
1407   * Note: Avoid using this method. Instead reuse the FileStatus 
1408   * returned by getFileStatus() or listStatus() methods.
1409   * @param f path to check
1410   */
1411  public boolean isDirectory(Path f) throws IOException {
1412    try {
1413      return getFileStatus(f).isDirectory();
1414    } catch (FileNotFoundException e) {
1415      return false;               // f does not exist
1416    }
1417  }
1418
1419  /** True iff the named path is a regular file.
1420   * Note: Avoid using this method. Instead reuse the FileStatus 
1421   * returned by getFileStatus() or listStatus() methods.
1422   * @param f path to check
1423   */
1424  public boolean isFile(Path f) throws IOException {
1425    try {
1426      return getFileStatus(f).isFile();
1427    } catch (FileNotFoundException e) {
1428      return false;               // f does not exist
1429    }
1430  }
1431  
1432  /** The number of bytes in a file. */
1433  /** @deprecated Use getFileStatus() instead */
1434  @Deprecated
1435  public long getLength(Path f) throws IOException {
1436    return getFileStatus(f).getLen();
1437  }
1438    
1439  /** Return the {@link ContentSummary} of a given {@link Path}.
1440  * @param f path to use
1441  */
1442  public ContentSummary getContentSummary(Path f) throws IOException {
1443    FileStatus status = getFileStatus(f);
1444    if (status.isFile()) {
1445      // f is a file
1446      return new ContentSummary(status.getLen(), 1, 0);
1447    }
1448    // f is a directory
1449    long[] summary = {0, 0, 1};
1450    for(FileStatus s : listStatus(f)) {
1451      ContentSummary c = s.isDirectory() ? getContentSummary(s.getPath()) :
1452                                     new ContentSummary(s.getLen(), 1, 0);
1453      summary[0] += c.getLength();
1454      summary[1] += c.getFileCount();
1455      summary[2] += c.getDirectoryCount();
1456    }
1457    return new ContentSummary(summary[0], summary[1], summary[2]);
1458  }
1459
1460  final private static PathFilter DEFAULT_FILTER = new PathFilter() {
1461      @Override
1462      public boolean accept(Path file) {
1463        return true;
1464      }     
1465    };
1466    
1467  /**
1468   * List the statuses of the files/directories in the given path if the path is
1469   * a directory.
1470   * 
1471   * @param f given path
1472   * @return the statuses of the files/directories in the given patch
1473   * @throws FileNotFoundException when the path does not exist;
1474   *         IOException see specific implementation
1475   */
1476  public abstract FileStatus[] listStatus(Path f) throws FileNotFoundException, 
1477                                                         IOException;
1478    
1479  /*
1480   * Filter files/directories in the given path using the user-supplied path
1481   * filter. Results are added to the given array <code>results</code>.
1482   */
1483  private void listStatus(ArrayList<FileStatus> results, Path f,
1484      PathFilter filter) throws FileNotFoundException, IOException {
1485    FileStatus listing[] = listStatus(f);
1486    if (listing == null) {
1487      throw new IOException("Error accessing " + f);
1488    }
1489
1490    for (int i = 0; i < listing.length; i++) {
1491      if (filter.accept(listing[i].getPath())) {
1492        results.add(listing[i]);
1493      }
1494    }
1495  }
1496
1497  /**
1498   * @return an iterator over the corrupt files under the given path
1499   * (may contain duplicates if a file has more than one corrupt block)
1500   * @throws IOException
1501   */
1502  public RemoteIterator<Path> listCorruptFileBlocks(Path path)
1503    throws IOException {
1504    throw new UnsupportedOperationException(getClass().getCanonicalName() +
1505                                            " does not support" +
1506                                            " listCorruptFileBlocks");
1507  }
1508
1509  /**
1510   * Filter files/directories in the given path using the user-supplied path
1511   * filter.
1512   * 
1513   * @param f
1514   *          a path name
1515   * @param filter
1516   *          the user-supplied path filter
1517   * @return an array of FileStatus objects for the files under the given path
1518   *         after applying the filter
1519   * @throws FileNotFoundException when the path does not exist;
1520   *         IOException see specific implementation   
1521   */
1522  public FileStatus[] listStatus(Path f, PathFilter filter) 
1523                                   throws FileNotFoundException, IOException {
1524    ArrayList<FileStatus> results = new ArrayList<FileStatus>();
1525    listStatus(results, f, filter);
1526    return results.toArray(new FileStatus[results.size()]);
1527  }
1528
1529  /**
1530   * Filter files/directories in the given list of paths using default
1531   * path filter.
1532   * 
1533   * @param files
1534   *          a list of paths
1535   * @return a list of statuses for the files under the given paths after
1536   *         applying the filter default Path filter
1537   * @throws FileNotFoundException when the path does not exist;
1538   *         IOException see specific implementation
1539   */
1540  public FileStatus[] listStatus(Path[] files)
1541      throws FileNotFoundException, IOException {
1542    return listStatus(files, DEFAULT_FILTER);
1543  }
1544
1545  /**
1546   * Filter files/directories in the given list of paths using user-supplied
1547   * path filter.
1548   * 
1549   * @param files
1550   *          a list of paths
1551   * @param filter
1552   *          the user-supplied path filter
1553   * @return a list of statuses for the files under the given paths after
1554   *         applying the filter
1555   * @throws FileNotFoundException when the path does not exist;
1556   *         IOException see specific implementation
1557   */
1558  public FileStatus[] listStatus(Path[] files, PathFilter filter)
1559      throws FileNotFoundException, IOException {
1560    ArrayList<FileStatus> results = new ArrayList<FileStatus>();
1561    for (int i = 0; i < files.length; i++) {
1562      listStatus(results, files[i], filter);
1563    }
1564    return results.toArray(new FileStatus[results.size()]);
1565  }
1566
1567  /**
1568   * <p>Return all the files that match filePattern and are not checksum
1569   * files. Results are sorted by their names.
1570   * 
1571   * <p>
1572   * A filename pattern is composed of <i>regular</i> characters and
1573   * <i>special pattern matching</i> characters, which are:
1574   *
1575   * <dl>
1576   *  <dd>
1577   *   <dl>
1578   *    <p>
1579   *    <dt> <tt> ? </tt>
1580   *    <dd> Matches any single character.
1581   *
1582   *    <p>
1583   *    <dt> <tt> * </tt>
1584   *    <dd> Matches zero or more characters.
1585   *
1586   *    <p>
1587   *    <dt> <tt> [<i>abc</i>] </tt>
1588   *    <dd> Matches a single character from character set
1589   *     <tt>{<i>a,b,c</i>}</tt>.
1590   *
1591   *    <p>
1592   *    <dt> <tt> [<i>a</i>-<i>b</i>] </tt>
1593   *    <dd> Matches a single character from the character range
1594   *     <tt>{<i>a...b</i>}</tt>.  Note that character <tt><i>a</i></tt> must be
1595   *     lexicographically less than or equal to character <tt><i>b</i></tt>.
1596   *
1597   *    <p>
1598   *    <dt> <tt> [^<i>a</i>] </tt>
1599   *    <dd> Matches a single character that is not from character set or range
1600   *     <tt>{<i>a</i>}</tt>.  Note that the <tt>^</tt> character must occur
1601   *     immediately to the right of the opening bracket.
1602   *
1603   *    <p>
1604   *    <dt> <tt> \<i>c</i> </tt>
1605   *    <dd> Removes (escapes) any special meaning of character <i>c</i>.
1606   *
1607   *    <p>
1608   *    <dt> <tt> {ab,cd} </tt>
1609   *    <dd> Matches a string from the string set <tt>{<i>ab, cd</i>} </tt>
1610   *    
1611   *    <p>
1612   *    <dt> <tt> {ab,c{de,fh}} </tt>
1613   *    <dd> Matches a string from the string set <tt>{<i>ab, cde, cfh</i>}</tt>
1614   *
1615   *   </dl>
1616   *  </dd>
1617   * </dl>
1618   *
1619   * @param pathPattern a regular expression specifying a pth pattern
1620
1621   * @return an array of paths that match the path pattern
1622   * @throws IOException
1623   */
1624  public FileStatus[] globStatus(Path pathPattern) throws IOException {
1625    return new Globber(this, pathPattern, DEFAULT_FILTER).glob();
1626  }
1627  
1628  /**
1629   * Return an array of FileStatus objects whose path names match pathPattern
1630   * and is accepted by the user-supplied path filter. Results are sorted by
1631   * their path names.
1632   * Return null if pathPattern has no glob and the path does not exist.
1633   * Return an empty array if pathPattern has a glob and no path matches it. 
1634   * 
1635   * @param pathPattern
1636   *          a regular expression specifying the path pattern
1637   * @param filter
1638   *          a user-supplied path filter
1639   * @return an array of FileStatus objects
1640   * @throws IOException if any I/O error occurs when fetching file status
1641   */
1642  public FileStatus[] globStatus(Path pathPattern, PathFilter filter)
1643      throws IOException {
1644    return new Globber(this, pathPattern, filter).glob();
1645  }
1646  
1647  /**
1648   * List the statuses of the files/directories in the given path if the path is
1649   * a directory. 
1650   * Return the file's status and block locations If the path is a file.
1651   * 
1652   * If a returned status is a file, it contains the file's block locations.
1653   * 
1654   * @param f is the path
1655   *
1656   * @return an iterator that traverses statuses of the files/directories 
1657   *         in the given path
1658   *
1659   * @throws FileNotFoundException If <code>f</code> does not exist
1660   * @throws IOException If an I/O error occurred
1661   */
1662  public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f)
1663  throws FileNotFoundException, IOException {
1664    return listLocatedStatus(f, DEFAULT_FILTER);
1665  }
1666
1667  /**
1668   * Listing a directory
1669   * The returned results include its block location if it is a file
1670   * The results are filtered by the given path filter
1671   * @param f a path
1672   * @param filter a path filter
1673   * @return an iterator that traverses statuses of the files/directories 
1674   *         in the given path
1675   * @throws FileNotFoundException if <code>f</code> does not exist
1676   * @throws IOException if any I/O error occurred
1677   */
1678  protected RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f,
1679      final PathFilter filter)
1680  throws FileNotFoundException, IOException {
1681    return new RemoteIterator<LocatedFileStatus>() {
1682      private final FileStatus[] stats = listStatus(f, filter);
1683      private int i = 0;
1684
1685      @Override
1686      public boolean hasNext() {
1687        return i<stats.length;
1688      }
1689
1690      @Override
1691      public LocatedFileStatus next() throws IOException {
1692        if (!hasNext()) {
1693          throw new NoSuchElementException("No more entry in " + f);
1694        }
1695        FileStatus result = stats[i++];
1696        // for files, use getBlockLocations(FileStatus, int, int) to avoid
1697        // calling getFileStatus(Path) to load the FileStatus again
1698        BlockLocation[] locs = result.isFile() ?
1699            getFileBlockLocations(result, 0, result.getLen()) :
1700            null;
1701        return new LocatedFileStatus(result, locs);
1702      }
1703    };
1704  }
1705
1706  /**
1707   * List the statuses and block locations of the files in the given path.
1708   * 
1709   * If the path is a directory, 
1710   *   if recursive is false, returns files in the directory;
1711   *   if recursive is true, return files in the subtree rooted at the path.
1712   * If the path is a file, return the file's status and block locations.
1713   * 
1714   * @param f is the path
1715   * @param recursive if the subdirectories need to be traversed recursively
1716   *
1717   * @return an iterator that traverses statuses of the files
1718   *
1719   * @throws FileNotFoundException when the path does not exist;
1720   *         IOException see specific implementation
1721   */
1722  public RemoteIterator<LocatedFileStatus> listFiles(
1723      final Path f, final boolean recursive)
1724  throws FileNotFoundException, IOException {
1725    return new RemoteIterator<LocatedFileStatus>() {
1726      private Stack<RemoteIterator<LocatedFileStatus>> itors = 
1727        new Stack<RemoteIterator<LocatedFileStatus>>();
1728      private RemoteIterator<LocatedFileStatus> curItor =
1729        listLocatedStatus(f);
1730      private LocatedFileStatus curFile;
1731     
1732      @Override
1733      public boolean hasNext() throws IOException {
1734        while (curFile == null) {
1735          if (curItor.hasNext()) {
1736            handleFileStat(curItor.next());
1737          } else if (!itors.empty()) {
1738            curItor = itors.pop();
1739          } else {
1740            return false;
1741          }
1742        }
1743        return true;
1744      }
1745
1746      /**
1747       * Process the input stat.
1748       * If it is a file, return the file stat.
1749       * If it is a directory, traverse the directory if recursive is true;
1750       * ignore it if recursive is false.
1751       * @param stat input status
1752       * @throws IOException if any IO error occurs
1753       */
1754      private void handleFileStat(LocatedFileStatus stat) throws IOException {
1755        if (stat.isFile()) { // file
1756          curFile = stat;
1757        } else if (recursive) { // directory
1758          itors.push(curItor);
1759          curItor = listLocatedStatus(stat.getPath());
1760        }
1761      }
1762
1763      @Override
1764      public LocatedFileStatus next() throws IOException {
1765        if (hasNext()) {
1766          LocatedFileStatus result = curFile;
1767          curFile = null;
1768          return result;
1769        } 
1770        throw new java.util.NoSuchElementException("No more entry in " + f);
1771      }
1772    };
1773  }
1774  
1775  /** Return the current user's home directory in this filesystem.
1776   * The default implementation returns "/user/$USER/".
1777   */
1778  public Path getHomeDirectory() {
1779    return this.makeQualified(
1780        new Path("/user/"+System.getProperty("user.name")));
1781  }
1782
1783
1784  /**
1785   * Set the current working directory for the given file system. All relative
1786   * paths will be resolved relative to it.
1787   * 
1788   * @param new_dir
1789   */
1790  public abstract void setWorkingDirectory(Path new_dir);
1791    
1792  /**
1793   * Get the current working directory for the given file system
1794   * @return the directory pathname
1795   */
1796  public abstract Path getWorkingDirectory();
1797  
1798  
1799  /**
1800   * Note: with the new FilesContext class, getWorkingDirectory()
1801   * will be removed. 
1802   * The working directory is implemented in FilesContext.
1803   * 
1804   * Some file systems like LocalFileSystem have an initial workingDir
1805   * that we use as the starting workingDir. For other file systems
1806   * like HDFS there is no built in notion of an initial workingDir.
1807   * 
1808   * @return if there is built in notion of workingDir then it
1809   * is returned; else a null is returned.
1810   */
1811  protected Path getInitialWorkingDirectory() {
1812    return null;
1813  }
1814
1815  /**
1816   * Call {@link #mkdirs(Path, FsPermission)} with default permission.
1817   */
1818  public boolean mkdirs(Path f) throws IOException {
1819    return mkdirs(f, FsPermission.getDirDefault());
1820  }
1821
1822  /**
1823   * Make the given file and all non-existent parents into
1824   * directories. Has the semantics of Unix 'mkdir -p'.
1825   * Existence of the directory hierarchy is not an error.
1826   * @param f path to create
1827   * @param permission to apply to f
1828   */
1829  public abstract boolean mkdirs(Path f, FsPermission permission
1830      ) throws IOException;
1831
1832  /**
1833   * The src file is on the local disk.  Add it to FS at
1834   * the given dst name and the source is kept intact afterwards
1835   * @param src path
1836   * @param dst path
1837   */
1838  public void copyFromLocalFile(Path src, Path dst)
1839    throws IOException {
1840    copyFromLocalFile(false, src, dst);
1841  }
1842
1843  /**
1844   * The src files is on the local disk.  Add it to FS at
1845   * the given dst name, removing the source afterwards.
1846   * @param srcs path
1847   * @param dst path
1848   */
1849  public void moveFromLocalFile(Path[] srcs, Path dst)
1850    throws IOException {
1851    copyFromLocalFile(true, true, srcs, dst);
1852  }
1853
1854  /**
1855   * The src file is on the local disk.  Add it to FS at
1856   * the given dst name, removing the source afterwards.
1857   * @param src path
1858   * @param dst path
1859   */
1860  public void moveFromLocalFile(Path src, Path dst)
1861    throws IOException {
1862    copyFromLocalFile(true, src, dst);
1863  }
1864
1865  /**
1866   * The src file is on the local disk.  Add it to FS at
1867   * the given dst name.
1868   * delSrc indicates if the source should be removed
1869   * @param delSrc whether to delete the src
1870   * @param src path
1871   * @param dst path
1872   */
1873  public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
1874    throws IOException {
1875    copyFromLocalFile(delSrc, true, src, dst);
1876  }
1877  
1878  /**
1879   * The src files are on the local disk.  Add it to FS at
1880   * the given dst name.
1881   * delSrc indicates if the source should be removed
1882   * @param delSrc whether to delete the src
1883   * @param overwrite whether to overwrite an existing file
1884   * @param srcs array of paths which are source
1885   * @param dst path
1886   */
1887  public void copyFromLocalFile(boolean delSrc, boolean overwrite, 
1888                                Path[] srcs, Path dst)
1889    throws IOException {
1890    Configuration conf = getConf();
1891    FileUtil.copy(getLocal(conf), srcs, this, dst, delSrc, overwrite, conf);
1892  }
1893  
1894  /**
1895   * The src file is on the local disk.  Add it to FS at
1896   * the given dst name.
1897   * delSrc indicates if the source should be removed
1898   * @param delSrc whether to delete the src
1899   * @param overwrite whether to overwrite an existing file
1900   * @param src path
1901   * @param dst path
1902   */
1903  public void copyFromLocalFile(boolean delSrc, boolean overwrite, 
1904                                Path src, Path dst)
1905    throws IOException {
1906    Configuration conf = getConf();
1907    FileUtil.copy(getLocal(conf), src, this, dst, delSrc, overwrite, conf);
1908  }
1909    
1910  /**
1911   * The src file is under FS, and the dst is on the local disk.
1912   * Copy it from FS control to the local dst name.
1913   * @param src path
1914   * @param dst path
1915   */
1916  public void copyToLocalFile(Path src, Path dst) throws IOException {
1917    copyToLocalFile(false, src, dst);
1918  }
1919    
1920  /**
1921   * The src file is under FS, and the dst is on the local disk.
1922   * Copy it from FS control to the local dst name.
1923   * Remove the source afterwards
1924   * @param src path
1925   * @param dst path
1926   */
1927  public void moveToLocalFile(Path src, Path dst) throws IOException {
1928    copyToLocalFile(true, src, dst);
1929  }
1930
1931  /**
1932   * The src file is under FS, and the dst is on the local disk.
1933   * Copy it from FS control to the local dst name.
1934   * delSrc indicates if the src will be removed or not.
1935   * @param delSrc whether to delete the src
1936   * @param src path
1937   * @param dst path
1938   */   
1939  public void copyToLocalFile(boolean delSrc, Path src, Path dst)
1940    throws IOException {
1941    copyToLocalFile(delSrc, src, dst, false);
1942  }
1943  
1944    /**
1945   * The src file is under FS, and the dst is on the local disk. Copy it from FS
1946   * control to the local dst name. delSrc indicates if the src will be removed
1947   * or not. useRawLocalFileSystem indicates whether to use RawLocalFileSystem
1948   * as local file system or not. RawLocalFileSystem is non crc file system.So,
1949   * It will not create any crc files at local.
1950   * 
1951   * @param delSrc
1952   *          whether to delete the src
1953   * @param src
1954   *          path
1955   * @param dst
1956   *          path
1957   * @param useRawLocalFileSystem
1958   *          whether to use RawLocalFileSystem as local file system or not.
1959   * 
1960   * @throws IOException
1961   *           - if any IO error
1962   */
1963  public void copyToLocalFile(boolean delSrc, Path src, Path dst,
1964      boolean useRawLocalFileSystem) throws IOException {
1965    Configuration conf = getConf();
1966    FileSystem local = null;
1967    if (useRawLocalFileSystem) {
1968      local = getLocal(conf).getRawFileSystem();
1969    } else {
1970      local = getLocal(conf);
1971    }
1972    FileUtil.copy(this, src, local, dst, delSrc, conf);
1973  }
1974
1975  /**
1976   * Returns a local File that the user can write output to.  The caller
1977   * provides both the eventual FS target name and the local working
1978   * file.  If the FS is local, we write directly into the target.  If
1979   * the FS is remote, we write into the tmp local area.
1980   * @param fsOutputFile path of output file
1981   * @param tmpLocalFile path of local tmp file
1982   */
1983  public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
1984    throws IOException {
1985    return tmpLocalFile;
1986  }
1987
1988  /**
1989   * Called when we're all done writing to the target.  A local FS will
1990   * do nothing, because we've written to exactly the right place.  A remote
1991   * FS will copy the contents of tmpLocalFile to the correct target at
1992   * fsOutputFile.
1993   * @param fsOutputFile path of output file
1994   * @param tmpLocalFile path to local tmp file
1995   */
1996  public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
1997    throws IOException {
1998    moveFromLocalFile(tmpLocalFile, fsOutputFile);
1999  }
2000
2001  /**
2002   * No more filesystem operations are needed.  Will
2003   * release any held locks.
2004   */
2005  @Override
2006  public void close() throws IOException {
2007    // delete all files that were marked as delete-on-exit.
2008    processDeleteOnExit();
2009    CACHE.remove(this.key, this);
2010  }
2011
2012  /** Return the total size of all files in the filesystem.*/
2013  public long getUsed() throws IOException{
2014    long used = 0;
2015    FileStatus[] files = listStatus(new Path("/"));
2016    for(FileStatus file:files){
2017      used += file.getLen();
2018    }
2019    return used;
2020  }
2021  
2022  /**
2023   * Get the block size for a particular file.
2024   * @param f the filename
2025   * @return the number of bytes in a block
2026   */
2027  /** @deprecated Use getFileStatus() instead */
2028  @Deprecated
2029  public long getBlockSize(Path f) throws IOException {
2030    return getFileStatus(f).getBlockSize();
2031  }
2032
2033  /**
2034   * Return the number of bytes that large input files should be optimally
2035   * be split into to minimize i/o time.
2036   * @deprecated use {@link #getDefaultBlockSize(Path)} instead
2037   */
2038  @Deprecated
2039  public long getDefaultBlockSize() {
2040    // default to 32MB: large enough to minimize the impact of seeks
2041    return getConf().getLong("fs.local.block.size", 32 * 1024 * 1024);
2042  }
2043    
2044  /** Return the number of bytes that large input files should be optimally
2045   * be split into to minimize i/o time.  The given path will be used to
2046   * locate the actual filesystem.  The full path does not have to exist.
2047   * @param f path of file
2048   * @return the default block size for the path's filesystem
2049   */
2050  public long getDefaultBlockSize(Path f) {
2051    return getDefaultBlockSize();
2052  }
2053
2054  /**
2055   * Get the default replication.
2056   * @deprecated use {@link #getDefaultReplication(Path)} instead
2057   */
2058  @Deprecated
2059  public short getDefaultReplication() { return 1; }
2060
2061  /**
2062   * Get the default replication for a path.   The given path will be used to
2063   * locate the actual filesystem.  The full path does not have to exist.
2064   * @param path of the file
2065   * @return default replication for the path's filesystem 
2066   */
2067  public short getDefaultReplication(Path path) {
2068    return getDefaultReplication();
2069  }
2070  
2071  /**
2072   * Return a file status object that represents the path.
2073   * @param f The path we want information from
2074   * @return a FileStatus object
2075   * @throws FileNotFoundException when the path does not exist;
2076   *         IOException see specific implementation
2077   */
2078  public abstract FileStatus getFileStatus(Path f) throws IOException;
2079
2080  /**
2081   * Checks if the user can access a path.  The mode specifies which access
2082   * checks to perform.  If the requested permissions are granted, then the
2083   * method returns normally.  If access is denied, then the method throws an
2084   * {@link AccessControlException}.
2085   * <p/>
2086   * The default implementation of this method calls {@link #getFileStatus(Path)}
2087   * and checks the returned permissions against the requested permissions.
2088   * Note that the getFileStatus call will be subject to authorization checks.
2089   * Typically, this requires search (execute) permissions on each directory in
2090   * the path's prefix, but this is implementation-defined.  Any file system
2091   * that provides a richer authorization model (such as ACLs) may override the
2092   * default implementation so that it checks against that model instead.
2093   * <p>
2094   * In general, applications should avoid using this method, due to the risk of
2095   * time-of-check/time-of-use race conditions.  The permissions on a file may
2096   * change immediately after the access call returns.  Most applications should
2097   * prefer running specific file system actions as the desired user represented
2098   * by a {@link UserGroupInformation}.
2099   *
2100   * @param path Path to check
2101   * @param mode type of access to check
2102   * @throws AccessControlException if access is denied
2103   * @throws FileNotFoundException if the path does not exist
2104   * @throws IOException see specific implementation
2105   */
2106  @InterfaceAudience.LimitedPrivate({"HDFS", "Hive"})
2107  public void access(Path path, FsAction mode) throws AccessControlException,
2108      FileNotFoundException, IOException {
2109    checkAccessPermissions(this.getFileStatus(path), mode);
2110  }
2111
2112  /**
2113   * This method provides the default implementation of
2114   * {@link #access(Path, FsAction)}.
2115   *
2116   * @param stat FileStatus to check
2117   * @param mode type of access to check
2118   * @throws IOException for any error
2119   */
2120  @InterfaceAudience.Private
2121  static void checkAccessPermissions(FileStatus stat, FsAction mode)
2122      throws IOException {
2123    FsPermission perm = stat.getPermission();
2124    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
2125    String user = ugi.getShortUserName();
2126    List<String> groups = Arrays.asList(ugi.getGroupNames());
2127    if (user.equals(stat.getOwner())) {
2128      if (perm.getUserAction().implies(mode)) {
2129        return;
2130      }
2131    } else if (groups.contains(stat.getGroup())) {
2132      if (perm.getGroupAction().implies(mode)) {
2133        return;
2134      }
2135    } else {
2136      if (perm.getOtherAction().implies(mode)) {
2137        return;
2138      }
2139    }
2140    throw new AccessControlException(String.format(
2141      "Permission denied: user=%s, path=\"%s\":%s:%s:%s%s", user, stat.getPath(),
2142      stat.getOwner(), stat.getGroup(), stat.isDirectory() ? "d" : "-", perm));
2143  }
2144
2145  /**
2146   * See {@link FileContext#fixRelativePart}
2147   */
2148  protected Path fixRelativePart(Path p) {
2149    if (p.isUriPathAbsolute()) {
2150      return p;
2151    } else {
2152      return new Path(getWorkingDirectory(), p);
2153    }
2154  }
2155
2156  /**
2157   * See {@link FileContext#createSymlink(Path, Path, boolean)}
2158   */
2159  public void createSymlink(final Path target, final Path link,
2160      final boolean createParent) throws AccessControlException,
2161      FileAlreadyExistsException, FileNotFoundException,
2162      ParentNotDirectoryException, UnsupportedFileSystemException, 
2163      IOException {
2164    // Supporting filesystems should override this method
2165    throw new UnsupportedOperationException(
2166        "Filesystem does not support symlinks!");
2167  }
2168
2169  /**
2170   * See {@link FileContext#getFileLinkStatus(Path)}
2171   */
2172  public FileStatus getFileLinkStatus(final Path f)
2173      throws AccessControlException, FileNotFoundException,
2174      UnsupportedFileSystemException, IOException {
2175    // Supporting filesystems should override this method
2176    return getFileStatus(f);
2177  }
2178
2179  /**
2180   * See {@link AbstractFileSystem#supportsSymlinks()}
2181   */
2182  public boolean supportsSymlinks() {
2183    return false;
2184  }
2185
2186  /**
2187   * See {@link FileContext#getLinkTarget(Path)}
2188   */
2189  public Path getLinkTarget(Path f) throws IOException {
2190    // Supporting filesystems should override this method
2191    throw new UnsupportedOperationException(
2192        "Filesystem does not support symlinks!");
2193  }
2194
2195  /**
2196   * See {@link AbstractFileSystem#getLinkTarget(Path)}
2197   */
2198  protected Path resolveLink(Path f) throws IOException {
2199    // Supporting filesystems should override this method
2200    throw new UnsupportedOperationException(
2201        "Filesystem does not support symlinks!");
2202  }
2203
2204  /**
2205   * Get the checksum of a file.
2206   *
2207   * @param f The file path
2208   * @return The file checksum.  The default return value is null,
2209   *  which indicates that no checksum algorithm is implemented
2210   *  in the corresponding FileSystem.
2211   */
2212  public FileChecksum getFileChecksum(Path f) throws IOException {
2213    return getFileChecksum(f, Long.MAX_VALUE);
2214  }
2215
2216  /**
2217   * Get the checksum of a file, from the beginning of the file till the
2218   * specific length.
2219   * @param f The file path
2220   * @param length The length of the file range for checksum calculation
2221   * @return The file checksum.
2222   */
2223  public FileChecksum getFileChecksum(Path f, final long length)
2224      throws IOException {
2225    return null;
2226  }
2227
2228  /**
2229   * Set the verify checksum flag. This is only applicable if the 
2230   * corresponding FileSystem supports checksum. By default doesn't do anything.
2231   * @param verifyChecksum
2232   */
2233  public void setVerifyChecksum(boolean verifyChecksum) {
2234    //doesn't do anything
2235  }
2236
2237  /**
2238   * Set the write checksum flag. This is only applicable if the 
2239   * corresponding FileSystem supports checksum. By default doesn't do anything.
2240   * @param writeChecksum
2241   */
2242  public void setWriteChecksum(boolean writeChecksum) {
2243    //doesn't do anything
2244  }
2245
2246  /**
2247   * Returns a status object describing the use and capacity of the
2248   * file system. If the file system has multiple partitions, the
2249   * use and capacity of the root partition is reflected.
2250   * 
2251   * @return a FsStatus object
2252   * @throws IOException
2253   *           see specific implementation
2254   */
2255  public FsStatus getStatus() throws IOException {
2256    return getStatus(null);
2257  }
2258
2259  /**
2260   * Returns a status object describing the use and capacity of the
2261   * file system. If the file system has multiple partitions, the
2262   * use and capacity of the partition pointed to by the specified
2263   * path is reflected.
2264   * @param p Path for which status should be obtained. null means
2265   * the default partition. 
2266   * @return a FsStatus object
2267   * @throws IOException
2268   *           see specific implementation
2269   */
2270  public FsStatus getStatus(Path p) throws IOException {
2271    return new FsStatus(Long.MAX_VALUE, 0, Long.MAX_VALUE);
2272  }
2273
2274  /**
2275   * Set permission of a path.
2276   * @param p
2277   * @param permission
2278   */
2279  public void setPermission(Path p, FsPermission permission
2280      ) throws IOException {
2281  }
2282
2283  /**
2284   * Set owner of a path (i.e. a file or a directory).
2285   * The parameters username and groupname cannot both be null.
2286   * @param p The path
2287   * @param username If it is null, the original username remains unchanged.
2288   * @param groupname If it is null, the original groupname remains unchanged.
2289   */
2290  public void setOwner(Path p, String username, String groupname
2291      ) throws IOException {
2292  }
2293
2294  /**
2295   * Set access time of a file
2296   * @param p The path
2297   * @param mtime Set the modification time of this file.
2298   *              The number of milliseconds since Jan 1, 1970. 
2299   *              A value of -1 means that this call should not set modification time.
2300   * @param atime Set the access time of this file.
2301   *              The number of milliseconds since Jan 1, 1970. 
2302   *              A value of -1 means that this call should not set access time.
2303   */
2304  public void setTimes(Path p, long mtime, long atime
2305      ) throws IOException {
2306  }
2307
2308  /**
2309   * Create a snapshot with a default name.
2310   * @param path The directory where snapshots will be taken.
2311   * @return the snapshot path.
2312   */
2313  public final Path createSnapshot(Path path) throws IOException {
2314    return createSnapshot(path, null);
2315  }
2316
2317  /**
2318   * Create a snapshot
2319   * @param path The directory where snapshots will be taken.
2320   * @param snapshotName The name of the snapshot
2321   * @return the snapshot path.
2322   */
2323  public Path createSnapshot(Path path, String snapshotName)
2324      throws IOException {
2325    throw new UnsupportedOperationException(getClass().getSimpleName()
2326        + " doesn't support createSnapshot");
2327  }
2328  
2329  /**
2330   * Rename a snapshot
2331   * @param path The directory path where the snapshot was taken
2332   * @param snapshotOldName Old name of the snapshot
2333   * @param snapshotNewName New name of the snapshot
2334   * @throws IOException
2335   */
2336  public void renameSnapshot(Path path, String snapshotOldName,
2337      String snapshotNewName) throws IOException {
2338    throw new UnsupportedOperationException(getClass().getSimpleName()
2339        + " doesn't support renameSnapshot");
2340  }
2341  
2342  /**
2343   * Delete a snapshot of a directory
2344   * @param path  The directory that the to-be-deleted snapshot belongs to
2345   * @param snapshotName The name of the snapshot
2346   */
2347  public void deleteSnapshot(Path path, String snapshotName)
2348      throws IOException {
2349    throw new UnsupportedOperationException(getClass().getSimpleName()
2350        + " doesn't support deleteSnapshot");
2351  }
2352  
2353  /**
2354   * Modifies ACL entries of files and directories.  This method can add new ACL
2355   * entries or modify the permissions on existing ACL entries.  All existing
2356   * ACL entries that are not specified in this call are retained without
2357   * changes.  (Modifications are merged into the current ACL.)
2358   *
2359   * @param path Path to modify
2360   * @param aclSpec List<AclEntry> describing modifications
2361   * @throws IOException if an ACL could not be modified
2362   */
2363  public void modifyAclEntries(Path path, List<AclEntry> aclSpec)
2364      throws IOException {
2365    throw new UnsupportedOperationException(getClass().getSimpleName()
2366        + " doesn't support modifyAclEntries");
2367  }
2368
2369  /**
2370   * Removes ACL entries from files and directories.  Other ACL entries are
2371   * retained.
2372   *
2373   * @param path Path to modify
2374   * @param aclSpec List<AclEntry> describing entries to remove
2375   * @throws IOException if an ACL could not be modified
2376   */
2377  public void removeAclEntries(Path path, List<AclEntry> aclSpec)
2378      throws IOException {
2379    throw new UnsupportedOperationException(getClass().getSimpleName()
2380        + " doesn't support removeAclEntries");
2381  }
2382
2383  /**
2384   * Removes all default ACL entries from files and directories.
2385   *
2386   * @param path Path to modify
2387   * @throws IOException if an ACL could not be modified
2388   */
2389  public void removeDefaultAcl(Path path)
2390      throws IOException {
2391    throw new UnsupportedOperationException(getClass().getSimpleName()
2392        + " doesn't support removeDefaultAcl");
2393  }
2394
2395  /**
2396   * Removes all but the base ACL entries of files and directories.  The entries
2397   * for user, group, and others are retained for compatibility with permission
2398   * bits.
2399   *
2400   * @param path Path to modify
2401   * @throws IOException if an ACL could not be removed
2402   */
2403  public void removeAcl(Path path)
2404      throws IOException {
2405    throw new UnsupportedOperationException(getClass().getSimpleName()
2406        + " doesn't support removeAcl");
2407  }
2408
2409  /**
2410   * Fully replaces ACL of files and directories, discarding all existing
2411   * entries.
2412   *
2413   * @param path Path to modify
2414   * @param aclSpec List<AclEntry> describing modifications, must include entries
2415   *   for user, group, and others for compatibility with permission bits.
2416   * @throws IOException if an ACL could not be modified
2417   */
2418  public void setAcl(Path path, List<AclEntry> aclSpec) throws IOException {
2419    throw new UnsupportedOperationException(getClass().getSimpleName()
2420        + " doesn't support setAcl");
2421  }
2422
2423  /**
2424   * Gets the ACL of a file or directory.
2425   *
2426   * @param path Path to get
2427   * @return AclStatus describing the ACL of the file or directory
2428   * @throws IOException if an ACL could not be read
2429   */
2430  public AclStatus getAclStatus(Path path) throws IOException {
2431    throw new UnsupportedOperationException(getClass().getSimpleName()
2432        + " doesn't support getAclStatus");
2433  }
2434
2435  /**
2436   * Set an xattr of a file or directory.
2437   * The name must be prefixed with the namespace followed by ".". For example,
2438   * "user.attr".
2439   * <p/>
2440   * Refer to the HDFS extended attributes user documentation for details.
2441   *
2442   * @param path Path to modify
2443   * @param name xattr name.
2444   * @param value xattr value.
2445   * @throws IOException
2446   */
2447  public void setXAttr(Path path, String name, byte[] value)
2448      throws IOException {
2449    setXAttr(path, name, value, EnumSet.of(XAttrSetFlag.CREATE,
2450        XAttrSetFlag.REPLACE));
2451  }
2452
2453  /**
2454   * Set an xattr of a file or directory.
2455   * The name must be prefixed with the namespace followed by ".". For example,
2456   * "user.attr".
2457   * <p/>
2458   * Refer to the HDFS extended attributes user documentation for details.
2459   *
2460   * @param path Path to modify
2461   * @param name xattr name.
2462   * @param value xattr value.
2463   * @param flag xattr set flag
2464   * @throws IOException
2465   */
2466  public void setXAttr(Path path, String name, byte[] value,
2467      EnumSet<XAttrSetFlag> flag) throws IOException {
2468    throw new UnsupportedOperationException(getClass().getSimpleName()
2469        + " doesn't support setXAttr");
2470  }
2471
2472  /**
2473   * Get an xattr name and value for a file or directory.
2474   * The name must be prefixed with the namespace followed by ".". For example,
2475   * "user.attr".
2476   * <p/>
2477   * Refer to the HDFS extended attributes user documentation for details.
2478   *
2479   * @param path Path to get extended attribute
2480   * @param name xattr name.
2481   * @return byte[] xattr value.
2482   * @throws IOException
2483   */
2484  public byte[] getXAttr(Path path, String name) throws IOException {
2485    throw new UnsupportedOperationException(getClass().getSimpleName()
2486        + " doesn't support getXAttr");
2487  }
2488
2489  /**
2490   * Get all of the xattr name/value pairs for a file or directory.
2491   * Only those xattrs which the logged-in user has permissions to view
2492   * are returned.
2493   * <p/>
2494   * Refer to the HDFS extended attributes user documentation for details.
2495   *
2496   * @param path Path to get extended attributes
2497   * @return Map<String, byte[]> describing the XAttrs of the file or directory
2498   * @throws IOException
2499   */
2500  public Map<String, byte[]> getXAttrs(Path path) throws IOException {
2501    throw new UnsupportedOperationException(getClass().getSimpleName()
2502        + " doesn't support getXAttrs");
2503  }
2504
2505  /**
2506   * Get all of the xattrs name/value pairs for a file or directory.
2507   * Only those xattrs which the logged-in user has permissions to view
2508   * are returned.
2509   * <p/>
2510   * Refer to the HDFS extended attributes user documentation for details.
2511   *
2512   * @param path Path to get extended attributes
2513   * @param names XAttr names.
2514   * @return Map<String, byte[]> describing the XAttrs of the file or directory
2515   * @throws IOException
2516   */
2517  public Map<String, byte[]> getXAttrs(Path path, List<String> names)
2518      throws IOException {
2519    throw new UnsupportedOperationException(getClass().getSimpleName()
2520        + " doesn't support getXAttrs");
2521  }
2522
2523  /**
2524   * Get all of the xattr names for a file or directory.
2525   * Only those xattr names which the logged-in user has permissions to view
2526   * are returned.
2527   * <p/>
2528   * Refer to the HDFS extended attributes user documentation for details.
2529   *
2530   * @param path Path to get extended attributes
2531   * @return List<String> of the XAttr names of the file or directory
2532   * @throws IOException
2533   */
2534  public List<String> listXAttrs(Path path) throws IOException {
2535    throw new UnsupportedOperationException(getClass().getSimpleName()
2536            + " doesn't support listXAttrs");
2537  }
2538
2539  /**
2540   * Remove an xattr of a file or directory.
2541   * The name must be prefixed with the namespace followed by ".". For example,
2542   * "user.attr".
2543   * <p/>
2544   * Refer to the HDFS extended attributes user documentation for details.
2545   *
2546   * @param path Path to remove extended attribute
2547   * @param name xattr name
2548   * @throws IOException
2549   */
2550  public void removeXAttr(Path path, String name) throws IOException {
2551    throw new UnsupportedOperationException(getClass().getSimpleName()
2552        + " doesn't support removeXAttr");
2553  }
2554
2555  // making it volatile to be able to do a double checked locking
2556  private volatile static boolean FILE_SYSTEMS_LOADED = false;
2557
2558  private static final Map<String, Class<? extends FileSystem>>
2559    SERVICE_FILE_SYSTEMS = new HashMap<String, Class<? extends FileSystem>>();
2560
2561  private static void loadFileSystems() {
2562    synchronized (FileSystem.class) {
2563      if (!FILE_SYSTEMS_LOADED) {
2564        ServiceLoader<FileSystem> serviceLoader = ServiceLoader.load(FileSystem.class);
2565        for (FileSystem fs : serviceLoader) {
2566          SERVICE_FILE_SYSTEMS.put(fs.getScheme(), fs.getClass());
2567        }
2568        FILE_SYSTEMS_LOADED = true;
2569      }
2570    }
2571  }
2572
2573  public static Class<? extends FileSystem> getFileSystemClass(String scheme,
2574      Configuration conf) throws IOException {
2575    if (!FILE_SYSTEMS_LOADED) {
2576      loadFileSystems();
2577    }
2578    Class<? extends FileSystem> clazz = null;
2579    if (conf != null) {
2580      clazz = (Class<? extends FileSystem>) conf.getClass("fs." + scheme + ".impl", null);
2581    }
2582    if (clazz == null) {
2583      clazz = SERVICE_FILE_SYSTEMS.get(scheme);
2584    }
2585    if (clazz == null) {
2586      throw new IOException("No FileSystem for scheme: " + scheme);
2587    }
2588    return clazz;
2589  }
2590
2591  private static FileSystem createFileSystem(URI uri, Configuration conf
2592      ) throws IOException {
2593    Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);
2594    if (clazz == null) {
2595      throw new IOException("No FileSystem for scheme: " + uri.getScheme());
2596    }
2597    FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
2598    fs.initialize(uri, conf);
2599    return fs;
2600  }
2601
2602  /** Caching FileSystem objects */
2603  static class Cache {
2604    private final ClientFinalizer clientFinalizer = new ClientFinalizer();
2605
2606    private final Map<Key, FileSystem> map = new HashMap<Key, FileSystem>();
2607    private final Set<Key> toAutoClose = new HashSet<Key>();
2608
2609    /** A variable that makes all objects in the cache unique */
2610    private static AtomicLong unique = new AtomicLong(1);
2611
2612    FileSystem get(URI uri, Configuration conf) throws IOException{
2613      Key key = new Key(uri, conf);
2614      return getInternal(uri, conf, key);
2615    }
2616
2617    /** The objects inserted into the cache using this method are all unique */
2618    FileSystem getUnique(URI uri, Configuration conf) throws IOException{
2619      Key key = new Key(uri, conf, unique.getAndIncrement());
2620      return getInternal(uri, conf, key);
2621    }
2622
2623    private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{
2624      FileSystem fs;
2625      synchronized (this) {
2626        fs = map.get(key);
2627      }
2628      if (fs != null) {
2629        return fs;
2630      }
2631
2632      fs = createFileSystem(uri, conf);
2633      synchronized (this) { // refetch the lock again
2634        FileSystem oldfs = map.get(key);
2635        if (oldfs != null) { // a file system is created while lock is releasing
2636          fs.close(); // close the new file system
2637          return oldfs;  // return the old file system
2638        }
2639        
2640        // now insert the new file system into the map
2641        if (map.isEmpty()
2642                && !ShutdownHookManager.get().isShutdownInProgress()) {
2643          ShutdownHookManager.get().addShutdownHook(clientFinalizer, SHUTDOWN_HOOK_PRIORITY);
2644        }
2645        fs.key = key;
2646        map.put(key, fs);
2647        if (conf.getBoolean("fs.automatic.close", true)) {
2648          toAutoClose.add(key);
2649        }
2650        return fs;
2651      }
2652    }
2653
2654    synchronized void remove(Key key, FileSystem fs) {
2655      if (map.containsKey(key) && fs == map.get(key)) {
2656        map.remove(key);
2657        toAutoClose.remove(key);
2658        }
2659    }
2660
2661    synchronized void closeAll() throws IOException {
2662      closeAll(false);
2663    }
2664
2665    /**
2666     * Close all FileSystem instances in the Cache.
2667     * @param onlyAutomatic only close those that are marked for automatic closing
2668     */
2669    synchronized void closeAll(boolean onlyAutomatic) throws IOException {
2670      List<IOException> exceptions = new ArrayList<IOException>();
2671
2672      // Make a copy of the keys in the map since we'll be modifying
2673      // the map while iterating over it, which isn't safe.
2674      List<Key> keys = new ArrayList<Key>();
2675      keys.addAll(map.keySet());
2676
2677      for (Key key : keys) {
2678        final FileSystem fs = map.get(key);
2679
2680        if (onlyAutomatic && !toAutoClose.contains(key)) {
2681          continue;
2682        }
2683
2684        //remove from cache
2685        remove(key, fs);
2686
2687        if (fs != null) {
2688          try {
2689            fs.close();
2690          }
2691          catch(IOException ioe) {
2692            exceptions.add(ioe);
2693          }
2694        }
2695      }
2696
2697      if (!exceptions.isEmpty()) {
2698        throw MultipleIOException.createIOException(exceptions);
2699      }
2700    }
2701
2702    private class ClientFinalizer implements Runnable {
2703      @Override
2704      public synchronized void run() {
2705        try {
2706          closeAll(true);
2707        } catch (IOException e) {
2708          LOG.info("FileSystem.Cache.closeAll() threw an exception:\n" + e);
2709        }
2710      }
2711    }
2712
2713    synchronized void closeAll(UserGroupInformation ugi) throws IOException {
2714      List<FileSystem> targetFSList = new ArrayList<FileSystem>();
2715      //Make a pass over the list and collect the filesystems to close
2716      //we cannot close inline since close() removes the entry from the Map
2717      for (Map.Entry<Key, FileSystem> entry : map.entrySet()) {
2718        final Key key = entry.getKey();
2719        final FileSystem fs = entry.getValue();
2720        if (ugi.equals(key.ugi) && fs != null) {
2721          targetFSList.add(fs);   
2722        }
2723      }
2724      List<IOException> exceptions = new ArrayList<IOException>();
2725      //now make a pass over the target list and close each
2726      for (FileSystem fs : targetFSList) {
2727        try {
2728          fs.close();
2729        }
2730        catch(IOException ioe) {
2731          exceptions.add(ioe);
2732        }
2733      }
2734      if (!exceptions.isEmpty()) {
2735        throw MultipleIOException.createIOException(exceptions);
2736      }
2737    }
2738
2739    /** FileSystem.Cache.Key */
2740    static class Key {
2741      final String scheme;
2742      final String authority;
2743      final UserGroupInformation ugi;
2744      final long unique;   // an artificial way to make a key unique
2745
2746      Key(URI uri, Configuration conf) throws IOException {
2747        this(uri, conf, 0);
2748      }
2749
2750      Key(URI uri, Configuration conf, long unique) throws IOException {
2751        scheme = uri.getScheme()==null?"":uri.getScheme().toLowerCase();
2752        authority = uri.getAuthority()==null?"":uri.getAuthority().toLowerCase();
2753        this.unique = unique;
2754        
2755        this.ugi = UserGroupInformation.getCurrentUser();
2756      }
2757
2758      @Override
2759      public int hashCode() {
2760        return (scheme + authority).hashCode() + ugi.hashCode() + (int)unique;
2761      }
2762
2763      static boolean isEqual(Object a, Object b) {
2764        return a == b || (a != null && a.equals(b));        
2765      }
2766
2767      @Override
2768      public boolean equals(Object obj) {
2769        if (obj == this) {
2770          return true;
2771        }
2772        if (obj != null && obj instanceof Key) {
2773          Key that = (Key)obj;
2774          return isEqual(this.scheme, that.scheme)
2775                 && isEqual(this.authority, that.authority)
2776                 && isEqual(this.ugi, that.ugi)
2777                 && (this.unique == that.unique);
2778        }
2779        return false;        
2780      }
2781
2782      @Override
2783      public String toString() {
2784        return "("+ugi.toString() + ")@" + scheme + "://" + authority;        
2785      }
2786    }
2787  }
2788  
2789  /**
2790   * Tracks statistics about how many reads, writes, and so forth have been
2791   * done in a FileSystem.
2792   * 
2793   * Since there is only one of these objects per FileSystem, there will 
2794   * typically be many threads writing to this object.  Almost every operation
2795   * on an open file will involve a write to this object.  In contrast, reading
2796   * statistics is done infrequently by most programs, and not at all by others.
2797   * Hence, this is optimized for writes.
2798   * 
2799   * Each thread writes to its own thread-local area of memory.  This removes 
2800   * contention and allows us to scale up to many, many threads.  To read
2801   * statistics, the reader thread totals up the contents of all of the 
2802   * thread-local data areas.
2803   */
2804  public static final class Statistics {
2805    /**
2806     * Statistics data.
2807     * 
2808     * There is only a single writer to thread-local StatisticsData objects.
2809     * Hence, volatile is adequate here-- we do not need AtomicLong or similar
2810     * to prevent lost updates.
2811     * The Java specification guarantees that updates to volatile longs will
2812     * be perceived as atomic with respect to other threads, which is all we
2813     * need.
2814     */
2815    public static class StatisticsData {
2816      volatile long bytesRead;
2817      volatile long bytesWritten;
2818      volatile int readOps;
2819      volatile int largeReadOps;
2820      volatile int writeOps;
2821
2822      /**
2823       * Add another StatisticsData object to this one.
2824       */
2825      void add(StatisticsData other) {
2826        this.bytesRead += other.bytesRead;
2827        this.bytesWritten += other.bytesWritten;
2828        this.readOps += other.readOps;
2829        this.largeReadOps += other.largeReadOps;
2830        this.writeOps += other.writeOps;
2831      }
2832
2833      /**
2834       * Negate the values of all statistics.
2835       */
2836      void negate() {
2837        this.bytesRead = -this.bytesRead;
2838        this.bytesWritten = -this.bytesWritten;
2839        this.readOps = -this.readOps;
2840        this.largeReadOps = -this.largeReadOps;
2841        this.writeOps = -this.writeOps;
2842      }
2843
2844      @Override
2845      public String toString() {
2846        return bytesRead + " bytes read, " + bytesWritten + " bytes written, "
2847            + readOps + " read ops, " + largeReadOps + " large read ops, "
2848            + writeOps + " write ops";
2849      }
2850      
2851      public long getBytesRead() {
2852        return bytesRead;
2853      }
2854      
2855      public long getBytesWritten() {
2856        return bytesWritten;
2857      }
2858      
2859      public int getReadOps() {
2860        return readOps;
2861      }
2862      
2863      public int getLargeReadOps() {
2864        return largeReadOps;
2865      }
2866      
2867      public int getWriteOps() {
2868        return writeOps;
2869      }
2870    }
2871
2872    private interface StatisticsAggregator<T> {
2873      void accept(StatisticsData data);
2874      T aggregate();
2875    }
2876
2877    private final String scheme;
2878
2879    /**
2880     * rootData is data that doesn't belong to any thread, but will be added
2881     * to the totals.  This is useful for making copies of Statistics objects,
2882     * and for storing data that pertains to threads that have been garbage
2883     * collected.  Protected by the Statistics lock.
2884     */
2885    private final StatisticsData rootData;
2886
2887    /**
2888     * Thread-local data.
2889     */
2890    private final ThreadLocal<StatisticsData> threadData;
2891
2892    /**
2893     * Set of all thread-local data areas.  Protected by the Statistics lock.
2894     * The references to the statistics data are kept using weak references
2895     * to the associated threads. Proper clean-up is performed by the cleaner
2896     * thread when the threads are garbage collected.
2897     */
2898    private final Set<StatisticsDataReference> allData;
2899
2900    /**
2901     * Global reference queue and a cleaner thread that manage statistics data
2902     * references from all filesystem instances.
2903     */
2904    private static final ReferenceQueue<Thread> STATS_DATA_REF_QUEUE;
2905    private static final Thread STATS_DATA_CLEANER;
2906
2907    static {
2908      STATS_DATA_REF_QUEUE = new ReferenceQueue<Thread>();
2909      // start a single daemon cleaner thread
2910      STATS_DATA_CLEANER = new Thread(new StatisticsDataReferenceCleaner());
2911      STATS_DATA_CLEANER.
2912          setName(StatisticsDataReferenceCleaner.class.getName());
2913      STATS_DATA_CLEANER.setDaemon(true);
2914      STATS_DATA_CLEANER.start();
2915    }
2916
2917    public Statistics(String scheme) {
2918      this.scheme = scheme;
2919      this.rootData = new StatisticsData();
2920      this.threadData = new ThreadLocal<StatisticsData>();
2921      this.allData = new HashSet<StatisticsDataReference>();
2922    }
2923
2924    /**
2925     * Copy constructor.
2926     * 
2927     * @param other    The input Statistics object which is cloned.
2928     */
2929    public Statistics(Statistics other) {
2930      this.scheme = other.scheme;
2931      this.rootData = new StatisticsData();
2932      other.visitAll(new StatisticsAggregator<Void>() {
2933        @Override
2934        public void accept(StatisticsData data) {
2935          rootData.add(data);
2936        }
2937
2938        public Void aggregate() {
2939          return null;
2940        }
2941      });
2942      this.threadData = new ThreadLocal<StatisticsData>();
2943      this.allData = new HashSet<StatisticsDataReference>();
2944    }
2945
2946    /**
2947     * A weak reference to a thread that also includes the data associated
2948     * with that thread. On the thread being garbage collected, it is enqueued
2949     * to the reference queue for clean-up.
2950     */
2951    private class StatisticsDataReference extends WeakReference<Thread> {
2952      private final StatisticsData data;
2953
2954      public StatisticsDataReference(StatisticsData data, Thread thread) {
2955        super(thread, STATS_DATA_REF_QUEUE);
2956        this.data = data;
2957      }
2958
2959      public StatisticsData getData() {
2960        return data;
2961      }
2962
2963      /**
2964       * Performs clean-up action when the associated thread is garbage
2965       * collected.
2966       */
2967      public void cleanUp() {
2968        // use the statistics lock for safety
2969        synchronized (Statistics.this) {
2970          /*
2971           * If the thread that created this thread-local data no longer exists,
2972           * remove the StatisticsData from our list and fold the values into
2973           * rootData.
2974           */
2975          rootData.add(data);
2976          allData.remove(this);
2977        }
2978      }
2979    }
2980
2981    /**
2982     * Background action to act on references being removed.
2983     */
2984    private static class StatisticsDataReferenceCleaner implements Runnable {
2985      @Override
2986      public void run() {
2987        while (true) {
2988          try {
2989            StatisticsDataReference ref =
2990                (StatisticsDataReference)STATS_DATA_REF_QUEUE.remove();
2991            ref.cleanUp();
2992          } catch (Throwable th) {
2993            // the cleaner thread should continue to run even if there are
2994            // exceptions, including InterruptedException
2995            LOG.warn("exception in the cleaner thread but it will continue to "
2996                + "run", th);
2997          }
2998        }
2999      }
3000    }
3001
3002    /**
3003     * Get or create the thread-local data associated with the current thread.
3004     */
3005    public StatisticsData getThreadStatistics() {
3006      StatisticsData data = threadData.get();
3007      if (data == null) {
3008        data = new StatisticsData();
3009        threadData.set(data);
3010        StatisticsDataReference ref =
3011            new StatisticsDataReference(data, Thread.currentThread());
3012        synchronized(this) {
3013          allData.add(ref);
3014        }
3015      }
3016      return data;
3017    }
3018
3019    /**
3020     * Increment the bytes read in the statistics
3021     * @param newBytes the additional bytes read
3022     */
3023    public void incrementBytesRead(long newBytes) {
3024      getThreadStatistics().bytesRead += newBytes;
3025    }
3026    
3027    /**
3028     * Increment the bytes written in the statistics
3029     * @param newBytes the additional bytes written
3030     */
3031    public void incrementBytesWritten(long newBytes) {
3032      getThreadStatistics().bytesWritten += newBytes;
3033    }
3034    
3035    /**
3036     * Increment the number of read operations
3037     * @param count number of read operations
3038     */
3039    public void incrementReadOps(int count) {
3040      getThreadStatistics().readOps += count;
3041    }
3042
3043    /**
3044     * Increment the number of large read operations
3045     * @param count number of large read operations
3046     */
3047    public void incrementLargeReadOps(int count) {
3048      getThreadStatistics().largeReadOps += count;
3049    }
3050
3051    /**
3052     * Increment the number of write operations
3053     * @param count number of write operations
3054     */
3055    public void incrementWriteOps(int count) {
3056      getThreadStatistics().writeOps += count;
3057    }
3058
3059    /**
3060     * Apply the given aggregator to all StatisticsData objects associated with
3061     * this Statistics object.
3062     *
3063     * For each StatisticsData object, we will call accept on the visitor.
3064     * Finally, at the end, we will call aggregate to get the final total. 
3065     *
3066     * @param         The visitor to use.
3067     * @return        The total.
3068     */
3069    private synchronized <T> T visitAll(StatisticsAggregator<T> visitor) {
3070      visitor.accept(rootData);
3071      for (StatisticsDataReference ref: allData) {
3072        StatisticsData data = ref.getData();
3073        visitor.accept(data);
3074      }
3075      return visitor.aggregate();
3076    }
3077
3078    /**
3079     * Get the total number of bytes read
3080     * @return the number of bytes
3081     */
3082    public long getBytesRead() {
3083      return visitAll(new StatisticsAggregator<Long>() {
3084        private long bytesRead = 0;
3085
3086        @Override
3087        public void accept(StatisticsData data) {
3088          bytesRead += data.bytesRead;
3089        }
3090
3091        public Long aggregate() {
3092          return bytesRead;
3093        }
3094      });
3095    }
3096    
3097    /**
3098     * Get the total number of bytes written
3099     * @return the number of bytes
3100     */
3101    public long getBytesWritten() {
3102      return visitAll(new StatisticsAggregator<Long>() {
3103        private long bytesWritten = 0;
3104
3105        @Override
3106        public void accept(StatisticsData data) {
3107          bytesWritten += data.bytesWritten;
3108        }
3109
3110        public Long aggregate() {
3111          return bytesWritten;
3112        }
3113      });
3114    }
3115    
3116    /**
3117     * Get the number of file system read operations such as list files
3118     * @return number of read operations
3119     */
3120    public int getReadOps() {
3121      return visitAll(new StatisticsAggregator<Integer>() {
3122        private int readOps = 0;
3123
3124        @Override
3125        public void accept(StatisticsData data) {
3126          readOps += data.readOps;
3127          readOps += data.largeReadOps;
3128        }
3129
3130        public Integer aggregate() {
3131          return readOps;
3132        }
3133      });
3134    }
3135
3136    /**
3137     * Get the number of large file system read operations such as list files
3138     * under a large directory
3139     * @return number of large read operations
3140     */
3141    public int getLargeReadOps() {
3142      return visitAll(new StatisticsAggregator<Integer>() {
3143        private int largeReadOps = 0;
3144
3145        @Override
3146        public void accept(StatisticsData data) {
3147          largeReadOps += data.largeReadOps;
3148        }
3149
3150        public Integer aggregate() {
3151          return largeReadOps;
3152        }
3153      });
3154    }
3155
3156    /**
3157     * Get the number of file system write operations such as create, append 
3158     * rename etc.
3159     * @return number of write operations
3160     */
3161    public int getWriteOps() {
3162      return visitAll(new StatisticsAggregator<Integer>() {
3163        private int writeOps = 0;
3164
3165        @Override
3166        public void accept(StatisticsData data) {
3167          writeOps += data.writeOps;
3168        }
3169
3170        public Integer aggregate() {
3171          return writeOps;
3172        }
3173      });
3174    }
3175
3176
3177    @Override
3178    public String toString() {
3179      return visitAll(new StatisticsAggregator<String>() {
3180        private StatisticsData total = new StatisticsData();
3181
3182        @Override
3183        public void accept(StatisticsData data) {
3184          total.add(data);
3185        }
3186
3187        public String aggregate() {
3188          return total.toString();
3189        }
3190      });
3191    }
3192
3193    /**
3194     * Resets all statistics to 0.
3195     *
3196     * In order to reset, we add up all the thread-local statistics data, and
3197     * set rootData to the negative of that.
3198     *
3199     * This may seem like a counterintuitive way to reset the statsitics.  Why
3200     * can't we just zero out all the thread-local data?  Well, thread-local
3201     * data can only be modified by the thread that owns it.  If we tried to
3202     * modify the thread-local data from this thread, our modification might get
3203     * interleaved with a read-modify-write operation done by the thread that
3204     * owns the data.  That would result in our update getting lost.
3205     *
3206     * The approach used here avoids this problem because it only ever reads
3207     * (not writes) the thread-local data.  Both reads and writes to rootData
3208     * are done under the lock, so we're free to modify rootData from any thread
3209     * that holds the lock.
3210     */
3211    public void reset() {
3212      visitAll(new StatisticsAggregator<Void>() {
3213        private StatisticsData total = new StatisticsData();
3214
3215        @Override
3216        public void accept(StatisticsData data) {
3217          total.add(data);
3218        }
3219
3220        public Void aggregate() {
3221          total.negate();
3222          rootData.add(total);
3223          return null;
3224        }
3225      });
3226    }
3227    
3228    /**
3229     * Get the uri scheme associated with this statistics object.
3230     * @return the schema associated with this set of statistics
3231     */
3232    public String getScheme() {
3233      return scheme;
3234    }
3235
3236    @VisibleForTesting
3237    synchronized int getAllThreadLocalDataSize() {
3238      return allData.size();
3239    }
3240  }
3241  
3242  /**
3243   * Get the Map of Statistics object indexed by URI Scheme.
3244   * @return a Map having a key as URI scheme and value as Statistics object
3245   * @deprecated use {@link #getAllStatistics} instead
3246   */
3247  @Deprecated
3248  public static synchronized Map<String, Statistics> getStatistics() {
3249    Map<String, Statistics> result = new HashMap<String, Statistics>();
3250    for(Statistics stat: statisticsTable.values()) {
3251      result.put(stat.getScheme(), stat);
3252    }
3253    return result;
3254  }
3255
3256  /**
3257   * Return the FileSystem classes that have Statistics
3258   */
3259  public static synchronized List<Statistics> getAllStatistics() {
3260    return new ArrayList<Statistics>(statisticsTable.values());
3261  }
3262  
3263  /**
3264   * Get the statistics for a particular file system
3265   * @param cls the class to lookup
3266   * @return a statistics object
3267   */
3268  public static synchronized 
3269  Statistics getStatistics(String scheme, Class<? extends FileSystem> cls) {
3270    Statistics result = statisticsTable.get(cls);
3271    if (result == null) {
3272      result = new Statistics(scheme);
3273      statisticsTable.put(cls, result);
3274    }
3275    return result;
3276  }
3277  
3278  /**
3279   * Reset all statistics for all file systems
3280   */
3281  public static synchronized void clearStatistics() {
3282    for(Statistics stat: statisticsTable.values()) {
3283      stat.reset();
3284    }
3285  }
3286
3287  /**
3288   * Print all statistics for all file systems
3289   */
3290  public static synchronized
3291  void printStatistics() throws IOException {
3292    for (Map.Entry<Class<? extends FileSystem>, Statistics> pair: 
3293            statisticsTable.entrySet()) {
3294      System.out.println("  FileSystem " + pair.getKey().getName() + 
3295                         ": " + pair.getValue());
3296    }
3297  }
3298
3299  // Symlinks are temporarily disabled - see HADOOP-10020 and HADOOP-10052
3300  private static boolean symlinksEnabled = false;
3301
3302  private static Configuration conf = null;
3303
3304  @VisibleForTesting
3305  public static boolean areSymlinksEnabled() {
3306    return symlinksEnabled;
3307  }
3308
3309  @VisibleForTesting
3310  public static void enableSymlinks() {
3311    symlinksEnabled = true;
3312  }
3313}