001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.fs;
019
020import java.io.Closeable;
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.lang.ref.WeakReference;
024import java.lang.ref.ReferenceQueue;
025import java.net.URI;
026import java.net.URISyntaxException;
027import java.security.PrivilegedExceptionAction;
028import java.util.ArrayList;
029import java.util.EnumSet;
030import java.util.HashMap;
031import java.util.HashSet;
032import java.util.IdentityHashMap;
033import java.util.Iterator;
034import java.util.List;
035import java.util.Map;
036import java.util.NoSuchElementException;
037import java.util.ServiceConfigurationError;
038import java.util.ServiceLoader;
039import java.util.Set;
040import java.util.Stack;
041import java.util.TreeSet;
042import java.util.concurrent.atomic.AtomicLong;
043
044import org.apache.commons.logging.Log;
045import org.apache.commons.logging.LogFactory;
046import org.apache.hadoop.classification.InterfaceAudience;
047import org.apache.hadoop.classification.InterfaceStability;
048import org.apache.hadoop.conf.Configuration;
049import org.apache.hadoop.conf.Configured;
050import org.apache.hadoop.fs.Options.ChecksumOpt;
051import org.apache.hadoop.fs.Options.Rename;
052import org.apache.hadoop.fs.permission.AclEntry;
053import org.apache.hadoop.fs.permission.AclStatus;
054import org.apache.hadoop.fs.permission.FsAction;
055import org.apache.hadoop.fs.permission.FsPermission;
056import org.apache.hadoop.io.MultipleIOException;
057import org.apache.hadoop.io.Text;
058import org.apache.hadoop.net.NetUtils;
059import org.apache.hadoop.security.AccessControlException;
060import org.apache.hadoop.security.Credentials;
061import org.apache.hadoop.security.SecurityUtil;
062import org.apache.hadoop.security.UserGroupInformation;
063import org.apache.hadoop.security.token.Token;
064import org.apache.hadoop.util.ClassUtil;
065import org.apache.hadoop.util.DataChecksum;
066import org.apache.hadoop.util.Progressable;
067import org.apache.hadoop.util.ReflectionUtils;
068import org.apache.hadoop.util.ShutdownHookManager;
069import org.apache.hadoop.util.StringUtils;
070
071import com.google.common.annotations.VisibleForTesting;
072
073/****************************************************************
074 * An abstract base class for a fairly generic filesystem.  It
075 * may be implemented as a distributed filesystem, or as a "local"
076 * one that reflects the locally-connected disk.  The local version
077 * exists for small Hadoop instances and for testing.
078 *
079 * <p>
080 *
081 * All user code that may potentially use the Hadoop Distributed
082 * File System should be written to use a FileSystem object.  The
083 * Hadoop DFS is a multi-machine system that appears as a single
084 * disk.  It's useful because of its fault tolerance and potentially
085 * very large capacity.
086 * 
087 * <p>
088 * The local implementation is {@link LocalFileSystem} and distributed
089 * implementation is DistributedFileSystem.
090 *****************************************************************/
091@InterfaceAudience.Public
092@InterfaceStability.Stable
093public abstract class FileSystem extends Configured implements Closeable {
094  public static final String FS_DEFAULT_NAME_KEY = 
095                   CommonConfigurationKeys.FS_DEFAULT_NAME_KEY;
096  public static final String DEFAULT_FS = 
097                   CommonConfigurationKeys.FS_DEFAULT_NAME_DEFAULT;
098
099  public static final Log LOG = LogFactory.getLog(FileSystem.class);
100
101  /**
102   * Priority of the FileSystem shutdown hook.
103   */
104  public static final int SHUTDOWN_HOOK_PRIORITY = 10;
105
106  /** FileSystem cache */
107  static final Cache CACHE = new Cache();
108
109  /** The key this instance is stored under in the cache. */
110  private Cache.Key key;
111
112  /** Recording statistics per a FileSystem class */
113  private static final Map<Class<? extends FileSystem>, Statistics> 
114    statisticsTable =
115      new IdentityHashMap<Class<? extends FileSystem>, Statistics>();
116  
117  /**
118   * The statistics for this file system.
119   */
120  protected Statistics statistics;
121
122  /**
123   * A cache of files that should be deleted when filsystem is closed
124   * or the JVM is exited.
125   */
126  private Set<Path> deleteOnExit = new TreeSet<Path>();
127  
128  boolean resolveSymlinks;
129  /**
130   * This method adds a file system for testing so that we can find it later. It
131   * is only for testing.
132   * @param uri the uri to store it under
133   * @param conf the configuration to store it under
134   * @param fs the file system to store
135   * @throws IOException
136   */
137  static void addFileSystemForTesting(URI uri, Configuration conf,
138      FileSystem fs) throws IOException {
139    CACHE.map.put(new Cache.Key(uri, conf), fs);
140  }
141
142  /**
143   * Get a filesystem instance based on the uri, the passed
144   * configuration and the user
145   * @param uri of the filesystem
146   * @param conf the configuration to use
147   * @param user to perform the get as
148   * @return the filesystem instance
149   * @throws IOException
150   * @throws InterruptedException
151   */
152  public static FileSystem get(final URI uri, final Configuration conf,
153        final String user) throws IOException, InterruptedException {
154    String ticketCachePath =
155      conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
156    UserGroupInformation ugi =
157        UserGroupInformation.getBestUGI(ticketCachePath, user);
158    return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
159      @Override
160      public FileSystem run() throws IOException {
161        return get(uri, conf);
162      }
163    });
164  }
165
166  /**
167   * Returns the configured filesystem implementation.
168   * @param conf the configuration to use
169   */
170  public static FileSystem get(Configuration conf) throws IOException {
171    return get(getDefaultUri(conf), conf);
172  }
173  
174  /** Get the default filesystem URI from a configuration.
175   * @param conf the configuration to use
176   * @return the uri of the default filesystem
177   */
178  public static URI getDefaultUri(Configuration conf) {
179    return URI.create(fixName(conf.get(FS_DEFAULT_NAME_KEY, DEFAULT_FS)));
180  }
181
182  /** Set the default filesystem URI in a configuration.
183   * @param conf the configuration to alter
184   * @param uri the new default filesystem uri
185   */
186  public static void setDefaultUri(Configuration conf, URI uri) {
187    conf.set(FS_DEFAULT_NAME_KEY, uri.toString());
188  }
189
190  /** Set the default filesystem URI in a configuration.
191   * @param conf the configuration to alter
192   * @param uri the new default filesystem uri
193   */
194  public static void setDefaultUri(Configuration conf, String uri) {
195    setDefaultUri(conf, URI.create(fixName(uri)));
196  }
197
198  /** Called after a new FileSystem instance is constructed.
199   * @param name a uri whose authority section names the host, port, etc.
200   *   for this FileSystem
201   * @param conf the configuration
202   */
203  public void initialize(URI name, Configuration conf) throws IOException {
204    statistics = getStatistics(name.getScheme(), getClass());    
205    resolveSymlinks = conf.getBoolean(
206        CommonConfigurationKeys.FS_CLIENT_RESOLVE_REMOTE_SYMLINKS_KEY,
207        CommonConfigurationKeys.FS_CLIENT_RESOLVE_REMOTE_SYMLINKS_DEFAULT);
208  }
209
210  /**
211   * Return the protocol scheme for the FileSystem.
212   * <p/>
213   * This implementation throws an <code>UnsupportedOperationException</code>.
214   *
215   * @return the protocol scheme for the FileSystem.
216   */
217  public String getScheme() {
218    throw new UnsupportedOperationException("Not implemented by the " + getClass().getSimpleName() + " FileSystem implementation");
219  }
220
221  /** Returns a URI whose scheme and authority identify this FileSystem.*/
222  public abstract URI getUri();
223  
224  /**
225   * Return a canonicalized form of this FileSystem's URI.
226   * 
227   * The default implementation simply calls {@link #canonicalizeUri(URI)}
228   * on the filesystem's own URI, so subclasses typically only need to
229   * implement that method.
230   *
231   * @see #canonicalizeUri(URI)
232   */
233  protected URI getCanonicalUri() {
234    return canonicalizeUri(getUri());
235  }
236  
237  /**
238   * Canonicalize the given URI.
239   * 
240   * This is filesystem-dependent, but may for example consist of
241   * canonicalizing the hostname using DNS and adding the default
242   * port if not specified.
243   * 
244   * The default implementation simply fills in the default port if
245   * not specified and if the filesystem has a default port.
246   *
247   * @return URI
248   * @see NetUtils#getCanonicalUri(URI, int)
249   */
250  protected URI canonicalizeUri(URI uri) {
251    if (uri.getPort() == -1 && getDefaultPort() > 0) {
252      // reconstruct the uri with the default port set
253      try {
254        uri = new URI(uri.getScheme(), uri.getUserInfo(),
255            uri.getHost(), getDefaultPort(),
256            uri.getPath(), uri.getQuery(), uri.getFragment());
257      } catch (URISyntaxException e) {
258        // Should never happen!
259        throw new AssertionError("Valid URI became unparseable: " +
260            uri);
261      }
262    }
263    
264    return uri;
265  }
266  
267  /**
268   * Get the default port for this file system.
269   * @return the default port or 0 if there isn't one
270   */
271  protected int getDefaultPort() {
272    return 0;
273  }
274
275  protected static FileSystem getFSofPath(final Path absOrFqPath,
276      final Configuration conf)
277      throws UnsupportedFileSystemException, IOException {
278    absOrFqPath.checkNotSchemeWithRelative();
279    absOrFqPath.checkNotRelative();
280
281    // Uses the default file system if not fully qualified
282    return get(absOrFqPath.toUri(), conf);
283  }
284
285  /**
286   * Get a canonical service name for this file system.  The token cache is
287   * the only user of the canonical service name, and uses it to lookup this
288   * filesystem's service tokens.
289   * If file system provides a token of its own then it must have a canonical
290   * name, otherwise canonical name can be null.
291   * 
292   * Default Impl: If the file system has child file systems 
293   * (such as an embedded file system) then it is assumed that the fs has no
294   * tokens of its own and hence returns a null name; otherwise a service
295   * name is built using Uri and port.
296   * 
297   * @return a service string that uniquely identifies this file system, null
298   *         if the filesystem does not implement tokens
299   * @see SecurityUtil#buildDTServiceName(URI, int) 
300   */
301  @InterfaceAudience.LimitedPrivate({ "HDFS", "MapReduce" })
302  public String getCanonicalServiceName() {
303    return (getChildFileSystems() == null)
304      ? SecurityUtil.buildDTServiceName(getUri(), getDefaultPort())
305      : null;
306  }
307
308  /** @deprecated call #getUri() instead.*/
309  @Deprecated
310  public String getName() { return getUri().toString(); }
311
312  /** @deprecated call #get(URI,Configuration) instead. */
313  @Deprecated
314  public static FileSystem getNamed(String name, Configuration conf)
315    throws IOException {
316    return get(URI.create(fixName(name)), conf);
317  }
318  
319  /** Update old-format filesystem names, for back-compatibility.  This should
320   * eventually be replaced with a checkName() method that throws an exception
321   * for old-format names. */ 
322  private static String fixName(String name) {
323    // convert old-format name to new-format name
324    if (name.equals("local")) {         // "local" is now "file:///".
325      LOG.warn("\"local\" is a deprecated filesystem name."
326               +" Use \"file:///\" instead.");
327      name = "file:///";
328    } else if (name.indexOf('/')==-1) {   // unqualified is "hdfs://"
329      LOG.warn("\""+name+"\" is a deprecated filesystem name."
330               +" Use \"hdfs://"+name+"/\" instead.");
331      name = "hdfs://"+name;
332    }
333    return name;
334  }
335
336  /**
337   * Get the local file system.
338   * @param conf the configuration to configure the file system with
339   * @return a LocalFileSystem
340   */
341  public static LocalFileSystem getLocal(Configuration conf)
342    throws IOException {
343    return (LocalFileSystem)get(LocalFileSystem.NAME, conf);
344  }
345
346  /** Returns the FileSystem for this URI's scheme and authority.  The scheme
347   * of the URI determines a configuration property name,
348   * <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
349   * The entire URI is passed to the FileSystem instance's initialize method.
350   */
351  public static FileSystem get(URI uri, Configuration conf) throws IOException {
352    String scheme = uri.getScheme();
353    String authority = uri.getAuthority();
354
355    if (scheme == null && authority == null) {     // use default FS
356      return get(conf);
357    }
358
359    if (scheme != null && authority == null) {     // no authority
360      URI defaultUri = getDefaultUri(conf);
361      if (scheme.equals(defaultUri.getScheme())    // if scheme matches default
362          && defaultUri.getAuthority() != null) {  // & default has authority
363        return get(defaultUri, conf);              // return default
364      }
365    }
366    
367    String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
368    if (conf.getBoolean(disableCacheName, false)) {
369      return createFileSystem(uri, conf);
370    }
371
372    return CACHE.get(uri, conf);
373  }
374
375  /**
376   * Returns the FileSystem for this URI's scheme and authority and the 
377   * passed user. Internally invokes {@link #newInstance(URI, Configuration)}
378   * @param uri of the filesystem
379   * @param conf the configuration to use
380   * @param user to perform the get as
381   * @return filesystem instance
382   * @throws IOException
383   * @throws InterruptedException
384   */
385  public static FileSystem newInstance(final URI uri, final Configuration conf,
386      final String user) throws IOException, InterruptedException {
387    String ticketCachePath =
388      conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
389    UserGroupInformation ugi =
390        UserGroupInformation.getBestUGI(ticketCachePath, user);
391    return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
392      @Override
393      public FileSystem run() throws IOException {
394        return newInstance(uri,conf); 
395      }
396    });
397  }
398  /** Returns the FileSystem for this URI's scheme and authority.  The scheme
399   * of the URI determines a configuration property name,
400   * <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
401   * The entire URI is passed to the FileSystem instance's initialize method.
402   * This always returns a new FileSystem object.
403   */
404  public static FileSystem newInstance(URI uri, Configuration conf) throws IOException {
405    String scheme = uri.getScheme();
406    String authority = uri.getAuthority();
407
408    if (scheme == null) {                       // no scheme: use default FS
409      return newInstance(conf);
410    }
411
412    if (authority == null) {                       // no authority
413      URI defaultUri = getDefaultUri(conf);
414      if (scheme.equals(defaultUri.getScheme())    // if scheme matches default
415          && defaultUri.getAuthority() != null) {  // & default has authority
416        return newInstance(defaultUri, conf);              // return default
417      }
418    }
419    return CACHE.getUnique(uri, conf);
420  }
421
422  /** Returns a unique configured filesystem implementation.
423   * This always returns a new FileSystem object.
424   * @param conf the configuration to use
425   */
426  public static FileSystem newInstance(Configuration conf) throws IOException {
427    return newInstance(getDefaultUri(conf), conf);
428  }
429
430  /**
431   * Get a unique local file system object
432   * @param conf the configuration to configure the file system with
433   * @return a LocalFileSystem
434   * This always returns a new FileSystem object.
435   */
436  public static LocalFileSystem newInstanceLocal(Configuration conf)
437    throws IOException {
438    return (LocalFileSystem)newInstance(LocalFileSystem.NAME, conf);
439  }
440
441  /**
442   * Close all cached filesystems. Be sure those filesystems are not
443   * used anymore.
444   * 
445   * @throws IOException
446   */
447  public static void closeAll() throws IOException {
448    CACHE.closeAll();
449  }
450
451  /**
452   * Close all cached filesystems for a given UGI. Be sure those filesystems 
453   * are not used anymore.
454   * @param ugi user group info to close
455   * @throws IOException
456   */
457  public static void closeAllForUGI(UserGroupInformation ugi) 
458  throws IOException {
459    CACHE.closeAll(ugi);
460  }
461
462  /** 
463   * Make sure that a path specifies a FileSystem.
464   * @param path to use
465   */
466  public Path makeQualified(Path path) {
467    checkPath(path);
468    return path.makeQualified(this.getUri(), this.getWorkingDirectory());
469  }
470    
471  /**
472   * Get a new delegation token for this file system.
473   * This is an internal method that should have been declared protected
474   * but wasn't historically.
475   * Callers should use {@link #addDelegationTokens(String, Credentials)}
476   * 
477   * @param renewer the account name that is allowed to renew the token.
478   * @return a new delegation token
479   * @throws IOException
480   */
481  @InterfaceAudience.Private()
482  public Token<?> getDelegationToken(String renewer) throws IOException {
483    return null;
484  }
485  
486  /**
487   * Obtain all delegation tokens used by this FileSystem that are not
488   * already present in the given Credentials.  Existing tokens will neither
489   * be verified as valid nor having the given renewer.  Missing tokens will
490   * be acquired and added to the given Credentials.
491   * 
492   * Default Impl: works for simple fs with its own token
493   * and also for an embedded fs whose tokens are those of its
494   * children file system (i.e. the embedded fs has not tokens of its
495   * own).
496   * 
497   * @param renewer the user allowed to renew the delegation tokens
498   * @param credentials cache in which to add new delegation tokens
499   * @return list of new delegation tokens
500   * @throws IOException
501   */
502  @InterfaceAudience.LimitedPrivate({ "HDFS", "MapReduce" })
503  public Token<?>[] addDelegationTokens(
504      final String renewer, Credentials credentials) throws IOException {
505    if (credentials == null) {
506      credentials = new Credentials();
507    }
508    final List<Token<?>> tokens = new ArrayList<Token<?>>();
509    collectDelegationTokens(renewer, credentials, tokens);
510    return tokens.toArray(new Token<?>[tokens.size()]);
511  }
512  
513  /**
514   * Recursively obtain the tokens for this FileSystem and all descended
515   * FileSystems as determined by getChildFileSystems().
516   * @param renewer the user allowed to renew the delegation tokens
517   * @param credentials cache in which to add the new delegation tokens
518   * @param tokens list in which to add acquired tokens
519   * @throws IOException
520   */
521  private void collectDelegationTokens(final String renewer,
522                                       final Credentials credentials,
523                                       final List<Token<?>> tokens)
524                                           throws IOException {
525    final String serviceName = getCanonicalServiceName();
526    // Collect token of the this filesystem and then of its embedded children
527    if (serviceName != null) { // fs has token, grab it
528      final Text service = new Text(serviceName);
529      Token<?> token = credentials.getToken(service);
530      if (token == null) {
531        token = getDelegationToken(renewer);
532        if (token != null) {
533          tokens.add(token);
534          credentials.addToken(service, token);
535        }
536      }
537    }
538    // Now collect the tokens from the children
539    final FileSystem[] children = getChildFileSystems();
540    if (children != null) {
541      for (final FileSystem fs : children) {
542        fs.collectDelegationTokens(renewer, credentials, tokens);
543      }
544    }
545  }
546
547  /**
548   * Get all the immediate child FileSystems embedded in this FileSystem.
549   * It does not recurse and get grand children.  If a FileSystem
550   * has multiple child FileSystems, then it should return a unique list
551   * of those FileSystems.  Default is to return null to signify no children.
552   * 
553   * @return FileSystems used by this FileSystem
554   */
555  @InterfaceAudience.LimitedPrivate({ "HDFS" })
556  @VisibleForTesting
557  public FileSystem[] getChildFileSystems() {
558    return null;
559  }
560  
561  /** create a file with the provided permission
562   * The permission of the file is set to be the provided permission as in
563   * setPermission, not permission&~umask
564   * 
565   * It is implemented using two RPCs. It is understood that it is inefficient,
566   * but the implementation is thread-safe. The other option is to change the
567   * value of umask in configuration to be 0, but it is not thread-safe.
568   * 
569   * @param fs file system handle
570   * @param file the name of the file to be created
571   * @param permission the permission of the file
572   * @return an output stream
573   * @throws IOException
574   */
575  public static FSDataOutputStream create(FileSystem fs,
576      Path file, FsPermission permission) throws IOException {
577    // create the file with default permission
578    FSDataOutputStream out = fs.create(file);
579    // set its permission to the supplied one
580    fs.setPermission(file, permission);
581    return out;
582  }
583
584  /** create a directory with the provided permission
585   * The permission of the directory is set to be the provided permission as in
586   * setPermission, not permission&~umask
587   * 
588   * @see #create(FileSystem, Path, FsPermission)
589   * 
590   * @param fs file system handle
591   * @param dir the name of the directory to be created
592   * @param permission the permission of the directory
593   * @return true if the directory creation succeeds; false otherwise
594   * @throws IOException
595   */
596  public static boolean mkdirs(FileSystem fs, Path dir, FsPermission permission)
597  throws IOException {
598    // create the directory using the default permission
599    boolean result = fs.mkdirs(dir);
600    // set its permission to be the supplied one
601    fs.setPermission(dir, permission);
602    return result;
603  }
604
605  ///////////////////////////////////////////////////////////////
606  // FileSystem
607  ///////////////////////////////////////////////////////////////
608
609  protected FileSystem() {
610    super(null);
611  }
612
613  /** 
614   * Check that a Path belongs to this FileSystem.
615   * @param path to check
616   */
617  protected void checkPath(Path path) {
618    URI uri = path.toUri();
619    String thatScheme = uri.getScheme();
620    if (thatScheme == null)                // fs is relative
621      return;
622    URI thisUri = getCanonicalUri();
623    String thisScheme = thisUri.getScheme();
624    //authority and scheme are not case sensitive
625    if (thisScheme.equalsIgnoreCase(thatScheme)) {// schemes match
626      String thisAuthority = thisUri.getAuthority();
627      String thatAuthority = uri.getAuthority();
628      if (thatAuthority == null &&                // path's authority is null
629          thisAuthority != null) {                // fs has an authority
630        URI defaultUri = getDefaultUri(getConf());
631        if (thisScheme.equalsIgnoreCase(defaultUri.getScheme())) {
632          uri = defaultUri; // schemes match, so use this uri instead
633        } else {
634          uri = null; // can't determine auth of the path
635        }
636      }
637      if (uri != null) {
638        // canonicalize uri before comparing with this fs
639        uri = canonicalizeUri(uri);
640        thatAuthority = uri.getAuthority();
641        if (thisAuthority == thatAuthority ||       // authorities match
642            (thisAuthority != null &&
643             thisAuthority.equalsIgnoreCase(thatAuthority)))
644          return;
645      }
646    }
647    throw new IllegalArgumentException("Wrong FS: "+path+
648                                       ", expected: "+this.getUri());
649  }
650
651  /**
652   * Return an array containing hostnames, offset and size of 
653   * portions of the given file.  For a nonexistent 
654   * file or regions, null will be returned.
655   *
656   * This call is most helpful with DFS, where it returns 
657   * hostnames of machines that contain the given file.
658   *
659   * The FileSystem will simply return an elt containing 'localhost'.
660   *
661   * @param file FilesStatus to get data from
662   * @param start offset into the given file
663   * @param len length for which to get locations for
664   */
665  public BlockLocation[] getFileBlockLocations(FileStatus file, 
666      long start, long len) throws IOException {
667    if (file == null) {
668      return null;
669    }
670
671    if (start < 0 || len < 0) {
672      throw new IllegalArgumentException("Invalid start or len parameter");
673    }
674
675    if (file.getLen() <= start) {
676      return new BlockLocation[0];
677
678    }
679    String[] name = { "localhost:50010" };
680    String[] host = { "localhost" };
681    return new BlockLocation[] {
682      new BlockLocation(name, host, 0, file.getLen()) };
683  }
684 
685
686  /**
687   * Return an array containing hostnames, offset and size of 
688   * portions of the given file.  For a nonexistent 
689   * file or regions, null will be returned.
690   *
691   * This call is most helpful with DFS, where it returns 
692   * hostnames of machines that contain the given file.
693   *
694   * The FileSystem will simply return an elt containing 'localhost'.
695   *
696   * @param p path is used to identify an FS since an FS could have
697   *          another FS that it could be delegating the call to
698   * @param start offset into the given file
699   * @param len length for which to get locations for
700   */
701  public BlockLocation[] getFileBlockLocations(Path p, 
702      long start, long len) throws IOException {
703    if (p == null) {
704      throw new NullPointerException();
705    }
706    FileStatus file = getFileStatus(p);
707    return getFileBlockLocations(file, start, len);
708  }
709  
710  /**
711   * Return a set of server default configuration values
712   * @return server default configuration values
713   * @throws IOException
714   * @deprecated use {@link #getServerDefaults(Path)} instead
715   */
716  @Deprecated
717  public FsServerDefaults getServerDefaults() throws IOException {
718    Configuration conf = getConf();
719    // CRC32 is chosen as default as it is available in all 
720    // releases that support checksum.
721    // The client trash configuration is ignored.
722    return new FsServerDefaults(getDefaultBlockSize(), 
723        conf.getInt("io.bytes.per.checksum", 512), 
724        64 * 1024, 
725        getDefaultReplication(),
726        conf.getInt("io.file.buffer.size", 4096),
727        false,
728        CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT,
729        DataChecksum.Type.CRC32);
730  }
731
732  /**
733   * Return a set of server default configuration values
734   * @param p path is used to identify an FS since an FS could have
735   *          another FS that it could be delegating the call to
736   * @return server default configuration values
737   * @throws IOException
738   */
739  public FsServerDefaults getServerDefaults(Path p) throws IOException {
740    return getServerDefaults();
741  }
742
743  /**
744   * Return the fully-qualified path of path f resolving the path
745   * through any symlinks or mount point
746   * @param p path to be resolved
747   * @return fully qualified path 
748   * @throws FileNotFoundException
749   */
750   public Path resolvePath(final Path p) throws IOException {
751     checkPath(p);
752     return getFileStatus(p).getPath();
753   }
754
755  /**
756   * Opens an FSDataInputStream at the indicated Path.
757   * @param f the file name to open
758   * @param bufferSize the size of the buffer to be used.
759   */
760  public abstract FSDataInputStream open(Path f, int bufferSize)
761    throws IOException;
762    
763  /**
764   * Opens an FSDataInputStream at the indicated Path.
765   * @param f the file to open
766   */
767  public FSDataInputStream open(Path f) throws IOException {
768    return open(f, getConf().getInt("io.file.buffer.size", 4096));
769  }
770
771  /**
772   * Create an FSDataOutputStream at the indicated Path.
773   * Files are overwritten by default.
774   * @param f the file to create
775   */
776  public FSDataOutputStream create(Path f) throws IOException {
777    return create(f, true);
778  }
779
780  /**
781   * Create an FSDataOutputStream at the indicated Path.
782   * @param f the file to create
783   * @param overwrite if a file with this name already exists, then if true,
784   *   the file will be overwritten, and if false an exception will be thrown.
785   */
786  public FSDataOutputStream create(Path f, boolean overwrite)
787      throws IOException {
788    return create(f, overwrite, 
789                  getConf().getInt("io.file.buffer.size", 4096),
790                  getDefaultReplication(f),
791                  getDefaultBlockSize(f));
792  }
793
794  /**
795   * Create an FSDataOutputStream at the indicated Path with write-progress
796   * reporting.
797   * Files are overwritten by default.
798   * @param f the file to create
799   * @param progress to report progress
800   */
801  public FSDataOutputStream create(Path f, Progressable progress) 
802      throws IOException {
803    return create(f, true, 
804                  getConf().getInt("io.file.buffer.size", 4096),
805                  getDefaultReplication(f),
806                  getDefaultBlockSize(f), progress);
807  }
808
809  /**
810   * Create an FSDataOutputStream at the indicated Path.
811   * Files are overwritten by default.
812   * @param f the file to create
813   * @param replication the replication factor
814   */
815  public FSDataOutputStream create(Path f, short replication)
816      throws IOException {
817    return create(f, true, 
818                  getConf().getInt("io.file.buffer.size", 4096),
819                  replication,
820                  getDefaultBlockSize(f));
821  }
822
823  /**
824   * Create an FSDataOutputStream at the indicated Path with write-progress
825   * reporting.
826   * Files are overwritten by default.
827   * @param f the file to create
828   * @param replication the replication factor
829   * @param progress to report progress
830   */
831  public FSDataOutputStream create(Path f, short replication, 
832      Progressable progress) throws IOException {
833    return create(f, true, 
834                  getConf().getInt(
835                      CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
836                      CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT),
837                  replication,
838                  getDefaultBlockSize(f), progress);
839  }
840
841    
842  /**
843   * Create an FSDataOutputStream at the indicated Path.
844   * @param f the file name to create
845   * @param overwrite if a file with this name already exists, then if true,
846   *   the file will be overwritten, and if false an error will be thrown.
847   * @param bufferSize the size of the buffer to be used.
848   */
849  public FSDataOutputStream create(Path f, 
850                                   boolean overwrite,
851                                   int bufferSize
852                                   ) throws IOException {
853    return create(f, overwrite, bufferSize, 
854                  getDefaultReplication(f),
855                  getDefaultBlockSize(f));
856  }
857    
858  /**
859   * Create an FSDataOutputStream at the indicated Path with write-progress
860   * reporting.
861   * @param f the path of the file to open
862   * @param overwrite if a file with this name already exists, then if true,
863   *   the file will be overwritten, and if false an error will be thrown.
864   * @param bufferSize the size of the buffer to be used.
865   */
866  public FSDataOutputStream create(Path f, 
867                                   boolean overwrite,
868                                   int bufferSize,
869                                   Progressable progress
870                                   ) throws IOException {
871    return create(f, overwrite, bufferSize, 
872                  getDefaultReplication(f),
873                  getDefaultBlockSize(f), progress);
874  }
875    
876    
877  /**
878   * Create an FSDataOutputStream at the indicated Path.
879   * @param f the file name to open
880   * @param overwrite if a file with this name already exists, then if true,
881   *   the file will be overwritten, and if false an error will be thrown.
882   * @param bufferSize the size of the buffer to be used.
883   * @param replication required block replication for the file. 
884   */
885  public FSDataOutputStream create(Path f, 
886                                   boolean overwrite,
887                                   int bufferSize,
888                                   short replication,
889                                   long blockSize
890                                   ) throws IOException {
891    return create(f, overwrite, bufferSize, replication, blockSize, null);
892  }
893
894  /**
895   * Create an FSDataOutputStream at the indicated Path with write-progress
896   * reporting.
897   * @param f the file name to open
898   * @param overwrite if a file with this name already exists, then if true,
899   *   the file will be overwritten, and if false an error will be thrown.
900   * @param bufferSize the size of the buffer to be used.
901   * @param replication required block replication for the file. 
902   */
903  public FSDataOutputStream create(Path f,
904                                            boolean overwrite,
905                                            int bufferSize,
906                                            short replication,
907                                            long blockSize,
908                                            Progressable progress
909                                            ) throws IOException {
910    return this.create(f, FsPermission.getFileDefault().applyUMask(
911        FsPermission.getUMask(getConf())), overwrite, bufferSize,
912        replication, blockSize, progress);
913  }
914
915  /**
916   * Create an FSDataOutputStream at the indicated Path with write-progress
917   * reporting.
918   * @param f the file name to open
919   * @param permission
920   * @param overwrite if a file with this name already exists, then if true,
921   *   the file will be overwritten, and if false an error will be thrown.
922   * @param bufferSize the size of the buffer to be used.
923   * @param replication required block replication for the file.
924   * @param blockSize
925   * @param progress
926   * @throws IOException
927   * @see #setPermission(Path, FsPermission)
928   */
929  public abstract FSDataOutputStream create(Path f,
930      FsPermission permission,
931      boolean overwrite,
932      int bufferSize,
933      short replication,
934      long blockSize,
935      Progressable progress) throws IOException;
936  
937  /**
938   * Create an FSDataOutputStream at the indicated Path with write-progress
939   * reporting.
940   * @param f the file name to open
941   * @param permission
942   * @param flags {@link CreateFlag}s to use for this stream.
943   * @param bufferSize the size of the buffer to be used.
944   * @param replication required block replication for the file.
945   * @param blockSize
946   * @param progress
947   * @throws IOException
948   * @see #setPermission(Path, FsPermission)
949   */
950  public FSDataOutputStream create(Path f,
951      FsPermission permission,
952      EnumSet<CreateFlag> flags,
953      int bufferSize,
954      short replication,
955      long blockSize,
956      Progressable progress) throws IOException {
957    return create(f, permission, flags, bufferSize, replication,
958        blockSize, progress, null);
959  }
960  
961  /**
962   * Create an FSDataOutputStream at the indicated Path with a custom
963   * checksum option
964   * @param f the file name to open
965   * @param permission
966   * @param flags {@link CreateFlag}s to use for this stream.
967   * @param bufferSize the size of the buffer to be used.
968   * @param replication required block replication for the file.
969   * @param blockSize
970   * @param progress
971   * @param checksumOpt checksum parameter. If null, the values
972   *        found in conf will be used.
973   * @throws IOException
974   * @see #setPermission(Path, FsPermission)
975   */
976  public FSDataOutputStream create(Path f,
977      FsPermission permission,
978      EnumSet<CreateFlag> flags,
979      int bufferSize,
980      short replication,
981      long blockSize,
982      Progressable progress,
983      ChecksumOpt checksumOpt) throws IOException {
984    // Checksum options are ignored by default. The file systems that
985    // implement checksum need to override this method. The full
986    // support is currently only available in DFS.
987    return create(f, permission, flags.contains(CreateFlag.OVERWRITE), 
988        bufferSize, replication, blockSize, progress);
989  }
990
991  /*.
992   * This create has been added to support the FileContext that processes
993   * the permission
994   * with umask before calling this method.
995   * This a temporary method added to support the transition from FileSystem
996   * to FileContext for user applications.
997   */
998  @Deprecated
999  protected FSDataOutputStream primitiveCreate(Path f,
1000     FsPermission absolutePermission, EnumSet<CreateFlag> flag, int bufferSize,
1001     short replication, long blockSize, Progressable progress,
1002     ChecksumOpt checksumOpt) throws IOException {
1003
1004    boolean pathExists = exists(f);
1005    CreateFlag.validate(f, pathExists, flag);
1006    
1007    // Default impl  assumes that permissions do not matter and 
1008    // nor does the bytesPerChecksum  hence
1009    // calling the regular create is good enough.
1010    // FSs that implement permissions should override this.
1011
1012    if (pathExists && flag.contains(CreateFlag.APPEND)) {
1013      return append(f, bufferSize, progress);
1014    }
1015    
1016    return this.create(f, absolutePermission,
1017        flag.contains(CreateFlag.OVERWRITE), bufferSize, replication,
1018        blockSize, progress);
1019  }
1020  
1021  /**
1022   * This version of the mkdirs method assumes that the permission is absolute.
1023   * It has been added to support the FileContext that processes the permission
1024   * with umask before calling this method.
1025   * This a temporary method added to support the transition from FileSystem
1026   * to FileContext for user applications.
1027   */
1028  @Deprecated
1029  protected boolean primitiveMkdir(Path f, FsPermission absolutePermission)
1030    throws IOException {
1031    // Default impl is to assume that permissions do not matter and hence
1032    // calling the regular mkdirs is good enough.
1033    // FSs that implement permissions should override this.
1034   return this.mkdirs(f, absolutePermission);
1035  }
1036
1037
1038  /**
1039   * This version of the mkdirs method assumes that the permission is absolute.
1040   * It has been added to support the FileContext that processes the permission
1041   * with umask before calling this method.
1042   * This a temporary method added to support the transition from FileSystem
1043   * to FileContext for user applications.
1044   */
1045  @Deprecated
1046  protected void primitiveMkdir(Path f, FsPermission absolutePermission, 
1047                    boolean createParent)
1048    throws IOException {
1049    
1050    if (!createParent) { // parent must exist.
1051      // since the this.mkdirs makes parent dirs automatically
1052      // we must throw exception if parent does not exist.
1053      final FileStatus stat = getFileStatus(f.getParent());
1054      if (stat == null) {
1055        throw new FileNotFoundException("Missing parent:" + f);
1056      }
1057      if (!stat.isDirectory()) {
1058        throw new ParentNotDirectoryException("parent is not a dir");
1059      }
1060      // parent does exist - go ahead with mkdir of leaf
1061    }
1062    // Default impl is to assume that permissions do not matter and hence
1063    // calling the regular mkdirs is good enough.
1064    // FSs that implement permissions should override this.
1065    if (!this.mkdirs(f, absolutePermission)) {
1066      throw new IOException("mkdir of "+ f + " failed");
1067    }
1068  }
1069
1070  /**
1071   * Opens an FSDataOutputStream at the indicated Path with write-progress
1072   * reporting. Same as create(), except fails if parent directory doesn't
1073   * already exist.
1074   * @param f the file name to open
1075   * @param overwrite if a file with this name already exists, then if true,
1076   * the file will be overwritten, and if false an error will be thrown.
1077   * @param bufferSize the size of the buffer to be used.
1078   * @param replication required block replication for the file.
1079   * @param blockSize
1080   * @param progress
1081   * @throws IOException
1082   * @see #setPermission(Path, FsPermission)
1083   * @deprecated API only for 0.20-append
1084   */
1085  @Deprecated
1086  public FSDataOutputStream createNonRecursive(Path f,
1087      boolean overwrite,
1088      int bufferSize, short replication, long blockSize,
1089      Progressable progress) throws IOException {
1090    return this.createNonRecursive(f, FsPermission.getFileDefault(),
1091        overwrite, bufferSize, replication, blockSize, progress);
1092  }
1093
1094  /**
1095   * Opens an FSDataOutputStream at the indicated Path with write-progress
1096   * reporting. Same as create(), except fails if parent directory doesn't
1097   * already exist.
1098   * @param f the file name to open
1099   * @param permission
1100   * @param overwrite if a file with this name already exists, then if true,
1101   * the file will be overwritten, and if false an error will be thrown.
1102   * @param bufferSize the size of the buffer to be used.
1103   * @param replication required block replication for the file.
1104   * @param blockSize
1105   * @param progress
1106   * @throws IOException
1107   * @see #setPermission(Path, FsPermission)
1108   * @deprecated API only for 0.20-append
1109   */
1110   @Deprecated
1111   public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
1112       boolean overwrite, int bufferSize, short replication, long blockSize,
1113       Progressable progress) throws IOException {
1114     return createNonRecursive(f, permission,
1115         overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
1116             : EnumSet.of(CreateFlag.CREATE), bufferSize,
1117             replication, blockSize, progress);
1118   }
1119
1120   /**
1121    * Opens an FSDataOutputStream at the indicated Path with write-progress
1122    * reporting. Same as create(), except fails if parent directory doesn't
1123    * already exist.
1124    * @param f the file name to open
1125    * @param permission
1126    * @param flags {@link CreateFlag}s to use for this stream.
1127    * @param bufferSize the size of the buffer to be used.
1128    * @param replication required block replication for the file.
1129    * @param blockSize
1130    * @param progress
1131    * @throws IOException
1132    * @see #setPermission(Path, FsPermission)
1133    * @deprecated API only for 0.20-append
1134    */
1135    @Deprecated
1136    public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
1137        EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize,
1138        Progressable progress) throws IOException {
1139      throw new IOException("createNonRecursive unsupported for this filesystem "
1140          + this.getClass());
1141    }
1142
1143  /**
1144   * Creates the given Path as a brand-new zero-length file.  If
1145   * create fails, or if it already existed, return false.
1146   *
1147   * @param f path to use for create
1148   */
1149  public boolean createNewFile(Path f) throws IOException {
1150    if (exists(f)) {
1151      return false;
1152    } else {
1153      create(f, false, getConf().getInt("io.file.buffer.size", 4096)).close();
1154      return true;
1155    }
1156  }
1157
1158  /**
1159   * Append to an existing file (optional operation).
1160   * Same as append(f, getConf().getInt("io.file.buffer.size", 4096), null)
1161   * @param f the existing file to be appended.
1162   * @throws IOException
1163   */
1164  public FSDataOutputStream append(Path f) throws IOException {
1165    return append(f, getConf().getInt("io.file.buffer.size", 4096), null);
1166  }
1167  /**
1168   * Append to an existing file (optional operation).
1169   * Same as append(f, bufferSize, null).
1170   * @param f the existing file to be appended.
1171   * @param bufferSize the size of the buffer to be used.
1172   * @throws IOException
1173   */
1174  public FSDataOutputStream append(Path f, int bufferSize) throws IOException {
1175    return append(f, bufferSize, null);
1176  }
1177
1178  /**
1179   * Append to an existing file (optional operation).
1180   * @param f the existing file to be appended.
1181   * @param bufferSize the size of the buffer to be used.
1182   * @param progress for reporting progress if it is not null.
1183   * @throws IOException
1184   */
1185  public abstract FSDataOutputStream append(Path f, int bufferSize,
1186      Progressable progress) throws IOException;
1187
1188  /**
1189   * Concat existing files together.
1190   * @param trg the path to the target destination.
1191   * @param psrcs the paths to the sources to use for the concatenation.
1192   * @throws IOException
1193   */
1194  public void concat(final Path trg, final Path [] psrcs) throws IOException {
1195    throw new UnsupportedOperationException("Not implemented by the " + 
1196        getClass().getSimpleName() + " FileSystem implementation");
1197  }
1198
1199 /**
1200   * Get replication.
1201   * 
1202   * @deprecated Use getFileStatus() instead
1203   * @param src file name
1204   * @return file replication
1205   * @throws IOException
1206   */ 
1207  @Deprecated
1208  public short getReplication(Path src) throws IOException {
1209    return getFileStatus(src).getReplication();
1210  }
1211
1212  /**
1213   * Set replication for an existing file.
1214   * 
1215   * @param src file name
1216   * @param replication new replication
1217   * @throws IOException
1218   * @return true if successful;
1219   *         false if file does not exist or is a directory
1220   */
1221  public boolean setReplication(Path src, short replication)
1222    throws IOException {
1223    return true;
1224  }
1225
1226  /**
1227   * Renames Path src to Path dst.  Can take place on local fs
1228   * or remote DFS.
1229   * @param src path to be renamed
1230   * @param dst new path after rename
1231   * @throws IOException on failure
1232   * @return true if rename is successful
1233   */
1234  public abstract boolean rename(Path src, Path dst) throws IOException;
1235
1236  /**
1237   * Renames Path src to Path dst
1238   * <ul>
1239   * <li
1240   * <li>Fails if src is a file and dst is a directory.
1241   * <li>Fails if src is a directory and dst is a file.
1242   * <li>Fails if the parent of dst does not exist or is a file.
1243   * </ul>
1244   * <p>
1245   * If OVERWRITE option is not passed as an argument, rename fails
1246   * if the dst already exists.
1247   * <p>
1248   * If OVERWRITE option is passed as an argument, rename overwrites
1249   * the dst if it is a file or an empty directory. Rename fails if dst is
1250   * a non-empty directory.
1251   * <p>
1252   * Note that atomicity of rename is dependent on the file system
1253   * implementation. Please refer to the file system documentation for
1254   * details. This default implementation is non atomic.
1255   * <p>
1256   * This method is deprecated since it is a temporary method added to 
1257   * support the transition from FileSystem to FileContext for user 
1258   * applications.
1259   * 
1260   * @param src path to be renamed
1261   * @param dst new path after rename
1262   * @throws IOException on failure
1263   */
1264  @Deprecated
1265  protected void rename(final Path src, final Path dst,
1266      final Rename... options) throws IOException {
1267    // Default implementation
1268    final FileStatus srcStatus = getFileLinkStatus(src);
1269    if (srcStatus == null) {
1270      throw new FileNotFoundException("rename source " + src + " not found.");
1271    }
1272
1273    boolean overwrite = false;
1274    if (null != options) {
1275      for (Rename option : options) {
1276        if (option == Rename.OVERWRITE) {
1277          overwrite = true;
1278        }
1279      }
1280    }
1281
1282    FileStatus dstStatus;
1283    try {
1284      dstStatus = getFileLinkStatus(dst);
1285    } catch (IOException e) {
1286      dstStatus = null;
1287    }
1288    if (dstStatus != null) {
1289      if (srcStatus.isDirectory() != dstStatus.isDirectory()) {
1290        throw new IOException("Source " + src + " Destination " + dst
1291            + " both should be either file or directory");
1292      }
1293      if (!overwrite) {
1294        throw new FileAlreadyExistsException("rename destination " + dst
1295            + " already exists.");
1296      }
1297      // Delete the destination that is a file or an empty directory
1298      if (dstStatus.isDirectory()) {
1299        FileStatus[] list = listStatus(dst);
1300        if (list != null && list.length != 0) {
1301          throw new IOException(
1302              "rename cannot overwrite non empty destination directory " + dst);
1303        }
1304      }
1305      delete(dst, false);
1306    } else {
1307      final Path parent = dst.getParent();
1308      final FileStatus parentStatus = getFileStatus(parent);
1309      if (parentStatus == null) {
1310        throw new FileNotFoundException("rename destination parent " + parent
1311            + " not found.");
1312      }
1313      if (!parentStatus.isDirectory()) {
1314        throw new ParentNotDirectoryException("rename destination parent " + parent
1315            + " is a file.");
1316      }
1317    }
1318    if (!rename(src, dst)) {
1319      throw new IOException("rename from " + src + " to " + dst + " failed.");
1320    }
1321  }
1322
1323  /**
1324   * Truncate the file in the indicated path to the indicated size.
1325   * <ul>
1326   * <li>Fails if path is a directory.
1327   * <li>Fails if path does not exist.
1328   * <li>Fails if path is not closed.
1329   * <li>Fails if new size is greater than current size.
1330   * </ul>
1331   * @param f The path to the file to be truncated
1332   * @param newLength The size the file is to be truncated to
1333   *
1334   * @return <code>true</code> if the file has been truncated to the desired
1335   * <code>newLength</code> and is immediately available to be reused for
1336   * write operations such as <code>append</code>, or
1337   * <code>false</code> if a background process of adjusting the length of
1338   * the last block has been started, and clients should wait for it to
1339   * complete before proceeding with further file updates.
1340   */
1341  public boolean truncate(Path f, long newLength) throws IOException {
1342    throw new UnsupportedOperationException("Not implemented by the " +
1343        getClass().getSimpleName() + " FileSystem implementation");
1344  }
1345  
1346  /**
1347   * Delete a file 
1348   * @deprecated Use {@link #delete(Path, boolean)} instead.
1349   */
1350  @Deprecated
1351  public boolean delete(Path f) throws IOException {
1352    return delete(f, true);
1353  }
1354  
1355  /** Delete a file.
1356   *
1357   * @param f the path to delete.
1358   * @param recursive if path is a directory and set to 
1359   * true, the directory is deleted else throws an exception. In
1360   * case of a file the recursive can be set to either true or false. 
1361   * @return  true if delete is successful else false. 
1362   * @throws IOException
1363   */
1364  public abstract boolean delete(Path f, boolean recursive) throws IOException;
1365
1366  /**
1367   * Mark a path to be deleted when FileSystem is closed.
1368   * When the JVM shuts down,
1369   * all FileSystem objects will be closed automatically.
1370   * Then,
1371   * the marked path will be deleted as a result of closing the FileSystem.
1372   *
1373   * The path has to exist in the file system.
1374   * 
1375   * @param f the path to delete.
1376   * @return  true if deleteOnExit is successful, otherwise false.
1377   * @throws IOException
1378   */
1379  public boolean deleteOnExit(Path f) throws IOException {
1380    if (!exists(f)) {
1381      return false;
1382    }
1383    synchronized (deleteOnExit) {
1384      deleteOnExit.add(f);
1385    }
1386    return true;
1387  }
1388  
1389  /**
1390   * Cancel the deletion of the path when the FileSystem is closed
1391   * @param f the path to cancel deletion
1392   */
1393  public boolean cancelDeleteOnExit(Path f) {
1394    synchronized (deleteOnExit) {
1395      return deleteOnExit.remove(f);
1396    }
1397  }
1398
1399  /**
1400   * Delete all files that were marked as delete-on-exit. This recursively
1401   * deletes all files in the specified paths.
1402   */
1403  protected void processDeleteOnExit() {
1404    synchronized (deleteOnExit) {
1405      for (Iterator<Path> iter = deleteOnExit.iterator(); iter.hasNext();) {
1406        Path path = iter.next();
1407        try {
1408          if (exists(path)) {
1409            delete(path, true);
1410          }
1411        }
1412        catch (IOException e) {
1413          LOG.info("Ignoring failure to deleteOnExit for path " + path);
1414        }
1415        iter.remove();
1416      }
1417    }
1418  }
1419  
1420  /** Check if exists.
1421   * @param f source file
1422   */
1423  public boolean exists(Path f) throws IOException {
1424    try {
1425      return getFileStatus(f) != null;
1426    } catch (FileNotFoundException e) {
1427      return false;
1428    }
1429  }
1430
1431  /** True iff the named path is a directory.
1432   * Note: Avoid using this method. Instead reuse the FileStatus 
1433   * returned by getFileStatus() or listStatus() methods.
1434   * @param f path to check
1435   */
1436  public boolean isDirectory(Path f) throws IOException {
1437    try {
1438      return getFileStatus(f).isDirectory();
1439    } catch (FileNotFoundException e) {
1440      return false;               // f does not exist
1441    }
1442  }
1443
1444  /** True iff the named path is a regular file.
1445   * Note: Avoid using this method. Instead reuse the FileStatus 
1446   * returned by getFileStatus() or listStatus() methods.
1447   * @param f path to check
1448   */
1449  public boolean isFile(Path f) throws IOException {
1450    try {
1451      return getFileStatus(f).isFile();
1452    } catch (FileNotFoundException e) {
1453      return false;               // f does not exist
1454    }
1455  }
1456  
1457  /** The number of bytes in a file. */
1458  /** @deprecated Use getFileStatus() instead */
1459  @Deprecated
1460  public long getLength(Path f) throws IOException {
1461    return getFileStatus(f).getLen();
1462  }
1463    
1464  /** Return the {@link ContentSummary} of a given {@link Path}.
1465  * @param f path to use
1466  */
1467  public ContentSummary getContentSummary(Path f) throws IOException {
1468    FileStatus status = getFileStatus(f);
1469    if (status.isFile()) {
1470      // f is a file
1471      long length = status.getLen();
1472      return new ContentSummary.Builder().length(length).
1473          fileCount(1).directoryCount(0).spaceConsumed(length).build();
1474    }
1475    // f is a directory
1476    long[] summary = {0, 0, 1};
1477    for(FileStatus s : listStatus(f)) {
1478      long length = s.getLen();
1479      ContentSummary c = s.isDirectory() ? getContentSummary(s.getPath()) :
1480          new ContentSummary.Builder().length(length).
1481          fileCount(1).directoryCount(0).spaceConsumed(length).build();
1482      summary[0] += c.getLength();
1483      summary[1] += c.getFileCount();
1484      summary[2] += c.getDirectoryCount();
1485    }
1486    return new ContentSummary.Builder().length(summary[0]).
1487        fileCount(summary[1]).directoryCount(summary[2]).
1488        spaceConsumed(summary[0]).build();
1489  }
1490
1491  final private static PathFilter DEFAULT_FILTER = new PathFilter() {
1492      @Override
1493      public boolean accept(Path file) {
1494        return true;
1495      }     
1496    };
1497    
1498  /**
1499   * List the statuses of the files/directories in the given path if the path is
1500   * a directory.
1501   * 
1502   * @param f given path
1503   * @return the statuses of the files/directories in the given patch
1504   * @throws FileNotFoundException when the path does not exist;
1505   *         IOException see specific implementation
1506   */
1507  public abstract FileStatus[] listStatus(Path f) throws FileNotFoundException, 
1508                                                         IOException;
1509    
1510  /*
1511   * Filter files/directories in the given path using the user-supplied path
1512   * filter. Results are added to the given array <code>results</code>.
1513   */
1514  private void listStatus(ArrayList<FileStatus> results, Path f,
1515      PathFilter filter) throws FileNotFoundException, IOException {
1516    FileStatus listing[] = listStatus(f);
1517    if (listing == null) {
1518      throw new IOException("Error accessing " + f);
1519    }
1520
1521    for (int i = 0; i < listing.length; i++) {
1522      if (filter.accept(listing[i].getPath())) {
1523        results.add(listing[i]);
1524      }
1525    }
1526  }
1527
1528  /**
1529   * @return an iterator over the corrupt files under the given path
1530   * (may contain duplicates if a file has more than one corrupt block)
1531   * @throws IOException
1532   */
1533  public RemoteIterator<Path> listCorruptFileBlocks(Path path)
1534    throws IOException {
1535    throw new UnsupportedOperationException(getClass().getCanonicalName() +
1536                                            " does not support" +
1537                                            " listCorruptFileBlocks");
1538  }
1539
1540  /**
1541   * Filter files/directories in the given path using the user-supplied path
1542   * filter.
1543   * 
1544   * @param f
1545   *          a path name
1546   * @param filter
1547   *          the user-supplied path filter
1548   * @return an array of FileStatus objects for the files under the given path
1549   *         after applying the filter
1550   * @throws FileNotFoundException when the path does not exist;
1551   *         IOException see specific implementation   
1552   */
1553  public FileStatus[] listStatus(Path f, PathFilter filter) 
1554                                   throws FileNotFoundException, IOException {
1555    ArrayList<FileStatus> results = new ArrayList<FileStatus>();
1556    listStatus(results, f, filter);
1557    return results.toArray(new FileStatus[results.size()]);
1558  }
1559
1560  /**
1561   * Filter files/directories in the given list of paths using default
1562   * path filter.
1563   * 
1564   * @param files
1565   *          a list of paths
1566   * @return a list of statuses for the files under the given paths after
1567   *         applying the filter default Path filter
1568   * @throws FileNotFoundException when the path does not exist;
1569   *         IOException see specific implementation
1570   */
1571  public FileStatus[] listStatus(Path[] files)
1572      throws FileNotFoundException, IOException {
1573    return listStatus(files, DEFAULT_FILTER);
1574  }
1575
1576  /**
1577   * Filter files/directories in the given list of paths using user-supplied
1578   * path filter.
1579   * 
1580   * @param files
1581   *          a list of paths
1582   * @param filter
1583   *          the user-supplied path filter
1584   * @return a list of statuses for the files under the given paths after
1585   *         applying the filter
1586   * @throws FileNotFoundException when the path does not exist;
1587   *         IOException see specific implementation
1588   */
1589  public FileStatus[] listStatus(Path[] files, PathFilter filter)
1590      throws FileNotFoundException, IOException {
1591    ArrayList<FileStatus> results = new ArrayList<FileStatus>();
1592    for (int i = 0; i < files.length; i++) {
1593      listStatus(results, files[i], filter);
1594    }
1595    return results.toArray(new FileStatus[results.size()]);
1596  }
1597
1598  /**
1599   * <p>Return all the files that match filePattern and are not checksum
1600   * files. Results are sorted by their names.
1601   * 
1602   * <p>
1603   * A filename pattern is composed of <i>regular</i> characters and
1604   * <i>special pattern matching</i> characters, which are:
1605   *
1606   * <dl>
1607   *  <dd>
1608   *   <dl>
1609   *    <p>
1610   *    <dt> <tt> ? </tt>
1611   *    <dd> Matches any single character.
1612   *
1613   *    <p>
1614   *    <dt> <tt> * </tt>
1615   *    <dd> Matches zero or more characters.
1616   *
1617   *    <p>
1618   *    <dt> <tt> [<i>abc</i>] </tt>
1619   *    <dd> Matches a single character from character set
1620   *     <tt>{<i>a,b,c</i>}</tt>.
1621   *
1622   *    <p>
1623   *    <dt> <tt> [<i>a</i>-<i>b</i>] </tt>
1624   *    <dd> Matches a single character from the character range
1625   *     <tt>{<i>a...b</i>}</tt>.  Note that character <tt><i>a</i></tt> must be
1626   *     lexicographically less than or equal to character <tt><i>b</i></tt>.
1627   *
1628   *    <p>
1629   *    <dt> <tt> [^<i>a</i>] </tt>
1630   *    <dd> Matches a single character that is not from character set or range
1631   *     <tt>{<i>a</i>}</tt>.  Note that the <tt>^</tt> character must occur
1632   *     immediately to the right of the opening bracket.
1633   *
1634   *    <p>
1635   *    <dt> <tt> \<i>c</i> </tt>
1636   *    <dd> Removes (escapes) any special meaning of character <i>c</i>.
1637   *
1638   *    <p>
1639   *    <dt> <tt> {ab,cd} </tt>
1640   *    <dd> Matches a string from the string set <tt>{<i>ab, cd</i>} </tt>
1641   *    
1642   *    <p>
1643   *    <dt> <tt> {ab,c{de,fh}} </tt>
1644   *    <dd> Matches a string from the string set <tt>{<i>ab, cde, cfh</i>}</tt>
1645   *
1646   *   </dl>
1647   *  </dd>
1648   * </dl>
1649   *
1650   * @param pathPattern a regular expression specifying a pth pattern
1651
1652   * @return an array of paths that match the path pattern
1653   * @throws IOException
1654   */
1655  public FileStatus[] globStatus(Path pathPattern) throws IOException {
1656    return new Globber(this, pathPattern, DEFAULT_FILTER).glob();
1657  }
1658  
1659  /**
1660   * Return an array of FileStatus objects whose path names match pathPattern
1661   * and is accepted by the user-supplied path filter. Results are sorted by
1662   * their path names.
1663   * Return null if pathPattern has no glob and the path does not exist.
1664   * Return an empty array if pathPattern has a glob and no path matches it. 
1665   * 
1666   * @param pathPattern
1667   *          a regular expression specifying the path pattern
1668   * @param filter
1669   *          a user-supplied path filter
1670   * @return an array of FileStatus objects
1671   * @throws IOException if any I/O error occurs when fetching file status
1672   */
1673  public FileStatus[] globStatus(Path pathPattern, PathFilter filter)
1674      throws IOException {
1675    return new Globber(this, pathPattern, filter).glob();
1676  }
1677  
1678  /**
1679   * List the statuses of the files/directories in the given path if the path is
1680   * a directory. 
1681   * Return the file's status and block locations If the path is a file.
1682   * 
1683   * If a returned status is a file, it contains the file's block locations.
1684   * 
1685   * @param f is the path
1686   *
1687   * @return an iterator that traverses statuses of the files/directories 
1688   *         in the given path
1689   *
1690   * @throws FileNotFoundException If <code>f</code> does not exist
1691   * @throws IOException If an I/O error occurred
1692   */
1693  public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f)
1694  throws FileNotFoundException, IOException {
1695    return listLocatedStatus(f, DEFAULT_FILTER);
1696  }
1697
1698  /**
1699   * Listing a directory
1700   * The returned results include its block location if it is a file
1701   * The results are filtered by the given path filter
1702   * @param f a path
1703   * @param filter a path filter
1704   * @return an iterator that traverses statuses of the files/directories 
1705   *         in the given path
1706   * @throws FileNotFoundException if <code>f</code> does not exist
1707   * @throws IOException if any I/O error occurred
1708   */
1709  protected RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f,
1710      final PathFilter filter)
1711  throws FileNotFoundException, IOException {
1712    return new RemoteIterator<LocatedFileStatus>() {
1713      private final FileStatus[] stats = listStatus(f, filter);
1714      private int i = 0;
1715
1716      @Override
1717      public boolean hasNext() {
1718        return i<stats.length;
1719      }
1720
1721      @Override
1722      public LocatedFileStatus next() throws IOException {
1723        if (!hasNext()) {
1724          throw new NoSuchElementException("No more entry in " + f);
1725        }
1726        FileStatus result = stats[i++];
1727        // for files, use getBlockLocations(FileStatus, int, int) to avoid
1728        // calling getFileStatus(Path) to load the FileStatus again
1729        BlockLocation[] locs = result.isFile() ?
1730            getFileBlockLocations(result, 0, result.getLen()) :
1731            null;
1732        return new LocatedFileStatus(result, locs);
1733      }
1734    };
1735  }
1736
1737  /**
1738   * Returns a remote iterator so that followup calls are made on demand
1739   * while consuming the entries. Each file system implementation should
1740   * override this method and provide a more efficient implementation, if
1741   * possible. 
1742   *
1743   * @param p target path
1744   * @return remote iterator
1745   */
1746  public RemoteIterator<FileStatus> listStatusIterator(final Path p)
1747  throws FileNotFoundException, IOException {
1748    return new RemoteIterator<FileStatus>() {
1749      private final FileStatus[] stats = listStatus(p);
1750      private int i = 0;
1751
1752      @Override
1753      public boolean hasNext() {
1754        return i<stats.length;
1755      }
1756
1757      @Override
1758      public FileStatus next() throws IOException {
1759        if (!hasNext()) {
1760          throw new NoSuchElementException("No more entry in " + p);
1761        }
1762        return stats[i++];
1763      }
1764    };
1765  }
1766
1767  /**
1768   * List the statuses and block locations of the files in the given path.
1769   * 
1770   * If the path is a directory, 
1771   *   if recursive is false, returns files in the directory;
1772   *   if recursive is true, return files in the subtree rooted at the path.
1773   * If the path is a file, return the file's status and block locations.
1774   * 
1775   * @param f is the path
1776   * @param recursive if the subdirectories need to be traversed recursively
1777   *
1778   * @return an iterator that traverses statuses of the files
1779   *
1780   * @throws FileNotFoundException when the path does not exist;
1781   *         IOException see specific implementation
1782   */
1783  public RemoteIterator<LocatedFileStatus> listFiles(
1784      final Path f, final boolean recursive)
1785  throws FileNotFoundException, IOException {
1786    return new RemoteIterator<LocatedFileStatus>() {
1787      private Stack<RemoteIterator<LocatedFileStatus>> itors = 
1788        new Stack<RemoteIterator<LocatedFileStatus>>();
1789      private RemoteIterator<LocatedFileStatus> curItor =
1790        listLocatedStatus(f);
1791      private LocatedFileStatus curFile;
1792     
1793      @Override
1794      public boolean hasNext() throws IOException {
1795        while (curFile == null) {
1796          if (curItor.hasNext()) {
1797            handleFileStat(curItor.next());
1798          } else if (!itors.empty()) {
1799            curItor = itors.pop();
1800          } else {
1801            return false;
1802          }
1803        }
1804        return true;
1805      }
1806
1807      /**
1808       * Process the input stat.
1809       * If it is a file, return the file stat.
1810       * If it is a directory, traverse the directory if recursive is true;
1811       * ignore it if recursive is false.
1812       * @param stat input status
1813       * @throws IOException if any IO error occurs
1814       */
1815      private void handleFileStat(LocatedFileStatus stat) throws IOException {
1816        if (stat.isFile()) { // file
1817          curFile = stat;
1818        } else if (recursive) { // directory
1819          itors.push(curItor);
1820          curItor = listLocatedStatus(stat.getPath());
1821        }
1822      }
1823
1824      @Override
1825      public LocatedFileStatus next() throws IOException {
1826        if (hasNext()) {
1827          LocatedFileStatus result = curFile;
1828          curFile = null;
1829          return result;
1830        } 
1831        throw new java.util.NoSuchElementException("No more entry in " + f);
1832      }
1833    };
1834  }
1835  
1836  /** Return the current user's home directory in this filesystem.
1837   * The default implementation returns "/user/$USER/".
1838   */
1839  public Path getHomeDirectory() {
1840    return this.makeQualified(
1841        new Path("/user/"+System.getProperty("user.name")));
1842  }
1843
1844
1845  /**
1846   * Set the current working directory for the given file system. All relative
1847   * paths will be resolved relative to it.
1848   * 
1849   * @param new_dir
1850   */
1851  public abstract void setWorkingDirectory(Path new_dir);
1852    
1853  /**
1854   * Get the current working directory for the given file system
1855   * @return the directory pathname
1856   */
1857  public abstract Path getWorkingDirectory();
1858  
1859  
1860  /**
1861   * Note: with the new FilesContext class, getWorkingDirectory()
1862   * will be removed. 
1863   * The working directory is implemented in FilesContext.
1864   * 
1865   * Some file systems like LocalFileSystem have an initial workingDir
1866   * that we use as the starting workingDir. For other file systems
1867   * like HDFS there is no built in notion of an initial workingDir.
1868   * 
1869   * @return if there is built in notion of workingDir then it
1870   * is returned; else a null is returned.
1871   */
1872  protected Path getInitialWorkingDirectory() {
1873    return null;
1874  }
1875
1876  /**
1877   * Call {@link #mkdirs(Path, FsPermission)} with default permission.
1878   */
1879  public boolean mkdirs(Path f) throws IOException {
1880    return mkdirs(f, FsPermission.getDirDefault());
1881  }
1882
1883  /**
1884   * Make the given file and all non-existent parents into
1885   * directories. Has the semantics of Unix 'mkdir -p'.
1886   * Existence of the directory hierarchy is not an error.
1887   * @param f path to create
1888   * @param permission to apply to f
1889   */
1890  public abstract boolean mkdirs(Path f, FsPermission permission
1891      ) throws IOException;
1892
1893  /**
1894   * The src file is on the local disk.  Add it to FS at
1895   * the given dst name and the source is kept intact afterwards
1896   * @param src path
1897   * @param dst path
1898   */
1899  public void copyFromLocalFile(Path src, Path dst)
1900    throws IOException {
1901    copyFromLocalFile(false, src, dst);
1902  }
1903
1904  /**
1905   * The src files is on the local disk.  Add it to FS at
1906   * the given dst name, removing the source afterwards.
1907   * @param srcs path
1908   * @param dst path
1909   */
1910  public void moveFromLocalFile(Path[] srcs, Path dst)
1911    throws IOException {
1912    copyFromLocalFile(true, true, srcs, dst);
1913  }
1914
1915  /**
1916   * The src file is on the local disk.  Add it to FS at
1917   * the given dst name, removing the source afterwards.
1918   * @param src path
1919   * @param dst path
1920   */
1921  public void moveFromLocalFile(Path src, Path dst)
1922    throws IOException {
1923    copyFromLocalFile(true, src, dst);
1924  }
1925
1926  /**
1927   * The src file is on the local disk.  Add it to FS at
1928   * the given dst name.
1929   * delSrc indicates if the source should be removed
1930   * @param delSrc whether to delete the src
1931   * @param src path
1932   * @param dst path
1933   */
1934  public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
1935    throws IOException {
1936    copyFromLocalFile(delSrc, true, src, dst);
1937  }
1938  
1939  /**
1940   * The src files are on the local disk.  Add it to FS at
1941   * the given dst name.
1942   * delSrc indicates if the source should be removed
1943   * @param delSrc whether to delete the src
1944   * @param overwrite whether to overwrite an existing file
1945   * @param srcs array of paths which are source
1946   * @param dst path
1947   */
1948  public void copyFromLocalFile(boolean delSrc, boolean overwrite, 
1949                                Path[] srcs, Path dst)
1950    throws IOException {
1951    Configuration conf = getConf();
1952    FileUtil.copy(getLocal(conf), srcs, this, dst, delSrc, overwrite, conf);
1953  }
1954  
1955  /**
1956   * The src file is on the local disk.  Add it to FS at
1957   * the given dst name.
1958   * delSrc indicates if the source should be removed
1959   * @param delSrc whether to delete the src
1960   * @param overwrite whether to overwrite an existing file
1961   * @param src path
1962   * @param dst path
1963   */
1964  public void copyFromLocalFile(boolean delSrc, boolean overwrite, 
1965                                Path src, Path dst)
1966    throws IOException {
1967    Configuration conf = getConf();
1968    FileUtil.copy(getLocal(conf), src, this, dst, delSrc, overwrite, conf);
1969  }
1970    
1971  /**
1972   * The src file is under FS, and the dst is on the local disk.
1973   * Copy it from FS control to the local dst name.
1974   * @param src path
1975   * @param dst path
1976   */
1977  public void copyToLocalFile(Path src, Path dst) throws IOException {
1978    copyToLocalFile(false, src, dst);
1979  }
1980    
1981  /**
1982   * The src file is under FS, and the dst is on the local disk.
1983   * Copy it from FS control to the local dst name.
1984   * Remove the source afterwards
1985   * @param src path
1986   * @param dst path
1987   */
1988  public void moveToLocalFile(Path src, Path dst) throws IOException {
1989    copyToLocalFile(true, src, dst);
1990  }
1991
1992  /**
1993   * The src file is under FS, and the dst is on the local disk.
1994   * Copy it from FS control to the local dst name.
1995   * delSrc indicates if the src will be removed or not.
1996   * @param delSrc whether to delete the src
1997   * @param src path
1998   * @param dst path
1999   */   
2000  public void copyToLocalFile(boolean delSrc, Path src, Path dst)
2001    throws IOException {
2002    copyToLocalFile(delSrc, src, dst, false);
2003  }
2004  
2005    /**
2006   * The src file is under FS, and the dst is on the local disk. Copy it from FS
2007   * control to the local dst name. delSrc indicates if the src will be removed
2008   * or not. useRawLocalFileSystem indicates whether to use RawLocalFileSystem
2009   * as local file system or not. RawLocalFileSystem is non crc file system.So,
2010   * It will not create any crc files at local.
2011   * 
2012   * @param delSrc
2013   *          whether to delete the src
2014   * @param src
2015   *          path
2016   * @param dst
2017   *          path
2018   * @param useRawLocalFileSystem
2019   *          whether to use RawLocalFileSystem as local file system or not.
2020   * 
2021   * @throws IOException
2022   *           - if any IO error
2023   */
2024  public void copyToLocalFile(boolean delSrc, Path src, Path dst,
2025      boolean useRawLocalFileSystem) throws IOException {
2026    Configuration conf = getConf();
2027    FileSystem local = null;
2028    if (useRawLocalFileSystem) {
2029      local = getLocal(conf).getRawFileSystem();
2030    } else {
2031      local = getLocal(conf);
2032    }
2033    FileUtil.copy(this, src, local, dst, delSrc, conf);
2034  }
2035
2036  /**
2037   * Returns a local File that the user can write output to.  The caller
2038   * provides both the eventual FS target name and the local working
2039   * file.  If the FS is local, we write directly into the target.  If
2040   * the FS is remote, we write into the tmp local area.
2041   * @param fsOutputFile path of output file
2042   * @param tmpLocalFile path of local tmp file
2043   */
2044  public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
2045    throws IOException {
2046    return tmpLocalFile;
2047  }
2048
2049  /**
2050   * Called when we're all done writing to the target.  A local FS will
2051   * do nothing, because we've written to exactly the right place.  A remote
2052   * FS will copy the contents of tmpLocalFile to the correct target at
2053   * fsOutputFile.
2054   * @param fsOutputFile path of output file
2055   * @param tmpLocalFile path to local tmp file
2056   */
2057  public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
2058    throws IOException {
2059    moveFromLocalFile(tmpLocalFile, fsOutputFile);
2060  }
2061
2062  /**
2063   * No more filesystem operations are needed.  Will
2064   * release any held locks.
2065   */
2066  @Override
2067  public void close() throws IOException {
2068    // delete all files that were marked as delete-on-exit.
2069    processDeleteOnExit();
2070    CACHE.remove(this.key, this);
2071  }
2072
2073  /** Return the total size of all files in the filesystem.*/
2074  public long getUsed() throws IOException{
2075    long used = 0;
2076    FileStatus[] files = listStatus(new Path("/"));
2077    for(FileStatus file:files){
2078      used += file.getLen();
2079    }
2080    return used;
2081  }
2082  
2083  /**
2084   * Get the block size for a particular file.
2085   * @param f the filename
2086   * @return the number of bytes in a block
2087   */
2088  /** @deprecated Use getFileStatus() instead */
2089  @Deprecated
2090  public long getBlockSize(Path f) throws IOException {
2091    return getFileStatus(f).getBlockSize();
2092  }
2093
2094  /**
2095   * Return the number of bytes that large input files should be optimally
2096   * be split into to minimize i/o time.
2097   * @deprecated use {@link #getDefaultBlockSize(Path)} instead
2098   */
2099  @Deprecated
2100  public long getDefaultBlockSize() {
2101    // default to 32MB: large enough to minimize the impact of seeks
2102    return getConf().getLong("fs.local.block.size", 32 * 1024 * 1024);
2103  }
2104    
2105  /** Return the number of bytes that large input files should be optimally
2106   * be split into to minimize i/o time.  The given path will be used to
2107   * locate the actual filesystem.  The full path does not have to exist.
2108   * @param f path of file
2109   * @return the default block size for the path's filesystem
2110   */
2111  public long getDefaultBlockSize(Path f) {
2112    return getDefaultBlockSize();
2113  }
2114
2115  /**
2116   * Get the default replication.
2117   * @deprecated use {@link #getDefaultReplication(Path)} instead
2118   */
2119  @Deprecated
2120  public short getDefaultReplication() { return 1; }
2121
2122  /**
2123   * Get the default replication for a path.   The given path will be used to
2124   * locate the actual filesystem.  The full path does not have to exist.
2125   * @param path of the file
2126   * @return default replication for the path's filesystem 
2127   */
2128  public short getDefaultReplication(Path path) {
2129    return getDefaultReplication();
2130  }
2131  
2132  /**
2133   * Return a file status object that represents the path.
2134   * @param f The path we want information from
2135   * @return a FileStatus object
2136   * @throws FileNotFoundException when the path does not exist;
2137   *         IOException see specific implementation
2138   */
2139  public abstract FileStatus getFileStatus(Path f) throws IOException;
2140
2141  /**
2142   * Checks if the user can access a path.  The mode specifies which access
2143   * checks to perform.  If the requested permissions are granted, then the
2144   * method returns normally.  If access is denied, then the method throws an
2145   * {@link AccessControlException}.
2146   * <p/>
2147   * The default implementation of this method calls {@link #getFileStatus(Path)}
2148   * and checks the returned permissions against the requested permissions.
2149   * Note that the getFileStatus call will be subject to authorization checks.
2150   * Typically, this requires search (execute) permissions on each directory in
2151   * the path's prefix, but this is implementation-defined.  Any file system
2152   * that provides a richer authorization model (such as ACLs) may override the
2153   * default implementation so that it checks against that model instead.
2154   * <p>
2155   * In general, applications should avoid using this method, due to the risk of
2156   * time-of-check/time-of-use race conditions.  The permissions on a file may
2157   * change immediately after the access call returns.  Most applications should
2158   * prefer running specific file system actions as the desired user represented
2159   * by a {@link UserGroupInformation}.
2160   *
2161   * @param path Path to check
2162   * @param mode type of access to check
2163   * @throws AccessControlException if access is denied
2164   * @throws FileNotFoundException if the path does not exist
2165   * @throws IOException see specific implementation
2166   */
2167  @InterfaceAudience.LimitedPrivate({"HDFS", "Hive"})
2168  public void access(Path path, FsAction mode) throws AccessControlException,
2169      FileNotFoundException, IOException {
2170    checkAccessPermissions(this.getFileStatus(path), mode);
2171  }
2172
2173  /**
2174   * This method provides the default implementation of
2175   * {@link #access(Path, FsAction)}.
2176   *
2177   * @param stat FileStatus to check
2178   * @param mode type of access to check
2179   * @throws IOException for any error
2180   */
2181  @InterfaceAudience.Private
2182  static void checkAccessPermissions(FileStatus stat, FsAction mode)
2183      throws IOException {
2184    FsPermission perm = stat.getPermission();
2185    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
2186    String user = ugi.getShortUserName();
2187    if (user.equals(stat.getOwner())) {
2188      if (perm.getUserAction().implies(mode)) {
2189        return;
2190      }
2191    } else if (ugi.getGroups().contains(stat.getGroup())) {
2192      if (perm.getGroupAction().implies(mode)) {
2193        return;
2194      }
2195    } else {
2196      if (perm.getOtherAction().implies(mode)) {
2197        return;
2198      }
2199    }
2200    throw new AccessControlException(String.format(
2201      "Permission denied: user=%s, path=\"%s\":%s:%s:%s%s", user, stat.getPath(),
2202      stat.getOwner(), stat.getGroup(), stat.isDirectory() ? "d" : "-", perm));
2203  }
2204
2205  /**
2206   * See {@link FileContext#fixRelativePart}
2207   */
2208  protected Path fixRelativePart(Path p) {
2209    if (p.isUriPathAbsolute()) {
2210      return p;
2211    } else {
2212      return new Path(getWorkingDirectory(), p);
2213    }
2214  }
2215
2216  /**
2217   * See {@link FileContext#createSymlink(Path, Path, boolean)}
2218   */
2219  public void createSymlink(final Path target, final Path link,
2220      final boolean createParent) throws AccessControlException,
2221      FileAlreadyExistsException, FileNotFoundException,
2222      ParentNotDirectoryException, UnsupportedFileSystemException, 
2223      IOException {
2224    // Supporting filesystems should override this method
2225    throw new UnsupportedOperationException(
2226        "Filesystem does not support symlinks!");
2227  }
2228
2229  /**
2230   * See {@link FileContext#getFileLinkStatus(Path)}
2231   */
2232  public FileStatus getFileLinkStatus(final Path f)
2233      throws AccessControlException, FileNotFoundException,
2234      UnsupportedFileSystemException, IOException {
2235    // Supporting filesystems should override this method
2236    return getFileStatus(f);
2237  }
2238
2239  /**
2240   * See {@link AbstractFileSystem#supportsSymlinks()}
2241   */
2242  public boolean supportsSymlinks() {
2243    return false;
2244  }
2245
2246  /**
2247   * See {@link FileContext#getLinkTarget(Path)}
2248   */
2249  public Path getLinkTarget(Path f) throws IOException {
2250    // Supporting filesystems should override this method
2251    throw new UnsupportedOperationException(
2252        "Filesystem does not support symlinks!");
2253  }
2254
2255  /**
2256   * See {@link AbstractFileSystem#getLinkTarget(Path)}
2257   */
2258  protected Path resolveLink(Path f) throws IOException {
2259    // Supporting filesystems should override this method
2260    throw new UnsupportedOperationException(
2261        "Filesystem does not support symlinks!");
2262  }
2263
2264  /**
2265   * Get the checksum of a file.
2266   *
2267   * @param f The file path
2268   * @return The file checksum.  The default return value is null,
2269   *  which indicates that no checksum algorithm is implemented
2270   *  in the corresponding FileSystem.
2271   */
2272  public FileChecksum getFileChecksum(Path f) throws IOException {
2273    return getFileChecksum(f, Long.MAX_VALUE);
2274  }
2275
2276  /**
2277   * Get the checksum of a file, from the beginning of the file till the
2278   * specific length.
2279   * @param f The file path
2280   * @param length The length of the file range for checksum calculation
2281   * @return The file checksum.
2282   */
2283  public FileChecksum getFileChecksum(Path f, final long length)
2284      throws IOException {
2285    return null;
2286  }
2287
2288  /**
2289   * Set the verify checksum flag. This is only applicable if the 
2290   * corresponding FileSystem supports checksum. By default doesn't do anything.
2291   * @param verifyChecksum
2292   */
2293  public void setVerifyChecksum(boolean verifyChecksum) {
2294    //doesn't do anything
2295  }
2296
2297  /**
2298   * Set the write checksum flag. This is only applicable if the 
2299   * corresponding FileSystem supports checksum. By default doesn't do anything.
2300   * @param writeChecksum
2301   */
2302  public void setWriteChecksum(boolean writeChecksum) {
2303    //doesn't do anything
2304  }
2305
2306  /**
2307   * Returns a status object describing the use and capacity of the
2308   * file system. If the file system has multiple partitions, the
2309   * use and capacity of the root partition is reflected.
2310   * 
2311   * @return a FsStatus object
2312   * @throws IOException
2313   *           see specific implementation
2314   */
2315  public FsStatus getStatus() throws IOException {
2316    return getStatus(null);
2317  }
2318
2319  /**
2320   * Returns a status object describing the use and capacity of the
2321   * file system. If the file system has multiple partitions, the
2322   * use and capacity of the partition pointed to by the specified
2323   * path is reflected.
2324   * @param p Path for which status should be obtained. null means
2325   * the default partition. 
2326   * @return a FsStatus object
2327   * @throws IOException
2328   *           see specific implementation
2329   */
2330  public FsStatus getStatus(Path p) throws IOException {
2331    return new FsStatus(Long.MAX_VALUE, 0, Long.MAX_VALUE);
2332  }
2333
2334  /**
2335   * Set permission of a path.
2336   * @param p
2337   * @param permission
2338   */
2339  public void setPermission(Path p, FsPermission permission
2340      ) throws IOException {
2341  }
2342
2343  /**
2344   * Set owner of a path (i.e. a file or a directory).
2345   * The parameters username and groupname cannot both be null.
2346   * @param p The path
2347   * @param username If it is null, the original username remains unchanged.
2348   * @param groupname If it is null, the original groupname remains unchanged.
2349   */
2350  public void setOwner(Path p, String username, String groupname
2351      ) throws IOException {
2352  }
2353
2354  /**
2355   * Set access time of a file
2356   * @param p The path
2357   * @param mtime Set the modification time of this file.
2358   *              The number of milliseconds since Jan 1, 1970. 
2359   *              A value of -1 means that this call should not set modification time.
2360   * @param atime Set the access time of this file.
2361   *              The number of milliseconds since Jan 1, 1970. 
2362   *              A value of -1 means that this call should not set access time.
2363   */
2364  public void setTimes(Path p, long mtime, long atime
2365      ) throws IOException {
2366  }
2367
2368  /**
2369   * Create a snapshot with a default name.
2370   * @param path The directory where snapshots will be taken.
2371   * @return the snapshot path.
2372   */
2373  public final Path createSnapshot(Path path) throws IOException {
2374    return createSnapshot(path, null);
2375  }
2376
2377  /**
2378   * Create a snapshot
2379   * @param path The directory where snapshots will be taken.
2380   * @param snapshotName The name of the snapshot
2381   * @return the snapshot path.
2382   */
2383  public Path createSnapshot(Path path, String snapshotName)
2384      throws IOException {
2385    throw new UnsupportedOperationException(getClass().getSimpleName()
2386        + " doesn't support createSnapshot");
2387  }
2388  
2389  /**
2390   * Rename a snapshot
2391   * @param path The directory path where the snapshot was taken
2392   * @param snapshotOldName Old name of the snapshot
2393   * @param snapshotNewName New name of the snapshot
2394   * @throws IOException
2395   */
2396  public void renameSnapshot(Path path, String snapshotOldName,
2397      String snapshotNewName) throws IOException {
2398    throw new UnsupportedOperationException(getClass().getSimpleName()
2399        + " doesn't support renameSnapshot");
2400  }
2401  
2402  /**
2403   * Delete a snapshot of a directory
2404   * @param path  The directory that the to-be-deleted snapshot belongs to
2405   * @param snapshotName The name of the snapshot
2406   */
2407  public void deleteSnapshot(Path path, String snapshotName)
2408      throws IOException {
2409    throw new UnsupportedOperationException(getClass().getSimpleName()
2410        + " doesn't support deleteSnapshot");
2411  }
2412  
2413  /**
2414   * Modifies ACL entries of files and directories.  This method can add new ACL
2415   * entries or modify the permissions on existing ACL entries.  All existing
2416   * ACL entries that are not specified in this call are retained without
2417   * changes.  (Modifications are merged into the current ACL.)
2418   *
2419   * @param path Path to modify
2420   * @param aclSpec List<AclEntry> describing modifications
2421   * @throws IOException if an ACL could not be modified
2422   */
2423  public void modifyAclEntries(Path path, List<AclEntry> aclSpec)
2424      throws IOException {
2425    throw new UnsupportedOperationException(getClass().getSimpleName()
2426        + " doesn't support modifyAclEntries");
2427  }
2428
2429  /**
2430   * Removes ACL entries from files and directories.  Other ACL entries are
2431   * retained.
2432   *
2433   * @param path Path to modify
2434   * @param aclSpec List<AclEntry> describing entries to remove
2435   * @throws IOException if an ACL could not be modified
2436   */
2437  public void removeAclEntries(Path path, List<AclEntry> aclSpec)
2438      throws IOException {
2439    throw new UnsupportedOperationException(getClass().getSimpleName()
2440        + " doesn't support removeAclEntries");
2441  }
2442
2443  /**
2444   * Removes all default ACL entries from files and directories.
2445   *
2446   * @param path Path to modify
2447   * @throws IOException if an ACL could not be modified
2448   */
2449  public void removeDefaultAcl(Path path)
2450      throws IOException {
2451    throw new UnsupportedOperationException(getClass().getSimpleName()
2452        + " doesn't support removeDefaultAcl");
2453  }
2454
2455  /**
2456   * Removes all but the base ACL entries of files and directories.  The entries
2457   * for user, group, and others are retained for compatibility with permission
2458   * bits.
2459   *
2460   * @param path Path to modify
2461   * @throws IOException if an ACL could not be removed
2462   */
2463  public void removeAcl(Path path)
2464      throws IOException {
2465    throw new UnsupportedOperationException(getClass().getSimpleName()
2466        + " doesn't support removeAcl");
2467  }
2468
2469  /**
2470   * Fully replaces ACL of files and directories, discarding all existing
2471   * entries.
2472   *
2473   * @param path Path to modify
2474   * @param aclSpec List<AclEntry> describing modifications, must include entries
2475   *   for user, group, and others for compatibility with permission bits.
2476   * @throws IOException if an ACL could not be modified
2477   */
2478  public void setAcl(Path path, List<AclEntry> aclSpec) throws IOException {
2479    throw new UnsupportedOperationException(getClass().getSimpleName()
2480        + " doesn't support setAcl");
2481  }
2482
2483  /**
2484   * Gets the ACL of a file or directory.
2485   *
2486   * @param path Path to get
2487   * @return AclStatus describing the ACL of the file or directory
2488   * @throws IOException if an ACL could not be read
2489   */
2490  public AclStatus getAclStatus(Path path) throws IOException {
2491    throw new UnsupportedOperationException(getClass().getSimpleName()
2492        + " doesn't support getAclStatus");
2493  }
2494
2495  /**
2496   * Set an xattr of a file or directory.
2497   * The name must be prefixed with the namespace followed by ".". For example,
2498   * "user.attr".
2499   * <p/>
2500   * Refer to the HDFS extended attributes user documentation for details.
2501   *
2502   * @param path Path to modify
2503   * @param name xattr name.
2504   * @param value xattr value.
2505   * @throws IOException
2506   */
2507  public void setXAttr(Path path, String name, byte[] value)
2508      throws IOException {
2509    setXAttr(path, name, value, EnumSet.of(XAttrSetFlag.CREATE,
2510        XAttrSetFlag.REPLACE));
2511  }
2512
2513  /**
2514   * Set an xattr of a file or directory.
2515   * The name must be prefixed with the namespace followed by ".". For example,
2516   * "user.attr".
2517   * <p/>
2518   * Refer to the HDFS extended attributes user documentation for details.
2519   *
2520   * @param path Path to modify
2521   * @param name xattr name.
2522   * @param value xattr value.
2523   * @param flag xattr set flag
2524   * @throws IOException
2525   */
2526  public void setXAttr(Path path, String name, byte[] value,
2527      EnumSet<XAttrSetFlag> flag) throws IOException {
2528    throw new UnsupportedOperationException(getClass().getSimpleName()
2529        + " doesn't support setXAttr");
2530  }
2531
2532  /**
2533   * Get an xattr name and value for a file or directory.
2534   * The name must be prefixed with the namespace followed by ".". For example,
2535   * "user.attr".
2536   * <p/>
2537   * Refer to the HDFS extended attributes user documentation for details.
2538   *
2539   * @param path Path to get extended attribute
2540   * @param name xattr name.
2541   * @return byte[] xattr value.
2542   * @throws IOException
2543   */
2544  public byte[] getXAttr(Path path, String name) throws IOException {
2545    throw new UnsupportedOperationException(getClass().getSimpleName()
2546        + " doesn't support getXAttr");
2547  }
2548
2549  /**
2550   * Get all of the xattr name/value pairs for a file or directory.
2551   * Only those xattrs which the logged-in user has permissions to view
2552   * are returned.
2553   * <p/>
2554   * Refer to the HDFS extended attributes user documentation for details.
2555   *
2556   * @param path Path to get extended attributes
2557   * @return Map<String, byte[]> describing the XAttrs of the file or directory
2558   * @throws IOException
2559   */
2560  public Map<String, byte[]> getXAttrs(Path path) throws IOException {
2561    throw new UnsupportedOperationException(getClass().getSimpleName()
2562        + " doesn't support getXAttrs");
2563  }
2564
2565  /**
2566   * Get all of the xattrs name/value pairs for a file or directory.
2567   * Only those xattrs which the logged-in user has permissions to view
2568   * are returned.
2569   * <p/>
2570   * Refer to the HDFS extended attributes user documentation for details.
2571   *
2572   * @param path Path to get extended attributes
2573   * @param names XAttr names.
2574   * @return Map<String, byte[]> describing the XAttrs of the file or directory
2575   * @throws IOException
2576   */
2577  public Map<String, byte[]> getXAttrs(Path path, List<String> names)
2578      throws IOException {
2579    throw new UnsupportedOperationException(getClass().getSimpleName()
2580        + " doesn't support getXAttrs");
2581  }
2582
2583  /**
2584   * Get all of the xattr names for a file or directory.
2585   * Only those xattr names which the logged-in user has permissions to view
2586   * are returned.
2587   * <p/>
2588   * Refer to the HDFS extended attributes user documentation for details.
2589   *
2590   * @param path Path to get extended attributes
2591   * @return List<String> of the XAttr names of the file or directory
2592   * @throws IOException
2593   */
2594  public List<String> listXAttrs(Path path) throws IOException {
2595    throw new UnsupportedOperationException(getClass().getSimpleName()
2596            + " doesn't support listXAttrs");
2597  }
2598
2599  /**
2600   * Remove an xattr of a file or directory.
2601   * The name must be prefixed with the namespace followed by ".". For example,
2602   * "user.attr".
2603   * <p/>
2604   * Refer to the HDFS extended attributes user documentation for details.
2605   *
2606   * @param path Path to remove extended attribute
2607   * @param name xattr name
2608   * @throws IOException
2609   */
2610  public void removeXAttr(Path path, String name) throws IOException {
2611    throw new UnsupportedOperationException(getClass().getSimpleName()
2612        + " doesn't support removeXAttr");
2613  }
2614
2615  // making it volatile to be able to do a double checked locking
2616  private volatile static boolean FILE_SYSTEMS_LOADED = false;
2617
2618  private static final Map<String, Class<? extends FileSystem>>
2619    SERVICE_FILE_SYSTEMS = new HashMap<String, Class<? extends FileSystem>>();
2620
2621  private static void loadFileSystems() {
2622    synchronized (FileSystem.class) {
2623      if (!FILE_SYSTEMS_LOADED) {
2624        ServiceLoader<FileSystem> serviceLoader = ServiceLoader.load(FileSystem.class);
2625        Iterator<FileSystem> it = serviceLoader.iterator();
2626        while (it.hasNext()) {
2627          FileSystem fs = null;
2628          try {
2629            fs = it.next();
2630            try {
2631              SERVICE_FILE_SYSTEMS.put(fs.getScheme(), fs.getClass());
2632            } catch (Exception e) {
2633              LOG.warn("Cannot load: " + fs + " from " +
2634                  ClassUtil.findContainingJar(fs.getClass()), e);
2635            }
2636          } catch (ServiceConfigurationError ee) {
2637            LOG.warn("Cannot load filesystem", ee);
2638          }
2639        }
2640        FILE_SYSTEMS_LOADED = true;
2641      }
2642    }
2643  }
2644
2645  public static Class<? extends FileSystem> getFileSystemClass(String scheme,
2646      Configuration conf) throws IOException {
2647    if (!FILE_SYSTEMS_LOADED) {
2648      loadFileSystems();
2649    }
2650    Class<? extends FileSystem> clazz = null;
2651    if (conf != null) {
2652      clazz = (Class<? extends FileSystem>) conf.getClass("fs." + scheme + ".impl", null);
2653    }
2654    if (clazz == null) {
2655      clazz = SERVICE_FILE_SYSTEMS.get(scheme);
2656    }
2657    if (clazz == null) {
2658      throw new IOException("No FileSystem for scheme: " + scheme);
2659    }
2660    return clazz;
2661  }
2662
2663  private static FileSystem createFileSystem(URI uri, Configuration conf
2664      ) throws IOException {
2665    Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);
2666    FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
2667    fs.initialize(uri, conf);
2668    return fs;
2669  }
2670
2671  /** Caching FileSystem objects */
2672  static class Cache {
2673    private final ClientFinalizer clientFinalizer = new ClientFinalizer();
2674
2675    private final Map<Key, FileSystem> map = new HashMap<Key, FileSystem>();
2676    private final Set<Key> toAutoClose = new HashSet<Key>();
2677
2678    /** A variable that makes all objects in the cache unique */
2679    private static AtomicLong unique = new AtomicLong(1);
2680
2681    FileSystem get(URI uri, Configuration conf) throws IOException{
2682      Key key = new Key(uri, conf);
2683      return getInternal(uri, conf, key);
2684    }
2685
2686    /** The objects inserted into the cache using this method are all unique */
2687    FileSystem getUnique(URI uri, Configuration conf) throws IOException{
2688      Key key = new Key(uri, conf, unique.getAndIncrement());
2689      return getInternal(uri, conf, key);
2690    }
2691
2692    private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{
2693      FileSystem fs;
2694      synchronized (this) {
2695        fs = map.get(key);
2696      }
2697      if (fs != null) {
2698        return fs;
2699      }
2700
2701      fs = createFileSystem(uri, conf);
2702      synchronized (this) { // refetch the lock again
2703        FileSystem oldfs = map.get(key);
2704        if (oldfs != null) { // a file system is created while lock is releasing
2705          fs.close(); // close the new file system
2706          return oldfs;  // return the old file system
2707        }
2708        
2709        // now insert the new file system into the map
2710        if (map.isEmpty()
2711                && !ShutdownHookManager.get().isShutdownInProgress()) {
2712          ShutdownHookManager.get().addShutdownHook(clientFinalizer, SHUTDOWN_HOOK_PRIORITY);
2713        }
2714        fs.key = key;
2715        map.put(key, fs);
2716        if (conf.getBoolean("fs.automatic.close", true)) {
2717          toAutoClose.add(key);
2718        }
2719        return fs;
2720      }
2721    }
2722
2723    synchronized void remove(Key key, FileSystem fs) {
2724      if (map.containsKey(key) && fs == map.get(key)) {
2725        map.remove(key);
2726        toAutoClose.remove(key);
2727        }
2728    }
2729
2730    synchronized void closeAll() throws IOException {
2731      closeAll(false);
2732    }
2733
2734    /**
2735     * Close all FileSystem instances in the Cache.
2736     * @param onlyAutomatic only close those that are marked for automatic closing
2737     */
2738    synchronized void closeAll(boolean onlyAutomatic) throws IOException {
2739      List<IOException> exceptions = new ArrayList<IOException>();
2740
2741      // Make a copy of the keys in the map since we'll be modifying
2742      // the map while iterating over it, which isn't safe.
2743      List<Key> keys = new ArrayList<Key>();
2744      keys.addAll(map.keySet());
2745
2746      for (Key key : keys) {
2747        final FileSystem fs = map.get(key);
2748
2749        if (onlyAutomatic && !toAutoClose.contains(key)) {
2750          continue;
2751        }
2752
2753        //remove from cache
2754        remove(key, fs);
2755
2756        if (fs != null) {
2757          try {
2758            fs.close();
2759          }
2760          catch(IOException ioe) {
2761            exceptions.add(ioe);
2762          }
2763        }
2764      }
2765
2766      if (!exceptions.isEmpty()) {
2767        throw MultipleIOException.createIOException(exceptions);
2768      }
2769    }
2770
2771    private class ClientFinalizer implements Runnable {
2772      @Override
2773      public synchronized void run() {
2774        try {
2775          closeAll(true);
2776        } catch (IOException e) {
2777          LOG.info("FileSystem.Cache.closeAll() threw an exception:\n" + e);
2778        }
2779      }
2780    }
2781
2782    synchronized void closeAll(UserGroupInformation ugi) throws IOException {
2783      List<FileSystem> targetFSList = new ArrayList<FileSystem>();
2784      //Make a pass over the list and collect the filesystems to close
2785      //we cannot close inline since close() removes the entry from the Map
2786      for (Map.Entry<Key, FileSystem> entry : map.entrySet()) {
2787        final Key key = entry.getKey();
2788        final FileSystem fs = entry.getValue();
2789        if (ugi.equals(key.ugi) && fs != null) {
2790          targetFSList.add(fs);   
2791        }
2792      }
2793      List<IOException> exceptions = new ArrayList<IOException>();
2794      //now make a pass over the target list and close each
2795      for (FileSystem fs : targetFSList) {
2796        try {
2797          fs.close();
2798        }
2799        catch(IOException ioe) {
2800          exceptions.add(ioe);
2801        }
2802      }
2803      if (!exceptions.isEmpty()) {
2804        throw MultipleIOException.createIOException(exceptions);
2805      }
2806    }
2807
2808    /** FileSystem.Cache.Key */
2809    static class Key {
2810      final String scheme;
2811      final String authority;
2812      final UserGroupInformation ugi;
2813      final long unique;   // an artificial way to make a key unique
2814
2815      Key(URI uri, Configuration conf) throws IOException {
2816        this(uri, conf, 0);
2817      }
2818
2819      Key(URI uri, Configuration conf, long unique) throws IOException {
2820        scheme = uri.getScheme()==null ?
2821            "" : StringUtils.toLowerCase(uri.getScheme());
2822        authority = uri.getAuthority()==null ?
2823            "" : StringUtils.toLowerCase(uri.getAuthority());
2824        this.unique = unique;
2825        
2826        this.ugi = UserGroupInformation.getCurrentUser();
2827      }
2828
2829      @Override
2830      public int hashCode() {
2831        return (scheme + authority).hashCode() + ugi.hashCode() + (int)unique;
2832      }
2833
2834      static boolean isEqual(Object a, Object b) {
2835        return a == b || (a != null && a.equals(b));        
2836      }
2837
2838      @Override
2839      public boolean equals(Object obj) {
2840        if (obj == this) {
2841          return true;
2842        }
2843        if (obj != null && obj instanceof Key) {
2844          Key that = (Key)obj;
2845          return isEqual(this.scheme, that.scheme)
2846                 && isEqual(this.authority, that.authority)
2847                 && isEqual(this.ugi, that.ugi)
2848                 && (this.unique == that.unique);
2849        }
2850        return false;        
2851      }
2852
2853      @Override
2854      public String toString() {
2855        return "("+ugi.toString() + ")@" + scheme + "://" + authority;        
2856      }
2857    }
2858  }
2859  
2860  /**
2861   * Tracks statistics about how many reads, writes, and so forth have been
2862   * done in a FileSystem.
2863   * 
2864   * Since there is only one of these objects per FileSystem, there will 
2865   * typically be many threads writing to this object.  Almost every operation
2866   * on an open file will involve a write to this object.  In contrast, reading
2867   * statistics is done infrequently by most programs, and not at all by others.
2868   * Hence, this is optimized for writes.
2869   * 
2870   * Each thread writes to its own thread-local area of memory.  This removes 
2871   * contention and allows us to scale up to many, many threads.  To read
2872   * statistics, the reader thread totals up the contents of all of the 
2873   * thread-local data areas.
2874   */
2875  public static final class Statistics {
2876    /**
2877     * Statistics data.
2878     * 
2879     * There is only a single writer to thread-local StatisticsData objects.
2880     * Hence, volatile is adequate here-- we do not need AtomicLong or similar
2881     * to prevent lost updates.
2882     * The Java specification guarantees that updates to volatile longs will
2883     * be perceived as atomic with respect to other threads, which is all we
2884     * need.
2885     */
2886    public static class StatisticsData {
2887      volatile long bytesRead;
2888      volatile long bytesWritten;
2889      volatile int readOps;
2890      volatile int largeReadOps;
2891      volatile int writeOps;
2892
2893      /**
2894       * Add another StatisticsData object to this one.
2895       */
2896      void add(StatisticsData other) {
2897        this.bytesRead += other.bytesRead;
2898        this.bytesWritten += other.bytesWritten;
2899        this.readOps += other.readOps;
2900        this.largeReadOps += other.largeReadOps;
2901        this.writeOps += other.writeOps;
2902      }
2903
2904      /**
2905       * Negate the values of all statistics.
2906       */
2907      void negate() {
2908        this.bytesRead = -this.bytesRead;
2909        this.bytesWritten = -this.bytesWritten;
2910        this.readOps = -this.readOps;
2911        this.largeReadOps = -this.largeReadOps;
2912        this.writeOps = -this.writeOps;
2913      }
2914
2915      @Override
2916      public String toString() {
2917        return bytesRead + " bytes read, " + bytesWritten + " bytes written, "
2918            + readOps + " read ops, " + largeReadOps + " large read ops, "
2919            + writeOps + " write ops";
2920      }
2921      
2922      public long getBytesRead() {
2923        return bytesRead;
2924      }
2925      
2926      public long getBytesWritten() {
2927        return bytesWritten;
2928      }
2929      
2930      public int getReadOps() {
2931        return readOps;
2932      }
2933      
2934      public int getLargeReadOps() {
2935        return largeReadOps;
2936      }
2937      
2938      public int getWriteOps() {
2939        return writeOps;
2940      }
2941    }
2942
2943    private interface StatisticsAggregator<T> {
2944      void accept(StatisticsData data);
2945      T aggregate();
2946    }
2947
2948    private final String scheme;
2949
2950    /**
2951     * rootData is data that doesn't belong to any thread, but will be added
2952     * to the totals.  This is useful for making copies of Statistics objects,
2953     * and for storing data that pertains to threads that have been garbage
2954     * collected.  Protected by the Statistics lock.
2955     */
2956    private final StatisticsData rootData;
2957
2958    /**
2959     * Thread-local data.
2960     */
2961    private final ThreadLocal<StatisticsData> threadData;
2962
2963    /**
2964     * Set of all thread-local data areas.  Protected by the Statistics lock.
2965     * The references to the statistics data are kept using weak references
2966     * to the associated threads. Proper clean-up is performed by the cleaner
2967     * thread when the threads are garbage collected.
2968     */
2969    private final Set<StatisticsDataReference> allData;
2970
2971    /**
2972     * Global reference queue and a cleaner thread that manage statistics data
2973     * references from all filesystem instances.
2974     */
2975    private static final ReferenceQueue<Thread> STATS_DATA_REF_QUEUE;
2976    private static final Thread STATS_DATA_CLEANER;
2977
2978    static {
2979      STATS_DATA_REF_QUEUE = new ReferenceQueue<Thread>();
2980      // start a single daemon cleaner thread
2981      STATS_DATA_CLEANER = new Thread(new StatisticsDataReferenceCleaner());
2982      STATS_DATA_CLEANER.
2983          setName(StatisticsDataReferenceCleaner.class.getName());
2984      STATS_DATA_CLEANER.setDaemon(true);
2985      STATS_DATA_CLEANER.start();
2986    }
2987
2988    public Statistics(String scheme) {
2989      this.scheme = scheme;
2990      this.rootData = new StatisticsData();
2991      this.threadData = new ThreadLocal<StatisticsData>();
2992      this.allData = new HashSet<StatisticsDataReference>();
2993    }
2994
2995    /**
2996     * Copy constructor.
2997     * 
2998     * @param other    The input Statistics object which is cloned.
2999     */
3000    public Statistics(Statistics other) {
3001      this.scheme = other.scheme;
3002      this.rootData = new StatisticsData();
3003      other.visitAll(new StatisticsAggregator<Void>() {
3004        @Override
3005        public void accept(StatisticsData data) {
3006          rootData.add(data);
3007        }
3008
3009        public Void aggregate() {
3010          return null;
3011        }
3012      });
3013      this.threadData = new ThreadLocal<StatisticsData>();
3014      this.allData = new HashSet<StatisticsDataReference>();
3015    }
3016
3017    /**
3018     * A weak reference to a thread that also includes the data associated
3019     * with that thread. On the thread being garbage collected, it is enqueued
3020     * to the reference queue for clean-up.
3021     */
3022    private class StatisticsDataReference extends WeakReference<Thread> {
3023      private final StatisticsData data;
3024
3025      public StatisticsDataReference(StatisticsData data, Thread thread) {
3026        super(thread, STATS_DATA_REF_QUEUE);
3027        this.data = data;
3028      }
3029
3030      public StatisticsData getData() {
3031        return data;
3032      }
3033
3034      /**
3035       * Performs clean-up action when the associated thread is garbage
3036       * collected.
3037       */
3038      public void cleanUp() {
3039        // use the statistics lock for safety
3040        synchronized (Statistics.this) {
3041          /*
3042           * If the thread that created this thread-local data no longer exists,
3043           * remove the StatisticsData from our list and fold the values into
3044           * rootData.
3045           */
3046          rootData.add(data);
3047          allData.remove(this);
3048        }
3049      }
3050    }
3051
3052    /**
3053     * Background action to act on references being removed.
3054     */
3055    private static class StatisticsDataReferenceCleaner implements Runnable {
3056      @Override
3057      public void run() {
3058        while (true) {
3059          try {
3060            StatisticsDataReference ref =
3061                (StatisticsDataReference)STATS_DATA_REF_QUEUE.remove();
3062            ref.cleanUp();
3063          } catch (Throwable th) {
3064            // the cleaner thread should continue to run even if there are
3065            // exceptions, including InterruptedException
3066            LOG.warn("exception in the cleaner thread but it will continue to "
3067                + "run", th);
3068          }
3069        }
3070      }
3071    }
3072
3073    /**
3074     * Get or create the thread-local data associated with the current thread.
3075     */
3076    public StatisticsData getThreadStatistics() {
3077      StatisticsData data = threadData.get();
3078      if (data == null) {
3079        data = new StatisticsData();
3080        threadData.set(data);
3081        StatisticsDataReference ref =
3082            new StatisticsDataReference(data, Thread.currentThread());
3083        synchronized(this) {
3084          allData.add(ref);
3085        }
3086      }
3087      return data;
3088    }
3089
3090    /**
3091     * Increment the bytes read in the statistics
3092     * @param newBytes the additional bytes read
3093     */
3094    public void incrementBytesRead(long newBytes) {
3095      getThreadStatistics().bytesRead += newBytes;
3096    }
3097    
3098    /**
3099     * Increment the bytes written in the statistics
3100     * @param newBytes the additional bytes written
3101     */
3102    public void incrementBytesWritten(long newBytes) {
3103      getThreadStatistics().bytesWritten += newBytes;
3104    }
3105    
3106    /**
3107     * Increment the number of read operations
3108     * @param count number of read operations
3109     */
3110    public void incrementReadOps(int count) {
3111      getThreadStatistics().readOps += count;
3112    }
3113
3114    /**
3115     * Increment the number of large read operations
3116     * @param count number of large read operations
3117     */
3118    public void incrementLargeReadOps(int count) {
3119      getThreadStatistics().largeReadOps += count;
3120    }
3121
3122    /**
3123     * Increment the number of write operations
3124     * @param count number of write operations
3125     */
3126    public void incrementWriteOps(int count) {
3127      getThreadStatistics().writeOps += count;
3128    }
3129
3130    /**
3131     * Apply the given aggregator to all StatisticsData objects associated with
3132     * this Statistics object.
3133     *
3134     * For each StatisticsData object, we will call accept on the visitor.
3135     * Finally, at the end, we will call aggregate to get the final total. 
3136     *
3137     * @param         The visitor to use.
3138     * @return        The total.
3139     */
3140    private synchronized <T> T visitAll(StatisticsAggregator<T> visitor) {
3141      visitor.accept(rootData);
3142      for (StatisticsDataReference ref: allData) {
3143        StatisticsData data = ref.getData();
3144        visitor.accept(data);
3145      }
3146      return visitor.aggregate();
3147    }
3148
3149    /**
3150     * Get the total number of bytes read
3151     * @return the number of bytes
3152     */
3153    public long getBytesRead() {
3154      return visitAll(new StatisticsAggregator<Long>() {
3155        private long bytesRead = 0;
3156
3157        @Override
3158        public void accept(StatisticsData data) {
3159          bytesRead += data.bytesRead;
3160        }
3161
3162        public Long aggregate() {
3163          return bytesRead;
3164        }
3165      });
3166    }
3167    
3168    /**
3169     * Get the total number of bytes written
3170     * @return the number of bytes
3171     */
3172    public long getBytesWritten() {
3173      return visitAll(new StatisticsAggregator<Long>() {
3174        private long bytesWritten = 0;
3175
3176        @Override
3177        public void accept(StatisticsData data) {
3178          bytesWritten += data.bytesWritten;
3179        }
3180
3181        public Long aggregate() {
3182          return bytesWritten;
3183        }
3184      });
3185    }
3186    
3187    /**
3188     * Get the number of file system read operations such as list files
3189     * @return number of read operations
3190     */
3191    public int getReadOps() {
3192      return visitAll(new StatisticsAggregator<Integer>() {
3193        private int readOps = 0;
3194
3195        @Override
3196        public void accept(StatisticsData data) {
3197          readOps += data.readOps;
3198          readOps += data.largeReadOps;
3199        }
3200
3201        public Integer aggregate() {
3202          return readOps;
3203        }
3204      });
3205    }
3206
3207    /**
3208     * Get the number of large file system read operations such as list files
3209     * under a large directory
3210     * @return number of large read operations
3211     */
3212    public int getLargeReadOps() {
3213      return visitAll(new StatisticsAggregator<Integer>() {
3214        private int largeReadOps = 0;
3215
3216        @Override
3217        public void accept(StatisticsData data) {
3218          largeReadOps += data.largeReadOps;
3219        }
3220
3221        public Integer aggregate() {
3222          return largeReadOps;
3223        }
3224      });
3225    }
3226
3227    /**
3228     * Get the number of file system write operations such as create, append 
3229     * rename etc.
3230     * @return number of write operations
3231     */
3232    public int getWriteOps() {
3233      return visitAll(new StatisticsAggregator<Integer>() {
3234        private int writeOps = 0;
3235
3236        @Override
3237        public void accept(StatisticsData data) {
3238          writeOps += data.writeOps;
3239        }
3240
3241        public Integer aggregate() {
3242          return writeOps;
3243        }
3244      });
3245    }
3246
3247
3248    @Override
3249    public String toString() {
3250      return visitAll(new StatisticsAggregator<String>() {
3251        private StatisticsData total = new StatisticsData();
3252
3253        @Override
3254        public void accept(StatisticsData data) {
3255          total.add(data);
3256        }
3257
3258        public String aggregate() {
3259          return total.toString();
3260        }
3261      });
3262    }
3263
3264    /**
3265     * Resets all statistics to 0.
3266     *
3267     * In order to reset, we add up all the thread-local statistics data, and
3268     * set rootData to the negative of that.
3269     *
3270     * This may seem like a counterintuitive way to reset the statsitics.  Why
3271     * can't we just zero out all the thread-local data?  Well, thread-local
3272     * data can only be modified by the thread that owns it.  If we tried to
3273     * modify the thread-local data from this thread, our modification might get
3274     * interleaved with a read-modify-write operation done by the thread that
3275     * owns the data.  That would result in our update getting lost.
3276     *
3277     * The approach used here avoids this problem because it only ever reads
3278     * (not writes) the thread-local data.  Both reads and writes to rootData
3279     * are done under the lock, so we're free to modify rootData from any thread
3280     * that holds the lock.
3281     */
3282    public void reset() {
3283      visitAll(new StatisticsAggregator<Void>() {
3284        private StatisticsData total = new StatisticsData();
3285
3286        @Override
3287        public void accept(StatisticsData data) {
3288          total.add(data);
3289        }
3290
3291        public Void aggregate() {
3292          total.negate();
3293          rootData.add(total);
3294          return null;
3295        }
3296      });
3297    }
3298    
3299    /**
3300     * Get the uri scheme associated with this statistics object.
3301     * @return the schema associated with this set of statistics
3302     */
3303    public String getScheme() {
3304      return scheme;
3305    }
3306
3307    @VisibleForTesting
3308    synchronized int getAllThreadLocalDataSize() {
3309      return allData.size();
3310    }
3311  }
3312  
3313  /**
3314   * Get the Map of Statistics object indexed by URI Scheme.
3315   * @return a Map having a key as URI scheme and value as Statistics object
3316   * @deprecated use {@link #getAllStatistics} instead
3317   */
3318  @Deprecated
3319  public static synchronized Map<String, Statistics> getStatistics() {
3320    Map<String, Statistics> result = new HashMap<String, Statistics>();
3321    for(Statistics stat: statisticsTable.values()) {
3322      result.put(stat.getScheme(), stat);
3323    }
3324    return result;
3325  }
3326
3327  /**
3328   * Return the FileSystem classes that have Statistics
3329   */
3330  public static synchronized List<Statistics> getAllStatistics() {
3331    return new ArrayList<Statistics>(statisticsTable.values());
3332  }
3333  
3334  /**
3335   * Get the statistics for a particular file system
3336   * @param cls the class to lookup
3337   * @return a statistics object
3338   */
3339  public static synchronized 
3340  Statistics getStatistics(String scheme, Class<? extends FileSystem> cls) {
3341    Statistics result = statisticsTable.get(cls);
3342    if (result == null) {
3343      result = new Statistics(scheme);
3344      statisticsTable.put(cls, result);
3345    }
3346    return result;
3347  }
3348  
3349  /**
3350   * Reset all statistics for all file systems
3351   */
3352  public static synchronized void clearStatistics() {
3353    for(Statistics stat: statisticsTable.values()) {
3354      stat.reset();
3355    }
3356  }
3357
3358  /**
3359   * Print all statistics for all file systems
3360   */
3361  public static synchronized
3362  void printStatistics() throws IOException {
3363    for (Map.Entry<Class<? extends FileSystem>, Statistics> pair: 
3364            statisticsTable.entrySet()) {
3365      System.out.println("  FileSystem " + pair.getKey().getName() + 
3366                         ": " + pair.getValue());
3367    }
3368  }
3369
3370  // Symlinks are temporarily disabled - see HADOOP-10020 and HADOOP-10052
3371  private static boolean symlinksEnabled = false;
3372
3373  private static Configuration conf = null;
3374
3375  @VisibleForTesting
3376  public static boolean areSymlinksEnabled() {
3377    return symlinksEnabled;
3378  }
3379
3380  @VisibleForTesting
3381  public static void enableSymlinks() {
3382    symlinksEnabled = true;
3383  }
3384}