001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.hadoop.fs;
019    
020    import java.io.Closeable;
021    import java.io.FileNotFoundException;
022    import java.io.IOException;
023    import java.lang.ref.WeakReference;
024    import java.net.URI;
025    import java.net.URISyntaxException;
026    import java.security.PrivilegedExceptionAction;
027    import java.util.ArrayList;
028    import java.util.EnumSet;
029    import java.util.HashMap;
030    import java.util.HashSet;
031    import java.util.IdentityHashMap;
032    import java.util.Iterator;
033    import java.util.LinkedList;
034    import java.util.List;
035    import java.util.Map;
036    import java.util.NoSuchElementException;
037    import java.util.ServiceLoader;
038    import java.util.Set;
039    import java.util.Stack;
040    import java.util.TreeSet;
041    import java.util.concurrent.atomic.AtomicLong;
042    
043    import org.apache.commons.logging.Log;
044    import org.apache.commons.logging.LogFactory;
045    import org.apache.hadoop.classification.InterfaceAudience;
046    import org.apache.hadoop.classification.InterfaceStability;
047    import org.apache.hadoop.conf.Configuration;
048    import org.apache.hadoop.conf.Configured;
049    import org.apache.hadoop.fs.Options.ChecksumOpt;
050    import org.apache.hadoop.fs.Options.Rename;
051    import org.apache.hadoop.fs.permission.AclEntry;
052    import org.apache.hadoop.fs.permission.AclStatus;
053    import org.apache.hadoop.fs.permission.FsPermission;
054    import org.apache.hadoop.io.MultipleIOException;
055    import org.apache.hadoop.io.Text;
056    import org.apache.hadoop.net.NetUtils;
057    import org.apache.hadoop.security.AccessControlException;
058    import org.apache.hadoop.security.Credentials;
059    import org.apache.hadoop.security.SecurityUtil;
060    import org.apache.hadoop.security.UserGroupInformation;
061    import org.apache.hadoop.security.token.Token;
062    import org.apache.hadoop.util.DataChecksum;
063    import org.apache.hadoop.util.Progressable;
064    import org.apache.hadoop.util.ReflectionUtils;
065    import org.apache.hadoop.util.ShutdownHookManager;
066    
067    import com.google.common.annotations.VisibleForTesting;
068    
069    /****************************************************************
070     * An abstract base class for a fairly generic filesystem.  It
071     * may be implemented as a distributed filesystem, or as a "local"
072     * one that reflects the locally-connected disk.  The local version
073     * exists for small Hadoop instances and for testing.
074     *
075     * <p>
076     *
077     * All user code that may potentially use the Hadoop Distributed
078     * File System should be written to use a FileSystem object.  The
079     * Hadoop DFS is a multi-machine system that appears as a single
080     * disk.  It's useful because of its fault tolerance and potentially
081     * very large capacity.
082     * 
083     * <p>
084     * The local implementation is {@link LocalFileSystem} and distributed
085     * implementation is DistributedFileSystem.
086     *****************************************************************/
087    @InterfaceAudience.Public
088    @InterfaceStability.Stable
089    public abstract class FileSystem extends Configured implements Closeable {
090      public static final String FS_DEFAULT_NAME_KEY = 
091                       CommonConfigurationKeys.FS_DEFAULT_NAME_KEY;
092      public static final String DEFAULT_FS = 
093                       CommonConfigurationKeys.FS_DEFAULT_NAME_DEFAULT;
094    
095      public static final Log LOG = LogFactory.getLog(FileSystem.class);
096    
097      /**
098       * Priority of the FileSystem shutdown hook.
099       */
100      public static final int SHUTDOWN_HOOK_PRIORITY = 10;
101    
102      /** FileSystem cache */
103      static final Cache CACHE = new Cache();
104    
105      /** The key this instance is stored under in the cache. */
106      private Cache.Key key;
107    
108      /** Recording statistics per a FileSystem class */
109      private static final Map<Class<? extends FileSystem>, Statistics> 
110        statisticsTable =
111          new IdentityHashMap<Class<? extends FileSystem>, Statistics>();
112      
113      /**
114       * The statistics for this file system.
115       */
116      protected Statistics statistics;
117    
118      /**
119       * A cache of files that should be deleted when filsystem is closed
120       * or the JVM is exited.
121       */
122      private Set<Path> deleteOnExit = new TreeSet<Path>();
123      
124      boolean resolveSymlinks;
125      /**
126       * This method adds a file system for testing so that we can find it later. It
127       * is only for testing.
128       * @param uri the uri to store it under
129       * @param conf the configuration to store it under
130       * @param fs the file system to store
131       * @throws IOException
132       */
133      static void addFileSystemForTesting(URI uri, Configuration conf,
134          FileSystem fs) throws IOException {
135        CACHE.map.put(new Cache.Key(uri, conf), fs);
136      }
137    
138      /**
139       * Get a filesystem instance based on the uri, the passed
140       * configuration and the user
141       * @param uri of the filesystem
142       * @param conf the configuration to use
143       * @param user to perform the get as
144       * @return the filesystem instance
145       * @throws IOException
146       * @throws InterruptedException
147       */
148      public static FileSystem get(final URI uri, final Configuration conf,
149            final String user) throws IOException, InterruptedException {
150        String ticketCachePath =
151          conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
152        UserGroupInformation ugi =
153            UserGroupInformation.getBestUGI(ticketCachePath, user);
154        return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
155          @Override
156          public FileSystem run() throws IOException {
157            return get(uri, conf);
158          }
159        });
160      }
161    
162      /**
163       * Returns the configured filesystem implementation.
164       * @param conf the configuration to use
165       */
166      public static FileSystem get(Configuration conf) throws IOException {
167        return get(getDefaultUri(conf), conf);
168      }
169      
170      /** Get the default filesystem URI from a configuration.
171       * @param conf the configuration to use
172       * @return the uri of the default filesystem
173       */
174      public static URI getDefaultUri(Configuration conf) {
175        return URI.create(fixName(conf.get(FS_DEFAULT_NAME_KEY, DEFAULT_FS)));
176      }
177    
178      /** Set the default filesystem URI in a configuration.
179       * @param conf the configuration to alter
180       * @param uri the new default filesystem uri
181       */
182      public static void setDefaultUri(Configuration conf, URI uri) {
183        conf.set(FS_DEFAULT_NAME_KEY, uri.toString());
184      }
185    
186      /** Set the default filesystem URI in a configuration.
187       * @param conf the configuration to alter
188       * @param uri the new default filesystem uri
189       */
190      public static void setDefaultUri(Configuration conf, String uri) {
191        setDefaultUri(conf, URI.create(fixName(uri)));
192      }
193    
194      /** Called after a new FileSystem instance is constructed.
195       * @param name a uri whose authority section names the host, port, etc.
196       *   for this FileSystem
197       * @param conf the configuration
198       */
199      public void initialize(URI name, Configuration conf) throws IOException {
200        statistics = getStatistics(name.getScheme(), getClass());    
201        resolveSymlinks = conf.getBoolean(
202            CommonConfigurationKeys.FS_CLIENT_RESOLVE_REMOTE_SYMLINKS_KEY,
203            CommonConfigurationKeys.FS_CLIENT_RESOLVE_REMOTE_SYMLINKS_DEFAULT);
204      }
205    
206      /**
207       * Return the protocol scheme for the FileSystem.
208       * <p/>
209       * This implementation throws an <code>UnsupportedOperationException</code>.
210       *
211       * @return the protocol scheme for the FileSystem.
212       */
213      public String getScheme() {
214        throw new UnsupportedOperationException("Not implemented by the " + getClass().getSimpleName() + " FileSystem implementation");
215      }
216    
217      /** Returns a URI whose scheme and authority identify this FileSystem.*/
218      public abstract URI getUri();
219      
220      /**
221       * Return a canonicalized form of this FileSystem's URI.
222       * 
223       * The default implementation simply calls {@link #canonicalizeUri(URI)}
224       * on the filesystem's own URI, so subclasses typically only need to
225       * implement that method.
226       *
227       * @see #canonicalizeUri(URI)
228       */
229      protected URI getCanonicalUri() {
230        return canonicalizeUri(getUri());
231      }
232      
233      /**
234       * Canonicalize the given URI.
235       * 
236       * This is filesystem-dependent, but may for example consist of
237       * canonicalizing the hostname using DNS and adding the default
238       * port if not specified.
239       * 
240       * The default implementation simply fills in the default port if
241       * not specified and if the filesystem has a default port.
242       *
243       * @return URI
244       * @see NetUtils#getCanonicalUri(URI, int)
245       */
246      protected URI canonicalizeUri(URI uri) {
247        if (uri.getPort() == -1 && getDefaultPort() > 0) {
248          // reconstruct the uri with the default port set
249          try {
250            uri = new URI(uri.getScheme(), uri.getUserInfo(),
251                uri.getHost(), getDefaultPort(),
252                uri.getPath(), uri.getQuery(), uri.getFragment());
253          } catch (URISyntaxException e) {
254            // Should never happen!
255            throw new AssertionError("Valid URI became unparseable: " +
256                uri);
257          }
258        }
259        
260        return uri;
261      }
262      
263      /**
264       * Get the default port for this file system.
265       * @return the default port or 0 if there isn't one
266       */
267      protected int getDefaultPort() {
268        return 0;
269      }
270    
271      protected static FileSystem getFSofPath(final Path absOrFqPath,
272          final Configuration conf)
273          throws UnsupportedFileSystemException, IOException {
274        absOrFqPath.checkNotSchemeWithRelative();
275        absOrFqPath.checkNotRelative();
276    
277        // Uses the default file system if not fully qualified
278        return get(absOrFqPath.toUri(), conf);
279      }
280    
281      /**
282       * Get a canonical service name for this file system.  The token cache is
283       * the only user of the canonical service name, and uses it to lookup this
284       * filesystem's service tokens.
285       * If file system provides a token of its own then it must have a canonical
286       * name, otherwise canonical name can be null.
287       * 
288       * Default Impl: If the file system has child file systems 
289       * (such as an embedded file system) then it is assumed that the fs has no
290       * tokens of its own and hence returns a null name; otherwise a service
291       * name is built using Uri and port.
292       * 
293       * @return a service string that uniquely identifies this file system, null
294       *         if the filesystem does not implement tokens
295       * @see SecurityUtil#buildDTServiceName(URI, int) 
296       */
297      @InterfaceAudience.LimitedPrivate({ "HDFS", "MapReduce" })
298      public String getCanonicalServiceName() {
299        return (getChildFileSystems() == null)
300          ? SecurityUtil.buildDTServiceName(getUri(), getDefaultPort())
301          : null;
302      }
303    
304      /** @deprecated call #getUri() instead.*/
305      @Deprecated
306      public String getName() { return getUri().toString(); }
307    
308      /** @deprecated call #get(URI,Configuration) instead. */
309      @Deprecated
310      public static FileSystem getNamed(String name, Configuration conf)
311        throws IOException {
312        return get(URI.create(fixName(name)), conf);
313      }
314      
315      /** Update old-format filesystem names, for back-compatibility.  This should
316       * eventually be replaced with a checkName() method that throws an exception
317       * for old-format names. */ 
318      private static String fixName(String name) {
319        // convert old-format name to new-format name
320        if (name.equals("local")) {         // "local" is now "file:///".
321          LOG.warn("\"local\" is a deprecated filesystem name."
322                   +" Use \"file:///\" instead.");
323          name = "file:///";
324        } else if (name.indexOf('/')==-1) {   // unqualified is "hdfs://"
325          LOG.warn("\""+name+"\" is a deprecated filesystem name."
326                   +" Use \"hdfs://"+name+"/\" instead.");
327          name = "hdfs://"+name;
328        }
329        return name;
330      }
331    
332      /**
333       * Get the local file system.
334       * @param conf the configuration to configure the file system with
335       * @return a LocalFileSystem
336       */
337      public static LocalFileSystem getLocal(Configuration conf)
338        throws IOException {
339        return (LocalFileSystem)get(LocalFileSystem.NAME, conf);
340      }
341    
342      /** Returns the FileSystem for this URI's scheme and authority.  The scheme
343       * of the URI determines a configuration property name,
344       * <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
345       * The entire URI is passed to the FileSystem instance's initialize method.
346       */
347      public static FileSystem get(URI uri, Configuration conf) throws IOException {
348        String scheme = uri.getScheme();
349        String authority = uri.getAuthority();
350    
351        if (scheme == null && authority == null) {     // use default FS
352          return get(conf);
353        }
354    
355        if (scheme != null && authority == null) {     // no authority
356          URI defaultUri = getDefaultUri(conf);
357          if (scheme.equals(defaultUri.getScheme())    // if scheme matches default
358              && defaultUri.getAuthority() != null) {  // & default has authority
359            return get(defaultUri, conf);              // return default
360          }
361        }
362        
363        String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
364        if (conf.getBoolean(disableCacheName, false)) {
365          return createFileSystem(uri, conf);
366        }
367    
368        return CACHE.get(uri, conf);
369      }
370    
371      /**
372       * Returns the FileSystem for this URI's scheme and authority and the 
373       * passed user. Internally invokes {@link #newInstance(URI, Configuration)}
374       * @param uri of the filesystem
375       * @param conf the configuration to use
376       * @param user to perform the get as
377       * @return filesystem instance
378       * @throws IOException
379       * @throws InterruptedException
380       */
381      public static FileSystem newInstance(final URI uri, final Configuration conf,
382          final String user) throws IOException, InterruptedException {
383        String ticketCachePath =
384          conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
385        UserGroupInformation ugi =
386            UserGroupInformation.getBestUGI(ticketCachePath, user);
387        return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
388          @Override
389          public FileSystem run() throws IOException {
390            return newInstance(uri,conf); 
391          }
392        });
393      }
394      /** Returns the FileSystem for this URI's scheme and authority.  The scheme
395       * of the URI determines a configuration property name,
396       * <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
397       * The entire URI is passed to the FileSystem instance's initialize method.
398       * This always returns a new FileSystem object.
399       */
400      public static FileSystem newInstance(URI uri, Configuration conf) throws IOException {
401        String scheme = uri.getScheme();
402        String authority = uri.getAuthority();
403    
404        if (scheme == null) {                       // no scheme: use default FS
405          return newInstance(conf);
406        }
407    
408        if (authority == null) {                       // no authority
409          URI defaultUri = getDefaultUri(conf);
410          if (scheme.equals(defaultUri.getScheme())    // if scheme matches default
411              && defaultUri.getAuthority() != null) {  // & default has authority
412            return newInstance(defaultUri, conf);              // return default
413          }
414        }
415        return CACHE.getUnique(uri, conf);
416      }
417    
418      /** Returns a unique configured filesystem implementation.
419       * This always returns a new FileSystem object.
420       * @param conf the configuration to use
421       */
422      public static FileSystem newInstance(Configuration conf) throws IOException {
423        return newInstance(getDefaultUri(conf), conf);
424      }
425    
426      /**
427       * Get a unique local file system object
428       * @param conf the configuration to configure the file system with
429       * @return a LocalFileSystem
430       * This always returns a new FileSystem object.
431       */
432      public static LocalFileSystem newInstanceLocal(Configuration conf)
433        throws IOException {
434        return (LocalFileSystem)newInstance(LocalFileSystem.NAME, conf);
435      }
436    
437      /**
438       * Close all cached filesystems. Be sure those filesystems are not
439       * used anymore.
440       * 
441       * @throws IOException
442       */
443      public static void closeAll() throws IOException {
444        CACHE.closeAll();
445      }
446    
447      /**
448       * Close all cached filesystems for a given UGI. Be sure those filesystems 
449       * are not used anymore.
450       * @param ugi user group info to close
451       * @throws IOException
452       */
453      public static void closeAllForUGI(UserGroupInformation ugi) 
454      throws IOException {
455        CACHE.closeAll(ugi);
456      }
457    
458      /** 
459       * Make sure that a path specifies a FileSystem.
460       * @param path to use
461       */
462      public Path makeQualified(Path path) {
463        checkPath(path);
464        return path.makeQualified(this.getUri(), this.getWorkingDirectory());
465      }
466        
467      /**
468       * Get a new delegation token for this file system.
469       * This is an internal method that should have been declared protected
470       * but wasn't historically.
471       * Callers should use {@link #addDelegationTokens(String, Credentials)}
472       * 
473       * @param renewer the account name that is allowed to renew the token.
474       * @return a new delegation token
475       * @throws IOException
476       */
477      @InterfaceAudience.Private()
478      public Token<?> getDelegationToken(String renewer) throws IOException {
479        return null;
480      }
481      
482      /**
483       * Obtain all delegation tokens used by this FileSystem that are not
484       * already present in the given Credentials.  Existing tokens will neither
485       * be verified as valid nor having the given renewer.  Missing tokens will
486       * be acquired and added to the given Credentials.
487       * 
488       * Default Impl: works for simple fs with its own token
489       * and also for an embedded fs whose tokens are those of its
490       * children file system (i.e. the embedded fs has not tokens of its
491       * own).
492       * 
493       * @param renewer the user allowed to renew the delegation tokens
494       * @param credentials cache in which to add new delegation tokens
495       * @return list of new delegation tokens
496       * @throws IOException
497       */
498      @InterfaceAudience.LimitedPrivate({ "HDFS", "MapReduce" })
499      public Token<?>[] addDelegationTokens(
500          final String renewer, Credentials credentials) throws IOException {
501        if (credentials == null) {
502          credentials = new Credentials();
503        }
504        final List<Token<?>> tokens = new ArrayList<Token<?>>();
505        collectDelegationTokens(renewer, credentials, tokens);
506        return tokens.toArray(new Token<?>[tokens.size()]);
507      }
508      
509      /**
510       * Recursively obtain the tokens for this FileSystem and all descended
511       * FileSystems as determined by getChildFileSystems().
512       * @param renewer the user allowed to renew the delegation tokens
513       * @param credentials cache in which to add the new delegation tokens
514       * @param tokens list in which to add acquired tokens
515       * @throws IOException
516       */
517      private void collectDelegationTokens(final String renewer,
518                                           final Credentials credentials,
519                                           final List<Token<?>> tokens)
520                                               throws IOException {
521        final String serviceName = getCanonicalServiceName();
522        // Collect token of the this filesystem and then of its embedded children
523        if (serviceName != null) { // fs has token, grab it
524          final Text service = new Text(serviceName);
525          Token<?> token = credentials.getToken(service);
526          if (token == null) {
527            token = getDelegationToken(renewer);
528            if (token != null) {
529              tokens.add(token);
530              credentials.addToken(service, token);
531            }
532          }
533        }
534        // Now collect the tokens from the children
535        final FileSystem[] children = getChildFileSystems();
536        if (children != null) {
537          for (final FileSystem fs : children) {
538            fs.collectDelegationTokens(renewer, credentials, tokens);
539          }
540        }
541      }
542    
543      /**
544       * Get all the immediate child FileSystems embedded in this FileSystem.
545       * It does not recurse and get grand children.  If a FileSystem
546       * has multiple child FileSystems, then it should return a unique list
547       * of those FileSystems.  Default is to return null to signify no children.
548       * 
549       * @return FileSystems used by this FileSystem
550       */
551      @InterfaceAudience.LimitedPrivate({ "HDFS" })
552      @VisibleForTesting
553      public FileSystem[] getChildFileSystems() {
554        return null;
555      }
556      
557      /** create a file with the provided permission
558       * The permission of the file is set to be the provided permission as in
559       * setPermission, not permission&~umask
560       * 
561       * It is implemented using two RPCs. It is understood that it is inefficient,
562       * but the implementation is thread-safe. The other option is to change the
563       * value of umask in configuration to be 0, but it is not thread-safe.
564       * 
565       * @param fs file system handle
566       * @param file the name of the file to be created
567       * @param permission the permission of the file
568       * @return an output stream
569       * @throws IOException
570       */
571      public static FSDataOutputStream create(FileSystem fs,
572          Path file, FsPermission permission) throws IOException {
573        // create the file with default permission
574        FSDataOutputStream out = fs.create(file);
575        // set its permission to the supplied one
576        fs.setPermission(file, permission);
577        return out;
578      }
579    
580      /** create a directory with the provided permission
581       * The permission of the directory is set to be the provided permission as in
582       * setPermission, not permission&~umask
583       * 
584       * @see #create(FileSystem, Path, FsPermission)
585       * 
586       * @param fs file system handle
587       * @param dir the name of the directory to be created
588       * @param permission the permission of the directory
589       * @return true if the directory creation succeeds; false otherwise
590       * @throws IOException
591       */
592      public static boolean mkdirs(FileSystem fs, Path dir, FsPermission permission)
593      throws IOException {
594        // create the directory using the default permission
595        boolean result = fs.mkdirs(dir);
596        // set its permission to be the supplied one
597        fs.setPermission(dir, permission);
598        return result;
599      }
600    
601      ///////////////////////////////////////////////////////////////
602      // FileSystem
603      ///////////////////////////////////////////////////////////////
604    
605      protected FileSystem() {
606        super(null);
607      }
608    
609      /** 
610       * Check that a Path belongs to this FileSystem.
611       * @param path to check
612       */
613      protected void checkPath(Path path) {
614        URI uri = path.toUri();
615        String thatScheme = uri.getScheme();
616        if (thatScheme == null)                // fs is relative
617          return;
618        URI thisUri = getCanonicalUri();
619        String thisScheme = thisUri.getScheme();
620        //authority and scheme are not case sensitive
621        if (thisScheme.equalsIgnoreCase(thatScheme)) {// schemes match
622          String thisAuthority = thisUri.getAuthority();
623          String thatAuthority = uri.getAuthority();
624          if (thatAuthority == null &&                // path's authority is null
625              thisAuthority != null) {                // fs has an authority
626            URI defaultUri = getDefaultUri(getConf());
627            if (thisScheme.equalsIgnoreCase(defaultUri.getScheme())) {
628              uri = defaultUri; // schemes match, so use this uri instead
629            } else {
630              uri = null; // can't determine auth of the path
631            }
632          }
633          if (uri != null) {
634            // canonicalize uri before comparing with this fs
635            uri = canonicalizeUri(uri);
636            thatAuthority = uri.getAuthority();
637            if (thisAuthority == thatAuthority ||       // authorities match
638                (thisAuthority != null &&
639                 thisAuthority.equalsIgnoreCase(thatAuthority)))
640              return;
641          }
642        }
643        throw new IllegalArgumentException("Wrong FS: "+path+
644                                           ", expected: "+this.getUri());
645      }
646    
647      /**
648       * Return an array containing hostnames, offset and size of 
649       * portions of the given file.  For a nonexistent 
650       * file or regions, null will be returned.
651       *
652       * This call is most helpful with DFS, where it returns 
653       * hostnames of machines that contain the given file.
654       *
655       * The FileSystem will simply return an elt containing 'localhost'.
656       *
657       * @param file FilesStatus to get data from
658       * @param start offset into the given file
659       * @param len length for which to get locations for
660       */
661      public BlockLocation[] getFileBlockLocations(FileStatus file, 
662          long start, long len) throws IOException {
663        if (file == null) {
664          return null;
665        }
666    
667        if (start < 0 || len < 0) {
668          throw new IllegalArgumentException("Invalid start or len parameter");
669        }
670    
671        if (file.getLen() <= start) {
672          return new BlockLocation[0];
673    
674        }
675        String[] name = { "localhost:50010" };
676        String[] host = { "localhost" };
677        return new BlockLocation[] {
678          new BlockLocation(name, host, 0, file.getLen()) };
679      }
680     
681    
682      /**
683       * Return an array containing hostnames, offset and size of 
684       * portions of the given file.  For a nonexistent 
685       * file or regions, null will be returned.
686       *
687       * This call is most helpful with DFS, where it returns 
688       * hostnames of machines that contain the given file.
689       *
690       * The FileSystem will simply return an elt containing 'localhost'.
691       *
692       * @param p path is used to identify an FS since an FS could have
693       *          another FS that it could be delegating the call to
694       * @param start offset into the given file
695       * @param len length for which to get locations for
696       */
697      public BlockLocation[] getFileBlockLocations(Path p, 
698          long start, long len) throws IOException {
699        if (p == null) {
700          throw new NullPointerException();
701        }
702        FileStatus file = getFileStatus(p);
703        return getFileBlockLocations(file, start, len);
704      }
705      
706      /**
707       * Return a set of server default configuration values
708       * @return server default configuration values
709       * @throws IOException
710       * @deprecated use {@link #getServerDefaults(Path)} instead
711       */
712      @Deprecated
713      public FsServerDefaults getServerDefaults() throws IOException {
714        Configuration conf = getConf();
715        // CRC32 is chosen as default as it is available in all 
716        // releases that support checksum.
717        // The client trash configuration is ignored.
718        return new FsServerDefaults(getDefaultBlockSize(), 
719            conf.getInt("io.bytes.per.checksum", 512), 
720            64 * 1024, 
721            getDefaultReplication(),
722            conf.getInt("io.file.buffer.size", 4096),
723            false,
724            CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT,
725            DataChecksum.Type.CRC32);
726      }
727    
728      /**
729       * Return a set of server default configuration values
730       * @param p path is used to identify an FS since an FS could have
731       *          another FS that it could be delegating the call to
732       * @return server default configuration values
733       * @throws IOException
734       */
735      public FsServerDefaults getServerDefaults(Path p) throws IOException {
736        return getServerDefaults();
737      }
738    
739      /**
740       * Return the fully-qualified path of path f resolving the path
741       * through any symlinks or mount point
742       * @param p path to be resolved
743       * @return fully qualified path 
744       * @throws FileNotFoundException
745       */
746       public Path resolvePath(final Path p) throws IOException {
747         checkPath(p);
748         return getFileStatus(p).getPath();
749       }
750    
751      /**
752       * Opens an FSDataInputStream at the indicated Path.
753       * @param f the file name to open
754       * @param bufferSize the size of the buffer to be used.
755       */
756      public abstract FSDataInputStream open(Path f, int bufferSize)
757        throws IOException;
758        
759      /**
760       * Opens an FSDataInputStream at the indicated Path.
761       * @param f the file to open
762       */
763      public FSDataInputStream open(Path f) throws IOException {
764        return open(f, getConf().getInt("io.file.buffer.size", 4096));
765      }
766    
767      /**
768       * Create an FSDataOutputStream at the indicated Path.
769       * Files are overwritten by default.
770       * @param f the file to create
771       */
772      public FSDataOutputStream create(Path f) throws IOException {
773        return create(f, true);
774      }
775    
776      /**
777       * Create an FSDataOutputStream at the indicated Path.
778       * @param f the file to create
779       * @param overwrite if a file with this name already exists, then if true,
780       *   the file will be overwritten, and if false an exception will be thrown.
781       */
782      public FSDataOutputStream create(Path f, boolean overwrite)
783          throws IOException {
784        return create(f, overwrite, 
785                      getConf().getInt("io.file.buffer.size", 4096),
786                      getDefaultReplication(f),
787                      getDefaultBlockSize(f));
788      }
789    
790      /**
791       * Create an FSDataOutputStream at the indicated Path with write-progress
792       * reporting.
793       * Files are overwritten by default.
794       * @param f the file to create
795       * @param progress to report progress
796       */
797      public FSDataOutputStream create(Path f, Progressable progress) 
798          throws IOException {
799        return create(f, true, 
800                      getConf().getInt("io.file.buffer.size", 4096),
801                      getDefaultReplication(f),
802                      getDefaultBlockSize(f), progress);
803      }
804    
805      /**
806       * Create an FSDataOutputStream at the indicated Path.
807       * Files are overwritten by default.
808       * @param f the file to create
809       * @param replication the replication factor
810       */
811      public FSDataOutputStream create(Path f, short replication)
812          throws IOException {
813        return create(f, true, 
814                      getConf().getInt("io.file.buffer.size", 4096),
815                      replication,
816                      getDefaultBlockSize(f));
817      }
818    
819      /**
820       * Create an FSDataOutputStream at the indicated Path with write-progress
821       * reporting.
822       * Files are overwritten by default.
823       * @param f the file to create
824       * @param replication the replication factor
825       * @param progress to report progress
826       */
827      public FSDataOutputStream create(Path f, short replication, 
828          Progressable progress) throws IOException {
829        return create(f, true, 
830                      getConf().getInt(
831                          CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
832                          CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT),
833                      replication,
834                      getDefaultBlockSize(f), progress);
835      }
836    
837        
838      /**
839       * Create an FSDataOutputStream at the indicated Path.
840       * @param f the file name to create
841       * @param overwrite if a file with this name already exists, then if true,
842       *   the file will be overwritten, and if false an error will be thrown.
843       * @param bufferSize the size of the buffer to be used.
844       */
845      public FSDataOutputStream create(Path f, 
846                                       boolean overwrite,
847                                       int bufferSize
848                                       ) throws IOException {
849        return create(f, overwrite, bufferSize, 
850                      getDefaultReplication(f),
851                      getDefaultBlockSize(f));
852      }
853        
854      /**
855       * Create an FSDataOutputStream at the indicated Path with write-progress
856       * reporting.
857       * @param f the path of the file to open
858       * @param overwrite if a file with this name already exists, then if true,
859       *   the file will be overwritten, and if false an error will be thrown.
860       * @param bufferSize the size of the buffer to be used.
861       */
862      public FSDataOutputStream create(Path f, 
863                                       boolean overwrite,
864                                       int bufferSize,
865                                       Progressable progress
866                                       ) throws IOException {
867        return create(f, overwrite, bufferSize, 
868                      getDefaultReplication(f),
869                      getDefaultBlockSize(f), progress);
870      }
871        
872        
873      /**
874       * Create an FSDataOutputStream at the indicated Path.
875       * @param f the file name to open
876       * @param overwrite if a file with this name already exists, then if true,
877       *   the file will be overwritten, and if false an error will be thrown.
878       * @param bufferSize the size of the buffer to be used.
879       * @param replication required block replication for the file. 
880       */
881      public FSDataOutputStream create(Path f, 
882                                       boolean overwrite,
883                                       int bufferSize,
884                                       short replication,
885                                       long blockSize
886                                       ) throws IOException {
887        return create(f, overwrite, bufferSize, replication, blockSize, null);
888      }
889    
890      /**
891       * Create an FSDataOutputStream at the indicated Path with write-progress
892       * reporting.
893       * @param f the file name to open
894       * @param overwrite if a file with this name already exists, then if true,
895       *   the file will be overwritten, and if false an error will be thrown.
896       * @param bufferSize the size of the buffer to be used.
897       * @param replication required block replication for the file. 
898       */
899      public FSDataOutputStream create(Path f,
900                                                boolean overwrite,
901                                                int bufferSize,
902                                                short replication,
903                                                long blockSize,
904                                                Progressable progress
905                                                ) throws IOException {
906        return this.create(f, FsPermission.getFileDefault().applyUMask(
907            FsPermission.getUMask(getConf())), overwrite, bufferSize,
908            replication, blockSize, progress);
909      }
910    
911      /**
912       * Create an FSDataOutputStream at the indicated Path with write-progress
913       * reporting.
914       * @param f the file name to open
915       * @param permission
916       * @param overwrite if a file with this name already exists, then if true,
917       *   the file will be overwritten, and if false an error will be thrown.
918       * @param bufferSize the size of the buffer to be used.
919       * @param replication required block replication for the file.
920       * @param blockSize
921       * @param progress
922       * @throws IOException
923       * @see #setPermission(Path, FsPermission)
924       */
925      public abstract FSDataOutputStream create(Path f,
926          FsPermission permission,
927          boolean overwrite,
928          int bufferSize,
929          short replication,
930          long blockSize,
931          Progressable progress) throws IOException;
932      
933      /**
934       * Create an FSDataOutputStream at the indicated Path with write-progress
935       * reporting.
936       * @param f the file name to open
937       * @param permission
938       * @param flags {@link CreateFlag}s to use for this stream.
939       * @param bufferSize the size of the buffer to be used.
940       * @param replication required block replication for the file.
941       * @param blockSize
942       * @param progress
943       * @throws IOException
944       * @see #setPermission(Path, FsPermission)
945       */
946      public FSDataOutputStream create(Path f,
947          FsPermission permission,
948          EnumSet<CreateFlag> flags,
949          int bufferSize,
950          short replication,
951          long blockSize,
952          Progressable progress) throws IOException {
953        return create(f, permission, flags, bufferSize, replication,
954            blockSize, progress, null);
955      }
956      
957      /**
958       * Create an FSDataOutputStream at the indicated Path with a custom
959       * checksum option
960       * @param f the file name to open
961       * @param permission
962       * @param flags {@link CreateFlag}s to use for this stream.
963       * @param bufferSize the size of the buffer to be used.
964       * @param replication required block replication for the file.
965       * @param blockSize
966       * @param progress
967       * @param checksumOpt checksum parameter. If null, the values
968       *        found in conf will be used.
969       * @throws IOException
970       * @see #setPermission(Path, FsPermission)
971       */
972      public FSDataOutputStream create(Path f,
973          FsPermission permission,
974          EnumSet<CreateFlag> flags,
975          int bufferSize,
976          short replication,
977          long blockSize,
978          Progressable progress,
979          ChecksumOpt checksumOpt) throws IOException {
980        // Checksum options are ignored by default. The file systems that
981        // implement checksum need to override this method. The full
982        // support is currently only available in DFS.
983        return create(f, permission, flags.contains(CreateFlag.OVERWRITE), 
984            bufferSize, replication, blockSize, progress);
985      }
986    
987      /*.
988       * This create has been added to support the FileContext that processes
989       * the permission
990       * with umask before calling this method.
991       * This a temporary method added to support the transition from FileSystem
992       * to FileContext for user applications.
993       */
994      @Deprecated
995      protected FSDataOutputStream primitiveCreate(Path f,
996         FsPermission absolutePermission, EnumSet<CreateFlag> flag, int bufferSize,
997         short replication, long blockSize, Progressable progress,
998         ChecksumOpt checksumOpt) throws IOException {
999    
1000        boolean pathExists = exists(f);
1001        CreateFlag.validate(f, pathExists, flag);
1002        
1003        // Default impl  assumes that permissions do not matter and 
1004        // nor does the bytesPerChecksum  hence
1005        // calling the regular create is good enough.
1006        // FSs that implement permissions should override this.
1007    
1008        if (pathExists && flag.contains(CreateFlag.APPEND)) {
1009          return append(f, bufferSize, progress);
1010        }
1011        
1012        return this.create(f, absolutePermission,
1013            flag.contains(CreateFlag.OVERWRITE), bufferSize, replication,
1014            blockSize, progress);
1015      }
1016      
1017      /**
1018       * This version of the mkdirs method assumes that the permission is absolute.
1019       * It has been added to support the FileContext that processes the permission
1020       * with umask before calling this method.
1021       * This a temporary method added to support the transition from FileSystem
1022       * to FileContext for user applications.
1023       */
1024      @Deprecated
1025      protected boolean primitiveMkdir(Path f, FsPermission absolutePermission)
1026        throws IOException {
1027        // Default impl is to assume that permissions do not matter and hence
1028        // calling the regular mkdirs is good enough.
1029        // FSs that implement permissions should override this.
1030       return this.mkdirs(f, absolutePermission);
1031      }
1032    
1033    
1034      /**
1035       * This version of the mkdirs method assumes that the permission is absolute.
1036       * It has been added to support the FileContext that processes the permission
1037       * with umask before calling this method.
1038       * This a temporary method added to support the transition from FileSystem
1039       * to FileContext for user applications.
1040       */
1041      @Deprecated
1042      protected void primitiveMkdir(Path f, FsPermission absolutePermission, 
1043                        boolean createParent)
1044        throws IOException {
1045        
1046        if (!createParent) { // parent must exist.
1047          // since the this.mkdirs makes parent dirs automatically
1048          // we must throw exception if parent does not exist.
1049          final FileStatus stat = getFileStatus(f.getParent());
1050          if (stat == null) {
1051            throw new FileNotFoundException("Missing parent:" + f);
1052          }
1053          if (!stat.isDirectory()) {
1054            throw new ParentNotDirectoryException("parent is not a dir");
1055          }
1056          // parent does exist - go ahead with mkdir of leaf
1057        }
1058        // Default impl is to assume that permissions do not matter and hence
1059        // calling the regular mkdirs is good enough.
1060        // FSs that implement permissions should override this.
1061        if (!this.mkdirs(f, absolutePermission)) {
1062          throw new IOException("mkdir of "+ f + " failed");
1063        }
1064      }
1065    
1066      /**
1067       * Opens an FSDataOutputStream at the indicated Path with write-progress
1068       * reporting. Same as create(), except fails if parent directory doesn't
1069       * already exist.
1070       * @param f the file name to open
1071       * @param overwrite if a file with this name already exists, then if true,
1072       * the file will be overwritten, and if false an error will be thrown.
1073       * @param bufferSize the size of the buffer to be used.
1074       * @param replication required block replication for the file.
1075       * @param blockSize
1076       * @param progress
1077       * @throws IOException
1078       * @see #setPermission(Path, FsPermission)
1079       * @deprecated API only for 0.20-append
1080       */
1081      @Deprecated
1082      public FSDataOutputStream createNonRecursive(Path f,
1083          boolean overwrite,
1084          int bufferSize, short replication, long blockSize,
1085          Progressable progress) throws IOException {
1086        return this.createNonRecursive(f, FsPermission.getFileDefault(),
1087            overwrite, bufferSize, replication, blockSize, progress);
1088      }
1089    
1090      /**
1091       * Opens an FSDataOutputStream at the indicated Path with write-progress
1092       * reporting. Same as create(), except fails if parent directory doesn't
1093       * already exist.
1094       * @param f the file name to open
1095       * @param permission
1096       * @param overwrite if a file with this name already exists, then if true,
1097       * the file will be overwritten, and if false an error will be thrown.
1098       * @param bufferSize the size of the buffer to be used.
1099       * @param replication required block replication for the file.
1100       * @param blockSize
1101       * @param progress
1102       * @throws IOException
1103       * @see #setPermission(Path, FsPermission)
1104       * @deprecated API only for 0.20-append
1105       */
1106       @Deprecated
1107       public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
1108           boolean overwrite, int bufferSize, short replication, long blockSize,
1109           Progressable progress) throws IOException {
1110         return createNonRecursive(f, permission,
1111             overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
1112                 : EnumSet.of(CreateFlag.CREATE), bufferSize,
1113                 replication, blockSize, progress);
1114       }
1115    
1116       /**
1117        * Opens an FSDataOutputStream at the indicated Path with write-progress
1118        * reporting. Same as create(), except fails if parent directory doesn't
1119        * already exist.
1120        * @param f the file name to open
1121        * @param permission
1122        * @param flags {@link CreateFlag}s to use for this stream.
1123        * @param bufferSize the size of the buffer to be used.
1124        * @param replication required block replication for the file.
1125        * @param blockSize
1126        * @param progress
1127        * @throws IOException
1128        * @see #setPermission(Path, FsPermission)
1129        * @deprecated API only for 0.20-append
1130        */
1131        @Deprecated
1132        public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
1133            EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize,
1134            Progressable progress) throws IOException {
1135          throw new IOException("createNonRecursive unsupported for this filesystem "
1136              + this.getClass());
1137        }
1138    
1139      /**
1140       * Creates the given Path as a brand-new zero-length file.  If
1141       * create fails, or if it already existed, return false.
1142       *
1143       * @param f path to use for create
1144       */
1145      public boolean createNewFile(Path f) throws IOException {
1146        if (exists(f)) {
1147          return false;
1148        } else {
1149          create(f, false, getConf().getInt("io.file.buffer.size", 4096)).close();
1150          return true;
1151        }
1152      }
1153    
1154      /**
1155       * Append to an existing file (optional operation).
1156       * Same as append(f, getConf().getInt("io.file.buffer.size", 4096), null)
1157       * @param f the existing file to be appended.
1158       * @throws IOException
1159       */
1160      public FSDataOutputStream append(Path f) throws IOException {
1161        return append(f, getConf().getInt("io.file.buffer.size", 4096), null);
1162      }
1163      /**
1164       * Append to an existing file (optional operation).
1165       * Same as append(f, bufferSize, null).
1166       * @param f the existing file to be appended.
1167       * @param bufferSize the size of the buffer to be used.
1168       * @throws IOException
1169       */
1170      public FSDataOutputStream append(Path f, int bufferSize) throws IOException {
1171        return append(f, bufferSize, null);
1172      }
1173    
1174      /**
1175       * Append to an existing file (optional operation).
1176       * @param f the existing file to be appended.
1177       * @param bufferSize the size of the buffer to be used.
1178       * @param progress for reporting progress if it is not null.
1179       * @throws IOException
1180       */
1181      public abstract FSDataOutputStream append(Path f, int bufferSize,
1182          Progressable progress) throws IOException;
1183    
1184      /**
1185       * Concat existing files together.
1186       * @param trg the path to the target destination.
1187       * @param psrcs the paths to the sources to use for the concatenation.
1188       * @throws IOException
1189       */
1190      public void concat(final Path trg, final Path [] psrcs) throws IOException {
1191        throw new UnsupportedOperationException("Not implemented by the " + 
1192            getClass().getSimpleName() + " FileSystem implementation");
1193      }
1194    
1195     /**
1196       * Get replication.
1197       * 
1198       * @deprecated Use getFileStatus() instead
1199       * @param src file name
1200       * @return file replication
1201       * @throws IOException
1202       */ 
1203      @Deprecated
1204      public short getReplication(Path src) throws IOException {
1205        return getFileStatus(src).getReplication();
1206      }
1207    
1208      /**
1209       * Set replication for an existing file.
1210       * 
1211       * @param src file name
1212       * @param replication new replication
1213       * @throws IOException
1214       * @return true if successful;
1215       *         false if file does not exist or is a directory
1216       */
1217      public boolean setReplication(Path src, short replication)
1218        throws IOException {
1219        return true;
1220      }
1221    
1222      /**
1223       * Renames Path src to Path dst.  Can take place on local fs
1224       * or remote DFS.
1225       * @param src path to be renamed
1226       * @param dst new path after rename
1227       * @throws IOException on failure
1228       * @return true if rename is successful
1229       */
1230      public abstract boolean rename(Path src, Path dst) throws IOException;
1231    
1232      /**
1233       * Renames Path src to Path dst
1234       * <ul>
1235       * <li
1236       * <li>Fails if src is a file and dst is a directory.
1237       * <li>Fails if src is a directory and dst is a file.
1238       * <li>Fails if the parent of dst does not exist or is a file.
1239       * </ul>
1240       * <p>
1241       * If OVERWRITE option is not passed as an argument, rename fails
1242       * if the dst already exists.
1243       * <p>
1244       * If OVERWRITE option is passed as an argument, rename overwrites
1245       * the dst if it is a file or an empty directory. Rename fails if dst is
1246       * a non-empty directory.
1247       * <p>
1248       * Note that atomicity of rename is dependent on the file system
1249       * implementation. Please refer to the file system documentation for
1250       * details. This default implementation is non atomic.
1251       * <p>
1252       * This method is deprecated since it is a temporary method added to 
1253       * support the transition from FileSystem to FileContext for user 
1254       * applications.
1255       * 
1256       * @param src path to be renamed
1257       * @param dst new path after rename
1258       * @throws IOException on failure
1259       */
1260      @Deprecated
1261      protected void rename(final Path src, final Path dst,
1262          final Rename... options) throws IOException {
1263        // Default implementation
1264        final FileStatus srcStatus = getFileLinkStatus(src);
1265        if (srcStatus == null) {
1266          throw new FileNotFoundException("rename source " + src + " not found.");
1267        }
1268    
1269        boolean overwrite = false;
1270        if (null != options) {
1271          for (Rename option : options) {
1272            if (option == Rename.OVERWRITE) {
1273              overwrite = true;
1274            }
1275          }
1276        }
1277    
1278        FileStatus dstStatus;
1279        try {
1280          dstStatus = getFileLinkStatus(dst);
1281        } catch (IOException e) {
1282          dstStatus = null;
1283        }
1284        if (dstStatus != null) {
1285          if (srcStatus.isDirectory() != dstStatus.isDirectory()) {
1286            throw new IOException("Source " + src + " Destination " + dst
1287                + " both should be either file or directory");
1288          }
1289          if (!overwrite) {
1290            throw new FileAlreadyExistsException("rename destination " + dst
1291                + " already exists.");
1292          }
1293          // Delete the destination that is a file or an empty directory
1294          if (dstStatus.isDirectory()) {
1295            FileStatus[] list = listStatus(dst);
1296            if (list != null && list.length != 0) {
1297              throw new IOException(
1298                  "rename cannot overwrite non empty destination directory " + dst);
1299            }
1300          }
1301          delete(dst, false);
1302        } else {
1303          final Path parent = dst.getParent();
1304          final FileStatus parentStatus = getFileStatus(parent);
1305          if (parentStatus == null) {
1306            throw new FileNotFoundException("rename destination parent " + parent
1307                + " not found.");
1308          }
1309          if (!parentStatus.isDirectory()) {
1310            throw new ParentNotDirectoryException("rename destination parent " + parent
1311                + " is a file.");
1312          }
1313        }
1314        if (!rename(src, dst)) {
1315          throw new IOException("rename from " + src + " to " + dst + " failed.");
1316        }
1317      }
1318      
1319      /**
1320       * Delete a file 
1321       * @deprecated Use {@link #delete(Path, boolean)} instead.
1322       */
1323      @Deprecated
1324      public boolean delete(Path f) throws IOException {
1325        return delete(f, true);
1326      }
1327      
1328      /** Delete a file.
1329       *
1330       * @param f the path to delete.
1331       * @param recursive if path is a directory and set to 
1332       * true, the directory is deleted else throws an exception. In
1333       * case of a file the recursive can be set to either true or false. 
1334       * @return  true if delete is successful else false. 
1335       * @throws IOException
1336       */
1337      public abstract boolean delete(Path f, boolean recursive) throws IOException;
1338    
1339      /**
1340       * Mark a path to be deleted when FileSystem is closed.
1341       * When the JVM shuts down,
1342       * all FileSystem objects will be closed automatically.
1343       * Then,
1344       * the marked path will be deleted as a result of closing the FileSystem.
1345       *
1346       * The path has to exist in the file system.
1347       * 
1348       * @param f the path to delete.
1349       * @return  true if deleteOnExit is successful, otherwise false.
1350       * @throws IOException
1351       */
1352      public boolean deleteOnExit(Path f) throws IOException {
1353        if (!exists(f)) {
1354          return false;
1355        }
1356        synchronized (deleteOnExit) {
1357          deleteOnExit.add(f);
1358        }
1359        return true;
1360      }
1361      
1362      /**
1363       * Cancel the deletion of the path when the FileSystem is closed
1364       * @param f the path to cancel deletion
1365       */
1366      public boolean cancelDeleteOnExit(Path f) {
1367        synchronized (deleteOnExit) {
1368          return deleteOnExit.remove(f);
1369        }
1370      }
1371    
1372      /**
1373       * Delete all files that were marked as delete-on-exit. This recursively
1374       * deletes all files in the specified paths.
1375       */
1376      protected void processDeleteOnExit() {
1377        synchronized (deleteOnExit) {
1378          for (Iterator<Path> iter = deleteOnExit.iterator(); iter.hasNext();) {
1379            Path path = iter.next();
1380            try {
1381              if (exists(path)) {
1382                delete(path, true);
1383              }
1384            }
1385            catch (IOException e) {
1386              LOG.info("Ignoring failure to deleteOnExit for path " + path);
1387            }
1388            iter.remove();
1389          }
1390        }
1391      }
1392      
1393      /** Check if exists.
1394       * @param f source file
1395       */
1396      public boolean exists(Path f) throws IOException {
1397        try {
1398          return getFileStatus(f) != null;
1399        } catch (FileNotFoundException e) {
1400          return false;
1401        }
1402      }
1403    
1404      /** True iff the named path is a directory.
1405       * Note: Avoid using this method. Instead reuse the FileStatus 
1406       * returned by getFileStatus() or listStatus() methods.
1407       * @param f path to check
1408       */
1409      public boolean isDirectory(Path f) throws IOException {
1410        try {
1411          return getFileStatus(f).isDirectory();
1412        } catch (FileNotFoundException e) {
1413          return false;               // f does not exist
1414        }
1415      }
1416    
1417      /** True iff the named path is a regular file.
1418       * Note: Avoid using this method. Instead reuse the FileStatus 
1419       * returned by getFileStatus() or listStatus() methods.
1420       * @param f path to check
1421       */
1422      public boolean isFile(Path f) throws IOException {
1423        try {
1424          return getFileStatus(f).isFile();
1425        } catch (FileNotFoundException e) {
1426          return false;               // f does not exist
1427        }
1428      }
1429      
1430      /** The number of bytes in a file. */
1431      /** @deprecated Use getFileStatus() instead */
1432      @Deprecated
1433      public long getLength(Path f) throws IOException {
1434        return getFileStatus(f).getLen();
1435      }
1436        
1437      /** Return the {@link ContentSummary} of a given {@link Path}.
1438      * @param f path to use
1439      */
1440      public ContentSummary getContentSummary(Path f) throws IOException {
1441        FileStatus status = getFileStatus(f);
1442        if (status.isFile()) {
1443          // f is a file
1444          return new ContentSummary(status.getLen(), 1, 0);
1445        }
1446        // f is a directory
1447        long[] summary = {0, 0, 1};
1448        for(FileStatus s : listStatus(f)) {
1449          ContentSummary c = s.isDirectory() ? getContentSummary(s.getPath()) :
1450                                         new ContentSummary(s.getLen(), 1, 0);
1451          summary[0] += c.getLength();
1452          summary[1] += c.getFileCount();
1453          summary[2] += c.getDirectoryCount();
1454        }
1455        return new ContentSummary(summary[0], summary[1], summary[2]);
1456      }
1457    
1458      final private static PathFilter DEFAULT_FILTER = new PathFilter() {
1459          @Override
1460          public boolean accept(Path file) {
1461            return true;
1462          }     
1463        };
1464        
1465      /**
1466       * List the statuses of the files/directories in the given path if the path is
1467       * a directory.
1468       * 
1469       * @param f given path
1470       * @return the statuses of the files/directories in the given patch
1471       * @throws FileNotFoundException when the path does not exist;
1472       *         IOException see specific implementation
1473       */
1474      public abstract FileStatus[] listStatus(Path f) throws FileNotFoundException, 
1475                                                             IOException;
1476        
1477      /*
1478       * Filter files/directories in the given path using the user-supplied path
1479       * filter. Results are added to the given array <code>results</code>.
1480       */
1481      private void listStatus(ArrayList<FileStatus> results, Path f,
1482          PathFilter filter) throws FileNotFoundException, IOException {
1483        FileStatus listing[] = listStatus(f);
1484        if (listing == null) {
1485          throw new IOException("Error accessing " + f);
1486        }
1487    
1488        for (int i = 0; i < listing.length; i++) {
1489          if (filter.accept(listing[i].getPath())) {
1490            results.add(listing[i]);
1491          }
1492        }
1493      }
1494    
1495      /**
1496       * @return an iterator over the corrupt files under the given path
1497       * (may contain duplicates if a file has more than one corrupt block)
1498       * @throws IOException
1499       */
1500      public RemoteIterator<Path> listCorruptFileBlocks(Path path)
1501        throws IOException {
1502        throw new UnsupportedOperationException(getClass().getCanonicalName() +
1503                                                " does not support" +
1504                                                " listCorruptFileBlocks");
1505      }
1506    
1507      /**
1508       * Filter files/directories in the given path using the user-supplied path
1509       * filter.
1510       * 
1511       * @param f
1512       *          a path name
1513       * @param filter
1514       *          the user-supplied path filter
1515       * @return an array of FileStatus objects for the files under the given path
1516       *         after applying the filter
1517       * @throws FileNotFoundException when the path does not exist;
1518       *         IOException see specific implementation   
1519       */
1520      public FileStatus[] listStatus(Path f, PathFilter filter) 
1521                                       throws FileNotFoundException, IOException {
1522        ArrayList<FileStatus> results = new ArrayList<FileStatus>();
1523        listStatus(results, f, filter);
1524        return results.toArray(new FileStatus[results.size()]);
1525      }
1526    
1527      /**
1528       * Filter files/directories in the given list of paths using default
1529       * path filter.
1530       * 
1531       * @param files
1532       *          a list of paths
1533       * @return a list of statuses for the files under the given paths after
1534       *         applying the filter default Path filter
1535       * @throws FileNotFoundException when the path does not exist;
1536       *         IOException see specific implementation
1537       */
1538      public FileStatus[] listStatus(Path[] files)
1539          throws FileNotFoundException, IOException {
1540        return listStatus(files, DEFAULT_FILTER);
1541      }
1542    
1543      /**
1544       * Filter files/directories in the given list of paths using user-supplied
1545       * path filter.
1546       * 
1547       * @param files
1548       *          a list of paths
1549       * @param filter
1550       *          the user-supplied path filter
1551       * @return a list of statuses for the files under the given paths after
1552       *         applying the filter
1553       * @throws FileNotFoundException when the path does not exist;
1554       *         IOException see specific implementation
1555       */
1556      public FileStatus[] listStatus(Path[] files, PathFilter filter)
1557          throws FileNotFoundException, IOException {
1558        ArrayList<FileStatus> results = new ArrayList<FileStatus>();
1559        for (int i = 0; i < files.length; i++) {
1560          listStatus(results, files[i], filter);
1561        }
1562        return results.toArray(new FileStatus[results.size()]);
1563      }
1564    
1565      /**
1566       * <p>Return all the files that match filePattern and are not checksum
1567       * files. Results are sorted by their names.
1568       * 
1569       * <p>
1570       * A filename pattern is composed of <i>regular</i> characters and
1571       * <i>special pattern matching</i> characters, which are:
1572       *
1573       * <dl>
1574       *  <dd>
1575       *   <dl>
1576       *    <p>
1577       *    <dt> <tt> ? </tt>
1578       *    <dd> Matches any single character.
1579       *
1580       *    <p>
1581       *    <dt> <tt> * </tt>
1582       *    <dd> Matches zero or more characters.
1583       *
1584       *    <p>
1585       *    <dt> <tt> [<i>abc</i>] </tt>
1586       *    <dd> Matches a single character from character set
1587       *     <tt>{<i>a,b,c</i>}</tt>.
1588       *
1589       *    <p>
1590       *    <dt> <tt> [<i>a</i>-<i>b</i>] </tt>
1591       *    <dd> Matches a single character from the character range
1592       *     <tt>{<i>a...b</i>}</tt>.  Note that character <tt><i>a</i></tt> must be
1593       *     lexicographically less than or equal to character <tt><i>b</i></tt>.
1594       *
1595       *    <p>
1596       *    <dt> <tt> [^<i>a</i>] </tt>
1597       *    <dd> Matches a single character that is not from character set or range
1598       *     <tt>{<i>a</i>}</tt>.  Note that the <tt>^</tt> character must occur
1599       *     immediately to the right of the opening bracket.
1600       *
1601       *    <p>
1602       *    <dt> <tt> \<i>c</i> </tt>
1603       *    <dd> Removes (escapes) any special meaning of character <i>c</i>.
1604       *
1605       *    <p>
1606       *    <dt> <tt> {ab,cd} </tt>
1607       *    <dd> Matches a string from the string set <tt>{<i>ab, cd</i>} </tt>
1608       *    
1609       *    <p>
1610       *    <dt> <tt> {ab,c{de,fh}} </tt>
1611       *    <dd> Matches a string from the string set <tt>{<i>ab, cde, cfh</i>}</tt>
1612       *
1613       *   </dl>
1614       *  </dd>
1615       * </dl>
1616       *
1617       * @param pathPattern a regular expression specifying a pth pattern
1618    
1619       * @return an array of paths that match the path pattern
1620       * @throws IOException
1621       */
1622      public FileStatus[] globStatus(Path pathPattern) throws IOException {
1623        return new Globber(this, pathPattern, DEFAULT_FILTER).glob();
1624      }
1625      
1626      /**
1627       * Return an array of FileStatus objects whose path names match pathPattern
1628       * and is accepted by the user-supplied path filter. Results are sorted by
1629       * their path names.
1630       * Return null if pathPattern has no glob and the path does not exist.
1631       * Return an empty array if pathPattern has a glob and no path matches it. 
1632       * 
1633       * @param pathPattern
1634       *          a regular expression specifying the path pattern
1635       * @param filter
1636       *          a user-supplied path filter
1637       * @return an array of FileStatus objects
1638       * @throws IOException if any I/O error occurs when fetching file status
1639       */
1640      public FileStatus[] globStatus(Path pathPattern, PathFilter filter)
1641          throws IOException {
1642        return new Globber(this, pathPattern, filter).glob();
1643      }
1644      
1645      /**
1646       * List the statuses of the files/directories in the given path if the path is
1647       * a directory. 
1648       * Return the file's status and block locations If the path is a file.
1649       * 
1650       * If a returned status is a file, it contains the file's block locations.
1651       * 
1652       * @param f is the path
1653       *
1654       * @return an iterator that traverses statuses of the files/directories 
1655       *         in the given path
1656       *
1657       * @throws FileNotFoundException If <code>f</code> does not exist
1658       * @throws IOException If an I/O error occurred
1659       */
1660      public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f)
1661      throws FileNotFoundException, IOException {
1662        return listLocatedStatus(f, DEFAULT_FILTER);
1663      }
1664    
1665      /**
1666       * Listing a directory
1667       * The returned results include its block location if it is a file
1668       * The results are filtered by the given path filter
1669       * @param f a path
1670       * @param filter a path filter
1671       * @return an iterator that traverses statuses of the files/directories 
1672       *         in the given path
1673       * @throws FileNotFoundException if <code>f</code> does not exist
1674       * @throws IOException if any I/O error occurred
1675       */
1676      protected RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f,
1677          final PathFilter filter)
1678      throws FileNotFoundException, IOException {
1679        return new RemoteIterator<LocatedFileStatus>() {
1680          private final FileStatus[] stats = listStatus(f, filter);
1681          private int i = 0;
1682    
1683          @Override
1684          public boolean hasNext() {
1685            return i<stats.length;
1686          }
1687    
1688          @Override
1689          public LocatedFileStatus next() throws IOException {
1690            if (!hasNext()) {
1691              throw new NoSuchElementException("No more entry in " + f);
1692            }
1693            FileStatus result = stats[i++];
1694            BlockLocation[] locs = result.isFile() ?
1695                getFileBlockLocations(result.getPath(), 0, result.getLen()) :
1696                null;
1697            return new LocatedFileStatus(result, locs);
1698          }
1699        };
1700      }
1701    
1702      /**
1703       * List the statuses and block locations of the files in the given path.
1704       * 
1705       * If the path is a directory, 
1706       *   if recursive is false, returns files in the directory;
1707       *   if recursive is true, return files in the subtree rooted at the path.
1708       * If the path is a file, return the file's status and block locations.
1709       * 
1710       * @param f is the path
1711       * @param recursive if the subdirectories need to be traversed recursively
1712       *
1713       * @return an iterator that traverses statuses of the files
1714       *
1715       * @throws FileNotFoundException when the path does not exist;
1716       *         IOException see specific implementation
1717       */
1718      public RemoteIterator<LocatedFileStatus> listFiles(
1719          final Path f, final boolean recursive)
1720      throws FileNotFoundException, IOException {
1721        return new RemoteIterator<LocatedFileStatus>() {
1722          private Stack<RemoteIterator<LocatedFileStatus>> itors = 
1723            new Stack<RemoteIterator<LocatedFileStatus>>();
1724          private RemoteIterator<LocatedFileStatus> curItor =
1725            listLocatedStatus(f);
1726          private LocatedFileStatus curFile;
1727         
1728          @Override
1729          public boolean hasNext() throws IOException {
1730            while (curFile == null) {
1731              if (curItor.hasNext()) {
1732                handleFileStat(curItor.next());
1733              } else if (!itors.empty()) {
1734                curItor = itors.pop();
1735              } else {
1736                return false;
1737              }
1738            }
1739            return true;
1740          }
1741    
1742          /**
1743           * Process the input stat.
1744           * If it is a file, return the file stat.
1745           * If it is a directory, traverse the directory if recursive is true;
1746           * ignore it if recursive is false.
1747           * @param stat input status
1748           * @throws IOException if any IO error occurs
1749           */
1750          private void handleFileStat(LocatedFileStatus stat) throws IOException {
1751            if (stat.isFile()) { // file
1752              curFile = stat;
1753            } else if (recursive) { // directory
1754              itors.push(curItor);
1755              curItor = listLocatedStatus(stat.getPath());
1756            }
1757          }
1758    
1759          @Override
1760          public LocatedFileStatus next() throws IOException {
1761            if (hasNext()) {
1762              LocatedFileStatus result = curFile;
1763              curFile = null;
1764              return result;
1765            } 
1766            throw new java.util.NoSuchElementException("No more entry in " + f);
1767          }
1768        };
1769      }
1770      
1771      /** Return the current user's home directory in this filesystem.
1772       * The default implementation returns "/user/$USER/".
1773       */
1774      public Path getHomeDirectory() {
1775        return this.makeQualified(
1776            new Path("/user/"+System.getProperty("user.name")));
1777      }
1778    
1779    
1780      /**
1781       * Set the current working directory for the given file system. All relative
1782       * paths will be resolved relative to it.
1783       * 
1784       * @param new_dir
1785       */
1786      public abstract void setWorkingDirectory(Path new_dir);
1787        
1788      /**
1789       * Get the current working directory for the given file system
1790       * @return the directory pathname
1791       */
1792      public abstract Path getWorkingDirectory();
1793      
1794      
1795      /**
1796       * Note: with the new FilesContext class, getWorkingDirectory()
1797       * will be removed. 
1798       * The working directory is implemented in FilesContext.
1799       * 
1800       * Some file systems like LocalFileSystem have an initial workingDir
1801       * that we use as the starting workingDir. For other file systems
1802       * like HDFS there is no built in notion of an initial workingDir.
1803       * 
1804       * @return if there is built in notion of workingDir then it
1805       * is returned; else a null is returned.
1806       */
1807      protected Path getInitialWorkingDirectory() {
1808        return null;
1809      }
1810    
1811      /**
1812       * Call {@link #mkdirs(Path, FsPermission)} with default permission.
1813       */
1814      public boolean mkdirs(Path f) throws IOException {
1815        return mkdirs(f, FsPermission.getDirDefault());
1816      }
1817    
1818      /**
1819       * Make the given file and all non-existent parents into
1820       * directories. Has the semantics of Unix 'mkdir -p'.
1821       * Existence of the directory hierarchy is not an error.
1822       * @param f path to create
1823       * @param permission to apply to f
1824       */
1825      public abstract boolean mkdirs(Path f, FsPermission permission
1826          ) throws IOException;
1827    
1828      /**
1829       * The src file is on the local disk.  Add it to FS at
1830       * the given dst name and the source is kept intact afterwards
1831       * @param src path
1832       * @param dst path
1833       */
1834      public void copyFromLocalFile(Path src, Path dst)
1835        throws IOException {
1836        copyFromLocalFile(false, src, dst);
1837      }
1838    
1839      /**
1840       * The src files is on the local disk.  Add it to FS at
1841       * the given dst name, removing the source afterwards.
1842       * @param srcs path
1843       * @param dst path
1844       */
1845      public void moveFromLocalFile(Path[] srcs, Path dst)
1846        throws IOException {
1847        copyFromLocalFile(true, true, srcs, dst);
1848      }
1849    
1850      /**
1851       * The src file is on the local disk.  Add it to FS at
1852       * the given dst name, removing the source afterwards.
1853       * @param src path
1854       * @param dst path
1855       */
1856      public void moveFromLocalFile(Path src, Path dst)
1857        throws IOException {
1858        copyFromLocalFile(true, src, dst);
1859      }
1860    
1861      /**
1862       * The src file is on the local disk.  Add it to FS at
1863       * the given dst name.
1864       * delSrc indicates if the source should be removed
1865       * @param delSrc whether to delete the src
1866       * @param src path
1867       * @param dst path
1868       */
1869      public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
1870        throws IOException {
1871        copyFromLocalFile(delSrc, true, src, dst);
1872      }
1873      
1874      /**
1875       * The src files are on the local disk.  Add it to FS at
1876       * the given dst name.
1877       * delSrc indicates if the source should be removed
1878       * @param delSrc whether to delete the src
1879       * @param overwrite whether to overwrite an existing file
1880       * @param srcs array of paths which are source
1881       * @param dst path
1882       */
1883      public void copyFromLocalFile(boolean delSrc, boolean overwrite, 
1884                                    Path[] srcs, Path dst)
1885        throws IOException {
1886        Configuration conf = getConf();
1887        FileUtil.copy(getLocal(conf), srcs, this, dst, delSrc, overwrite, conf);
1888      }
1889      
1890      /**
1891       * The src file is on the local disk.  Add it to FS at
1892       * the given dst name.
1893       * delSrc indicates if the source should be removed
1894       * @param delSrc whether to delete the src
1895       * @param overwrite whether to overwrite an existing file
1896       * @param src path
1897       * @param dst path
1898       */
1899      public void copyFromLocalFile(boolean delSrc, boolean overwrite, 
1900                                    Path src, Path dst)
1901        throws IOException {
1902        Configuration conf = getConf();
1903        FileUtil.copy(getLocal(conf), src, this, dst, delSrc, overwrite, conf);
1904      }
1905        
1906      /**
1907       * The src file is under FS, and the dst is on the local disk.
1908       * Copy it from FS control to the local dst name.
1909       * @param src path
1910       * @param dst path
1911       */
1912      public void copyToLocalFile(Path src, Path dst) throws IOException {
1913        copyToLocalFile(false, src, dst);
1914      }
1915        
1916      /**
1917       * The src file is under FS, and the dst is on the local disk.
1918       * Copy it from FS control to the local dst name.
1919       * Remove the source afterwards
1920       * @param src path
1921       * @param dst path
1922       */
1923      public void moveToLocalFile(Path src, Path dst) throws IOException {
1924        copyToLocalFile(true, src, dst);
1925      }
1926    
1927      /**
1928       * The src file is under FS, and the dst is on the local disk.
1929       * Copy it from FS control to the local dst name.
1930       * delSrc indicates if the src will be removed or not.
1931       * @param delSrc whether to delete the src
1932       * @param src path
1933       * @param dst path
1934       */   
1935      public void copyToLocalFile(boolean delSrc, Path src, Path dst)
1936        throws IOException {
1937        copyToLocalFile(delSrc, src, dst, false);
1938      }
1939      
1940        /**
1941       * The src file is under FS, and the dst is on the local disk. Copy it from FS
1942       * control to the local dst name. delSrc indicates if the src will be removed
1943       * or not. useRawLocalFileSystem indicates whether to use RawLocalFileSystem
1944       * as local file system or not. RawLocalFileSystem is non crc file system.So,
1945       * It will not create any crc files at local.
1946       * 
1947       * @param delSrc
1948       *          whether to delete the src
1949       * @param src
1950       *          path
1951       * @param dst
1952       *          path
1953       * @param useRawLocalFileSystem
1954       *          whether to use RawLocalFileSystem as local file system or not.
1955       * 
1956       * @throws IOException
1957       *           - if any IO error
1958       */
1959      public void copyToLocalFile(boolean delSrc, Path src, Path dst,
1960          boolean useRawLocalFileSystem) throws IOException {
1961        Configuration conf = getConf();
1962        FileSystem local = null;
1963        if (useRawLocalFileSystem) {
1964          local = getLocal(conf).getRawFileSystem();
1965        } else {
1966          local = getLocal(conf);
1967        }
1968        FileUtil.copy(this, src, local, dst, delSrc, conf);
1969      }
1970    
1971      /**
1972       * Returns a local File that the user can write output to.  The caller
1973       * provides both the eventual FS target name and the local working
1974       * file.  If the FS is local, we write directly into the target.  If
1975       * the FS is remote, we write into the tmp local area.
1976       * @param fsOutputFile path of output file
1977       * @param tmpLocalFile path of local tmp file
1978       */
1979      public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
1980        throws IOException {
1981        return tmpLocalFile;
1982      }
1983    
1984      /**
1985       * Called when we're all done writing to the target.  A local FS will
1986       * do nothing, because we've written to exactly the right place.  A remote
1987       * FS will copy the contents of tmpLocalFile to the correct target at
1988       * fsOutputFile.
1989       * @param fsOutputFile path of output file
1990       * @param tmpLocalFile path to local tmp file
1991       */
1992      public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
1993        throws IOException {
1994        moveFromLocalFile(tmpLocalFile, fsOutputFile);
1995      }
1996    
1997      /**
1998       * No more filesystem operations are needed.  Will
1999       * release any held locks.
2000       */
2001      @Override
2002      public void close() throws IOException {
2003        // delete all files that were marked as delete-on-exit.
2004        processDeleteOnExit();
2005        CACHE.remove(this.key, this);
2006      }
2007    
2008      /** Return the total size of all files in the filesystem.*/
2009      public long getUsed() throws IOException{
2010        long used = 0;
2011        FileStatus[] files = listStatus(new Path("/"));
2012        for(FileStatus file:files){
2013          used += file.getLen();
2014        }
2015        return used;
2016      }
2017      
2018      /**
2019       * Get the block size for a particular file.
2020       * @param f the filename
2021       * @return the number of bytes in a block
2022       */
2023      /** @deprecated Use getFileStatus() instead */
2024      @Deprecated
2025      public long getBlockSize(Path f) throws IOException {
2026        return getFileStatus(f).getBlockSize();
2027      }
2028    
2029      /**
2030       * Return the number of bytes that large input files should be optimally
2031       * be split into to minimize i/o time.
2032       * @deprecated use {@link #getDefaultBlockSize(Path)} instead
2033       */
2034      @Deprecated
2035      public long getDefaultBlockSize() {
2036        // default to 32MB: large enough to minimize the impact of seeks
2037        return getConf().getLong("fs.local.block.size", 32 * 1024 * 1024);
2038      }
2039        
2040      /** Return the number of bytes that large input files should be optimally
2041       * be split into to minimize i/o time.  The given path will be used to
2042       * locate the actual filesystem.  The full path does not have to exist.
2043       * @param f path of file
2044       * @return the default block size for the path's filesystem
2045       */
2046      public long getDefaultBlockSize(Path f) {
2047        return getDefaultBlockSize();
2048      }
2049    
2050      /**
2051       * Get the default replication.
2052       * @deprecated use {@link #getDefaultReplication(Path)} instead
2053       */
2054      @Deprecated
2055      public short getDefaultReplication() { return 1; }
2056    
2057      /**
2058       * Get the default replication for a path.   The given path will be used to
2059       * locate the actual filesystem.  The full path does not have to exist.
2060       * @param path of the file
2061       * @return default replication for the path's filesystem 
2062       */
2063      public short getDefaultReplication(Path path) {
2064        return getDefaultReplication();
2065      }
2066      
2067      /**
2068       * Return a file status object that represents the path.
2069       * @param f The path we want information from
2070       * @return a FileStatus object
2071       * @throws FileNotFoundException when the path does not exist;
2072       *         IOException see specific implementation
2073       */
2074      public abstract FileStatus getFileStatus(Path f) throws IOException;
2075    
2076      /**
2077       * See {@link FileContext#fixRelativePart}
2078       */
2079      protected Path fixRelativePart(Path p) {
2080        if (p.isUriPathAbsolute()) {
2081          return p;
2082        } else {
2083          return new Path(getWorkingDirectory(), p);
2084        }
2085      }
2086    
2087      /**
2088       * See {@link FileContext#createSymlink(Path, Path, boolean)}
2089       */
2090      public void createSymlink(final Path target, final Path link,
2091          final boolean createParent) throws AccessControlException,
2092          FileAlreadyExistsException, FileNotFoundException,
2093          ParentNotDirectoryException, UnsupportedFileSystemException, 
2094          IOException {
2095        // Supporting filesystems should override this method
2096        throw new UnsupportedOperationException(
2097            "Filesystem does not support symlinks!");
2098      }
2099    
2100      /**
2101       * See {@link FileContext#getFileLinkStatus(Path)}
2102       */
2103      public FileStatus getFileLinkStatus(final Path f)
2104          throws AccessControlException, FileNotFoundException,
2105          UnsupportedFileSystemException, IOException {
2106        // Supporting filesystems should override this method
2107        return getFileStatus(f);
2108      }
2109    
2110      /**
2111       * See {@link AbstractFileSystem#supportsSymlinks()}
2112       */
2113      public boolean supportsSymlinks() {
2114        return false;
2115      }
2116    
2117      /**
2118       * See {@link FileContext#getLinkTarget(Path)}
2119       */
2120      public Path getLinkTarget(Path f) throws IOException {
2121        // Supporting filesystems should override this method
2122        throw new UnsupportedOperationException(
2123            "Filesystem does not support symlinks!");
2124      }
2125    
2126      /**
2127       * See {@link AbstractFileSystem#getLinkTarget(Path)}
2128       */
2129      protected Path resolveLink(Path f) throws IOException {
2130        // Supporting filesystems should override this method
2131        throw new UnsupportedOperationException(
2132            "Filesystem does not support symlinks!");
2133      }
2134    
2135      /**
2136       * Get the checksum of a file.
2137       *
2138       * @param f The file path
2139       * @return The file checksum.  The default return value is null,
2140       *  which indicates that no checksum algorithm is implemented
2141       *  in the corresponding FileSystem.
2142       */
2143      public FileChecksum getFileChecksum(Path f) throws IOException {
2144        return null;
2145      }
2146      
2147      /**
2148       * Set the verify checksum flag. This is only applicable if the 
2149       * corresponding FileSystem supports checksum. By default doesn't do anything.
2150       * @param verifyChecksum
2151       */
2152      public void setVerifyChecksum(boolean verifyChecksum) {
2153        //doesn't do anything
2154      }
2155    
2156      /**
2157       * Set the write checksum flag. This is only applicable if the 
2158       * corresponding FileSystem supports checksum. By default doesn't do anything.
2159       * @param writeChecksum
2160       */
2161      public void setWriteChecksum(boolean writeChecksum) {
2162        //doesn't do anything
2163      }
2164    
2165      /**
2166       * Returns a status object describing the use and capacity of the
2167       * file system. If the file system has multiple partitions, the
2168       * use and capacity of the root partition is reflected.
2169       * 
2170       * @return a FsStatus object
2171       * @throws IOException
2172       *           see specific implementation
2173       */
2174      public FsStatus getStatus() throws IOException {
2175        return getStatus(null);
2176      }
2177    
2178      /**
2179       * Returns a status object describing the use and capacity of the
2180       * file system. If the file system has multiple partitions, the
2181       * use and capacity of the partition pointed to by the specified
2182       * path is reflected.
2183       * @param p Path for which status should be obtained. null means
2184       * the default partition. 
2185       * @return a FsStatus object
2186       * @throws IOException
2187       *           see specific implementation
2188       */
2189      public FsStatus getStatus(Path p) throws IOException {
2190        return new FsStatus(Long.MAX_VALUE, 0, Long.MAX_VALUE);
2191      }
2192    
2193      /**
2194       * Set permission of a path.
2195       * @param p
2196       * @param permission
2197       */
2198      public void setPermission(Path p, FsPermission permission
2199          ) throws IOException {
2200      }
2201    
2202      /**
2203       * Set owner of a path (i.e. a file or a directory).
2204       * The parameters username and groupname cannot both be null.
2205       * @param p The path
2206       * @param username If it is null, the original username remains unchanged.
2207       * @param groupname If it is null, the original groupname remains unchanged.
2208       */
2209      public void setOwner(Path p, String username, String groupname
2210          ) throws IOException {
2211      }
2212    
2213      /**
2214       * Set access time of a file
2215       * @param p The path
2216       * @param mtime Set the modification time of this file.
2217       *              The number of milliseconds since Jan 1, 1970. 
2218       *              A value of -1 means that this call should not set modification time.
2219       * @param atime Set the access time of this file.
2220       *              The number of milliseconds since Jan 1, 1970. 
2221       *              A value of -1 means that this call should not set access time.
2222       */
2223      public void setTimes(Path p, long mtime, long atime
2224          ) throws IOException {
2225      }
2226    
2227      /**
2228       * Create a snapshot with a default name.
2229       * @param path The directory where snapshots will be taken.
2230       * @return the snapshot path.
2231       */
2232      public final Path createSnapshot(Path path) throws IOException {
2233        return createSnapshot(path, null);
2234      }
2235    
2236      /**
2237       * Create a snapshot
2238       * @param path The directory where snapshots will be taken.
2239       * @param snapshotName The name of the snapshot
2240       * @return the snapshot path.
2241       */
2242      public Path createSnapshot(Path path, String snapshotName)
2243          throws IOException {
2244        throw new UnsupportedOperationException(getClass().getSimpleName()
2245            + " doesn't support createSnapshot");
2246      }
2247      
2248      /**
2249       * Rename a snapshot
2250       * @param path The directory path where the snapshot was taken
2251       * @param snapshotOldName Old name of the snapshot
2252       * @param snapshotNewName New name of the snapshot
2253       * @throws IOException
2254       */
2255      public void renameSnapshot(Path path, String snapshotOldName,
2256          String snapshotNewName) throws IOException {
2257        throw new UnsupportedOperationException(getClass().getSimpleName()
2258            + " doesn't support renameSnapshot");
2259      }
2260      
2261      /**
2262       * Delete a snapshot of a directory
2263       * @param path  The directory that the to-be-deleted snapshot belongs to
2264       * @param snapshotName The name of the snapshot
2265       */
2266      public void deleteSnapshot(Path path, String snapshotName)
2267          throws IOException {
2268        throw new UnsupportedOperationException(getClass().getSimpleName()
2269            + " doesn't support deleteSnapshot");
2270      }
2271      
2272      /**
2273       * Modifies ACL entries of files and directories.  This method can add new ACL
2274       * entries or modify the permissions on existing ACL entries.  All existing
2275       * ACL entries that are not specified in this call are retained without
2276       * changes.  (Modifications are merged into the current ACL.)
2277       *
2278       * @param path Path to modify
2279       * @param aclSpec List<AclEntry> describing modifications
2280       * @throws IOException if an ACL could not be modified
2281       */
2282      public void modifyAclEntries(Path path, List<AclEntry> aclSpec)
2283          throws IOException {
2284        throw new UnsupportedOperationException(getClass().getSimpleName()
2285            + " doesn't support modifyAclEntries");
2286      }
2287    
2288      /**
2289       * Removes ACL entries from files and directories.  Other ACL entries are
2290       * retained.
2291       *
2292       * @param path Path to modify
2293       * @param aclSpec List<AclEntry> describing entries to remove
2294       * @throws IOException if an ACL could not be modified
2295       */
2296      public void removeAclEntries(Path path, List<AclEntry> aclSpec)
2297          throws IOException {
2298        throw new UnsupportedOperationException(getClass().getSimpleName()
2299            + " doesn't support removeAclEntries");
2300      }
2301    
2302      /**
2303       * Removes all default ACL entries from files and directories.
2304       *
2305       * @param path Path to modify
2306       * @throws IOException if an ACL could not be modified
2307       */
2308      public void removeDefaultAcl(Path path)
2309          throws IOException {
2310        throw new UnsupportedOperationException(getClass().getSimpleName()
2311            + " doesn't support removeDefaultAcl");
2312      }
2313    
2314      /**
2315       * Removes all but the base ACL entries of files and directories.  The entries
2316       * for user, group, and others are retained for compatibility with permission
2317       * bits.
2318       *
2319       * @param path Path to modify
2320       * @throws IOException if an ACL could not be removed
2321       */
2322      public void removeAcl(Path path)
2323          throws IOException {
2324        throw new UnsupportedOperationException(getClass().getSimpleName()
2325            + " doesn't support removeAcl");
2326      }
2327    
2328      /**
2329       * Fully replaces ACL of files and directories, discarding all existing
2330       * entries.
2331       *
2332       * @param path Path to modify
2333       * @param aclSpec List<AclEntry> describing modifications, must include entries
2334       *   for user, group, and others for compatibility with permission bits.
2335       * @throws IOException if an ACL could not be modified
2336       */
2337      public void setAcl(Path path, List<AclEntry> aclSpec) throws IOException {
2338        throw new UnsupportedOperationException(getClass().getSimpleName()
2339            + " doesn't support setAcl");
2340      }
2341    
2342      /**
2343       * Gets the ACL of a file or directory.
2344       *
2345       * @param path Path to get
2346       * @return AclStatus describing the ACL of the file or directory
2347       * @throws IOException if an ACL could not be read
2348       */
2349      public AclStatus getAclStatus(Path path) throws IOException {
2350        throw new UnsupportedOperationException(getClass().getSimpleName()
2351            + " doesn't support getAclStatus");
2352      }
2353    
2354      // making it volatile to be able to do a double checked locking
2355      private volatile static boolean FILE_SYSTEMS_LOADED = false;
2356    
2357      private static final Map<String, Class<? extends FileSystem>>
2358        SERVICE_FILE_SYSTEMS = new HashMap<String, Class<? extends FileSystem>>();
2359    
2360      private static void loadFileSystems() {
2361        synchronized (FileSystem.class) {
2362          if (!FILE_SYSTEMS_LOADED) {
2363            ServiceLoader<FileSystem> serviceLoader = ServiceLoader.load(FileSystem.class);
2364            for (FileSystem fs : serviceLoader) {
2365              SERVICE_FILE_SYSTEMS.put(fs.getScheme(), fs.getClass());
2366            }
2367            FILE_SYSTEMS_LOADED = true;
2368          }
2369        }
2370      }
2371    
2372      public static Class<? extends FileSystem> getFileSystemClass(String scheme,
2373          Configuration conf) throws IOException {
2374        if (!FILE_SYSTEMS_LOADED) {
2375          loadFileSystems();
2376        }
2377        Class<? extends FileSystem> clazz = null;
2378        if (conf != null) {
2379          clazz = (Class<? extends FileSystem>) conf.getClass("fs." + scheme + ".impl", null);
2380        }
2381        if (clazz == null) {
2382          clazz = SERVICE_FILE_SYSTEMS.get(scheme);
2383        }
2384        if (clazz == null) {
2385          throw new IOException("No FileSystem for scheme: " + scheme);
2386        }
2387        return clazz;
2388      }
2389    
2390      private static FileSystem createFileSystem(URI uri, Configuration conf
2391          ) throws IOException {
2392        Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);
2393        if (clazz == null) {
2394          throw new IOException("No FileSystem for scheme: " + uri.getScheme());
2395        }
2396        FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
2397        fs.initialize(uri, conf);
2398        return fs;
2399      }
2400    
2401      /** Caching FileSystem objects */
2402      static class Cache {
2403        private final ClientFinalizer clientFinalizer = new ClientFinalizer();
2404    
2405        private final Map<Key, FileSystem> map = new HashMap<Key, FileSystem>();
2406        private final Set<Key> toAutoClose = new HashSet<Key>();
2407    
2408        /** A variable that makes all objects in the cache unique */
2409        private static AtomicLong unique = new AtomicLong(1);
2410    
2411        FileSystem get(URI uri, Configuration conf) throws IOException{
2412          Key key = new Key(uri, conf);
2413          return getInternal(uri, conf, key);
2414        }
2415    
2416        /** The objects inserted into the cache using this method are all unique */
2417        FileSystem getUnique(URI uri, Configuration conf) throws IOException{
2418          Key key = new Key(uri, conf, unique.getAndIncrement());
2419          return getInternal(uri, conf, key);
2420        }
2421    
2422        private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{
2423          FileSystem fs;
2424          synchronized (this) {
2425            fs = map.get(key);
2426          }
2427          if (fs != null) {
2428            return fs;
2429          }
2430    
2431          fs = createFileSystem(uri, conf);
2432          synchronized (this) { // refetch the lock again
2433            FileSystem oldfs = map.get(key);
2434            if (oldfs != null) { // a file system is created while lock is releasing
2435              fs.close(); // close the new file system
2436              return oldfs;  // return the old file system
2437            }
2438            
2439            // now insert the new file system into the map
2440            if (map.isEmpty()
2441                    && !ShutdownHookManager.get().isShutdownInProgress()) {
2442              ShutdownHookManager.get().addShutdownHook(clientFinalizer, SHUTDOWN_HOOK_PRIORITY);
2443            }
2444            fs.key = key;
2445            map.put(key, fs);
2446            if (conf.getBoolean("fs.automatic.close", true)) {
2447              toAutoClose.add(key);
2448            }
2449            return fs;
2450          }
2451        }
2452    
2453        synchronized void remove(Key key, FileSystem fs) {
2454          if (map.containsKey(key) && fs == map.get(key)) {
2455            map.remove(key);
2456            toAutoClose.remove(key);
2457            }
2458        }
2459    
2460        synchronized void closeAll() throws IOException {
2461          closeAll(false);
2462        }
2463    
2464        /**
2465         * Close all FileSystem instances in the Cache.
2466         * @param onlyAutomatic only close those that are marked for automatic closing
2467         */
2468        synchronized void closeAll(boolean onlyAutomatic) throws IOException {
2469          List<IOException> exceptions = new ArrayList<IOException>();
2470    
2471          // Make a copy of the keys in the map since we'll be modifying
2472          // the map while iterating over it, which isn't safe.
2473          List<Key> keys = new ArrayList<Key>();
2474          keys.addAll(map.keySet());
2475    
2476          for (Key key : keys) {
2477            final FileSystem fs = map.get(key);
2478    
2479            if (onlyAutomatic && !toAutoClose.contains(key)) {
2480              continue;
2481            }
2482    
2483            //remove from cache
2484            remove(key, fs);
2485    
2486            if (fs != null) {
2487              try {
2488                fs.close();
2489              }
2490              catch(IOException ioe) {
2491                exceptions.add(ioe);
2492              }
2493            }
2494          }
2495    
2496          if (!exceptions.isEmpty()) {
2497            throw MultipleIOException.createIOException(exceptions);
2498          }
2499        }
2500    
2501        private class ClientFinalizer implements Runnable {
2502          @Override
2503          public synchronized void run() {
2504            try {
2505              closeAll(true);
2506            } catch (IOException e) {
2507              LOG.info("FileSystem.Cache.closeAll() threw an exception:\n" + e);
2508            }
2509          }
2510        }
2511    
2512        synchronized void closeAll(UserGroupInformation ugi) throws IOException {
2513          List<FileSystem> targetFSList = new ArrayList<FileSystem>();
2514          //Make a pass over the list and collect the filesystems to close
2515          //we cannot close inline since close() removes the entry from the Map
2516          for (Map.Entry<Key, FileSystem> entry : map.entrySet()) {
2517            final Key key = entry.getKey();
2518            final FileSystem fs = entry.getValue();
2519            if (ugi.equals(key.ugi) && fs != null) {
2520              targetFSList.add(fs);   
2521            }
2522          }
2523          List<IOException> exceptions = new ArrayList<IOException>();
2524          //now make a pass over the target list and close each
2525          for (FileSystem fs : targetFSList) {
2526            try {
2527              fs.close();
2528            }
2529            catch(IOException ioe) {
2530              exceptions.add(ioe);
2531            }
2532          }
2533          if (!exceptions.isEmpty()) {
2534            throw MultipleIOException.createIOException(exceptions);
2535          }
2536        }
2537    
2538        /** FileSystem.Cache.Key */
2539        static class Key {
2540          final String scheme;
2541          final String authority;
2542          final UserGroupInformation ugi;
2543          final long unique;   // an artificial way to make a key unique
2544    
2545          Key(URI uri, Configuration conf) throws IOException {
2546            this(uri, conf, 0);
2547          }
2548    
2549          Key(URI uri, Configuration conf, long unique) throws IOException {
2550            scheme = uri.getScheme()==null?"":uri.getScheme().toLowerCase();
2551            authority = uri.getAuthority()==null?"":uri.getAuthority().toLowerCase();
2552            this.unique = unique;
2553            
2554            this.ugi = UserGroupInformation.getCurrentUser();
2555          }
2556    
2557          @Override
2558          public int hashCode() {
2559            return (scheme + authority).hashCode() + ugi.hashCode() + (int)unique;
2560          }
2561    
2562          static boolean isEqual(Object a, Object b) {
2563            return a == b || (a != null && a.equals(b));        
2564          }
2565    
2566          @Override
2567          public boolean equals(Object obj) {
2568            if (obj == this) {
2569              return true;
2570            }
2571            if (obj != null && obj instanceof Key) {
2572              Key that = (Key)obj;
2573              return isEqual(this.scheme, that.scheme)
2574                     && isEqual(this.authority, that.authority)
2575                     && isEqual(this.ugi, that.ugi)
2576                     && (this.unique == that.unique);
2577            }
2578            return false;        
2579          }
2580    
2581          @Override
2582          public String toString() {
2583            return "("+ugi.toString() + ")@" + scheme + "://" + authority;        
2584          }
2585        }
2586      }
2587      
2588      /**
2589       * Tracks statistics about how many reads, writes, and so forth have been
2590       * done in a FileSystem.
2591       * 
2592       * Since there is only one of these objects per FileSystem, there will 
2593       * typically be many threads writing to this object.  Almost every operation
2594       * on an open file will involve a write to this object.  In contrast, reading
2595       * statistics is done infrequently by most programs, and not at all by others.
2596       * Hence, this is optimized for writes.
2597       * 
2598       * Each thread writes to its own thread-local area of memory.  This removes 
2599       * contention and allows us to scale up to many, many threads.  To read
2600       * statistics, the reader thread totals up the contents of all of the 
2601       * thread-local data areas.
2602       */
2603      public static final class Statistics {
2604        /**
2605         * Statistics data.
2606         * 
2607         * There is only a single writer to thread-local StatisticsData objects.
2608         * Hence, volatile is adequate here-- we do not need AtomicLong or similar
2609         * to prevent lost updates.
2610         * The Java specification guarantees that updates to volatile longs will
2611         * be perceived as atomic with respect to other threads, which is all we
2612         * need.
2613         */
2614        private static class StatisticsData {
2615          volatile long bytesRead;
2616          volatile long bytesWritten;
2617          volatile int readOps;
2618          volatile int largeReadOps;
2619          volatile int writeOps;
2620          /**
2621           * Stores a weak reference to the thread owning this StatisticsData.
2622           * This allows us to remove StatisticsData objects that pertain to
2623           * threads that no longer exist.
2624           */
2625          final WeakReference<Thread> owner;
2626    
2627          StatisticsData(WeakReference<Thread> owner) {
2628            this.owner = owner;
2629          }
2630    
2631          /**
2632           * Add another StatisticsData object to this one.
2633           */
2634          void add(StatisticsData other) {
2635            this.bytesRead += other.bytesRead;
2636            this.bytesWritten += other.bytesWritten;
2637            this.readOps += other.readOps;
2638            this.largeReadOps += other.largeReadOps;
2639            this.writeOps += other.writeOps;
2640          }
2641    
2642          /**
2643           * Negate the values of all statistics.
2644           */
2645          void negate() {
2646            this.bytesRead = -this.bytesRead;
2647            this.bytesWritten = -this.bytesWritten;
2648            this.readOps = -this.readOps;
2649            this.largeReadOps = -this.largeReadOps;
2650            this.writeOps = -this.writeOps;
2651          }
2652    
2653          @Override
2654          public String toString() {
2655            return bytesRead + " bytes read, " + bytesWritten + " bytes written, "
2656                + readOps + " read ops, " + largeReadOps + " large read ops, "
2657                + writeOps + " write ops";
2658          }
2659        }
2660    
2661        private interface StatisticsAggregator<T> {
2662          void accept(StatisticsData data);
2663          T aggregate();
2664        }
2665    
2666        private final String scheme;
2667    
2668        /**
2669         * rootData is data that doesn't belong to any thread, but will be added
2670         * to the totals.  This is useful for making copies of Statistics objects,
2671         * and for storing data that pertains to threads that have been garbage
2672         * collected.  Protected by the Statistics lock.
2673         */
2674        private final StatisticsData rootData;
2675    
2676        /**
2677         * Thread-local data.
2678         */
2679        private final ThreadLocal<StatisticsData> threadData;
2680        
2681        /**
2682         * List of all thread-local data areas.  Protected by the Statistics lock.
2683         */
2684        private LinkedList<StatisticsData> allData;
2685    
2686        public Statistics(String scheme) {
2687          this.scheme = scheme;
2688          this.rootData = new StatisticsData(null);
2689          this.threadData = new ThreadLocal<StatisticsData>();
2690          this.allData = null;
2691        }
2692    
2693        /**
2694         * Copy constructor.
2695         * 
2696         * @param other    The input Statistics object which is cloned.
2697         */
2698        public Statistics(Statistics other) {
2699          this.scheme = other.scheme;
2700          this.rootData = new StatisticsData(null);
2701          other.visitAll(new StatisticsAggregator<Void>() {
2702            @Override
2703            public void accept(StatisticsData data) {
2704              rootData.add(data);
2705            }
2706    
2707            public Void aggregate() {
2708              return null;
2709            }
2710          });
2711          this.threadData = new ThreadLocal<StatisticsData>();
2712        }
2713    
2714        /**
2715         * Get or create the thread-local data associated with the current thread.
2716         */
2717        private StatisticsData getThreadData() {
2718          StatisticsData data = threadData.get();
2719          if (data == null) {
2720            data = new StatisticsData(
2721                new WeakReference<Thread>(Thread.currentThread()));
2722            threadData.set(data);
2723            synchronized(this) {
2724              if (allData == null) {
2725                allData = new LinkedList<StatisticsData>();
2726              }
2727              allData.add(data);
2728            }
2729          }
2730          return data;
2731        }
2732    
2733        /**
2734         * Increment the bytes read in the statistics
2735         * @param newBytes the additional bytes read
2736         */
2737        public void incrementBytesRead(long newBytes) {
2738          getThreadData().bytesRead += newBytes;
2739        }
2740        
2741        /**
2742         * Increment the bytes written in the statistics
2743         * @param newBytes the additional bytes written
2744         */
2745        public void incrementBytesWritten(long newBytes) {
2746          getThreadData().bytesWritten += newBytes;
2747        }
2748        
2749        /**
2750         * Increment the number of read operations
2751         * @param count number of read operations
2752         */
2753        public void incrementReadOps(int count) {
2754          getThreadData().readOps += count;
2755        }
2756    
2757        /**
2758         * Increment the number of large read operations
2759         * @param count number of large read operations
2760         */
2761        public void incrementLargeReadOps(int count) {
2762          getThreadData().largeReadOps += count;
2763        }
2764    
2765        /**
2766         * Increment the number of write operations
2767         * @param count number of write operations
2768         */
2769        public void incrementWriteOps(int count) {
2770          getThreadData().writeOps += count;
2771        }
2772    
2773        /**
2774         * Apply the given aggregator to all StatisticsData objects associated with
2775         * this Statistics object.
2776         *
2777         * For each StatisticsData object, we will call accept on the visitor.
2778         * Finally, at the end, we will call aggregate to get the final total. 
2779         *
2780         * @param         The visitor to use.
2781         * @return        The total.
2782         */
2783        private synchronized <T> T visitAll(StatisticsAggregator<T> visitor) {
2784          visitor.accept(rootData);
2785          if (allData != null) {
2786            for (Iterator<StatisticsData> iter = allData.iterator();
2787                iter.hasNext(); ) {
2788              StatisticsData data = iter.next();
2789              visitor.accept(data);
2790              if (data.owner.get() == null) {
2791                /*
2792                 * If the thread that created this thread-local data no
2793                 * longer exists, remove the StatisticsData from our list
2794                 * and fold the values into rootData.
2795                 */
2796                rootData.add(data);
2797                iter.remove();
2798              }
2799            }
2800          }
2801          return visitor.aggregate();
2802        }
2803    
2804        /**
2805         * Get the total number of bytes read
2806         * @return the number of bytes
2807         */
2808        public long getBytesRead() {
2809          return visitAll(new StatisticsAggregator<Long>() {
2810            private long bytesRead = 0;
2811    
2812            @Override
2813            public void accept(StatisticsData data) {
2814              bytesRead += data.bytesRead;
2815            }
2816    
2817            public Long aggregate() {
2818              return bytesRead;
2819            }
2820          });
2821        }
2822        
2823        /**
2824         * Get the total number of bytes written
2825         * @return the number of bytes
2826         */
2827        public long getBytesWritten() {
2828          return visitAll(new StatisticsAggregator<Long>() {
2829            private long bytesWritten = 0;
2830    
2831            @Override
2832            public void accept(StatisticsData data) {
2833              bytesWritten += data.bytesWritten;
2834            }
2835    
2836            public Long aggregate() {
2837              return bytesWritten;
2838            }
2839          });
2840        }
2841        
2842        /**
2843         * Get the number of file system read operations such as list files
2844         * @return number of read operations
2845         */
2846        public int getReadOps() {
2847          return visitAll(new StatisticsAggregator<Integer>() {
2848            private int readOps = 0;
2849    
2850            @Override
2851            public void accept(StatisticsData data) {
2852              readOps += data.readOps;
2853              readOps += data.largeReadOps;
2854            }
2855    
2856            public Integer aggregate() {
2857              return readOps;
2858            }
2859          });
2860        }
2861    
2862        /**
2863         * Get the number of large file system read operations such as list files
2864         * under a large directory
2865         * @return number of large read operations
2866         */
2867        public int getLargeReadOps() {
2868          return visitAll(new StatisticsAggregator<Integer>() {
2869            private int largeReadOps = 0;
2870    
2871            @Override
2872            public void accept(StatisticsData data) {
2873              largeReadOps += data.largeReadOps;
2874            }
2875    
2876            public Integer aggregate() {
2877              return largeReadOps;
2878            }
2879          });
2880        }
2881    
2882        /**
2883         * Get the number of file system write operations such as create, append 
2884         * rename etc.
2885         * @return number of write operations
2886         */
2887        public int getWriteOps() {
2888          return visitAll(new StatisticsAggregator<Integer>() {
2889            private int writeOps = 0;
2890    
2891            @Override
2892            public void accept(StatisticsData data) {
2893              writeOps += data.writeOps;
2894            }
2895    
2896            public Integer aggregate() {
2897              return writeOps;
2898            }
2899          });
2900        }
2901    
2902    
2903        @Override
2904        public String toString() {
2905          return visitAll(new StatisticsAggregator<String>() {
2906            private StatisticsData total = new StatisticsData(null);
2907    
2908            @Override
2909            public void accept(StatisticsData data) {
2910              total.add(data);
2911            }
2912    
2913            public String aggregate() {
2914              return total.toString();
2915            }
2916          });
2917        }
2918    
2919        /**
2920         * Resets all statistics to 0.
2921         *
2922         * In order to reset, we add up all the thread-local statistics data, and
2923         * set rootData to the negative of that.
2924         *
2925         * This may seem like a counterintuitive way to reset the statsitics.  Why
2926         * can't we just zero out all the thread-local data?  Well, thread-local
2927         * data can only be modified by the thread that owns it.  If we tried to
2928         * modify the thread-local data from this thread, our modification might get
2929         * interleaved with a read-modify-write operation done by the thread that
2930         * owns the data.  That would result in our update getting lost.
2931         *
2932         * The approach used here avoids this problem because it only ever reads
2933         * (not writes) the thread-local data.  Both reads and writes to rootData
2934         * are done under the lock, so we're free to modify rootData from any thread
2935         * that holds the lock.
2936         */
2937        public void reset() {
2938          visitAll(new StatisticsAggregator<Void>() {
2939            private StatisticsData total = new StatisticsData(null);
2940    
2941            @Override
2942            public void accept(StatisticsData data) {
2943              total.add(data);
2944            }
2945    
2946            public Void aggregate() {
2947              total.negate();
2948              rootData.add(total);
2949              return null;
2950            }
2951          });
2952        }
2953        
2954        /**
2955         * Get the uri scheme associated with this statistics object.
2956         * @return the schema associated with this set of statistics
2957         */
2958        public String getScheme() {
2959          return scheme;
2960        }
2961      }
2962      
2963      /**
2964       * Get the Map of Statistics object indexed by URI Scheme.
2965       * @return a Map having a key as URI scheme and value as Statistics object
2966       * @deprecated use {@link #getAllStatistics} instead
2967       */
2968      @Deprecated
2969      public static synchronized Map<String, Statistics> getStatistics() {
2970        Map<String, Statistics> result = new HashMap<String, Statistics>();
2971        for(Statistics stat: statisticsTable.values()) {
2972          result.put(stat.getScheme(), stat);
2973        }
2974        return result;
2975      }
2976    
2977      /**
2978       * Return the FileSystem classes that have Statistics
2979       */
2980      public static synchronized List<Statistics> getAllStatistics() {
2981        return new ArrayList<Statistics>(statisticsTable.values());
2982      }
2983      
2984      /**
2985       * Get the statistics for a particular file system
2986       * @param cls the class to lookup
2987       * @return a statistics object
2988       */
2989      public static synchronized 
2990      Statistics getStatistics(String scheme, Class<? extends FileSystem> cls) {
2991        Statistics result = statisticsTable.get(cls);
2992        if (result == null) {
2993          result = new Statistics(scheme);
2994          statisticsTable.put(cls, result);
2995        }
2996        return result;
2997      }
2998      
2999      /**
3000       * Reset all statistics for all file systems
3001       */
3002      public static synchronized void clearStatistics() {
3003        for(Statistics stat: statisticsTable.values()) {
3004          stat.reset();
3005        }
3006      }
3007    
3008      /**
3009       * Print all statistics for all file systems
3010       */
3011      public static synchronized
3012      void printStatistics() throws IOException {
3013        for (Map.Entry<Class<? extends FileSystem>, Statistics> pair: 
3014                statisticsTable.entrySet()) {
3015          System.out.println("  FileSystem " + pair.getKey().getName() + 
3016                             ": " + pair.getValue());
3017        }
3018      }
3019    
3020      // Symlinks are temporarily disabled - see HADOOP-10020 and HADOOP-10052
3021      private static boolean symlinksEnabled = false;
3022    
3023      private static Configuration conf = null;
3024    
3025      @VisibleForTesting
3026      public static boolean areSymlinksEnabled() {
3027        return symlinksEnabled;
3028      }
3029    
3030      @VisibleForTesting
3031      public static void enableSymlinks() {
3032        symlinksEnabled = true;
3033      }
3034    }