001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.hadoop.fs;
019    
020    import java.io.Closeable;
021    import java.io.FileNotFoundException;
022    import java.io.IOException;
023    import java.lang.ref.WeakReference;
024    import java.net.URI;
025    import java.net.URISyntaxException;
026    import java.security.PrivilegedExceptionAction;
027    import java.util.ArrayList;
028    import java.util.Arrays;
029    import java.util.EnumSet;
030    import java.util.HashMap;
031    import java.util.HashSet;
032    import java.util.IdentityHashMap;
033    import java.util.Iterator;
034    import java.util.LinkedList;
035    import java.util.List;
036    import java.util.Map;
037    import java.util.NoSuchElementException;
038    import java.util.ServiceLoader;
039    import java.util.Set;
040    import java.util.Stack;
041    import java.util.TreeSet;
042    import java.util.concurrent.atomic.AtomicLong;
043    
044    import org.apache.commons.logging.Log;
045    import org.apache.commons.logging.LogFactory;
046    import org.apache.hadoop.classification.InterfaceAudience;
047    import org.apache.hadoop.classification.InterfaceStability;
048    import org.apache.hadoop.conf.Configuration;
049    import org.apache.hadoop.conf.Configured;
050    import org.apache.hadoop.fs.Options.ChecksumOpt;
051    import org.apache.hadoop.fs.Options.Rename;
052    import org.apache.hadoop.fs.permission.AclEntry;
053    import org.apache.hadoop.fs.permission.AclStatus;
054    import org.apache.hadoop.fs.permission.FsAction;
055    import org.apache.hadoop.fs.permission.FsPermission;
056    import org.apache.hadoop.io.MultipleIOException;
057    import org.apache.hadoop.io.Text;
058    import org.apache.hadoop.net.NetUtils;
059    import org.apache.hadoop.security.AccessControlException;
060    import org.apache.hadoop.security.Credentials;
061    import org.apache.hadoop.security.SecurityUtil;
062    import org.apache.hadoop.security.UserGroupInformation;
063    import org.apache.hadoop.security.token.Token;
064    import org.apache.hadoop.util.DataChecksum;
065    import org.apache.hadoop.util.Progressable;
066    import org.apache.hadoop.util.ReflectionUtils;
067    import org.apache.hadoop.util.ShutdownHookManager;
068    
069    import com.google.common.annotations.VisibleForTesting;
070    
071    /****************************************************************
072     * An abstract base class for a fairly generic filesystem.  It
073     * may be implemented as a distributed filesystem, or as a "local"
074     * one that reflects the locally-connected disk.  The local version
075     * exists for small Hadoop instances and for testing.
076     *
077     * <p>
078     *
079     * All user code that may potentially use the Hadoop Distributed
080     * File System should be written to use a FileSystem object.  The
081     * Hadoop DFS is a multi-machine system that appears as a single
082     * disk.  It's useful because of its fault tolerance and potentially
083     * very large capacity.
084     * 
085     * <p>
086     * The local implementation is {@link LocalFileSystem} and distributed
087     * implementation is DistributedFileSystem.
088     *****************************************************************/
089    @InterfaceAudience.Public
090    @InterfaceStability.Stable
091    public abstract class FileSystem extends Configured implements Closeable {
092      public static final String FS_DEFAULT_NAME_KEY = 
093                       CommonConfigurationKeys.FS_DEFAULT_NAME_KEY;
094      public static final String DEFAULT_FS = 
095                       CommonConfigurationKeys.FS_DEFAULT_NAME_DEFAULT;
096    
097      public static final Log LOG = LogFactory.getLog(FileSystem.class);
098    
099      /**
100       * Priority of the FileSystem shutdown hook.
101       */
102      public static final int SHUTDOWN_HOOK_PRIORITY = 10;
103    
104      /** FileSystem cache */
105      static final Cache CACHE = new Cache();
106    
107      /** The key this instance is stored under in the cache. */
108      private Cache.Key key;
109    
110      /** Recording statistics per a FileSystem class */
111      private static final Map<Class<? extends FileSystem>, Statistics> 
112        statisticsTable =
113          new IdentityHashMap<Class<? extends FileSystem>, Statistics>();
114      
115      /**
116       * The statistics for this file system.
117       */
118      protected Statistics statistics;
119    
120      /**
121       * A cache of files that should be deleted when filsystem is closed
122       * or the JVM is exited.
123       */
124      private Set<Path> deleteOnExit = new TreeSet<Path>();
125      
126      boolean resolveSymlinks;
127      /**
128       * This method adds a file system for testing so that we can find it later. It
129       * is only for testing.
130       * @param uri the uri to store it under
131       * @param conf the configuration to store it under
132       * @param fs the file system to store
133       * @throws IOException
134       */
135      static void addFileSystemForTesting(URI uri, Configuration conf,
136          FileSystem fs) throws IOException {
137        CACHE.map.put(new Cache.Key(uri, conf), fs);
138      }
139    
140      /**
141       * Get a filesystem instance based on the uri, the passed
142       * configuration and the user
143       * @param uri of the filesystem
144       * @param conf the configuration to use
145       * @param user to perform the get as
146       * @return the filesystem instance
147       * @throws IOException
148       * @throws InterruptedException
149       */
150      public static FileSystem get(final URI uri, final Configuration conf,
151            final String user) throws IOException, InterruptedException {
152        String ticketCachePath =
153          conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
154        UserGroupInformation ugi =
155            UserGroupInformation.getBestUGI(ticketCachePath, user);
156        return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
157          @Override
158          public FileSystem run() throws IOException {
159            return get(uri, conf);
160          }
161        });
162      }
163    
164      /**
165       * Returns the configured filesystem implementation.
166       * @param conf the configuration to use
167       */
168      public static FileSystem get(Configuration conf) throws IOException {
169        return get(getDefaultUri(conf), conf);
170      }
171      
172      /** Get the default filesystem URI from a configuration.
173       * @param conf the configuration to use
174       * @return the uri of the default filesystem
175       */
176      public static URI getDefaultUri(Configuration conf) {
177        return URI.create(fixName(conf.get(FS_DEFAULT_NAME_KEY, DEFAULT_FS)));
178      }
179    
180      /** Set the default filesystem URI in a configuration.
181       * @param conf the configuration to alter
182       * @param uri the new default filesystem uri
183       */
184      public static void setDefaultUri(Configuration conf, URI uri) {
185        conf.set(FS_DEFAULT_NAME_KEY, uri.toString());
186      }
187    
188      /** Set the default filesystem URI in a configuration.
189       * @param conf the configuration to alter
190       * @param uri the new default filesystem uri
191       */
192      public static void setDefaultUri(Configuration conf, String uri) {
193        setDefaultUri(conf, URI.create(fixName(uri)));
194      }
195    
196      /** Called after a new FileSystem instance is constructed.
197       * @param name a uri whose authority section names the host, port, etc.
198       *   for this FileSystem
199       * @param conf the configuration
200       */
201      public void initialize(URI name, Configuration conf) throws IOException {
202        statistics = getStatistics(name.getScheme(), getClass());    
203        resolveSymlinks = conf.getBoolean(
204            CommonConfigurationKeys.FS_CLIENT_RESOLVE_REMOTE_SYMLINKS_KEY,
205            CommonConfigurationKeys.FS_CLIENT_RESOLVE_REMOTE_SYMLINKS_DEFAULT);
206      }
207    
208      /**
209       * Return the protocol scheme for the FileSystem.
210       * <p/>
211       * This implementation throws an <code>UnsupportedOperationException</code>.
212       *
213       * @return the protocol scheme for the FileSystem.
214       */
215      public String getScheme() {
216        throw new UnsupportedOperationException("Not implemented by the " + getClass().getSimpleName() + " FileSystem implementation");
217      }
218    
219      /** Returns a URI whose scheme and authority identify this FileSystem.*/
220      public abstract URI getUri();
221      
222      /**
223       * Return a canonicalized form of this FileSystem's URI.
224       * 
225       * The default implementation simply calls {@link #canonicalizeUri(URI)}
226       * on the filesystem's own URI, so subclasses typically only need to
227       * implement that method.
228       *
229       * @see #canonicalizeUri(URI)
230       */
231      protected URI getCanonicalUri() {
232        return canonicalizeUri(getUri());
233      }
234      
235      /**
236       * Canonicalize the given URI.
237       * 
238       * This is filesystem-dependent, but may for example consist of
239       * canonicalizing the hostname using DNS and adding the default
240       * port if not specified.
241       * 
242       * The default implementation simply fills in the default port if
243       * not specified and if the filesystem has a default port.
244       *
245       * @return URI
246       * @see NetUtils#getCanonicalUri(URI, int)
247       */
248      protected URI canonicalizeUri(URI uri) {
249        if (uri.getPort() == -1 && getDefaultPort() > 0) {
250          // reconstruct the uri with the default port set
251          try {
252            uri = new URI(uri.getScheme(), uri.getUserInfo(),
253                uri.getHost(), getDefaultPort(),
254                uri.getPath(), uri.getQuery(), uri.getFragment());
255          } catch (URISyntaxException e) {
256            // Should never happen!
257            throw new AssertionError("Valid URI became unparseable: " +
258                uri);
259          }
260        }
261        
262        return uri;
263      }
264      
265      /**
266       * Get the default port for this file system.
267       * @return the default port or 0 if there isn't one
268       */
269      protected int getDefaultPort() {
270        return 0;
271      }
272    
273      protected static FileSystem getFSofPath(final Path absOrFqPath,
274          final Configuration conf)
275          throws UnsupportedFileSystemException, IOException {
276        absOrFqPath.checkNotSchemeWithRelative();
277        absOrFqPath.checkNotRelative();
278    
279        // Uses the default file system if not fully qualified
280        return get(absOrFqPath.toUri(), conf);
281      }
282    
283      /**
284       * Get a canonical service name for this file system.  The token cache is
285       * the only user of the canonical service name, and uses it to lookup this
286       * filesystem's service tokens.
287       * If file system provides a token of its own then it must have a canonical
288       * name, otherwise canonical name can be null.
289       * 
290       * Default Impl: If the file system has child file systems 
291       * (such as an embedded file system) then it is assumed that the fs has no
292       * tokens of its own and hence returns a null name; otherwise a service
293       * name is built using Uri and port.
294       * 
295       * @return a service string that uniquely identifies this file system, null
296       *         if the filesystem does not implement tokens
297       * @see SecurityUtil#buildDTServiceName(URI, int) 
298       */
299      @InterfaceAudience.LimitedPrivate({ "HDFS", "MapReduce" })
300      public String getCanonicalServiceName() {
301        return (getChildFileSystems() == null)
302          ? SecurityUtil.buildDTServiceName(getUri(), getDefaultPort())
303          : null;
304      }
305    
306      /** @deprecated call #getUri() instead.*/
307      @Deprecated
308      public String getName() { return getUri().toString(); }
309    
310      /** @deprecated call #get(URI,Configuration) instead. */
311      @Deprecated
312      public static FileSystem getNamed(String name, Configuration conf)
313        throws IOException {
314        return get(URI.create(fixName(name)), conf);
315      }
316      
317      /** Update old-format filesystem names, for back-compatibility.  This should
318       * eventually be replaced with a checkName() method that throws an exception
319       * for old-format names. */ 
320      private static String fixName(String name) {
321        // convert old-format name to new-format name
322        if (name.equals("local")) {         // "local" is now "file:///".
323          LOG.warn("\"local\" is a deprecated filesystem name."
324                   +" Use \"file:///\" instead.");
325          name = "file:///";
326        } else if (name.indexOf('/')==-1) {   // unqualified is "hdfs://"
327          LOG.warn("\""+name+"\" is a deprecated filesystem name."
328                   +" Use \"hdfs://"+name+"/\" instead.");
329          name = "hdfs://"+name;
330        }
331        return name;
332      }
333    
334      /**
335       * Get the local file system.
336       * @param conf the configuration to configure the file system with
337       * @return a LocalFileSystem
338       */
339      public static LocalFileSystem getLocal(Configuration conf)
340        throws IOException {
341        return (LocalFileSystem)get(LocalFileSystem.NAME, conf);
342      }
343    
344      /** Returns the FileSystem for this URI's scheme and authority.  The scheme
345       * of the URI determines a configuration property name,
346       * <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
347       * The entire URI is passed to the FileSystem instance's initialize method.
348       */
349      public static FileSystem get(URI uri, Configuration conf) throws IOException {
350        String scheme = uri.getScheme();
351        String authority = uri.getAuthority();
352    
353        if (scheme == null && authority == null) {     // use default FS
354          return get(conf);
355        }
356    
357        if (scheme != null && authority == null) {     // no authority
358          URI defaultUri = getDefaultUri(conf);
359          if (scheme.equals(defaultUri.getScheme())    // if scheme matches default
360              && defaultUri.getAuthority() != null) {  // & default has authority
361            return get(defaultUri, conf);              // return default
362          }
363        }
364        
365        String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
366        if (conf.getBoolean(disableCacheName, false)) {
367          return createFileSystem(uri, conf);
368        }
369    
370        return CACHE.get(uri, conf);
371      }
372    
373      /**
374       * Returns the FileSystem for this URI's scheme and authority and the 
375       * passed user. Internally invokes {@link #newInstance(URI, Configuration)}
376       * @param uri of the filesystem
377       * @param conf the configuration to use
378       * @param user to perform the get as
379       * @return filesystem instance
380       * @throws IOException
381       * @throws InterruptedException
382       */
383      public static FileSystem newInstance(final URI uri, final Configuration conf,
384          final String user) throws IOException, InterruptedException {
385        String ticketCachePath =
386          conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
387        UserGroupInformation ugi =
388            UserGroupInformation.getBestUGI(ticketCachePath, user);
389        return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
390          @Override
391          public FileSystem run() throws IOException {
392            return newInstance(uri,conf); 
393          }
394        });
395      }
396      /** Returns the FileSystem for this URI's scheme and authority.  The scheme
397       * of the URI determines a configuration property name,
398       * <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
399       * The entire URI is passed to the FileSystem instance's initialize method.
400       * This always returns a new FileSystem object.
401       */
402      public static FileSystem newInstance(URI uri, Configuration conf) throws IOException {
403        String scheme = uri.getScheme();
404        String authority = uri.getAuthority();
405    
406        if (scheme == null) {                       // no scheme: use default FS
407          return newInstance(conf);
408        }
409    
410        if (authority == null) {                       // no authority
411          URI defaultUri = getDefaultUri(conf);
412          if (scheme.equals(defaultUri.getScheme())    // if scheme matches default
413              && defaultUri.getAuthority() != null) {  // & default has authority
414            return newInstance(defaultUri, conf);              // return default
415          }
416        }
417        return CACHE.getUnique(uri, conf);
418      }
419    
420      /** Returns a unique configured filesystem implementation.
421       * This always returns a new FileSystem object.
422       * @param conf the configuration to use
423       */
424      public static FileSystem newInstance(Configuration conf) throws IOException {
425        return newInstance(getDefaultUri(conf), conf);
426      }
427    
428      /**
429       * Get a unique local file system object
430       * @param conf the configuration to configure the file system with
431       * @return a LocalFileSystem
432       * This always returns a new FileSystem object.
433       */
434      public static LocalFileSystem newInstanceLocal(Configuration conf)
435        throws IOException {
436        return (LocalFileSystem)newInstance(LocalFileSystem.NAME, conf);
437      }
438    
439      /**
440       * Close all cached filesystems. Be sure those filesystems are not
441       * used anymore.
442       * 
443       * @throws IOException
444       */
445      public static void closeAll() throws IOException {
446        CACHE.closeAll();
447      }
448    
449      /**
450       * Close all cached filesystems for a given UGI. Be sure those filesystems 
451       * are not used anymore.
452       * @param ugi user group info to close
453       * @throws IOException
454       */
455      public static void closeAllForUGI(UserGroupInformation ugi) 
456      throws IOException {
457        CACHE.closeAll(ugi);
458      }
459    
460      /** 
461       * Make sure that a path specifies a FileSystem.
462       * @param path to use
463       */
464      public Path makeQualified(Path path) {
465        checkPath(path);
466        return path.makeQualified(this.getUri(), this.getWorkingDirectory());
467      }
468        
469      /**
470       * Get a new delegation token for this file system.
471       * This is an internal method that should have been declared protected
472       * but wasn't historically.
473       * Callers should use {@link #addDelegationTokens(String, Credentials)}
474       * 
475       * @param renewer the account name that is allowed to renew the token.
476       * @return a new delegation token
477       * @throws IOException
478       */
479      @InterfaceAudience.Private()
480      public Token<?> getDelegationToken(String renewer) throws IOException {
481        return null;
482      }
483      
484      /**
485       * Obtain all delegation tokens used by this FileSystem that are not
486       * already present in the given Credentials.  Existing tokens will neither
487       * be verified as valid nor having the given renewer.  Missing tokens will
488       * be acquired and added to the given Credentials.
489       * 
490       * Default Impl: works for simple fs with its own token
491       * and also for an embedded fs whose tokens are those of its
492       * children file system (i.e. the embedded fs has not tokens of its
493       * own).
494       * 
495       * @param renewer the user allowed to renew the delegation tokens
496       * @param credentials cache in which to add new delegation tokens
497       * @return list of new delegation tokens
498       * @throws IOException
499       */
500      @InterfaceAudience.LimitedPrivate({ "HDFS", "MapReduce" })
501      public Token<?>[] addDelegationTokens(
502          final String renewer, Credentials credentials) throws IOException {
503        if (credentials == null) {
504          credentials = new Credentials();
505        }
506        final List<Token<?>> tokens = new ArrayList<Token<?>>();
507        collectDelegationTokens(renewer, credentials, tokens);
508        return tokens.toArray(new Token<?>[tokens.size()]);
509      }
510      
511      /**
512       * Recursively obtain the tokens for this FileSystem and all descended
513       * FileSystems as determined by getChildFileSystems().
514       * @param renewer the user allowed to renew the delegation tokens
515       * @param credentials cache in which to add the new delegation tokens
516       * @param tokens list in which to add acquired tokens
517       * @throws IOException
518       */
519      private void collectDelegationTokens(final String renewer,
520                                           final Credentials credentials,
521                                           final List<Token<?>> tokens)
522                                               throws IOException {
523        final String serviceName = getCanonicalServiceName();
524        // Collect token of the this filesystem and then of its embedded children
525        if (serviceName != null) { // fs has token, grab it
526          final Text service = new Text(serviceName);
527          Token<?> token = credentials.getToken(service);
528          if (token == null) {
529            token = getDelegationToken(renewer);
530            if (token != null) {
531              tokens.add(token);
532              credentials.addToken(service, token);
533            }
534          }
535        }
536        // Now collect the tokens from the children
537        final FileSystem[] children = getChildFileSystems();
538        if (children != null) {
539          for (final FileSystem fs : children) {
540            fs.collectDelegationTokens(renewer, credentials, tokens);
541          }
542        }
543      }
544    
545      /**
546       * Get all the immediate child FileSystems embedded in this FileSystem.
547       * It does not recurse and get grand children.  If a FileSystem
548       * has multiple child FileSystems, then it should return a unique list
549       * of those FileSystems.  Default is to return null to signify no children.
550       * 
551       * @return FileSystems used by this FileSystem
552       */
553      @InterfaceAudience.LimitedPrivate({ "HDFS" })
554      @VisibleForTesting
555      public FileSystem[] getChildFileSystems() {
556        return null;
557      }
558      
559      /** create a file with the provided permission
560       * The permission of the file is set to be the provided permission as in
561       * setPermission, not permission&~umask
562       * 
563       * It is implemented using two RPCs. It is understood that it is inefficient,
564       * but the implementation is thread-safe. The other option is to change the
565       * value of umask in configuration to be 0, but it is not thread-safe.
566       * 
567       * @param fs file system handle
568       * @param file the name of the file to be created
569       * @param permission the permission of the file
570       * @return an output stream
571       * @throws IOException
572       */
573      public static FSDataOutputStream create(FileSystem fs,
574          Path file, FsPermission permission) throws IOException {
575        // create the file with default permission
576        FSDataOutputStream out = fs.create(file);
577        // set its permission to the supplied one
578        fs.setPermission(file, permission);
579        return out;
580      }
581    
582      /** create a directory with the provided permission
583       * The permission of the directory is set to be the provided permission as in
584       * setPermission, not permission&~umask
585       * 
586       * @see #create(FileSystem, Path, FsPermission)
587       * 
588       * @param fs file system handle
589       * @param dir the name of the directory to be created
590       * @param permission the permission of the directory
591       * @return true if the directory creation succeeds; false otherwise
592       * @throws IOException
593       */
594      public static boolean mkdirs(FileSystem fs, Path dir, FsPermission permission)
595      throws IOException {
596        // create the directory using the default permission
597        boolean result = fs.mkdirs(dir);
598        // set its permission to be the supplied one
599        fs.setPermission(dir, permission);
600        return result;
601      }
602    
603      ///////////////////////////////////////////////////////////////
604      // FileSystem
605      ///////////////////////////////////////////////////////////////
606    
607      protected FileSystem() {
608        super(null);
609      }
610    
611      /** 
612       * Check that a Path belongs to this FileSystem.
613       * @param path to check
614       */
615      protected void checkPath(Path path) {
616        URI uri = path.toUri();
617        String thatScheme = uri.getScheme();
618        if (thatScheme == null)                // fs is relative
619          return;
620        URI thisUri = getCanonicalUri();
621        String thisScheme = thisUri.getScheme();
622        //authority and scheme are not case sensitive
623        if (thisScheme.equalsIgnoreCase(thatScheme)) {// schemes match
624          String thisAuthority = thisUri.getAuthority();
625          String thatAuthority = uri.getAuthority();
626          if (thatAuthority == null &&                // path's authority is null
627              thisAuthority != null) {                // fs has an authority
628            URI defaultUri = getDefaultUri(getConf());
629            if (thisScheme.equalsIgnoreCase(defaultUri.getScheme())) {
630              uri = defaultUri; // schemes match, so use this uri instead
631            } else {
632              uri = null; // can't determine auth of the path
633            }
634          }
635          if (uri != null) {
636            // canonicalize uri before comparing with this fs
637            uri = canonicalizeUri(uri);
638            thatAuthority = uri.getAuthority();
639            if (thisAuthority == thatAuthority ||       // authorities match
640                (thisAuthority != null &&
641                 thisAuthority.equalsIgnoreCase(thatAuthority)))
642              return;
643          }
644        }
645        throw new IllegalArgumentException("Wrong FS: "+path+
646                                           ", expected: "+this.getUri());
647      }
648    
649      /**
650       * Return an array containing hostnames, offset and size of 
651       * portions of the given file.  For a nonexistent 
652       * file or regions, null will be returned.
653       *
654       * This call is most helpful with DFS, where it returns 
655       * hostnames of machines that contain the given file.
656       *
657       * The FileSystem will simply return an elt containing 'localhost'.
658       *
659       * @param file FilesStatus to get data from
660       * @param start offset into the given file
661       * @param len length for which to get locations for
662       */
663      public BlockLocation[] getFileBlockLocations(FileStatus file, 
664          long start, long len) throws IOException {
665        if (file == null) {
666          return null;
667        }
668    
669        if (start < 0 || len < 0) {
670          throw new IllegalArgumentException("Invalid start or len parameter");
671        }
672    
673        if (file.getLen() <= start) {
674          return new BlockLocation[0];
675    
676        }
677        String[] name = { "localhost:50010" };
678        String[] host = { "localhost" };
679        return new BlockLocation[] {
680          new BlockLocation(name, host, 0, file.getLen()) };
681      }
682     
683    
684      /**
685       * Return an array containing hostnames, offset and size of 
686       * portions of the given file.  For a nonexistent 
687       * file or regions, null will be returned.
688       *
689       * This call is most helpful with DFS, where it returns 
690       * hostnames of machines that contain the given file.
691       *
692       * The FileSystem will simply return an elt containing 'localhost'.
693       *
694       * @param p path is used to identify an FS since an FS could have
695       *          another FS that it could be delegating the call to
696       * @param start offset into the given file
697       * @param len length for which to get locations for
698       */
699      public BlockLocation[] getFileBlockLocations(Path p, 
700          long start, long len) throws IOException {
701        if (p == null) {
702          throw new NullPointerException();
703        }
704        FileStatus file = getFileStatus(p);
705        return getFileBlockLocations(file, start, len);
706      }
707      
708      /**
709       * Return a set of server default configuration values
710       * @return server default configuration values
711       * @throws IOException
712       * @deprecated use {@link #getServerDefaults(Path)} instead
713       */
714      @Deprecated
715      public FsServerDefaults getServerDefaults() throws IOException {
716        Configuration conf = getConf();
717        // CRC32 is chosen as default as it is available in all 
718        // releases that support checksum.
719        // The client trash configuration is ignored.
720        return new FsServerDefaults(getDefaultBlockSize(), 
721            conf.getInt("io.bytes.per.checksum", 512), 
722            64 * 1024, 
723            getDefaultReplication(),
724            conf.getInt("io.file.buffer.size", 4096),
725            false,
726            CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT,
727            DataChecksum.Type.CRC32);
728      }
729    
730      /**
731       * Return a set of server default configuration values
732       * @param p path is used to identify an FS since an FS could have
733       *          another FS that it could be delegating the call to
734       * @return server default configuration values
735       * @throws IOException
736       */
737      public FsServerDefaults getServerDefaults(Path p) throws IOException {
738        return getServerDefaults();
739      }
740    
741      /**
742       * Return the fully-qualified path of path f resolving the path
743       * through any symlinks or mount point
744       * @param p path to be resolved
745       * @return fully qualified path 
746       * @throws FileNotFoundException
747       */
748       public Path resolvePath(final Path p) throws IOException {
749         checkPath(p);
750         return getFileStatus(p).getPath();
751       }
752    
753      /**
754       * Opens an FSDataInputStream at the indicated Path.
755       * @param f the file name to open
756       * @param bufferSize the size of the buffer to be used.
757       */
758      public abstract FSDataInputStream open(Path f, int bufferSize)
759        throws IOException;
760        
761      /**
762       * Opens an FSDataInputStream at the indicated Path.
763       * @param f the file to open
764       */
765      public FSDataInputStream open(Path f) throws IOException {
766        return open(f, getConf().getInt("io.file.buffer.size", 4096));
767      }
768    
769      /**
770       * Create an FSDataOutputStream at the indicated Path.
771       * Files are overwritten by default.
772       * @param f the file to create
773       */
774      public FSDataOutputStream create(Path f) throws IOException {
775        return create(f, true);
776      }
777    
778      /**
779       * Create an FSDataOutputStream at the indicated Path.
780       * @param f the file to create
781       * @param overwrite if a file with this name already exists, then if true,
782       *   the file will be overwritten, and if false an exception will be thrown.
783       */
784      public FSDataOutputStream create(Path f, boolean overwrite)
785          throws IOException {
786        return create(f, overwrite, 
787                      getConf().getInt("io.file.buffer.size", 4096),
788                      getDefaultReplication(f),
789                      getDefaultBlockSize(f));
790      }
791    
792      /**
793       * Create an FSDataOutputStream at the indicated Path with write-progress
794       * reporting.
795       * Files are overwritten by default.
796       * @param f the file to create
797       * @param progress to report progress
798       */
799      public FSDataOutputStream create(Path f, Progressable progress) 
800          throws IOException {
801        return create(f, true, 
802                      getConf().getInt("io.file.buffer.size", 4096),
803                      getDefaultReplication(f),
804                      getDefaultBlockSize(f), progress);
805      }
806    
807      /**
808       * Create an FSDataOutputStream at the indicated Path.
809       * Files are overwritten by default.
810       * @param f the file to create
811       * @param replication the replication factor
812       */
813      public FSDataOutputStream create(Path f, short replication)
814          throws IOException {
815        return create(f, true, 
816                      getConf().getInt("io.file.buffer.size", 4096),
817                      replication,
818                      getDefaultBlockSize(f));
819      }
820    
821      /**
822       * Create an FSDataOutputStream at the indicated Path with write-progress
823       * reporting.
824       * Files are overwritten by default.
825       * @param f the file to create
826       * @param replication the replication factor
827       * @param progress to report progress
828       */
829      public FSDataOutputStream create(Path f, short replication, 
830          Progressable progress) throws IOException {
831        return create(f, true, 
832                      getConf().getInt(
833                          CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
834                          CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT),
835                      replication,
836                      getDefaultBlockSize(f), progress);
837      }
838    
839        
840      /**
841       * Create an FSDataOutputStream at the indicated Path.
842       * @param f the file name to create
843       * @param overwrite if a file with this name already exists, then if true,
844       *   the file will be overwritten, and if false an error will be thrown.
845       * @param bufferSize the size of the buffer to be used.
846       */
847      public FSDataOutputStream create(Path f, 
848                                       boolean overwrite,
849                                       int bufferSize
850                                       ) throws IOException {
851        return create(f, overwrite, bufferSize, 
852                      getDefaultReplication(f),
853                      getDefaultBlockSize(f));
854      }
855        
856      /**
857       * Create an FSDataOutputStream at the indicated Path with write-progress
858       * reporting.
859       * @param f the path of the file to open
860       * @param overwrite if a file with this name already exists, then if true,
861       *   the file will be overwritten, and if false an error will be thrown.
862       * @param bufferSize the size of the buffer to be used.
863       */
864      public FSDataOutputStream create(Path f, 
865                                       boolean overwrite,
866                                       int bufferSize,
867                                       Progressable progress
868                                       ) throws IOException {
869        return create(f, overwrite, bufferSize, 
870                      getDefaultReplication(f),
871                      getDefaultBlockSize(f), progress);
872      }
873        
874        
875      /**
876       * Create an FSDataOutputStream at the indicated Path.
877       * @param f the file name to open
878       * @param overwrite if a file with this name already exists, then if true,
879       *   the file will be overwritten, and if false an error will be thrown.
880       * @param bufferSize the size of the buffer to be used.
881       * @param replication required block replication for the file. 
882       */
883      public FSDataOutputStream create(Path f, 
884                                       boolean overwrite,
885                                       int bufferSize,
886                                       short replication,
887                                       long blockSize
888                                       ) throws IOException {
889        return create(f, overwrite, bufferSize, replication, blockSize, null);
890      }
891    
892      /**
893       * Create an FSDataOutputStream at the indicated Path with write-progress
894       * reporting.
895       * @param f the file name to open
896       * @param overwrite if a file with this name already exists, then if true,
897       *   the file will be overwritten, and if false an error will be thrown.
898       * @param bufferSize the size of the buffer to be used.
899       * @param replication required block replication for the file. 
900       */
901      public FSDataOutputStream create(Path f,
902                                                boolean overwrite,
903                                                int bufferSize,
904                                                short replication,
905                                                long blockSize,
906                                                Progressable progress
907                                                ) throws IOException {
908        return this.create(f, FsPermission.getFileDefault().applyUMask(
909            FsPermission.getUMask(getConf())), overwrite, bufferSize,
910            replication, blockSize, progress);
911      }
912    
913      /**
914       * Create an FSDataOutputStream at the indicated Path with write-progress
915       * reporting.
916       * @param f the file name to open
917       * @param permission
918       * @param overwrite if a file with this name already exists, then if true,
919       *   the file will be overwritten, and if false an error will be thrown.
920       * @param bufferSize the size of the buffer to be used.
921       * @param replication required block replication for the file.
922       * @param blockSize
923       * @param progress
924       * @throws IOException
925       * @see #setPermission(Path, FsPermission)
926       */
927      public abstract FSDataOutputStream create(Path f,
928          FsPermission permission,
929          boolean overwrite,
930          int bufferSize,
931          short replication,
932          long blockSize,
933          Progressable progress) throws IOException;
934      
935      /**
936       * Create an FSDataOutputStream at the indicated Path with write-progress
937       * reporting.
938       * @param f the file name to open
939       * @param permission
940       * @param flags {@link CreateFlag}s to use for this stream.
941       * @param bufferSize the size of the buffer to be used.
942       * @param replication required block replication for the file.
943       * @param blockSize
944       * @param progress
945       * @throws IOException
946       * @see #setPermission(Path, FsPermission)
947       */
948      public FSDataOutputStream create(Path f,
949          FsPermission permission,
950          EnumSet<CreateFlag> flags,
951          int bufferSize,
952          short replication,
953          long blockSize,
954          Progressable progress) throws IOException {
955        return create(f, permission, flags, bufferSize, replication,
956            blockSize, progress, null);
957      }
958      
959      /**
960       * Create an FSDataOutputStream at the indicated Path with a custom
961       * checksum option
962       * @param f the file name to open
963       * @param permission
964       * @param flags {@link CreateFlag}s to use for this stream.
965       * @param bufferSize the size of the buffer to be used.
966       * @param replication required block replication for the file.
967       * @param blockSize
968       * @param progress
969       * @param checksumOpt checksum parameter. If null, the values
970       *        found in conf will be used.
971       * @throws IOException
972       * @see #setPermission(Path, FsPermission)
973       */
974      public FSDataOutputStream create(Path f,
975          FsPermission permission,
976          EnumSet<CreateFlag> flags,
977          int bufferSize,
978          short replication,
979          long blockSize,
980          Progressable progress,
981          ChecksumOpt checksumOpt) throws IOException {
982        // Checksum options are ignored by default. The file systems that
983        // implement checksum need to override this method. The full
984        // support is currently only available in DFS.
985        return create(f, permission, flags.contains(CreateFlag.OVERWRITE), 
986            bufferSize, replication, blockSize, progress);
987      }
988    
989      /*.
990       * This create has been added to support the FileContext that processes
991       * the permission
992       * with umask before calling this method.
993       * This a temporary method added to support the transition from FileSystem
994       * to FileContext for user applications.
995       */
996      @Deprecated
997      protected FSDataOutputStream primitiveCreate(Path f,
998         FsPermission absolutePermission, EnumSet<CreateFlag> flag, int bufferSize,
999         short replication, long blockSize, Progressable progress,
1000         ChecksumOpt checksumOpt) throws IOException {
1001    
1002        boolean pathExists = exists(f);
1003        CreateFlag.validate(f, pathExists, flag);
1004        
1005        // Default impl  assumes that permissions do not matter and 
1006        // nor does the bytesPerChecksum  hence
1007        // calling the regular create is good enough.
1008        // FSs that implement permissions should override this.
1009    
1010        if (pathExists && flag.contains(CreateFlag.APPEND)) {
1011          return append(f, bufferSize, progress);
1012        }
1013        
1014        return this.create(f, absolutePermission,
1015            flag.contains(CreateFlag.OVERWRITE), bufferSize, replication,
1016            blockSize, progress);
1017      }
1018      
1019      /**
1020       * This version of the mkdirs method assumes that the permission is absolute.
1021       * It has been added to support the FileContext that processes the permission
1022       * with umask before calling this method.
1023       * This a temporary method added to support the transition from FileSystem
1024       * to FileContext for user applications.
1025       */
1026      @Deprecated
1027      protected boolean primitiveMkdir(Path f, FsPermission absolutePermission)
1028        throws IOException {
1029        // Default impl is to assume that permissions do not matter and hence
1030        // calling the regular mkdirs is good enough.
1031        // FSs that implement permissions should override this.
1032       return this.mkdirs(f, absolutePermission);
1033      }
1034    
1035    
1036      /**
1037       * This version of the mkdirs method assumes that the permission is absolute.
1038       * It has been added to support the FileContext that processes the permission
1039       * with umask before calling this method.
1040       * This a temporary method added to support the transition from FileSystem
1041       * to FileContext for user applications.
1042       */
1043      @Deprecated
1044      protected void primitiveMkdir(Path f, FsPermission absolutePermission, 
1045                        boolean createParent)
1046        throws IOException {
1047        
1048        if (!createParent) { // parent must exist.
1049          // since the this.mkdirs makes parent dirs automatically
1050          // we must throw exception if parent does not exist.
1051          final FileStatus stat = getFileStatus(f.getParent());
1052          if (stat == null) {
1053            throw new FileNotFoundException("Missing parent:" + f);
1054          }
1055          if (!stat.isDirectory()) {
1056            throw new ParentNotDirectoryException("parent is not a dir");
1057          }
1058          // parent does exist - go ahead with mkdir of leaf
1059        }
1060        // Default impl is to assume that permissions do not matter and hence
1061        // calling the regular mkdirs is good enough.
1062        // FSs that implement permissions should override this.
1063        if (!this.mkdirs(f, absolutePermission)) {
1064          throw new IOException("mkdir of "+ f + " failed");
1065        }
1066      }
1067    
1068      /**
1069       * Opens an FSDataOutputStream at the indicated Path with write-progress
1070       * reporting. Same as create(), except fails if parent directory doesn't
1071       * already exist.
1072       * @param f the file name to open
1073       * @param overwrite if a file with this name already exists, then if true,
1074       * the file will be overwritten, and if false an error will be thrown.
1075       * @param bufferSize the size of the buffer to be used.
1076       * @param replication required block replication for the file.
1077       * @param blockSize
1078       * @param progress
1079       * @throws IOException
1080       * @see #setPermission(Path, FsPermission)
1081       * @deprecated API only for 0.20-append
1082       */
1083      @Deprecated
1084      public FSDataOutputStream createNonRecursive(Path f,
1085          boolean overwrite,
1086          int bufferSize, short replication, long blockSize,
1087          Progressable progress) throws IOException {
1088        return this.createNonRecursive(f, FsPermission.getFileDefault(),
1089            overwrite, bufferSize, replication, blockSize, progress);
1090      }
1091    
1092      /**
1093       * Opens an FSDataOutputStream at the indicated Path with write-progress
1094       * reporting. Same as create(), except fails if parent directory doesn't
1095       * already exist.
1096       * @param f the file name to open
1097       * @param permission
1098       * @param overwrite if a file with this name already exists, then if true,
1099       * the file will be overwritten, and if false an error will be thrown.
1100       * @param bufferSize the size of the buffer to be used.
1101       * @param replication required block replication for the file.
1102       * @param blockSize
1103       * @param progress
1104       * @throws IOException
1105       * @see #setPermission(Path, FsPermission)
1106       * @deprecated API only for 0.20-append
1107       */
1108       @Deprecated
1109       public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
1110           boolean overwrite, int bufferSize, short replication, long blockSize,
1111           Progressable progress) throws IOException {
1112         return createNonRecursive(f, permission,
1113             overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
1114                 : EnumSet.of(CreateFlag.CREATE), bufferSize,
1115                 replication, blockSize, progress);
1116       }
1117    
1118       /**
1119        * Opens an FSDataOutputStream at the indicated Path with write-progress
1120        * reporting. Same as create(), except fails if parent directory doesn't
1121        * already exist.
1122        * @param f the file name to open
1123        * @param permission
1124        * @param flags {@link CreateFlag}s to use for this stream.
1125        * @param bufferSize the size of the buffer to be used.
1126        * @param replication required block replication for the file.
1127        * @param blockSize
1128        * @param progress
1129        * @throws IOException
1130        * @see #setPermission(Path, FsPermission)
1131        * @deprecated API only for 0.20-append
1132        */
1133        @Deprecated
1134        public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
1135            EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize,
1136            Progressable progress) throws IOException {
1137          throw new IOException("createNonRecursive unsupported for this filesystem "
1138              + this.getClass());
1139        }
1140    
1141      /**
1142       * Creates the given Path as a brand-new zero-length file.  If
1143       * create fails, or if it already existed, return false.
1144       *
1145       * @param f path to use for create
1146       */
1147      public boolean createNewFile(Path f) throws IOException {
1148        if (exists(f)) {
1149          return false;
1150        } else {
1151          create(f, false, getConf().getInt("io.file.buffer.size", 4096)).close();
1152          return true;
1153        }
1154      }
1155    
1156      /**
1157       * Append to an existing file (optional operation).
1158       * Same as append(f, getConf().getInt("io.file.buffer.size", 4096), null)
1159       * @param f the existing file to be appended.
1160       * @throws IOException
1161       */
1162      public FSDataOutputStream append(Path f) throws IOException {
1163        return append(f, getConf().getInt("io.file.buffer.size", 4096), null);
1164      }
1165      /**
1166       * Append to an existing file (optional operation).
1167       * Same as append(f, bufferSize, null).
1168       * @param f the existing file to be appended.
1169       * @param bufferSize the size of the buffer to be used.
1170       * @throws IOException
1171       */
1172      public FSDataOutputStream append(Path f, int bufferSize) throws IOException {
1173        return append(f, bufferSize, null);
1174      }
1175    
1176      /**
1177       * Append to an existing file (optional operation).
1178       * @param f the existing file to be appended.
1179       * @param bufferSize the size of the buffer to be used.
1180       * @param progress for reporting progress if it is not null.
1181       * @throws IOException
1182       */
1183      public abstract FSDataOutputStream append(Path f, int bufferSize,
1184          Progressable progress) throws IOException;
1185    
1186      /**
1187       * Concat existing files together.
1188       * @param trg the path to the target destination.
1189       * @param psrcs the paths to the sources to use for the concatenation.
1190       * @throws IOException
1191       */
1192      public void concat(final Path trg, final Path [] psrcs) throws IOException {
1193        throw new UnsupportedOperationException("Not implemented by the " + 
1194            getClass().getSimpleName() + " FileSystem implementation");
1195      }
1196    
1197     /**
1198       * Get replication.
1199       * 
1200       * @deprecated Use getFileStatus() instead
1201       * @param src file name
1202       * @return file replication
1203       * @throws IOException
1204       */ 
1205      @Deprecated
1206      public short getReplication(Path src) throws IOException {
1207        return getFileStatus(src).getReplication();
1208      }
1209    
1210      /**
1211       * Set replication for an existing file.
1212       * 
1213       * @param src file name
1214       * @param replication new replication
1215       * @throws IOException
1216       * @return true if successful;
1217       *         false if file does not exist or is a directory
1218       */
1219      public boolean setReplication(Path src, short replication)
1220        throws IOException {
1221        return true;
1222      }
1223    
1224      /**
1225       * Renames Path src to Path dst.  Can take place on local fs
1226       * or remote DFS.
1227       * @param src path to be renamed
1228       * @param dst new path after rename
1229       * @throws IOException on failure
1230       * @return true if rename is successful
1231       */
1232      public abstract boolean rename(Path src, Path dst) throws IOException;
1233    
1234      /**
1235       * Renames Path src to Path dst
1236       * <ul>
1237       * <li
1238       * <li>Fails if src is a file and dst is a directory.
1239       * <li>Fails if src is a directory and dst is a file.
1240       * <li>Fails if the parent of dst does not exist or is a file.
1241       * </ul>
1242       * <p>
1243       * If OVERWRITE option is not passed as an argument, rename fails
1244       * if the dst already exists.
1245       * <p>
1246       * If OVERWRITE option is passed as an argument, rename overwrites
1247       * the dst if it is a file or an empty directory. Rename fails if dst is
1248       * a non-empty directory.
1249       * <p>
1250       * Note that atomicity of rename is dependent on the file system
1251       * implementation. Please refer to the file system documentation for
1252       * details. This default implementation is non atomic.
1253       * <p>
1254       * This method is deprecated since it is a temporary method added to 
1255       * support the transition from FileSystem to FileContext for user 
1256       * applications.
1257       * 
1258       * @param src path to be renamed
1259       * @param dst new path after rename
1260       * @throws IOException on failure
1261       */
1262      @Deprecated
1263      protected void rename(final Path src, final Path dst,
1264          final Rename... options) throws IOException {
1265        // Default implementation
1266        final FileStatus srcStatus = getFileLinkStatus(src);
1267        if (srcStatus == null) {
1268          throw new FileNotFoundException("rename source " + src + " not found.");
1269        }
1270    
1271        boolean overwrite = false;
1272        if (null != options) {
1273          for (Rename option : options) {
1274            if (option == Rename.OVERWRITE) {
1275              overwrite = true;
1276            }
1277          }
1278        }
1279    
1280        FileStatus dstStatus;
1281        try {
1282          dstStatus = getFileLinkStatus(dst);
1283        } catch (IOException e) {
1284          dstStatus = null;
1285        }
1286        if (dstStatus != null) {
1287          if (srcStatus.isDirectory() != dstStatus.isDirectory()) {
1288            throw new IOException("Source " + src + " Destination " + dst
1289                + " both should be either file or directory");
1290          }
1291          if (!overwrite) {
1292            throw new FileAlreadyExistsException("rename destination " + dst
1293                + " already exists.");
1294          }
1295          // Delete the destination that is a file or an empty directory
1296          if (dstStatus.isDirectory()) {
1297            FileStatus[] list = listStatus(dst);
1298            if (list != null && list.length != 0) {
1299              throw new IOException(
1300                  "rename cannot overwrite non empty destination directory " + dst);
1301            }
1302          }
1303          delete(dst, false);
1304        } else {
1305          final Path parent = dst.getParent();
1306          final FileStatus parentStatus = getFileStatus(parent);
1307          if (parentStatus == null) {
1308            throw new FileNotFoundException("rename destination parent " + parent
1309                + " not found.");
1310          }
1311          if (!parentStatus.isDirectory()) {
1312            throw new ParentNotDirectoryException("rename destination parent " + parent
1313                + " is a file.");
1314          }
1315        }
1316        if (!rename(src, dst)) {
1317          throw new IOException("rename from " + src + " to " + dst + " failed.");
1318        }
1319      }
1320      
1321      /**
1322       * Delete a file 
1323       * @deprecated Use {@link #delete(Path, boolean)} instead.
1324       */
1325      @Deprecated
1326      public boolean delete(Path f) throws IOException {
1327        return delete(f, true);
1328      }
1329      
1330      /** Delete a file.
1331       *
1332       * @param f the path to delete.
1333       * @param recursive if path is a directory and set to 
1334       * true, the directory is deleted else throws an exception. In
1335       * case of a file the recursive can be set to either true or false. 
1336       * @return  true if delete is successful else false. 
1337       * @throws IOException
1338       */
1339      public abstract boolean delete(Path f, boolean recursive) throws IOException;
1340    
1341      /**
1342       * Mark a path to be deleted when FileSystem is closed.
1343       * When the JVM shuts down,
1344       * all FileSystem objects will be closed automatically.
1345       * Then,
1346       * the marked path will be deleted as a result of closing the FileSystem.
1347       *
1348       * The path has to exist in the file system.
1349       * 
1350       * @param f the path to delete.
1351       * @return  true if deleteOnExit is successful, otherwise false.
1352       * @throws IOException
1353       */
1354      public boolean deleteOnExit(Path f) throws IOException {
1355        if (!exists(f)) {
1356          return false;
1357        }
1358        synchronized (deleteOnExit) {
1359          deleteOnExit.add(f);
1360        }
1361        return true;
1362      }
1363      
1364      /**
1365       * Cancel the deletion of the path when the FileSystem is closed
1366       * @param f the path to cancel deletion
1367       */
1368      public boolean cancelDeleteOnExit(Path f) {
1369        synchronized (deleteOnExit) {
1370          return deleteOnExit.remove(f);
1371        }
1372      }
1373    
1374      /**
1375       * Delete all files that were marked as delete-on-exit. This recursively
1376       * deletes all files in the specified paths.
1377       */
1378      protected void processDeleteOnExit() {
1379        synchronized (deleteOnExit) {
1380          for (Iterator<Path> iter = deleteOnExit.iterator(); iter.hasNext();) {
1381            Path path = iter.next();
1382            try {
1383              if (exists(path)) {
1384                delete(path, true);
1385              }
1386            }
1387            catch (IOException e) {
1388              LOG.info("Ignoring failure to deleteOnExit for path " + path);
1389            }
1390            iter.remove();
1391          }
1392        }
1393      }
1394      
1395      /** Check if exists.
1396       * @param f source file
1397       */
1398      public boolean exists(Path f) throws IOException {
1399        try {
1400          return getFileStatus(f) != null;
1401        } catch (FileNotFoundException e) {
1402          return false;
1403        }
1404      }
1405    
1406      /** True iff the named path is a directory.
1407       * Note: Avoid using this method. Instead reuse the FileStatus 
1408       * returned by getFileStatus() or listStatus() methods.
1409       * @param f path to check
1410       */
1411      public boolean isDirectory(Path f) throws IOException {
1412        try {
1413          return getFileStatus(f).isDirectory();
1414        } catch (FileNotFoundException e) {
1415          return false;               // f does not exist
1416        }
1417      }
1418    
1419      /** True iff the named path is a regular file.
1420       * Note: Avoid using this method. Instead reuse the FileStatus 
1421       * returned by getFileStatus() or listStatus() methods.
1422       * @param f path to check
1423       */
1424      public boolean isFile(Path f) throws IOException {
1425        try {
1426          return getFileStatus(f).isFile();
1427        } catch (FileNotFoundException e) {
1428          return false;               // f does not exist
1429        }
1430      }
1431      
1432      /** The number of bytes in a file. */
1433      /** @deprecated Use getFileStatus() instead */
1434      @Deprecated
1435      public long getLength(Path f) throws IOException {
1436        return getFileStatus(f).getLen();
1437      }
1438        
1439      /** Return the {@link ContentSummary} of a given {@link Path}.
1440      * @param f path to use
1441      */
1442      public ContentSummary getContentSummary(Path f) throws IOException {
1443        FileStatus status = getFileStatus(f);
1444        if (status.isFile()) {
1445          // f is a file
1446          return new ContentSummary(status.getLen(), 1, 0);
1447        }
1448        // f is a directory
1449        long[] summary = {0, 0, 1};
1450        for(FileStatus s : listStatus(f)) {
1451          ContentSummary c = s.isDirectory() ? getContentSummary(s.getPath()) :
1452                                         new ContentSummary(s.getLen(), 1, 0);
1453          summary[0] += c.getLength();
1454          summary[1] += c.getFileCount();
1455          summary[2] += c.getDirectoryCount();
1456        }
1457        return new ContentSummary(summary[0], summary[1], summary[2]);
1458      }
1459    
1460      final private static PathFilter DEFAULT_FILTER = new PathFilter() {
1461          @Override
1462          public boolean accept(Path file) {
1463            return true;
1464          }     
1465        };
1466        
1467      /**
1468       * List the statuses of the files/directories in the given path if the path is
1469       * a directory.
1470       * 
1471       * @param f given path
1472       * @return the statuses of the files/directories in the given patch
1473       * @throws FileNotFoundException when the path does not exist;
1474       *         IOException see specific implementation
1475       */
1476      public abstract FileStatus[] listStatus(Path f) throws FileNotFoundException, 
1477                                                             IOException;
1478        
1479      /*
1480       * Filter files/directories in the given path using the user-supplied path
1481       * filter. Results are added to the given array <code>results</code>.
1482       */
1483      private void listStatus(ArrayList<FileStatus> results, Path f,
1484          PathFilter filter) throws FileNotFoundException, IOException {
1485        FileStatus listing[] = listStatus(f);
1486        if (listing == null) {
1487          throw new IOException("Error accessing " + f);
1488        }
1489    
1490        for (int i = 0; i < listing.length; i++) {
1491          if (filter.accept(listing[i].getPath())) {
1492            results.add(listing[i]);
1493          }
1494        }
1495      }
1496    
1497      /**
1498       * @return an iterator over the corrupt files under the given path
1499       * (may contain duplicates if a file has more than one corrupt block)
1500       * @throws IOException
1501       */
1502      public RemoteIterator<Path> listCorruptFileBlocks(Path path)
1503        throws IOException {
1504        throw new UnsupportedOperationException(getClass().getCanonicalName() +
1505                                                " does not support" +
1506                                                " listCorruptFileBlocks");
1507      }
1508    
1509      /**
1510       * Filter files/directories in the given path using the user-supplied path
1511       * filter.
1512       * 
1513       * @param f
1514       *          a path name
1515       * @param filter
1516       *          the user-supplied path filter
1517       * @return an array of FileStatus objects for the files under the given path
1518       *         after applying the filter
1519       * @throws FileNotFoundException when the path does not exist;
1520       *         IOException see specific implementation   
1521       */
1522      public FileStatus[] listStatus(Path f, PathFilter filter) 
1523                                       throws FileNotFoundException, IOException {
1524        ArrayList<FileStatus> results = new ArrayList<FileStatus>();
1525        listStatus(results, f, filter);
1526        return results.toArray(new FileStatus[results.size()]);
1527      }
1528    
1529      /**
1530       * Filter files/directories in the given list of paths using default
1531       * path filter.
1532       * 
1533       * @param files
1534       *          a list of paths
1535       * @return a list of statuses for the files under the given paths after
1536       *         applying the filter default Path filter
1537       * @throws FileNotFoundException when the path does not exist;
1538       *         IOException see specific implementation
1539       */
1540      public FileStatus[] listStatus(Path[] files)
1541          throws FileNotFoundException, IOException {
1542        return listStatus(files, DEFAULT_FILTER);
1543      }
1544    
1545      /**
1546       * Filter files/directories in the given list of paths using user-supplied
1547       * path filter.
1548       * 
1549       * @param files
1550       *          a list of paths
1551       * @param filter
1552       *          the user-supplied path filter
1553       * @return a list of statuses for the files under the given paths after
1554       *         applying the filter
1555       * @throws FileNotFoundException when the path does not exist;
1556       *         IOException see specific implementation
1557       */
1558      public FileStatus[] listStatus(Path[] files, PathFilter filter)
1559          throws FileNotFoundException, IOException {
1560        ArrayList<FileStatus> results = new ArrayList<FileStatus>();
1561        for (int i = 0; i < files.length; i++) {
1562          listStatus(results, files[i], filter);
1563        }
1564        return results.toArray(new FileStatus[results.size()]);
1565      }
1566    
1567      /**
1568       * <p>Return all the files that match filePattern and are not checksum
1569       * files. Results are sorted by their names.
1570       * 
1571       * <p>
1572       * A filename pattern is composed of <i>regular</i> characters and
1573       * <i>special pattern matching</i> characters, which are:
1574       *
1575       * <dl>
1576       *  <dd>
1577       *   <dl>
1578       *    <p>
1579       *    <dt> <tt> ? </tt>
1580       *    <dd> Matches any single character.
1581       *
1582       *    <p>
1583       *    <dt> <tt> * </tt>
1584       *    <dd> Matches zero or more characters.
1585       *
1586       *    <p>
1587       *    <dt> <tt> [<i>abc</i>] </tt>
1588       *    <dd> Matches a single character from character set
1589       *     <tt>{<i>a,b,c</i>}</tt>.
1590       *
1591       *    <p>
1592       *    <dt> <tt> [<i>a</i>-<i>b</i>] </tt>
1593       *    <dd> Matches a single character from the character range
1594       *     <tt>{<i>a...b</i>}</tt>.  Note that character <tt><i>a</i></tt> must be
1595       *     lexicographically less than or equal to character <tt><i>b</i></tt>.
1596       *
1597       *    <p>
1598       *    <dt> <tt> [^<i>a</i>] </tt>
1599       *    <dd> Matches a single character that is not from character set or range
1600       *     <tt>{<i>a</i>}</tt>.  Note that the <tt>^</tt> character must occur
1601       *     immediately to the right of the opening bracket.
1602       *
1603       *    <p>
1604       *    <dt> <tt> \<i>c</i> </tt>
1605       *    <dd> Removes (escapes) any special meaning of character <i>c</i>.
1606       *
1607       *    <p>
1608       *    <dt> <tt> {ab,cd} </tt>
1609       *    <dd> Matches a string from the string set <tt>{<i>ab, cd</i>} </tt>
1610       *    
1611       *    <p>
1612       *    <dt> <tt> {ab,c{de,fh}} </tt>
1613       *    <dd> Matches a string from the string set <tt>{<i>ab, cde, cfh</i>}</tt>
1614       *
1615       *   </dl>
1616       *  </dd>
1617       * </dl>
1618       *
1619       * @param pathPattern a regular expression specifying a pth pattern
1620    
1621       * @return an array of paths that match the path pattern
1622       * @throws IOException
1623       */
1624      public FileStatus[] globStatus(Path pathPattern) throws IOException {
1625        return new Globber(this, pathPattern, DEFAULT_FILTER).glob();
1626      }
1627      
1628      /**
1629       * Return an array of FileStatus objects whose path names match pathPattern
1630       * and is accepted by the user-supplied path filter. Results are sorted by
1631       * their path names.
1632       * Return null if pathPattern has no glob and the path does not exist.
1633       * Return an empty array if pathPattern has a glob and no path matches it. 
1634       * 
1635       * @param pathPattern
1636       *          a regular expression specifying the path pattern
1637       * @param filter
1638       *          a user-supplied path filter
1639       * @return an array of FileStatus objects
1640       * @throws IOException if any I/O error occurs when fetching file status
1641       */
1642      public FileStatus[] globStatus(Path pathPattern, PathFilter filter)
1643          throws IOException {
1644        return new Globber(this, pathPattern, filter).glob();
1645      }
1646      
1647      /**
1648       * List the statuses of the files/directories in the given path if the path is
1649       * a directory. 
1650       * Return the file's status and block locations If the path is a file.
1651       * 
1652       * If a returned status is a file, it contains the file's block locations.
1653       * 
1654       * @param f is the path
1655       *
1656       * @return an iterator that traverses statuses of the files/directories 
1657       *         in the given path
1658       *
1659       * @throws FileNotFoundException If <code>f</code> does not exist
1660       * @throws IOException If an I/O error occurred
1661       */
1662      public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f)
1663      throws FileNotFoundException, IOException {
1664        return listLocatedStatus(f, DEFAULT_FILTER);
1665      }
1666    
1667      /**
1668       * Listing a directory
1669       * The returned results include its block location if it is a file
1670       * The results are filtered by the given path filter
1671       * @param f a path
1672       * @param filter a path filter
1673       * @return an iterator that traverses statuses of the files/directories 
1674       *         in the given path
1675       * @throws FileNotFoundException if <code>f</code> does not exist
1676       * @throws IOException if any I/O error occurred
1677       */
1678      protected RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f,
1679          final PathFilter filter)
1680      throws FileNotFoundException, IOException {
1681        return new RemoteIterator<LocatedFileStatus>() {
1682          private final FileStatus[] stats = listStatus(f, filter);
1683          private int i = 0;
1684    
1685          @Override
1686          public boolean hasNext() {
1687            return i<stats.length;
1688          }
1689    
1690          @Override
1691          public LocatedFileStatus next() throws IOException {
1692            if (!hasNext()) {
1693              throw new NoSuchElementException("No more entry in " + f);
1694            }
1695            FileStatus result = stats[i++];
1696            BlockLocation[] locs = result.isFile() ?
1697                getFileBlockLocations(result.getPath(), 0, result.getLen()) :
1698                null;
1699            return new LocatedFileStatus(result, locs);
1700          }
1701        };
1702      }
1703    
1704      /**
1705       * List the statuses and block locations of the files in the given path.
1706       * 
1707       * If the path is a directory, 
1708       *   if recursive is false, returns files in the directory;
1709       *   if recursive is true, return files in the subtree rooted at the path.
1710       * If the path is a file, return the file's status and block locations.
1711       * 
1712       * @param f is the path
1713       * @param recursive if the subdirectories need to be traversed recursively
1714       *
1715       * @return an iterator that traverses statuses of the files
1716       *
1717       * @throws FileNotFoundException when the path does not exist;
1718       *         IOException see specific implementation
1719       */
1720      public RemoteIterator<LocatedFileStatus> listFiles(
1721          final Path f, final boolean recursive)
1722      throws FileNotFoundException, IOException {
1723        return new RemoteIterator<LocatedFileStatus>() {
1724          private Stack<RemoteIterator<LocatedFileStatus>> itors = 
1725            new Stack<RemoteIterator<LocatedFileStatus>>();
1726          private RemoteIterator<LocatedFileStatus> curItor =
1727            listLocatedStatus(f);
1728          private LocatedFileStatus curFile;
1729         
1730          @Override
1731          public boolean hasNext() throws IOException {
1732            while (curFile == null) {
1733              if (curItor.hasNext()) {
1734                handleFileStat(curItor.next());
1735              } else if (!itors.empty()) {
1736                curItor = itors.pop();
1737              } else {
1738                return false;
1739              }
1740            }
1741            return true;
1742          }
1743    
1744          /**
1745           * Process the input stat.
1746           * If it is a file, return the file stat.
1747           * If it is a directory, traverse the directory if recursive is true;
1748           * ignore it if recursive is false.
1749           * @param stat input status
1750           * @throws IOException if any IO error occurs
1751           */
1752          private void handleFileStat(LocatedFileStatus stat) throws IOException {
1753            if (stat.isFile()) { // file
1754              curFile = stat;
1755            } else if (recursive) { // directory
1756              itors.push(curItor);
1757              curItor = listLocatedStatus(stat.getPath());
1758            }
1759          }
1760    
1761          @Override
1762          public LocatedFileStatus next() throws IOException {
1763            if (hasNext()) {
1764              LocatedFileStatus result = curFile;
1765              curFile = null;
1766              return result;
1767            } 
1768            throw new java.util.NoSuchElementException("No more entry in " + f);
1769          }
1770        };
1771      }
1772      
1773      /** Return the current user's home directory in this filesystem.
1774       * The default implementation returns "/user/$USER/".
1775       */
1776      public Path getHomeDirectory() {
1777        return this.makeQualified(
1778            new Path("/user/"+System.getProperty("user.name")));
1779      }
1780    
1781    
1782      /**
1783       * Set the current working directory for the given file system. All relative
1784       * paths will be resolved relative to it.
1785       * 
1786       * @param new_dir
1787       */
1788      public abstract void setWorkingDirectory(Path new_dir);
1789        
1790      /**
1791       * Get the current working directory for the given file system
1792       * @return the directory pathname
1793       */
1794      public abstract Path getWorkingDirectory();
1795      
1796      
1797      /**
1798       * Note: with the new FilesContext class, getWorkingDirectory()
1799       * will be removed. 
1800       * The working directory is implemented in FilesContext.
1801       * 
1802       * Some file systems like LocalFileSystem have an initial workingDir
1803       * that we use as the starting workingDir. For other file systems
1804       * like HDFS there is no built in notion of an initial workingDir.
1805       * 
1806       * @return if there is built in notion of workingDir then it
1807       * is returned; else a null is returned.
1808       */
1809      protected Path getInitialWorkingDirectory() {
1810        return null;
1811      }
1812    
1813      /**
1814       * Call {@link #mkdirs(Path, FsPermission)} with default permission.
1815       */
1816      public boolean mkdirs(Path f) throws IOException {
1817        return mkdirs(f, FsPermission.getDirDefault());
1818      }
1819    
1820      /**
1821       * Make the given file and all non-existent parents into
1822       * directories. Has the semantics of Unix 'mkdir -p'.
1823       * Existence of the directory hierarchy is not an error.
1824       * @param f path to create
1825       * @param permission to apply to f
1826       */
1827      public abstract boolean mkdirs(Path f, FsPermission permission
1828          ) throws IOException;
1829    
1830      /**
1831       * The src file is on the local disk.  Add it to FS at
1832       * the given dst name and the source is kept intact afterwards
1833       * @param src path
1834       * @param dst path
1835       */
1836      public void copyFromLocalFile(Path src, Path dst)
1837        throws IOException {
1838        copyFromLocalFile(false, src, dst);
1839      }
1840    
1841      /**
1842       * The src files is on the local disk.  Add it to FS at
1843       * the given dst name, removing the source afterwards.
1844       * @param srcs path
1845       * @param dst path
1846       */
1847      public void moveFromLocalFile(Path[] srcs, Path dst)
1848        throws IOException {
1849        copyFromLocalFile(true, true, srcs, dst);
1850      }
1851    
1852      /**
1853       * The src file is on the local disk.  Add it to FS at
1854       * the given dst name, removing the source afterwards.
1855       * @param src path
1856       * @param dst path
1857       */
1858      public void moveFromLocalFile(Path src, Path dst)
1859        throws IOException {
1860        copyFromLocalFile(true, src, dst);
1861      }
1862    
1863      /**
1864       * The src file is on the local disk.  Add it to FS at
1865       * the given dst name.
1866       * delSrc indicates if the source should be removed
1867       * @param delSrc whether to delete the src
1868       * @param src path
1869       * @param dst path
1870       */
1871      public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
1872        throws IOException {
1873        copyFromLocalFile(delSrc, true, src, dst);
1874      }
1875      
1876      /**
1877       * The src files are on the local disk.  Add it to FS at
1878       * the given dst name.
1879       * delSrc indicates if the source should be removed
1880       * @param delSrc whether to delete the src
1881       * @param overwrite whether to overwrite an existing file
1882       * @param srcs array of paths which are source
1883       * @param dst path
1884       */
1885      public void copyFromLocalFile(boolean delSrc, boolean overwrite, 
1886                                    Path[] srcs, Path dst)
1887        throws IOException {
1888        Configuration conf = getConf();
1889        FileUtil.copy(getLocal(conf), srcs, this, dst, delSrc, overwrite, conf);
1890      }
1891      
1892      /**
1893       * The src file is on the local disk.  Add it to FS at
1894       * the given dst name.
1895       * delSrc indicates if the source should be removed
1896       * @param delSrc whether to delete the src
1897       * @param overwrite whether to overwrite an existing file
1898       * @param src path
1899       * @param dst path
1900       */
1901      public void copyFromLocalFile(boolean delSrc, boolean overwrite, 
1902                                    Path src, Path dst)
1903        throws IOException {
1904        Configuration conf = getConf();
1905        FileUtil.copy(getLocal(conf), src, this, dst, delSrc, overwrite, conf);
1906      }
1907        
1908      /**
1909       * The src file is under FS, and the dst is on the local disk.
1910       * Copy it from FS control to the local dst name.
1911       * @param src path
1912       * @param dst path
1913       */
1914      public void copyToLocalFile(Path src, Path dst) throws IOException {
1915        copyToLocalFile(false, src, dst);
1916      }
1917        
1918      /**
1919       * The src file is under FS, and the dst is on the local disk.
1920       * Copy it from FS control to the local dst name.
1921       * Remove the source afterwards
1922       * @param src path
1923       * @param dst path
1924       */
1925      public void moveToLocalFile(Path src, Path dst) throws IOException {
1926        copyToLocalFile(true, src, dst);
1927      }
1928    
1929      /**
1930       * The src file is under FS, and the dst is on the local disk.
1931       * Copy it from FS control to the local dst name.
1932       * delSrc indicates if the src will be removed or not.
1933       * @param delSrc whether to delete the src
1934       * @param src path
1935       * @param dst path
1936       */   
1937      public void copyToLocalFile(boolean delSrc, Path src, Path dst)
1938        throws IOException {
1939        copyToLocalFile(delSrc, src, dst, false);
1940      }
1941      
1942        /**
1943       * The src file is under FS, and the dst is on the local disk. Copy it from FS
1944       * control to the local dst name. delSrc indicates if the src will be removed
1945       * or not. useRawLocalFileSystem indicates whether to use RawLocalFileSystem
1946       * as local file system or not. RawLocalFileSystem is non crc file system.So,
1947       * It will not create any crc files at local.
1948       * 
1949       * @param delSrc
1950       *          whether to delete the src
1951       * @param src
1952       *          path
1953       * @param dst
1954       *          path
1955       * @param useRawLocalFileSystem
1956       *          whether to use RawLocalFileSystem as local file system or not.
1957       * 
1958       * @throws IOException
1959       *           - if any IO error
1960       */
1961      public void copyToLocalFile(boolean delSrc, Path src, Path dst,
1962          boolean useRawLocalFileSystem) throws IOException {
1963        Configuration conf = getConf();
1964        FileSystem local = null;
1965        if (useRawLocalFileSystem) {
1966          local = getLocal(conf).getRawFileSystem();
1967        } else {
1968          local = getLocal(conf);
1969        }
1970        FileUtil.copy(this, src, local, dst, delSrc, conf);
1971      }
1972    
1973      /**
1974       * Returns a local File that the user can write output to.  The caller
1975       * provides both the eventual FS target name and the local working
1976       * file.  If the FS is local, we write directly into the target.  If
1977       * the FS is remote, we write into the tmp local area.
1978       * @param fsOutputFile path of output file
1979       * @param tmpLocalFile path of local tmp file
1980       */
1981      public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
1982        throws IOException {
1983        return tmpLocalFile;
1984      }
1985    
1986      /**
1987       * Called when we're all done writing to the target.  A local FS will
1988       * do nothing, because we've written to exactly the right place.  A remote
1989       * FS will copy the contents of tmpLocalFile to the correct target at
1990       * fsOutputFile.
1991       * @param fsOutputFile path of output file
1992       * @param tmpLocalFile path to local tmp file
1993       */
1994      public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
1995        throws IOException {
1996        moveFromLocalFile(tmpLocalFile, fsOutputFile);
1997      }
1998    
1999      /**
2000       * No more filesystem operations are needed.  Will
2001       * release any held locks.
2002       */
2003      @Override
2004      public void close() throws IOException {
2005        // delete all files that were marked as delete-on-exit.
2006        processDeleteOnExit();
2007        CACHE.remove(this.key, this);
2008      }
2009    
2010      /** Return the total size of all files in the filesystem.*/
2011      public long getUsed() throws IOException{
2012        long used = 0;
2013        FileStatus[] files = listStatus(new Path("/"));
2014        for(FileStatus file:files){
2015          used += file.getLen();
2016        }
2017        return used;
2018      }
2019      
2020      /**
2021       * Get the block size for a particular file.
2022       * @param f the filename
2023       * @return the number of bytes in a block
2024       */
2025      /** @deprecated Use getFileStatus() instead */
2026      @Deprecated
2027      public long getBlockSize(Path f) throws IOException {
2028        return getFileStatus(f).getBlockSize();
2029      }
2030    
2031      /**
2032       * Return the number of bytes that large input files should be optimally
2033       * be split into to minimize i/o time.
2034       * @deprecated use {@link #getDefaultBlockSize(Path)} instead
2035       */
2036      @Deprecated
2037      public long getDefaultBlockSize() {
2038        // default to 32MB: large enough to minimize the impact of seeks
2039        return getConf().getLong("fs.local.block.size", 32 * 1024 * 1024);
2040      }
2041        
2042      /** Return the number of bytes that large input files should be optimally
2043       * be split into to minimize i/o time.  The given path will be used to
2044       * locate the actual filesystem.  The full path does not have to exist.
2045       * @param f path of file
2046       * @return the default block size for the path's filesystem
2047       */
2048      public long getDefaultBlockSize(Path f) {
2049        return getDefaultBlockSize();
2050      }
2051    
2052      /**
2053       * Get the default replication.
2054       * @deprecated use {@link #getDefaultReplication(Path)} instead
2055       */
2056      @Deprecated
2057      public short getDefaultReplication() { return 1; }
2058    
2059      /**
2060       * Get the default replication for a path.   The given path will be used to
2061       * locate the actual filesystem.  The full path does not have to exist.
2062       * @param path of the file
2063       * @return default replication for the path's filesystem 
2064       */
2065      public short getDefaultReplication(Path path) {
2066        return getDefaultReplication();
2067      }
2068      
2069      /**
2070       * Return a file status object that represents the path.
2071       * @param f The path we want information from
2072       * @return a FileStatus object
2073       * @throws FileNotFoundException when the path does not exist;
2074       *         IOException see specific implementation
2075       */
2076      public abstract FileStatus getFileStatus(Path f) throws IOException;
2077    
2078      /**
2079       * Checks if the user can access a path.  The mode specifies which access
2080       * checks to perform.  If the requested permissions are granted, then the
2081       * method returns normally.  If access is denied, then the method throws an
2082       * {@link AccessControlException}.
2083       * <p/>
2084       * The default implementation of this method calls {@link #getFileStatus(Path)}
2085       * and checks the returned permissions against the requested permissions.
2086       * Note that the getFileStatus call will be subject to authorization checks.
2087       * Typically, this requires search (execute) permissions on each directory in
2088       * the path's prefix, but this is implementation-defined.  Any file system
2089       * that provides a richer authorization model (such as ACLs) may override the
2090       * default implementation so that it checks against that model instead.
2091       * <p>
2092       * In general, applications should avoid using this method, due to the risk of
2093       * time-of-check/time-of-use race conditions.  The permissions on a file may
2094       * change immediately after the access call returns.  Most applications should
2095       * prefer running specific file system actions as the desired user represented
2096       * by a {@link UserGroupInformation}.
2097       *
2098       * @param path Path to check
2099       * @param mode type of access to check
2100       * @throws AccessControlException if access is denied
2101       * @throws FileNotFoundException if the path does not exist
2102       * @throws IOException see specific implementation
2103       */
2104      @InterfaceAudience.LimitedPrivate({"HDFS", "Hive"})
2105      public void access(Path path, FsAction mode) throws AccessControlException,
2106          FileNotFoundException, IOException {
2107        checkAccessPermissions(this.getFileStatus(path), mode);
2108      }
2109    
2110      /**
2111       * This method provides the default implementation of
2112       * {@link #access(Path, FsAction)}.
2113       *
2114       * @param stat FileStatus to check
2115       * @param mode type of access to check
2116       * @throws IOException for any error
2117       */
2118      @InterfaceAudience.Private
2119      static void checkAccessPermissions(FileStatus stat, FsAction mode)
2120          throws IOException {
2121        FsPermission perm = stat.getPermission();
2122        UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
2123        String user = ugi.getShortUserName();
2124        List<String> groups = Arrays.asList(ugi.getGroupNames());
2125        if (user.equals(stat.getOwner())) {
2126          if (perm.getUserAction().implies(mode)) {
2127            return;
2128          }
2129        } else if (groups.contains(stat.getGroup())) {
2130          if (perm.getGroupAction().implies(mode)) {
2131            return;
2132          }
2133        } else {
2134          if (perm.getOtherAction().implies(mode)) {
2135            return;
2136          }
2137        }
2138        throw new AccessControlException(String.format(
2139          "Permission denied: user=%s, path=\"%s\":%s:%s:%s%s", user, stat.getPath(),
2140          stat.getOwner(), stat.getGroup(), stat.isDirectory() ? "d" : "-", perm));
2141      }
2142    
2143      /**
2144       * See {@link FileContext#fixRelativePart}
2145       */
2146      protected Path fixRelativePart(Path p) {
2147        if (p.isUriPathAbsolute()) {
2148          return p;
2149        } else {
2150          return new Path(getWorkingDirectory(), p);
2151        }
2152      }
2153    
2154      /**
2155       * See {@link FileContext#createSymlink(Path, Path, boolean)}
2156       */
2157      public void createSymlink(final Path target, final Path link,
2158          final boolean createParent) throws AccessControlException,
2159          FileAlreadyExistsException, FileNotFoundException,
2160          ParentNotDirectoryException, UnsupportedFileSystemException, 
2161          IOException {
2162        // Supporting filesystems should override this method
2163        throw new UnsupportedOperationException(
2164            "Filesystem does not support symlinks!");
2165      }
2166    
2167      /**
2168       * See {@link FileContext#getFileLinkStatus(Path)}
2169       */
2170      public FileStatus getFileLinkStatus(final Path f)
2171          throws AccessControlException, FileNotFoundException,
2172          UnsupportedFileSystemException, IOException {
2173        // Supporting filesystems should override this method
2174        return getFileStatus(f);
2175      }
2176    
2177      /**
2178       * See {@link AbstractFileSystem#supportsSymlinks()}
2179       */
2180      public boolean supportsSymlinks() {
2181        return false;
2182      }
2183    
2184      /**
2185       * See {@link FileContext#getLinkTarget(Path)}
2186       */
2187      public Path getLinkTarget(Path f) throws IOException {
2188        // Supporting filesystems should override this method
2189        throw new UnsupportedOperationException(
2190            "Filesystem does not support symlinks!");
2191      }
2192    
2193      /**
2194       * See {@link AbstractFileSystem#getLinkTarget(Path)}
2195       */
2196      protected Path resolveLink(Path f) throws IOException {
2197        // Supporting filesystems should override this method
2198        throw new UnsupportedOperationException(
2199            "Filesystem does not support symlinks!");
2200      }
2201    
2202      /**
2203       * Get the checksum of a file.
2204       *
2205       * @param f The file path
2206       * @return The file checksum.  The default return value is null,
2207       *  which indicates that no checksum algorithm is implemented
2208       *  in the corresponding FileSystem.
2209       */
2210      public FileChecksum getFileChecksum(Path f) throws IOException {
2211        return getFileChecksum(f, Long.MAX_VALUE);
2212      }
2213    
2214      /**
2215       * Get the checksum of a file, from the beginning of the file till the
2216       * specific length.
2217       * @param f The file path
2218       * @param length The length of the file range for checksum calculation
2219       * @return The file checksum.
2220       */
2221      public FileChecksum getFileChecksum(Path f, final long length)
2222          throws IOException {
2223        return null;
2224      }
2225    
2226      /**
2227       * Set the verify checksum flag. This is only applicable if the 
2228       * corresponding FileSystem supports checksum. By default doesn't do anything.
2229       * @param verifyChecksum
2230       */
2231      public void setVerifyChecksum(boolean verifyChecksum) {
2232        //doesn't do anything
2233      }
2234    
2235      /**
2236       * Set the write checksum flag. This is only applicable if the 
2237       * corresponding FileSystem supports checksum. By default doesn't do anything.
2238       * @param writeChecksum
2239       */
2240      public void setWriteChecksum(boolean writeChecksum) {
2241        //doesn't do anything
2242      }
2243    
2244      /**
2245       * Returns a status object describing the use and capacity of the
2246       * file system. If the file system has multiple partitions, the
2247       * use and capacity of the root partition is reflected.
2248       * 
2249       * @return a FsStatus object
2250       * @throws IOException
2251       *           see specific implementation
2252       */
2253      public FsStatus getStatus() throws IOException {
2254        return getStatus(null);
2255      }
2256    
2257      /**
2258       * Returns a status object describing the use and capacity of the
2259       * file system. If the file system has multiple partitions, the
2260       * use and capacity of the partition pointed to by the specified
2261       * path is reflected.
2262       * @param p Path for which status should be obtained. null means
2263       * the default partition. 
2264       * @return a FsStatus object
2265       * @throws IOException
2266       *           see specific implementation
2267       */
2268      public FsStatus getStatus(Path p) throws IOException {
2269        return new FsStatus(Long.MAX_VALUE, 0, Long.MAX_VALUE);
2270      }
2271    
2272      /**
2273       * Set permission of a path.
2274       * @param p
2275       * @param permission
2276       */
2277      public void setPermission(Path p, FsPermission permission
2278          ) throws IOException {
2279      }
2280    
2281      /**
2282       * Set owner of a path (i.e. a file or a directory).
2283       * The parameters username and groupname cannot both be null.
2284       * @param p The path
2285       * @param username If it is null, the original username remains unchanged.
2286       * @param groupname If it is null, the original groupname remains unchanged.
2287       */
2288      public void setOwner(Path p, String username, String groupname
2289          ) throws IOException {
2290      }
2291    
2292      /**
2293       * Set access time of a file
2294       * @param p The path
2295       * @param mtime Set the modification time of this file.
2296       *              The number of milliseconds since Jan 1, 1970. 
2297       *              A value of -1 means that this call should not set modification time.
2298       * @param atime Set the access time of this file.
2299       *              The number of milliseconds since Jan 1, 1970. 
2300       *              A value of -1 means that this call should not set access time.
2301       */
2302      public void setTimes(Path p, long mtime, long atime
2303          ) throws IOException {
2304      }
2305    
2306      /**
2307       * Create a snapshot with a default name.
2308       * @param path The directory where snapshots will be taken.
2309       * @return the snapshot path.
2310       */
2311      public final Path createSnapshot(Path path) throws IOException {
2312        return createSnapshot(path, null);
2313      }
2314    
2315      /**
2316       * Create a snapshot
2317       * @param path The directory where snapshots will be taken.
2318       * @param snapshotName The name of the snapshot
2319       * @return the snapshot path.
2320       */
2321      public Path createSnapshot(Path path, String snapshotName)
2322          throws IOException {
2323        throw new UnsupportedOperationException(getClass().getSimpleName()
2324            + " doesn't support createSnapshot");
2325      }
2326      
2327      /**
2328       * Rename a snapshot
2329       * @param path The directory path where the snapshot was taken
2330       * @param snapshotOldName Old name of the snapshot
2331       * @param snapshotNewName New name of the snapshot
2332       * @throws IOException
2333       */
2334      public void renameSnapshot(Path path, String snapshotOldName,
2335          String snapshotNewName) throws IOException {
2336        throw new UnsupportedOperationException(getClass().getSimpleName()
2337            + " doesn't support renameSnapshot");
2338      }
2339      
2340      /**
2341       * Delete a snapshot of a directory
2342       * @param path  The directory that the to-be-deleted snapshot belongs to
2343       * @param snapshotName The name of the snapshot
2344       */
2345      public void deleteSnapshot(Path path, String snapshotName)
2346          throws IOException {
2347        throw new UnsupportedOperationException(getClass().getSimpleName()
2348            + " doesn't support deleteSnapshot");
2349      }
2350      
2351      /**
2352       * Modifies ACL entries of files and directories.  This method can add new ACL
2353       * entries or modify the permissions on existing ACL entries.  All existing
2354       * ACL entries that are not specified in this call are retained without
2355       * changes.  (Modifications are merged into the current ACL.)
2356       *
2357       * @param path Path to modify
2358       * @param aclSpec List<AclEntry> describing modifications
2359       * @throws IOException if an ACL could not be modified
2360       */
2361      public void modifyAclEntries(Path path, List<AclEntry> aclSpec)
2362          throws IOException {
2363        throw new UnsupportedOperationException(getClass().getSimpleName()
2364            + " doesn't support modifyAclEntries");
2365      }
2366    
2367      /**
2368       * Removes ACL entries from files and directories.  Other ACL entries are
2369       * retained.
2370       *
2371       * @param path Path to modify
2372       * @param aclSpec List<AclEntry> describing entries to remove
2373       * @throws IOException if an ACL could not be modified
2374       */
2375      public void removeAclEntries(Path path, List<AclEntry> aclSpec)
2376          throws IOException {
2377        throw new UnsupportedOperationException(getClass().getSimpleName()
2378            + " doesn't support removeAclEntries");
2379      }
2380    
2381      /**
2382       * Removes all default ACL entries from files and directories.
2383       *
2384       * @param path Path to modify
2385       * @throws IOException if an ACL could not be modified
2386       */
2387      public void removeDefaultAcl(Path path)
2388          throws IOException {
2389        throw new UnsupportedOperationException(getClass().getSimpleName()
2390            + " doesn't support removeDefaultAcl");
2391      }
2392    
2393      /**
2394       * Removes all but the base ACL entries of files and directories.  The entries
2395       * for user, group, and others are retained for compatibility with permission
2396       * bits.
2397       *
2398       * @param path Path to modify
2399       * @throws IOException if an ACL could not be removed
2400       */
2401      public void removeAcl(Path path)
2402          throws IOException {
2403        throw new UnsupportedOperationException(getClass().getSimpleName()
2404            + " doesn't support removeAcl");
2405      }
2406    
2407      /**
2408       * Fully replaces ACL of files and directories, discarding all existing
2409       * entries.
2410       *
2411       * @param path Path to modify
2412       * @param aclSpec List<AclEntry> describing modifications, must include entries
2413       *   for user, group, and others for compatibility with permission bits.
2414       * @throws IOException if an ACL could not be modified
2415       */
2416      public void setAcl(Path path, List<AclEntry> aclSpec) throws IOException {
2417        throw new UnsupportedOperationException(getClass().getSimpleName()
2418            + " doesn't support setAcl");
2419      }
2420    
2421      /**
2422       * Gets the ACL of a file or directory.
2423       *
2424       * @param path Path to get
2425       * @return AclStatus describing the ACL of the file or directory
2426       * @throws IOException if an ACL could not be read
2427       */
2428      public AclStatus getAclStatus(Path path) throws IOException {
2429        throw new UnsupportedOperationException(getClass().getSimpleName()
2430            + " doesn't support getAclStatus");
2431      }
2432    
2433      /**
2434       * Set an xattr of a file or directory.
2435       * The name must be prefixed with the namespace followed by ".". For example,
2436       * "user.attr".
2437       * <p/>
2438       * Refer to the HDFS extended attributes user documentation for details.
2439       *
2440       * @param path Path to modify
2441       * @param name xattr name.
2442       * @param value xattr value.
2443       * @throws IOException
2444       */
2445      public void setXAttr(Path path, String name, byte[] value)
2446          throws IOException {
2447        setXAttr(path, name, value, EnumSet.of(XAttrSetFlag.CREATE,
2448            XAttrSetFlag.REPLACE));
2449      }
2450    
2451      /**
2452       * Set an xattr of a file or directory.
2453       * The name must be prefixed with the namespace followed by ".". For example,
2454       * "user.attr".
2455       * <p/>
2456       * Refer to the HDFS extended attributes user documentation for details.
2457       *
2458       * @param path Path to modify
2459       * @param name xattr name.
2460       * @param value xattr value.
2461       * @param flag xattr set flag
2462       * @throws IOException
2463       */
2464      public void setXAttr(Path path, String name, byte[] value,
2465          EnumSet<XAttrSetFlag> flag) throws IOException {
2466        throw new UnsupportedOperationException(getClass().getSimpleName()
2467            + " doesn't support setXAttr");
2468      }
2469    
2470      /**
2471       * Get an xattr name and value for a file or directory.
2472       * The name must be prefixed with the namespace followed by ".". For example,
2473       * "user.attr".
2474       * <p/>
2475       * Refer to the HDFS extended attributes user documentation for details.
2476       *
2477       * @param path Path to get extended attribute
2478       * @param name xattr name.
2479       * @return byte[] xattr value.
2480       * @throws IOException
2481       */
2482      public byte[] getXAttr(Path path, String name) throws IOException {
2483        throw new UnsupportedOperationException(getClass().getSimpleName()
2484            + " doesn't support getXAttr");
2485      }
2486    
2487      /**
2488       * Get all of the xattr name/value pairs for a file or directory.
2489       * Only those xattrs which the logged-in user has permissions to view
2490       * are returned.
2491       * <p/>
2492       * Refer to the HDFS extended attributes user documentation for details.
2493       *
2494       * @param path Path to get extended attributes
2495       * @return Map<String, byte[]> describing the XAttrs of the file or directory
2496       * @throws IOException
2497       */
2498      public Map<String, byte[]> getXAttrs(Path path) throws IOException {
2499        throw new UnsupportedOperationException(getClass().getSimpleName()
2500            + " doesn't support getXAttrs");
2501      }
2502    
2503      /**
2504       * Get all of the xattrs name/value pairs for a file or directory.
2505       * Only those xattrs which the logged-in user has permissions to view
2506       * are returned.
2507       * <p/>
2508       * Refer to the HDFS extended attributes user documentation for details.
2509       *
2510       * @param path Path to get extended attributes
2511       * @param names XAttr names.
2512       * @return Map<String, byte[]> describing the XAttrs of the file or directory
2513       * @throws IOException
2514       */
2515      public Map<String, byte[]> getXAttrs(Path path, List<String> names)
2516          throws IOException {
2517        throw new UnsupportedOperationException(getClass().getSimpleName()
2518            + " doesn't support getXAttrs");
2519      }
2520    
2521      /**
2522       * Get all of the xattr names for a file or directory.
2523       * Only those xattr names which the logged-in user has permissions to view
2524       * are returned.
2525       * <p/>
2526       * Refer to the HDFS extended attributes user documentation for details.
2527       *
2528       * @param path Path to get extended attributes
2529       * @return List<String> of the XAttr names of the file or directory
2530       * @throws IOException
2531       */
2532      public List<String> listXAttrs(Path path) throws IOException {
2533        throw new UnsupportedOperationException(getClass().getSimpleName()
2534                + " doesn't support listXAttrs");
2535      }
2536    
2537      /**
2538       * Remove an xattr of a file or directory.
2539       * The name must be prefixed with the namespace followed by ".". For example,
2540       * "user.attr".
2541       * <p/>
2542       * Refer to the HDFS extended attributes user documentation for details.
2543       *
2544       * @param path Path to remove extended attribute
2545       * @param name xattr name
2546       * @throws IOException
2547       */
2548      public void removeXAttr(Path path, String name) throws IOException {
2549        throw new UnsupportedOperationException(getClass().getSimpleName()
2550            + " doesn't support removeXAttr");
2551      }
2552    
2553      // making it volatile to be able to do a double checked locking
2554      private volatile static boolean FILE_SYSTEMS_LOADED = false;
2555    
2556      private static final Map<String, Class<? extends FileSystem>>
2557        SERVICE_FILE_SYSTEMS = new HashMap<String, Class<? extends FileSystem>>();
2558    
2559      private static void loadFileSystems() {
2560        synchronized (FileSystem.class) {
2561          if (!FILE_SYSTEMS_LOADED) {
2562            ServiceLoader<FileSystem> serviceLoader = ServiceLoader.load(FileSystem.class);
2563            for (FileSystem fs : serviceLoader) {
2564              SERVICE_FILE_SYSTEMS.put(fs.getScheme(), fs.getClass());
2565            }
2566            FILE_SYSTEMS_LOADED = true;
2567          }
2568        }
2569      }
2570    
2571      public static Class<? extends FileSystem> getFileSystemClass(String scheme,
2572          Configuration conf) throws IOException {
2573        if (!FILE_SYSTEMS_LOADED) {
2574          loadFileSystems();
2575        }
2576        Class<? extends FileSystem> clazz = null;
2577        if (conf != null) {
2578          clazz = (Class<? extends FileSystem>) conf.getClass("fs." + scheme + ".impl", null);
2579        }
2580        if (clazz == null) {
2581          clazz = SERVICE_FILE_SYSTEMS.get(scheme);
2582        }
2583        if (clazz == null) {
2584          throw new IOException("No FileSystem for scheme: " + scheme);
2585        }
2586        return clazz;
2587      }
2588    
2589      private static FileSystem createFileSystem(URI uri, Configuration conf
2590          ) throws IOException {
2591        Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);
2592        if (clazz == null) {
2593          throw new IOException("No FileSystem for scheme: " + uri.getScheme());
2594        }
2595        FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
2596        fs.initialize(uri, conf);
2597        return fs;
2598      }
2599    
2600      /** Caching FileSystem objects */
2601      static class Cache {
2602        private final ClientFinalizer clientFinalizer = new ClientFinalizer();
2603    
2604        private final Map<Key, FileSystem> map = new HashMap<Key, FileSystem>();
2605        private final Set<Key> toAutoClose = new HashSet<Key>();
2606    
2607        /** A variable that makes all objects in the cache unique */
2608        private static AtomicLong unique = new AtomicLong(1);
2609    
2610        FileSystem get(URI uri, Configuration conf) throws IOException{
2611          Key key = new Key(uri, conf);
2612          return getInternal(uri, conf, key);
2613        }
2614    
2615        /** The objects inserted into the cache using this method are all unique */
2616        FileSystem getUnique(URI uri, Configuration conf) throws IOException{
2617          Key key = new Key(uri, conf, unique.getAndIncrement());
2618          return getInternal(uri, conf, key);
2619        }
2620    
2621        private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{
2622          FileSystem fs;
2623          synchronized (this) {
2624            fs = map.get(key);
2625          }
2626          if (fs != null) {
2627            return fs;
2628          }
2629    
2630          fs = createFileSystem(uri, conf);
2631          synchronized (this) { // refetch the lock again
2632            FileSystem oldfs = map.get(key);
2633            if (oldfs != null) { // a file system is created while lock is releasing
2634              fs.close(); // close the new file system
2635              return oldfs;  // return the old file system
2636            }
2637            
2638            // now insert the new file system into the map
2639            if (map.isEmpty()
2640                    && !ShutdownHookManager.get().isShutdownInProgress()) {
2641              ShutdownHookManager.get().addShutdownHook(clientFinalizer, SHUTDOWN_HOOK_PRIORITY);
2642            }
2643            fs.key = key;
2644            map.put(key, fs);
2645            if (conf.getBoolean("fs.automatic.close", true)) {
2646              toAutoClose.add(key);
2647            }
2648            return fs;
2649          }
2650        }
2651    
2652        synchronized void remove(Key key, FileSystem fs) {
2653          if (map.containsKey(key) && fs == map.get(key)) {
2654            map.remove(key);
2655            toAutoClose.remove(key);
2656            }
2657        }
2658    
2659        synchronized void closeAll() throws IOException {
2660          closeAll(false);
2661        }
2662    
2663        /**
2664         * Close all FileSystem instances in the Cache.
2665         * @param onlyAutomatic only close those that are marked for automatic closing
2666         */
2667        synchronized void closeAll(boolean onlyAutomatic) throws IOException {
2668          List<IOException> exceptions = new ArrayList<IOException>();
2669    
2670          // Make a copy of the keys in the map since we'll be modifying
2671          // the map while iterating over it, which isn't safe.
2672          List<Key> keys = new ArrayList<Key>();
2673          keys.addAll(map.keySet());
2674    
2675          for (Key key : keys) {
2676            final FileSystem fs = map.get(key);
2677    
2678            if (onlyAutomatic && !toAutoClose.contains(key)) {
2679              continue;
2680            }
2681    
2682            //remove from cache
2683            remove(key, fs);
2684    
2685            if (fs != null) {
2686              try {
2687                fs.close();
2688              }
2689              catch(IOException ioe) {
2690                exceptions.add(ioe);
2691              }
2692            }
2693          }
2694    
2695          if (!exceptions.isEmpty()) {
2696            throw MultipleIOException.createIOException(exceptions);
2697          }
2698        }
2699    
2700        private class ClientFinalizer implements Runnable {
2701          @Override
2702          public synchronized void run() {
2703            try {
2704              closeAll(true);
2705            } catch (IOException e) {
2706              LOG.info("FileSystem.Cache.closeAll() threw an exception:\n" + e);
2707            }
2708          }
2709        }
2710    
2711        synchronized void closeAll(UserGroupInformation ugi) throws IOException {
2712          List<FileSystem> targetFSList = new ArrayList<FileSystem>();
2713          //Make a pass over the list and collect the filesystems to close
2714          //we cannot close inline since close() removes the entry from the Map
2715          for (Map.Entry<Key, FileSystem> entry : map.entrySet()) {
2716            final Key key = entry.getKey();
2717            final FileSystem fs = entry.getValue();
2718            if (ugi.equals(key.ugi) && fs != null) {
2719              targetFSList.add(fs);   
2720            }
2721          }
2722          List<IOException> exceptions = new ArrayList<IOException>();
2723          //now make a pass over the target list and close each
2724          for (FileSystem fs : targetFSList) {
2725            try {
2726              fs.close();
2727            }
2728            catch(IOException ioe) {
2729              exceptions.add(ioe);
2730            }
2731          }
2732          if (!exceptions.isEmpty()) {
2733            throw MultipleIOException.createIOException(exceptions);
2734          }
2735        }
2736    
2737        /** FileSystem.Cache.Key */
2738        static class Key {
2739          final String scheme;
2740          final String authority;
2741          final UserGroupInformation ugi;
2742          final long unique;   // an artificial way to make a key unique
2743    
2744          Key(URI uri, Configuration conf) throws IOException {
2745            this(uri, conf, 0);
2746          }
2747    
2748          Key(URI uri, Configuration conf, long unique) throws IOException {
2749            scheme = uri.getScheme()==null?"":uri.getScheme().toLowerCase();
2750            authority = uri.getAuthority()==null?"":uri.getAuthority().toLowerCase();
2751            this.unique = unique;
2752            
2753            this.ugi = UserGroupInformation.getCurrentUser();
2754          }
2755    
2756          @Override
2757          public int hashCode() {
2758            return (scheme + authority).hashCode() + ugi.hashCode() + (int)unique;
2759          }
2760    
2761          static boolean isEqual(Object a, Object b) {
2762            return a == b || (a != null && a.equals(b));        
2763          }
2764    
2765          @Override
2766          public boolean equals(Object obj) {
2767            if (obj == this) {
2768              return true;
2769            }
2770            if (obj != null && obj instanceof Key) {
2771              Key that = (Key)obj;
2772              return isEqual(this.scheme, that.scheme)
2773                     && isEqual(this.authority, that.authority)
2774                     && isEqual(this.ugi, that.ugi)
2775                     && (this.unique == that.unique);
2776            }
2777            return false;        
2778          }
2779    
2780          @Override
2781          public String toString() {
2782            return "("+ugi.toString() + ")@" + scheme + "://" + authority;        
2783          }
2784        }
2785      }
2786      
2787      /**
2788       * Tracks statistics about how many reads, writes, and so forth have been
2789       * done in a FileSystem.
2790       * 
2791       * Since there is only one of these objects per FileSystem, there will 
2792       * typically be many threads writing to this object.  Almost every operation
2793       * on an open file will involve a write to this object.  In contrast, reading
2794       * statistics is done infrequently by most programs, and not at all by others.
2795       * Hence, this is optimized for writes.
2796       * 
2797       * Each thread writes to its own thread-local area of memory.  This removes 
2798       * contention and allows us to scale up to many, many threads.  To read
2799       * statistics, the reader thread totals up the contents of all of the 
2800       * thread-local data areas.
2801       */
2802      public static final class Statistics {
2803        /**
2804         * Statistics data.
2805         * 
2806         * There is only a single writer to thread-local StatisticsData objects.
2807         * Hence, volatile is adequate here-- we do not need AtomicLong or similar
2808         * to prevent lost updates.
2809         * The Java specification guarantees that updates to volatile longs will
2810         * be perceived as atomic with respect to other threads, which is all we
2811         * need.
2812         */
2813        public static class StatisticsData {
2814          volatile long bytesRead;
2815          volatile long bytesWritten;
2816          volatile int readOps;
2817          volatile int largeReadOps;
2818          volatile int writeOps;
2819          /**
2820           * Stores a weak reference to the thread owning this StatisticsData.
2821           * This allows us to remove StatisticsData objects that pertain to
2822           * threads that no longer exist.
2823           */
2824          final WeakReference<Thread> owner;
2825    
2826          StatisticsData(WeakReference<Thread> owner) {
2827            this.owner = owner;
2828          }
2829    
2830          /**
2831           * Add another StatisticsData object to this one.
2832           */
2833          void add(StatisticsData other) {
2834            this.bytesRead += other.bytesRead;
2835            this.bytesWritten += other.bytesWritten;
2836            this.readOps += other.readOps;
2837            this.largeReadOps += other.largeReadOps;
2838            this.writeOps += other.writeOps;
2839          }
2840    
2841          /**
2842           * Negate the values of all statistics.
2843           */
2844          void negate() {
2845            this.bytesRead = -this.bytesRead;
2846            this.bytesWritten = -this.bytesWritten;
2847            this.readOps = -this.readOps;
2848            this.largeReadOps = -this.largeReadOps;
2849            this.writeOps = -this.writeOps;
2850          }
2851    
2852          @Override
2853          public String toString() {
2854            return bytesRead + " bytes read, " + bytesWritten + " bytes written, "
2855                + readOps + " read ops, " + largeReadOps + " large read ops, "
2856                + writeOps + " write ops";
2857          }
2858          
2859          public long getBytesRead() {
2860            return bytesRead;
2861          }
2862          
2863          public long getBytesWritten() {
2864            return bytesWritten;
2865          }
2866          
2867          public int getReadOps() {
2868            return readOps;
2869          }
2870          
2871          public int getLargeReadOps() {
2872            return largeReadOps;
2873          }
2874          
2875          public int getWriteOps() {
2876            return writeOps;
2877          }
2878        }
2879    
2880        private interface StatisticsAggregator<T> {
2881          void accept(StatisticsData data);
2882          T aggregate();
2883        }
2884    
2885        private final String scheme;
2886    
2887        /**
2888         * rootData is data that doesn't belong to any thread, but will be added
2889         * to the totals.  This is useful for making copies of Statistics objects,
2890         * and for storing data that pertains to threads that have been garbage
2891         * collected.  Protected by the Statistics lock.
2892         */
2893        private final StatisticsData rootData;
2894    
2895        /**
2896         * Thread-local data.
2897         */
2898        private final ThreadLocal<StatisticsData> threadData;
2899        
2900        /**
2901         * List of all thread-local data areas.  Protected by the Statistics lock.
2902         */
2903        private LinkedList<StatisticsData> allData;
2904    
2905        public Statistics(String scheme) {
2906          this.scheme = scheme;
2907          this.rootData = new StatisticsData(null);
2908          this.threadData = new ThreadLocal<StatisticsData>();
2909          this.allData = null;
2910        }
2911    
2912        /**
2913         * Copy constructor.
2914         * 
2915         * @param other    The input Statistics object which is cloned.
2916         */
2917        public Statistics(Statistics other) {
2918          this.scheme = other.scheme;
2919          this.rootData = new StatisticsData(null);
2920          other.visitAll(new StatisticsAggregator<Void>() {
2921            @Override
2922            public void accept(StatisticsData data) {
2923              rootData.add(data);
2924            }
2925    
2926            public Void aggregate() {
2927              return null;
2928            }
2929          });
2930          this.threadData = new ThreadLocal<StatisticsData>();
2931        }
2932    
2933        /**
2934         * Get or create the thread-local data associated with the current thread.
2935         */
2936        public StatisticsData getThreadStatistics() {
2937          StatisticsData data = threadData.get();
2938          if (data == null) {
2939            data = new StatisticsData(
2940                new WeakReference<Thread>(Thread.currentThread()));
2941            threadData.set(data);
2942            synchronized(this) {
2943              if (allData == null) {
2944                allData = new LinkedList<StatisticsData>();
2945              }
2946              allData.add(data);
2947            }
2948          }
2949          return data;
2950        }
2951    
2952        /**
2953         * Increment the bytes read in the statistics
2954         * @param newBytes the additional bytes read
2955         */
2956        public void incrementBytesRead(long newBytes) {
2957          getThreadStatistics().bytesRead += newBytes;
2958        }
2959        
2960        /**
2961         * Increment the bytes written in the statistics
2962         * @param newBytes the additional bytes written
2963         */
2964        public void incrementBytesWritten(long newBytes) {
2965          getThreadStatistics().bytesWritten += newBytes;
2966        }
2967        
2968        /**
2969         * Increment the number of read operations
2970         * @param count number of read operations
2971         */
2972        public void incrementReadOps(int count) {
2973          getThreadStatistics().readOps += count;
2974        }
2975    
2976        /**
2977         * Increment the number of large read operations
2978         * @param count number of large read operations
2979         */
2980        public void incrementLargeReadOps(int count) {
2981          getThreadStatistics().largeReadOps += count;
2982        }
2983    
2984        /**
2985         * Increment the number of write operations
2986         * @param count number of write operations
2987         */
2988        public void incrementWriteOps(int count) {
2989          getThreadStatistics().writeOps += count;
2990        }
2991    
2992        /**
2993         * Apply the given aggregator to all StatisticsData objects associated with
2994         * this Statistics object.
2995         *
2996         * For each StatisticsData object, we will call accept on the visitor.
2997         * Finally, at the end, we will call aggregate to get the final total. 
2998         *
2999         * @param         The visitor to use.
3000         * @return        The total.
3001         */
3002        private synchronized <T> T visitAll(StatisticsAggregator<T> visitor) {
3003          visitor.accept(rootData);
3004          if (allData != null) {
3005            for (Iterator<StatisticsData> iter = allData.iterator();
3006                iter.hasNext(); ) {
3007              StatisticsData data = iter.next();
3008              visitor.accept(data);
3009              if (data.owner.get() == null) {
3010                /*
3011                 * If the thread that created this thread-local data no
3012                 * longer exists, remove the StatisticsData from our list
3013                 * and fold the values into rootData.
3014                 */
3015                rootData.add(data);
3016                iter.remove();
3017              }
3018            }
3019          }
3020          return visitor.aggregate();
3021        }
3022    
3023        /**
3024         * Get the total number of bytes read
3025         * @return the number of bytes
3026         */
3027        public long getBytesRead() {
3028          return visitAll(new StatisticsAggregator<Long>() {
3029            private long bytesRead = 0;
3030    
3031            @Override
3032            public void accept(StatisticsData data) {
3033              bytesRead += data.bytesRead;
3034            }
3035    
3036            public Long aggregate() {
3037              return bytesRead;
3038            }
3039          });
3040        }
3041        
3042        /**
3043         * Get the total number of bytes written
3044         * @return the number of bytes
3045         */
3046        public long getBytesWritten() {
3047          return visitAll(new StatisticsAggregator<Long>() {
3048            private long bytesWritten = 0;
3049    
3050            @Override
3051            public void accept(StatisticsData data) {
3052              bytesWritten += data.bytesWritten;
3053            }
3054    
3055            public Long aggregate() {
3056              return bytesWritten;
3057            }
3058          });
3059        }
3060        
3061        /**
3062         * Get the number of file system read operations such as list files
3063         * @return number of read operations
3064         */
3065        public int getReadOps() {
3066          return visitAll(new StatisticsAggregator<Integer>() {
3067            private int readOps = 0;
3068    
3069            @Override
3070            public void accept(StatisticsData data) {
3071              readOps += data.readOps;
3072              readOps += data.largeReadOps;
3073            }
3074    
3075            public Integer aggregate() {
3076              return readOps;
3077            }
3078          });
3079        }
3080    
3081        /**
3082         * Get the number of large file system read operations such as list files
3083         * under a large directory
3084         * @return number of large read operations
3085         */
3086        public int getLargeReadOps() {
3087          return visitAll(new StatisticsAggregator<Integer>() {
3088            private int largeReadOps = 0;
3089    
3090            @Override
3091            public void accept(StatisticsData data) {
3092              largeReadOps += data.largeReadOps;
3093            }
3094    
3095            public Integer aggregate() {
3096              return largeReadOps;
3097            }
3098          });
3099        }
3100    
3101        /**
3102         * Get the number of file system write operations such as create, append 
3103         * rename etc.
3104         * @return number of write operations
3105         */
3106        public int getWriteOps() {
3107          return visitAll(new StatisticsAggregator<Integer>() {
3108            private int writeOps = 0;
3109    
3110            @Override
3111            public void accept(StatisticsData data) {
3112              writeOps += data.writeOps;
3113            }
3114    
3115            public Integer aggregate() {
3116              return writeOps;
3117            }
3118          });
3119        }
3120    
3121    
3122        @Override
3123        public String toString() {
3124          return visitAll(new StatisticsAggregator<String>() {
3125            private StatisticsData total = new StatisticsData(null);
3126    
3127            @Override
3128            public void accept(StatisticsData data) {
3129              total.add(data);
3130            }
3131    
3132            public String aggregate() {
3133              return total.toString();
3134            }
3135          });
3136        }
3137    
3138        /**
3139         * Resets all statistics to 0.
3140         *
3141         * In order to reset, we add up all the thread-local statistics data, and
3142         * set rootData to the negative of that.
3143         *
3144         * This may seem like a counterintuitive way to reset the statsitics.  Why
3145         * can't we just zero out all the thread-local data?  Well, thread-local
3146         * data can only be modified by the thread that owns it.  If we tried to
3147         * modify the thread-local data from this thread, our modification might get
3148         * interleaved with a read-modify-write operation done by the thread that
3149         * owns the data.  That would result in our update getting lost.
3150         *
3151         * The approach used here avoids this problem because it only ever reads
3152         * (not writes) the thread-local data.  Both reads and writes to rootData
3153         * are done under the lock, so we're free to modify rootData from any thread
3154         * that holds the lock.
3155         */
3156        public void reset() {
3157          visitAll(new StatisticsAggregator<Void>() {
3158            private StatisticsData total = new StatisticsData(null);
3159    
3160            @Override
3161            public void accept(StatisticsData data) {
3162              total.add(data);
3163            }
3164    
3165            public Void aggregate() {
3166              total.negate();
3167              rootData.add(total);
3168              return null;
3169            }
3170          });
3171        }
3172        
3173        /**
3174         * Get the uri scheme associated with this statistics object.
3175         * @return the schema associated with this set of statistics
3176         */
3177        public String getScheme() {
3178          return scheme;
3179        }
3180      }
3181      
3182      /**
3183       * Get the Map of Statistics object indexed by URI Scheme.
3184       * @return a Map having a key as URI scheme and value as Statistics object
3185       * @deprecated use {@link #getAllStatistics} instead
3186       */
3187      @Deprecated
3188      public static synchronized Map<String, Statistics> getStatistics() {
3189        Map<String, Statistics> result = new HashMap<String, Statistics>();
3190        for(Statistics stat: statisticsTable.values()) {
3191          result.put(stat.getScheme(), stat);
3192        }
3193        return result;
3194      }
3195    
3196      /**
3197       * Return the FileSystem classes that have Statistics
3198       */
3199      public static synchronized List<Statistics> getAllStatistics() {
3200        return new ArrayList<Statistics>(statisticsTable.values());
3201      }
3202      
3203      /**
3204       * Get the statistics for a particular file system
3205       * @param cls the class to lookup
3206       * @return a statistics object
3207       */
3208      public static synchronized 
3209      Statistics getStatistics(String scheme, Class<? extends FileSystem> cls) {
3210        Statistics result = statisticsTable.get(cls);
3211        if (result == null) {
3212          result = new Statistics(scheme);
3213          statisticsTable.put(cls, result);
3214        }
3215        return result;
3216      }
3217      
3218      /**
3219       * Reset all statistics for all file systems
3220       */
3221      public static synchronized void clearStatistics() {
3222        for(Statistics stat: statisticsTable.values()) {
3223          stat.reset();
3224        }
3225      }
3226    
3227      /**
3228       * Print all statistics for all file systems
3229       */
3230      public static synchronized
3231      void printStatistics() throws IOException {
3232        for (Map.Entry<Class<? extends FileSystem>, Statistics> pair: 
3233                statisticsTable.entrySet()) {
3234          System.out.println("  FileSystem " + pair.getKey().getName() + 
3235                             ": " + pair.getValue());
3236        }
3237      }
3238    
3239      // Symlinks are temporarily disabled - see HADOOP-10020 and HADOOP-10052
3240      private static boolean symlinksEnabled = false;
3241    
3242      private static Configuration conf = null;
3243    
3244      @VisibleForTesting
3245      public static boolean areSymlinksEnabled() {
3246        return symlinksEnabled;
3247      }
3248    
3249      @VisibleForTesting
3250      public static void enableSymlinks() {
3251        symlinksEnabled = true;
3252      }
3253    }