001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.hadoop.fs;
019    
020    import java.io.Closeable;
021    import java.io.FileNotFoundException;
022    import java.io.IOException;
023    import java.lang.ref.WeakReference;
024    import java.net.URI;
025    import java.net.URISyntaxException;
026    import java.security.PrivilegedExceptionAction;
027    import java.util.ArrayList;
028    import java.util.EnumSet;
029    import java.util.HashMap;
030    import java.util.HashSet;
031    import java.util.IdentityHashMap;
032    import java.util.Iterator;
033    import java.util.LinkedList;
034    import java.util.List;
035    import java.util.Map;
036    import java.util.NoSuchElementException;
037    import java.util.ServiceLoader;
038    import java.util.Set;
039    import java.util.Stack;
040    import java.util.TreeSet;
041    import java.util.concurrent.atomic.AtomicLong;
042    
043    import org.apache.commons.logging.Log;
044    import org.apache.commons.logging.LogFactory;
045    import org.apache.hadoop.classification.InterfaceAudience;
046    import org.apache.hadoop.classification.InterfaceStability;
047    import org.apache.hadoop.conf.Configuration;
048    import org.apache.hadoop.conf.Configured;
049    import org.apache.hadoop.fs.Options.ChecksumOpt;
050    import org.apache.hadoop.fs.Options.Rename;
051    import org.apache.hadoop.fs.permission.AclEntry;
052    import org.apache.hadoop.fs.permission.AclStatus;
053    import org.apache.hadoop.fs.permission.FsPermission;
054    import org.apache.hadoop.io.MultipleIOException;
055    import org.apache.hadoop.io.Text;
056    import org.apache.hadoop.net.NetUtils;
057    import org.apache.hadoop.security.AccessControlException;
058    import org.apache.hadoop.security.Credentials;
059    import org.apache.hadoop.security.SecurityUtil;
060    import org.apache.hadoop.security.UserGroupInformation;
061    import org.apache.hadoop.security.token.Token;
062    import org.apache.hadoop.util.DataChecksum;
063    import org.apache.hadoop.util.Progressable;
064    import org.apache.hadoop.util.ReflectionUtils;
065    import org.apache.hadoop.util.ShutdownHookManager;
066    
067    import com.google.common.annotations.VisibleForTesting;
068    
069    /****************************************************************
070     * An abstract base class for a fairly generic filesystem.  It
071     * may be implemented as a distributed filesystem, or as a "local"
072     * one that reflects the locally-connected disk.  The local version
073     * exists for small Hadoop instances and for testing.
074     *
075     * <p>
076     *
077     * All user code that may potentially use the Hadoop Distributed
078     * File System should be written to use a FileSystem object.  The
079     * Hadoop DFS is a multi-machine system that appears as a single
080     * disk.  It's useful because of its fault tolerance and potentially
081     * very large capacity.
082     * 
083     * <p>
084     * The local implementation is {@link LocalFileSystem} and distributed
085     * implementation is DistributedFileSystem.
086     *****************************************************************/
087    @InterfaceAudience.Public
088    @InterfaceStability.Stable
089    public abstract class FileSystem extends Configured implements Closeable {
090      public static final String FS_DEFAULT_NAME_KEY = 
091                       CommonConfigurationKeys.FS_DEFAULT_NAME_KEY;
092      public static final String DEFAULT_FS = 
093                       CommonConfigurationKeys.FS_DEFAULT_NAME_DEFAULT;
094    
095      public static final Log LOG = LogFactory.getLog(FileSystem.class);
096    
097      /**
098       * Priority of the FileSystem shutdown hook.
099       */
100      public static final int SHUTDOWN_HOOK_PRIORITY = 10;
101    
102      /** FileSystem cache */
103      static final Cache CACHE = new Cache();
104    
105      /** The key this instance is stored under in the cache. */
106      private Cache.Key key;
107    
108      /** Recording statistics per a FileSystem class */
109      private static final Map<Class<? extends FileSystem>, Statistics> 
110        statisticsTable =
111          new IdentityHashMap<Class<? extends FileSystem>, Statistics>();
112      
113      /**
114       * The statistics for this file system.
115       */
116      protected Statistics statistics;
117    
118      /**
119       * A cache of files that should be deleted when filsystem is closed
120       * or the JVM is exited.
121       */
122      private Set<Path> deleteOnExit = new TreeSet<Path>();
123      
124      boolean resolveSymlinks;
125      /**
126       * This method adds a file system for testing so that we can find it later. It
127       * is only for testing.
128       * @param uri the uri to store it under
129       * @param conf the configuration to store it under
130       * @param fs the file system to store
131       * @throws IOException
132       */
133      static void addFileSystemForTesting(URI uri, Configuration conf,
134          FileSystem fs) throws IOException {
135        CACHE.map.put(new Cache.Key(uri, conf), fs);
136      }
137    
138      /**
139       * Get a filesystem instance based on the uri, the passed
140       * configuration and the user
141       * @param uri of the filesystem
142       * @param conf the configuration to use
143       * @param user to perform the get as
144       * @return the filesystem instance
145       * @throws IOException
146       * @throws InterruptedException
147       */
148      public static FileSystem get(final URI uri, final Configuration conf,
149            final String user) throws IOException, InterruptedException {
150        String ticketCachePath =
151          conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
152        UserGroupInformation ugi =
153            UserGroupInformation.getBestUGI(ticketCachePath, user);
154        return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
155          @Override
156          public FileSystem run() throws IOException {
157            return get(uri, conf);
158          }
159        });
160      }
161    
162      /**
163       * Returns the configured filesystem implementation.
164       * @param conf the configuration to use
165       */
166      public static FileSystem get(Configuration conf) throws IOException {
167        return get(getDefaultUri(conf), conf);
168      }
169      
170      /** Get the default filesystem URI from a configuration.
171       * @param conf the configuration to use
172       * @return the uri of the default filesystem
173       */
174      public static URI getDefaultUri(Configuration conf) {
175        return URI.create(fixName(conf.get(FS_DEFAULT_NAME_KEY, DEFAULT_FS)));
176      }
177    
178      /** Set the default filesystem URI in a configuration.
179       * @param conf the configuration to alter
180       * @param uri the new default filesystem uri
181       */
182      public static void setDefaultUri(Configuration conf, URI uri) {
183        conf.set(FS_DEFAULT_NAME_KEY, uri.toString());
184      }
185    
186      /** Set the default filesystem URI in a configuration.
187       * @param conf the configuration to alter
188       * @param uri the new default filesystem uri
189       */
190      public static void setDefaultUri(Configuration conf, String uri) {
191        setDefaultUri(conf, URI.create(fixName(uri)));
192      }
193    
194      /** Called after a new FileSystem instance is constructed.
195       * @param name a uri whose authority section names the host, port, etc.
196       *   for this FileSystem
197       * @param conf the configuration
198       */
199      public void initialize(URI name, Configuration conf) throws IOException {
200        statistics = getStatistics(name.getScheme(), getClass());    
201        resolveSymlinks = conf.getBoolean(
202            CommonConfigurationKeys.FS_CLIENT_RESOLVE_REMOTE_SYMLINKS_KEY,
203            CommonConfigurationKeys.FS_CLIENT_RESOLVE_REMOTE_SYMLINKS_DEFAULT);
204      }
205    
206      /**
207       * Return the protocol scheme for the FileSystem.
208       * <p/>
209       * This implementation throws an <code>UnsupportedOperationException</code>.
210       *
211       * @return the protocol scheme for the FileSystem.
212       */
213      public String getScheme() {
214        throw new UnsupportedOperationException("Not implemented by the " + getClass().getSimpleName() + " FileSystem implementation");
215      }
216    
217      /** Returns a URI whose scheme and authority identify this FileSystem.*/
218      public abstract URI getUri();
219      
220      /**
221       * Return a canonicalized form of this FileSystem's URI.
222       * 
223       * The default implementation simply calls {@link #canonicalizeUri(URI)}
224       * on the filesystem's own URI, so subclasses typically only need to
225       * implement that method.
226       *
227       * @see #canonicalizeUri(URI)
228       */
229      protected URI getCanonicalUri() {
230        return canonicalizeUri(getUri());
231      }
232      
233      /**
234       * Canonicalize the given URI.
235       * 
236       * This is filesystem-dependent, but may for example consist of
237       * canonicalizing the hostname using DNS and adding the default
238       * port if not specified.
239       * 
240       * The default implementation simply fills in the default port if
241       * not specified and if the filesystem has a default port.
242       *
243       * @return URI
244       * @see NetUtils#getCanonicalUri(URI, int)
245       */
246      protected URI canonicalizeUri(URI uri) {
247        if (uri.getPort() == -1 && getDefaultPort() > 0) {
248          // reconstruct the uri with the default port set
249          try {
250            uri = new URI(uri.getScheme(), uri.getUserInfo(),
251                uri.getHost(), getDefaultPort(),
252                uri.getPath(), uri.getQuery(), uri.getFragment());
253          } catch (URISyntaxException e) {
254            // Should never happen!
255            throw new AssertionError("Valid URI became unparseable: " +
256                uri);
257          }
258        }
259        
260        return uri;
261      }
262      
263      /**
264       * Get the default port for this file system.
265       * @return the default port or 0 if there isn't one
266       */
267      protected int getDefaultPort() {
268        return 0;
269      }
270    
271      protected static FileSystem getFSofPath(final Path absOrFqPath,
272          final Configuration conf)
273          throws UnsupportedFileSystemException, IOException {
274        absOrFqPath.checkNotSchemeWithRelative();
275        absOrFqPath.checkNotRelative();
276    
277        // Uses the default file system if not fully qualified
278        return get(absOrFqPath.toUri(), conf);
279      }
280    
281      /**
282       * Get a canonical service name for this file system.  The token cache is
283       * the only user of the canonical service name, and uses it to lookup this
284       * filesystem's service tokens.
285       * If file system provides a token of its own then it must have a canonical
286       * name, otherwise canonical name can be null.
287       * 
288       * Default Impl: If the file system has child file systems 
289       * (such as an embedded file system) then it is assumed that the fs has no
290       * tokens of its own and hence returns a null name; otherwise a service
291       * name is built using Uri and port.
292       * 
293       * @return a service string that uniquely identifies this file system, null
294       *         if the filesystem does not implement tokens
295       * @see SecurityUtil#buildDTServiceName(URI, int) 
296       */
297      @InterfaceAudience.LimitedPrivate({ "HDFS", "MapReduce" })
298      public String getCanonicalServiceName() {
299        return (getChildFileSystems() == null)
300          ? SecurityUtil.buildDTServiceName(getUri(), getDefaultPort())
301          : null;
302      }
303    
304      /** @deprecated call #getUri() instead.*/
305      @Deprecated
306      public String getName() { return getUri().toString(); }
307    
308      /** @deprecated call #get(URI,Configuration) instead. */
309      @Deprecated
310      public static FileSystem getNamed(String name, Configuration conf)
311        throws IOException {
312        return get(URI.create(fixName(name)), conf);
313      }
314      
315      /** Update old-format filesystem names, for back-compatibility.  This should
316       * eventually be replaced with a checkName() method that throws an exception
317       * for old-format names. */ 
318      private static String fixName(String name) {
319        // convert old-format name to new-format name
320        if (name.equals("local")) {         // "local" is now "file:///".
321          LOG.warn("\"local\" is a deprecated filesystem name."
322                   +" Use \"file:///\" instead.");
323          name = "file:///";
324        } else if (name.indexOf('/')==-1) {   // unqualified is "hdfs://"
325          LOG.warn("\""+name+"\" is a deprecated filesystem name."
326                   +" Use \"hdfs://"+name+"/\" instead.");
327          name = "hdfs://"+name;
328        }
329        return name;
330      }
331    
332      /**
333       * Get the local file system.
334       * @param conf the configuration to configure the file system with
335       * @return a LocalFileSystem
336       */
337      public static LocalFileSystem getLocal(Configuration conf)
338        throws IOException {
339        return (LocalFileSystem)get(LocalFileSystem.NAME, conf);
340      }
341    
342      /** Returns the FileSystem for this URI's scheme and authority.  The scheme
343       * of the URI determines a configuration property name,
344       * <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
345       * The entire URI is passed to the FileSystem instance's initialize method.
346       */
347      public static FileSystem get(URI uri, Configuration conf) throws IOException {
348        String scheme = uri.getScheme();
349        String authority = uri.getAuthority();
350    
351        if (scheme == null && authority == null) {     // use default FS
352          return get(conf);
353        }
354    
355        if (scheme != null && authority == null) {     // no authority
356          URI defaultUri = getDefaultUri(conf);
357          if (scheme.equals(defaultUri.getScheme())    // if scheme matches default
358              && defaultUri.getAuthority() != null) {  // & default has authority
359            return get(defaultUri, conf);              // return default
360          }
361        }
362        
363        String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
364        if (conf.getBoolean(disableCacheName, false)) {
365          return createFileSystem(uri, conf);
366        }
367    
368        return CACHE.get(uri, conf);
369      }
370    
371      /**
372       * Returns the FileSystem for this URI's scheme and authority and the 
373       * passed user. Internally invokes {@link #newInstance(URI, Configuration)}
374       * @param uri of the filesystem
375       * @param conf the configuration to use
376       * @param user to perform the get as
377       * @return filesystem instance
378       * @throws IOException
379       * @throws InterruptedException
380       */
381      public static FileSystem newInstance(final URI uri, final Configuration conf,
382          final String user) throws IOException, InterruptedException {
383        String ticketCachePath =
384          conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
385        UserGroupInformation ugi =
386            UserGroupInformation.getBestUGI(ticketCachePath, user);
387        return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
388          @Override
389          public FileSystem run() throws IOException {
390            return newInstance(uri,conf); 
391          }
392        });
393      }
394      /** Returns the FileSystem for this URI's scheme and authority.  The scheme
395       * of the URI determines a configuration property name,
396       * <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
397       * The entire URI is passed to the FileSystem instance's initialize method.
398       * This always returns a new FileSystem object.
399       */
400      public static FileSystem newInstance(URI uri, Configuration conf) throws IOException {
401        String scheme = uri.getScheme();
402        String authority = uri.getAuthority();
403    
404        if (scheme == null) {                       // no scheme: use default FS
405          return newInstance(conf);
406        }
407    
408        if (authority == null) {                       // no authority
409          URI defaultUri = getDefaultUri(conf);
410          if (scheme.equals(defaultUri.getScheme())    // if scheme matches default
411              && defaultUri.getAuthority() != null) {  // & default has authority
412            return newInstance(defaultUri, conf);              // return default
413          }
414        }
415        return CACHE.getUnique(uri, conf);
416      }
417    
418      /** Returns a unique configured filesystem implementation.
419       * This always returns a new FileSystem object.
420       * @param conf the configuration to use
421       */
422      public static FileSystem newInstance(Configuration conf) throws IOException {
423        return newInstance(getDefaultUri(conf), conf);
424      }
425    
426      /**
427       * Get a unique local file system object
428       * @param conf the configuration to configure the file system with
429       * @return a LocalFileSystem
430       * This always returns a new FileSystem object.
431       */
432      public static LocalFileSystem newInstanceLocal(Configuration conf)
433        throws IOException {
434        return (LocalFileSystem)newInstance(LocalFileSystem.NAME, conf);
435      }
436    
437      /**
438       * Close all cached filesystems. Be sure those filesystems are not
439       * used anymore.
440       * 
441       * @throws IOException
442       */
443      public static void closeAll() throws IOException {
444        CACHE.closeAll();
445      }
446    
447      /**
448       * Close all cached filesystems for a given UGI. Be sure those filesystems 
449       * are not used anymore.
450       * @param ugi user group info to close
451       * @throws IOException
452       */
453      public static void closeAllForUGI(UserGroupInformation ugi) 
454      throws IOException {
455        CACHE.closeAll(ugi);
456      }
457    
458      /** 
459       * Make sure that a path specifies a FileSystem.
460       * @param path to use
461       */
462      public Path makeQualified(Path path) {
463        checkPath(path);
464        return path.makeQualified(this.getUri(), this.getWorkingDirectory());
465      }
466        
467      /**
468       * Get a new delegation token for this file system.
469       * This is an internal method that should have been declared protected
470       * but wasn't historically.
471       * Callers should use {@link #addDelegationTokens(String, Credentials)}
472       * 
473       * @param renewer the account name that is allowed to renew the token.
474       * @return a new delegation token
475       * @throws IOException
476       */
477      @InterfaceAudience.Private()
478      public Token<?> getDelegationToken(String renewer) throws IOException {
479        return null;
480      }
481      
482      /**
483       * Obtain all delegation tokens used by this FileSystem that are not
484       * already present in the given Credentials.  Existing tokens will neither
485       * be verified as valid nor having the given renewer.  Missing tokens will
486       * be acquired and added to the given Credentials.
487       * 
488       * Default Impl: works for simple fs with its own token
489       * and also for an embedded fs whose tokens are those of its
490       * children file system (i.e. the embedded fs has not tokens of its
491       * own).
492       * 
493       * @param renewer the user allowed to renew the delegation tokens
494       * @param credentials cache in which to add new delegation tokens
495       * @return list of new delegation tokens
496       * @throws IOException
497       */
498      @InterfaceAudience.LimitedPrivate({ "HDFS", "MapReduce" })
499      public Token<?>[] addDelegationTokens(
500          final String renewer, Credentials credentials) throws IOException {
501        if (credentials == null) {
502          credentials = new Credentials();
503        }
504        final List<Token<?>> tokens = new ArrayList<Token<?>>();
505        collectDelegationTokens(renewer, credentials, tokens);
506        return tokens.toArray(new Token<?>[tokens.size()]);
507      }
508      
509      /**
510       * Recursively obtain the tokens for this FileSystem and all descended
511       * FileSystems as determined by getChildFileSystems().
512       * @param renewer the user allowed to renew the delegation tokens
513       * @param credentials cache in which to add the new delegation tokens
514       * @param tokens list in which to add acquired tokens
515       * @throws IOException
516       */
517      private void collectDelegationTokens(final String renewer,
518                                           final Credentials credentials,
519                                           final List<Token<?>> tokens)
520                                               throws IOException {
521        final String serviceName = getCanonicalServiceName();
522        // Collect token of the this filesystem and then of its embedded children
523        if (serviceName != null) { // fs has token, grab it
524          final Text service = new Text(serviceName);
525          Token<?> token = credentials.getToken(service);
526          if (token == null) {
527            token = getDelegationToken(renewer);
528            if (token != null) {
529              tokens.add(token);
530              credentials.addToken(service, token);
531            }
532          }
533        }
534        // Now collect the tokens from the children
535        final FileSystem[] children = getChildFileSystems();
536        if (children != null) {
537          for (final FileSystem fs : children) {
538            fs.collectDelegationTokens(renewer, credentials, tokens);
539          }
540        }
541      }
542    
543      /**
544       * Get all the immediate child FileSystems embedded in this FileSystem.
545       * It does not recurse and get grand children.  If a FileSystem
546       * has multiple child FileSystems, then it should return a unique list
547       * of those FileSystems.  Default is to return null to signify no children.
548       * 
549       * @return FileSystems used by this FileSystem
550       */
551      @InterfaceAudience.LimitedPrivate({ "HDFS" })
552      @VisibleForTesting
553      public FileSystem[] getChildFileSystems() {
554        return null;
555      }
556      
557      /** create a file with the provided permission
558       * The permission of the file is set to be the provided permission as in
559       * setPermission, not permission&~umask
560       * 
561       * It is implemented using two RPCs. It is understood that it is inefficient,
562       * but the implementation is thread-safe. The other option is to change the
563       * value of umask in configuration to be 0, but it is not thread-safe.
564       * 
565       * @param fs file system handle
566       * @param file the name of the file to be created
567       * @param permission the permission of the file
568       * @return an output stream
569       * @throws IOException
570       */
571      public static FSDataOutputStream create(FileSystem fs,
572          Path file, FsPermission permission) throws IOException {
573        // create the file with default permission
574        FSDataOutputStream out = fs.create(file);
575        // set its permission to the supplied one
576        fs.setPermission(file, permission);
577        return out;
578      }
579    
580      /** create a directory with the provided permission
581       * The permission of the directory is set to be the provided permission as in
582       * setPermission, not permission&~umask
583       * 
584       * @see #create(FileSystem, Path, FsPermission)
585       * 
586       * @param fs file system handle
587       * @param dir the name of the directory to be created
588       * @param permission the permission of the directory
589       * @return true if the directory creation succeeds; false otherwise
590       * @throws IOException
591       */
592      public static boolean mkdirs(FileSystem fs, Path dir, FsPermission permission)
593      throws IOException {
594        // create the directory using the default permission
595        boolean result = fs.mkdirs(dir);
596        // set its permission to be the supplied one
597        fs.setPermission(dir, permission);
598        return result;
599      }
600    
601      ///////////////////////////////////////////////////////////////
602      // FileSystem
603      ///////////////////////////////////////////////////////////////
604    
605      protected FileSystem() {
606        super(null);
607      }
608    
609      /** 
610       * Check that a Path belongs to this FileSystem.
611       * @param path to check
612       */
613      protected void checkPath(Path path) {
614        URI uri = path.toUri();
615        String thatScheme = uri.getScheme();
616        if (thatScheme == null)                // fs is relative
617          return;
618        URI thisUri = getCanonicalUri();
619        String thisScheme = thisUri.getScheme();
620        //authority and scheme are not case sensitive
621        if (thisScheme.equalsIgnoreCase(thatScheme)) {// schemes match
622          String thisAuthority = thisUri.getAuthority();
623          String thatAuthority = uri.getAuthority();
624          if (thatAuthority == null &&                // path's authority is null
625              thisAuthority != null) {                // fs has an authority
626            URI defaultUri = getDefaultUri(getConf());
627            if (thisScheme.equalsIgnoreCase(defaultUri.getScheme())) {
628              uri = defaultUri; // schemes match, so use this uri instead
629            } else {
630              uri = null; // can't determine auth of the path
631            }
632          }
633          if (uri != null) {
634            // canonicalize uri before comparing with this fs
635            uri = canonicalizeUri(uri);
636            thatAuthority = uri.getAuthority();
637            if (thisAuthority == thatAuthority ||       // authorities match
638                (thisAuthority != null &&
639                 thisAuthority.equalsIgnoreCase(thatAuthority)))
640              return;
641          }
642        }
643        throw new IllegalArgumentException("Wrong FS: "+path+
644                                           ", expected: "+this.getUri());
645      }
646    
647      /**
648       * Return an array containing hostnames, offset and size of 
649       * portions of the given file.  For a nonexistent 
650       * file or regions, null will be returned.
651       *
652       * This call is most helpful with DFS, where it returns 
653       * hostnames of machines that contain the given file.
654       *
655       * The FileSystem will simply return an elt containing 'localhost'.
656       *
657       * @param file FilesStatus to get data from
658       * @param start offset into the given file
659       * @param len length for which to get locations for
660       */
661      public BlockLocation[] getFileBlockLocations(FileStatus file, 
662          long start, long len) throws IOException {
663        if (file == null) {
664          return null;
665        }
666    
667        if (start < 0 || len < 0) {
668          throw new IllegalArgumentException("Invalid start or len parameter");
669        }
670    
671        if (file.getLen() <= start) {
672          return new BlockLocation[0];
673    
674        }
675        String[] name = { "localhost:50010" };
676        String[] host = { "localhost" };
677        return new BlockLocation[] {
678          new BlockLocation(name, host, 0, file.getLen()) };
679      }
680     
681    
682      /**
683       * Return an array containing hostnames, offset and size of 
684       * portions of the given file.  For a nonexistent 
685       * file or regions, null will be returned.
686       *
687       * This call is most helpful with DFS, where it returns 
688       * hostnames of machines that contain the given file.
689       *
690       * The FileSystem will simply return an elt containing 'localhost'.
691       *
692       * @param p path is used to identify an FS since an FS could have
693       *          another FS that it could be delegating the call to
694       * @param start offset into the given file
695       * @param len length for which to get locations for
696       */
697      public BlockLocation[] getFileBlockLocations(Path p, 
698          long start, long len) throws IOException {
699        if (p == null) {
700          throw new NullPointerException();
701        }
702        FileStatus file = getFileStatus(p);
703        return getFileBlockLocations(file, start, len);
704      }
705      
706      /**
707       * Return a set of server default configuration values
708       * @return server default configuration values
709       * @throws IOException
710       * @deprecated use {@link #getServerDefaults(Path)} instead
711       */
712      @Deprecated
713      public FsServerDefaults getServerDefaults() throws IOException {
714        Configuration conf = getConf();
715        // CRC32 is chosen as default as it is available in all 
716        // releases that support checksum.
717        // The client trash configuration is ignored.
718        return new FsServerDefaults(getDefaultBlockSize(), 
719            conf.getInt("io.bytes.per.checksum", 512), 
720            64 * 1024, 
721            getDefaultReplication(),
722            conf.getInt("io.file.buffer.size", 4096),
723            false,
724            CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT,
725            DataChecksum.Type.CRC32);
726      }
727    
728      /**
729       * Return a set of server default configuration values
730       * @param p path is used to identify an FS since an FS could have
731       *          another FS that it could be delegating the call to
732       * @return server default configuration values
733       * @throws IOException
734       */
735      public FsServerDefaults getServerDefaults(Path p) throws IOException {
736        return getServerDefaults();
737      }
738    
739      /**
740       * Return the fully-qualified path of path f resolving the path
741       * through any symlinks or mount point
742       * @param p path to be resolved
743       * @return fully qualified path 
744       * @throws FileNotFoundException
745       */
746       public Path resolvePath(final Path p) throws IOException {
747         checkPath(p);
748         return getFileStatus(p).getPath();
749       }
750    
751      /**
752       * Opens an FSDataInputStream at the indicated Path.
753       * @param f the file name to open
754       * @param bufferSize the size of the buffer to be used.
755       */
756      public abstract FSDataInputStream open(Path f, int bufferSize)
757        throws IOException;
758        
759      /**
760       * Opens an FSDataInputStream at the indicated Path.
761       * @param f the file to open
762       */
763      public FSDataInputStream open(Path f) throws IOException {
764        return open(f, getConf().getInt("io.file.buffer.size", 4096));
765      }
766    
767      /**
768       * Create an FSDataOutputStream at the indicated Path.
769       * Files are overwritten by default.
770       * @param f the file to create
771       */
772      public FSDataOutputStream create(Path f) throws IOException {
773        return create(f, true);
774      }
775    
776      /**
777       * Create an FSDataOutputStream at the indicated Path.
778       * @param f the file to create
779       * @param overwrite if a file with this name already exists, then if true,
780       *   the file will be overwritten, and if false an exception will be thrown.
781       */
782      public FSDataOutputStream create(Path f, boolean overwrite)
783          throws IOException {
784        return create(f, overwrite, 
785                      getConf().getInt("io.file.buffer.size", 4096),
786                      getDefaultReplication(f),
787                      getDefaultBlockSize(f));
788      }
789    
790      /**
791       * Create an FSDataOutputStream at the indicated Path with write-progress
792       * reporting.
793       * Files are overwritten by default.
794       * @param f the file to create
795       * @param progress to report progress
796       */
797      public FSDataOutputStream create(Path f, Progressable progress) 
798          throws IOException {
799        return create(f, true, 
800                      getConf().getInt("io.file.buffer.size", 4096),
801                      getDefaultReplication(f),
802                      getDefaultBlockSize(f), progress);
803      }
804    
805      /**
806       * Create an FSDataOutputStream at the indicated Path.
807       * Files are overwritten by default.
808       * @param f the file to create
809       * @param replication the replication factor
810       */
811      public FSDataOutputStream create(Path f, short replication)
812          throws IOException {
813        return create(f, true, 
814                      getConf().getInt("io.file.buffer.size", 4096),
815                      replication,
816                      getDefaultBlockSize(f));
817      }
818    
819      /**
820       * Create an FSDataOutputStream at the indicated Path with write-progress
821       * reporting.
822       * Files are overwritten by default.
823       * @param f the file to create
824       * @param replication the replication factor
825       * @param progress to report progress
826       */
827      public FSDataOutputStream create(Path f, short replication, 
828          Progressable progress) throws IOException {
829        return create(f, true, 
830                      getConf().getInt(
831                          CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
832                          CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT),
833                      replication,
834                      getDefaultBlockSize(f), progress);
835      }
836    
837        
838      /**
839       * Create an FSDataOutputStream at the indicated Path.
840       * @param f the file name to create
841       * @param overwrite if a file with this name already exists, then if true,
842       *   the file will be overwritten, and if false an error will be thrown.
843       * @param bufferSize the size of the buffer to be used.
844       */
845      public FSDataOutputStream create(Path f, 
846                                       boolean overwrite,
847                                       int bufferSize
848                                       ) throws IOException {
849        return create(f, overwrite, bufferSize, 
850                      getDefaultReplication(f),
851                      getDefaultBlockSize(f));
852      }
853        
854      /**
855       * Create an FSDataOutputStream at the indicated Path with write-progress
856       * reporting.
857       * @param f the path of the file to open
858       * @param overwrite if a file with this name already exists, then if true,
859       *   the file will be overwritten, and if false an error will be thrown.
860       * @param bufferSize the size of the buffer to be used.
861       */
862      public FSDataOutputStream create(Path f, 
863                                       boolean overwrite,
864                                       int bufferSize,
865                                       Progressable progress
866                                       ) throws IOException {
867        return create(f, overwrite, bufferSize, 
868                      getDefaultReplication(f),
869                      getDefaultBlockSize(f), progress);
870      }
871        
872        
873      /**
874       * Create an FSDataOutputStream at the indicated Path.
875       * @param f the file name to open
876       * @param overwrite if a file with this name already exists, then if true,
877       *   the file will be overwritten, and if false an error will be thrown.
878       * @param bufferSize the size of the buffer to be used.
879       * @param replication required block replication for the file. 
880       */
881      public FSDataOutputStream create(Path f, 
882                                       boolean overwrite,
883                                       int bufferSize,
884                                       short replication,
885                                       long blockSize
886                                       ) throws IOException {
887        return create(f, overwrite, bufferSize, replication, blockSize, null);
888      }
889    
890      /**
891       * Create an FSDataOutputStream at the indicated Path with write-progress
892       * reporting.
893       * @param f the file name to open
894       * @param overwrite if a file with this name already exists, then if true,
895       *   the file will be overwritten, and if false an error will be thrown.
896       * @param bufferSize the size of the buffer to be used.
897       * @param replication required block replication for the file. 
898       */
899      public FSDataOutputStream create(Path f,
900                                                boolean overwrite,
901                                                int bufferSize,
902                                                short replication,
903                                                long blockSize,
904                                                Progressable progress
905                                                ) throws IOException {
906        return this.create(f, FsPermission.getFileDefault().applyUMask(
907            FsPermission.getUMask(getConf())), overwrite, bufferSize,
908            replication, blockSize, progress);
909      }
910    
911      /**
912       * Create an FSDataOutputStream at the indicated Path with write-progress
913       * reporting.
914       * @param f the file name to open
915       * @param permission
916       * @param overwrite if a file with this name already exists, then if true,
917       *   the file will be overwritten, and if false an error will be thrown.
918       * @param bufferSize the size of the buffer to be used.
919       * @param replication required block replication for the file.
920       * @param blockSize
921       * @param progress
922       * @throws IOException
923       * @see #setPermission(Path, FsPermission)
924       */
925      public abstract FSDataOutputStream create(Path f,
926          FsPermission permission,
927          boolean overwrite,
928          int bufferSize,
929          short replication,
930          long blockSize,
931          Progressable progress) throws IOException;
932      
933      /**
934       * Create an FSDataOutputStream at the indicated Path with write-progress
935       * reporting.
936       * @param f the file name to open
937       * @param permission
938       * @param flags {@link CreateFlag}s to use for this stream.
939       * @param bufferSize the size of the buffer to be used.
940       * @param replication required block replication for the file.
941       * @param blockSize
942       * @param progress
943       * @throws IOException
944       * @see #setPermission(Path, FsPermission)
945       */
946      public FSDataOutputStream create(Path f,
947          FsPermission permission,
948          EnumSet<CreateFlag> flags,
949          int bufferSize,
950          short replication,
951          long blockSize,
952          Progressable progress) throws IOException {
953        return create(f, permission, flags, bufferSize, replication,
954            blockSize, progress, null);
955      }
956      
957      /**
958       * Create an FSDataOutputStream at the indicated Path with a custom
959       * checksum option
960       * @param f the file name to open
961       * @param permission
962       * @param flags {@link CreateFlag}s to use for this stream.
963       * @param bufferSize the size of the buffer to be used.
964       * @param replication required block replication for the file.
965       * @param blockSize
966       * @param progress
967       * @param checksumOpt checksum parameter. If null, the values
968       *        found in conf will be used.
969       * @throws IOException
970       * @see #setPermission(Path, FsPermission)
971       */
972      public FSDataOutputStream create(Path f,
973          FsPermission permission,
974          EnumSet<CreateFlag> flags,
975          int bufferSize,
976          short replication,
977          long blockSize,
978          Progressable progress,
979          ChecksumOpt checksumOpt) throws IOException {
980        // Checksum options are ignored by default. The file systems that
981        // implement checksum need to override this method. The full
982        // support is currently only available in DFS.
983        return create(f, permission, flags.contains(CreateFlag.OVERWRITE), 
984            bufferSize, replication, blockSize, progress);
985      }
986    
987      /*.
988       * This create has been added to support the FileContext that processes
989       * the permission
990       * with umask before calling this method.
991       * This a temporary method added to support the transition from FileSystem
992       * to FileContext for user applications.
993       */
994      @Deprecated
995      protected FSDataOutputStream primitiveCreate(Path f,
996         FsPermission absolutePermission, EnumSet<CreateFlag> flag, int bufferSize,
997         short replication, long blockSize, Progressable progress,
998         ChecksumOpt checksumOpt) throws IOException {
999    
1000        boolean pathExists = exists(f);
1001        CreateFlag.validate(f, pathExists, flag);
1002        
1003        // Default impl  assumes that permissions do not matter and 
1004        // nor does the bytesPerChecksum  hence
1005        // calling the regular create is good enough.
1006        // FSs that implement permissions should override this.
1007    
1008        if (pathExists && flag.contains(CreateFlag.APPEND)) {
1009          return append(f, bufferSize, progress);
1010        }
1011        
1012        return this.create(f, absolutePermission,
1013            flag.contains(CreateFlag.OVERWRITE), bufferSize, replication,
1014            blockSize, progress);
1015      }
1016      
1017      /**
1018       * This version of the mkdirs method assumes that the permission is absolute.
1019       * It has been added to support the FileContext that processes the permission
1020       * with umask before calling this method.
1021       * This a temporary method added to support the transition from FileSystem
1022       * to FileContext for user applications.
1023       */
1024      @Deprecated
1025      protected boolean primitiveMkdir(Path f, FsPermission absolutePermission)
1026        throws IOException {
1027        // Default impl is to assume that permissions do not matter and hence
1028        // calling the regular mkdirs is good enough.
1029        // FSs that implement permissions should override this.
1030       return this.mkdirs(f, absolutePermission);
1031      }
1032    
1033    
1034      /**
1035       * This version of the mkdirs method assumes that the permission is absolute.
1036       * It has been added to support the FileContext that processes the permission
1037       * with umask before calling this method.
1038       * This a temporary method added to support the transition from FileSystem
1039       * to FileContext for user applications.
1040       */
1041      @Deprecated
1042      protected void primitiveMkdir(Path f, FsPermission absolutePermission, 
1043                        boolean createParent)
1044        throws IOException {
1045        
1046        if (!createParent) { // parent must exist.
1047          // since the this.mkdirs makes parent dirs automatically
1048          // we must throw exception if parent does not exist.
1049          final FileStatus stat = getFileStatus(f.getParent());
1050          if (stat == null) {
1051            throw new FileNotFoundException("Missing parent:" + f);
1052          }
1053          if (!stat.isDirectory()) {
1054            throw new ParentNotDirectoryException("parent is not a dir");
1055          }
1056          // parent does exist - go ahead with mkdir of leaf
1057        }
1058        // Default impl is to assume that permissions do not matter and hence
1059        // calling the regular mkdirs is good enough.
1060        // FSs that implement permissions should override this.
1061        if (!this.mkdirs(f, absolutePermission)) {
1062          throw new IOException("mkdir of "+ f + " failed");
1063        }
1064      }
1065    
1066      /**
1067       * Opens an FSDataOutputStream at the indicated Path with write-progress
1068       * reporting. Same as create(), except fails if parent directory doesn't
1069       * already exist.
1070       * @param f the file name to open
1071       * @param overwrite if a file with this name already exists, then if true,
1072       * the file will be overwritten, and if false an error will be thrown.
1073       * @param bufferSize the size of the buffer to be used.
1074       * @param replication required block replication for the file.
1075       * @param blockSize
1076       * @param progress
1077       * @throws IOException
1078       * @see #setPermission(Path, FsPermission)
1079       * @deprecated API only for 0.20-append
1080       */
1081      @Deprecated
1082      public FSDataOutputStream createNonRecursive(Path f,
1083          boolean overwrite,
1084          int bufferSize, short replication, long blockSize,
1085          Progressable progress) throws IOException {
1086        return this.createNonRecursive(f, FsPermission.getFileDefault(),
1087            overwrite, bufferSize, replication, blockSize, progress);
1088      }
1089    
1090      /**
1091       * Opens an FSDataOutputStream at the indicated Path with write-progress
1092       * reporting. Same as create(), except fails if parent directory doesn't
1093       * already exist.
1094       * @param f the file name to open
1095       * @param permission
1096       * @param overwrite if a file with this name already exists, then if true,
1097       * the file will be overwritten, and if false an error will be thrown.
1098       * @param bufferSize the size of the buffer to be used.
1099       * @param replication required block replication for the file.
1100       * @param blockSize
1101       * @param progress
1102       * @throws IOException
1103       * @see #setPermission(Path, FsPermission)
1104       * @deprecated API only for 0.20-append
1105       */
1106       @Deprecated
1107       public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
1108           boolean overwrite, int bufferSize, short replication, long blockSize,
1109           Progressable progress) throws IOException {
1110         return createNonRecursive(f, permission,
1111             overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
1112                 : EnumSet.of(CreateFlag.CREATE), bufferSize,
1113                 replication, blockSize, progress);
1114       }
1115    
1116       /**
1117        * Opens an FSDataOutputStream at the indicated Path with write-progress
1118        * reporting. Same as create(), except fails if parent directory doesn't
1119        * already exist.
1120        * @param f the file name to open
1121        * @param permission
1122        * @param flags {@link CreateFlag}s to use for this stream.
1123        * @param bufferSize the size of the buffer to be used.
1124        * @param replication required block replication for the file.
1125        * @param blockSize
1126        * @param progress
1127        * @throws IOException
1128        * @see #setPermission(Path, FsPermission)
1129        * @deprecated API only for 0.20-append
1130        */
1131        @Deprecated
1132        public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
1133            EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize,
1134            Progressable progress) throws IOException {
1135          throw new IOException("createNonRecursive unsupported for this filesystem "
1136              + this.getClass());
1137        }
1138    
1139      /**
1140       * Creates the given Path as a brand-new zero-length file.  If
1141       * create fails, or if it already existed, return false.
1142       *
1143       * @param f path to use for create
1144       */
1145      public boolean createNewFile(Path f) throws IOException {
1146        if (exists(f)) {
1147          return false;
1148        } else {
1149          create(f, false, getConf().getInt("io.file.buffer.size", 4096)).close();
1150          return true;
1151        }
1152      }
1153    
1154      /**
1155       * Append to an existing file (optional operation).
1156       * Same as append(f, getConf().getInt("io.file.buffer.size", 4096), null)
1157       * @param f the existing file to be appended.
1158       * @throws IOException
1159       */
1160      public FSDataOutputStream append(Path f) throws IOException {
1161        return append(f, getConf().getInt("io.file.buffer.size", 4096), null);
1162      }
1163      /**
1164       * Append to an existing file (optional operation).
1165       * Same as append(f, bufferSize, null).
1166       * @param f the existing file to be appended.
1167       * @param bufferSize the size of the buffer to be used.
1168       * @throws IOException
1169       */
1170      public FSDataOutputStream append(Path f, int bufferSize) throws IOException {
1171        return append(f, bufferSize, null);
1172      }
1173    
1174      /**
1175       * Append to an existing file (optional operation).
1176       * @param f the existing file to be appended.
1177       * @param bufferSize the size of the buffer to be used.
1178       * @param progress for reporting progress if it is not null.
1179       * @throws IOException
1180       */
1181      public abstract FSDataOutputStream append(Path f, int bufferSize,
1182          Progressable progress) throws IOException;
1183    
1184      /**
1185       * Concat existing files together.
1186       * @param trg the path to the target destination.
1187       * @param psrcs the paths to the sources to use for the concatenation.
1188       * @throws IOException
1189       */
1190      public void concat(final Path trg, final Path [] psrcs) throws IOException {
1191        throw new UnsupportedOperationException("Not implemented by the " + 
1192            getClass().getSimpleName() + " FileSystem implementation");
1193      }
1194    
1195     /**
1196       * Get replication.
1197       * 
1198       * @deprecated Use getFileStatus() instead
1199       * @param src file name
1200       * @return file replication
1201       * @throws IOException
1202       */ 
1203      @Deprecated
1204      public short getReplication(Path src) throws IOException {
1205        return getFileStatus(src).getReplication();
1206      }
1207    
1208      /**
1209       * Set replication for an existing file.
1210       * 
1211       * @param src file name
1212       * @param replication new replication
1213       * @throws IOException
1214       * @return true if successful;
1215       *         false if file does not exist or is a directory
1216       */
1217      public boolean setReplication(Path src, short replication)
1218        throws IOException {
1219        return true;
1220      }
1221    
1222      /**
1223       * Renames Path src to Path dst.  Can take place on local fs
1224       * or remote DFS.
1225       * @param src path to be renamed
1226       * @param dst new path after rename
1227       * @throws IOException on failure
1228       * @return true if rename is successful
1229       */
1230      public abstract boolean rename(Path src, Path dst) throws IOException;
1231    
1232      /**
1233       * Renames Path src to Path dst
1234       * <ul>
1235       * <li
1236       * <li>Fails if src is a file and dst is a directory.
1237       * <li>Fails if src is a directory and dst is a file.
1238       * <li>Fails if the parent of dst does not exist or is a file.
1239       * </ul>
1240       * <p>
1241       * If OVERWRITE option is not passed as an argument, rename fails
1242       * if the dst already exists.
1243       * <p>
1244       * If OVERWRITE option is passed as an argument, rename overwrites
1245       * the dst if it is a file or an empty directory. Rename fails if dst is
1246       * a non-empty directory.
1247       * <p>
1248       * Note that atomicity of rename is dependent on the file system
1249       * implementation. Please refer to the file system documentation for
1250       * details. This default implementation is non atomic.
1251       * <p>
1252       * This method is deprecated since it is a temporary method added to 
1253       * support the transition from FileSystem to FileContext for user 
1254       * applications.
1255       * 
1256       * @param src path to be renamed
1257       * @param dst new path after rename
1258       * @throws IOException on failure
1259       */
1260      @Deprecated
1261      protected void rename(final Path src, final Path dst,
1262          final Rename... options) throws IOException {
1263        // Default implementation
1264        final FileStatus srcStatus = getFileLinkStatus(src);
1265        if (srcStatus == null) {
1266          throw new FileNotFoundException("rename source " + src + " not found.");
1267        }
1268    
1269        boolean overwrite = false;
1270        if (null != options) {
1271          for (Rename option : options) {
1272            if (option == Rename.OVERWRITE) {
1273              overwrite = true;
1274            }
1275          }
1276        }
1277    
1278        FileStatus dstStatus;
1279        try {
1280          dstStatus = getFileLinkStatus(dst);
1281        } catch (IOException e) {
1282          dstStatus = null;
1283        }
1284        if (dstStatus != null) {
1285          if (srcStatus.isDirectory() != dstStatus.isDirectory()) {
1286            throw new IOException("Source " + src + " Destination " + dst
1287                + " both should be either file or directory");
1288          }
1289          if (!overwrite) {
1290            throw new FileAlreadyExistsException("rename destination " + dst
1291                + " already exists.");
1292          }
1293          // Delete the destination that is a file or an empty directory
1294          if (dstStatus.isDirectory()) {
1295            FileStatus[] list = listStatus(dst);
1296            if (list != null && list.length != 0) {
1297              throw new IOException(
1298                  "rename cannot overwrite non empty destination directory " + dst);
1299            }
1300          }
1301          delete(dst, false);
1302        } else {
1303          final Path parent = dst.getParent();
1304          final FileStatus parentStatus = getFileStatus(parent);
1305          if (parentStatus == null) {
1306            throw new FileNotFoundException("rename destination parent " + parent
1307                + " not found.");
1308          }
1309          if (!parentStatus.isDirectory()) {
1310            throw new ParentNotDirectoryException("rename destination parent " + parent
1311                + " is a file.");
1312          }
1313        }
1314        if (!rename(src, dst)) {
1315          throw new IOException("rename from " + src + " to " + dst + " failed.");
1316        }
1317      }
1318      
1319      /**
1320       * Delete a file 
1321       * @deprecated Use {@link #delete(Path, boolean)} instead.
1322       */
1323      @Deprecated
1324      public boolean delete(Path f) throws IOException {
1325        return delete(f, true);
1326      }
1327      
1328      /** Delete a file.
1329       *
1330       * @param f the path to delete.
1331       * @param recursive if path is a directory and set to 
1332       * true, the directory is deleted else throws an exception. In
1333       * case of a file the recursive can be set to either true or false. 
1334       * @return  true if delete is successful else false. 
1335       * @throws IOException
1336       */
1337      public abstract boolean delete(Path f, boolean recursive) throws IOException;
1338    
1339      /**
1340       * Mark a path to be deleted when FileSystem is closed.
1341       * When the JVM shuts down,
1342       * all FileSystem objects will be closed automatically.
1343       * Then,
1344       * the marked path will be deleted as a result of closing the FileSystem.
1345       *
1346       * The path has to exist in the file system.
1347       * 
1348       * @param f the path to delete.
1349       * @return  true if deleteOnExit is successful, otherwise false.
1350       * @throws IOException
1351       */
1352      public boolean deleteOnExit(Path f) throws IOException {
1353        if (!exists(f)) {
1354          return false;
1355        }
1356        synchronized (deleteOnExit) {
1357          deleteOnExit.add(f);
1358        }
1359        return true;
1360      }
1361      
1362      /**
1363       * Cancel the deletion of the path when the FileSystem is closed
1364       * @param f the path to cancel deletion
1365       */
1366      public boolean cancelDeleteOnExit(Path f) {
1367        synchronized (deleteOnExit) {
1368          return deleteOnExit.remove(f);
1369        }
1370      }
1371    
1372      /**
1373       * Delete all files that were marked as delete-on-exit. This recursively
1374       * deletes all files in the specified paths.
1375       */
1376      protected void processDeleteOnExit() {
1377        synchronized (deleteOnExit) {
1378          for (Iterator<Path> iter = deleteOnExit.iterator(); iter.hasNext();) {
1379            Path path = iter.next();
1380            try {
1381              if (exists(path)) {
1382                delete(path, true);
1383              }
1384            }
1385            catch (IOException e) {
1386              LOG.info("Ignoring failure to deleteOnExit for path " + path);
1387            }
1388            iter.remove();
1389          }
1390        }
1391      }
1392      
1393      /** Check if exists.
1394       * @param f source file
1395       */
1396      public boolean exists(Path f) throws IOException {
1397        try {
1398          return getFileStatus(f) != null;
1399        } catch (FileNotFoundException e) {
1400          return false;
1401        }
1402      }
1403    
1404      /** True iff the named path is a directory.
1405       * Note: Avoid using this method. Instead reuse the FileStatus 
1406       * returned by getFileStatus() or listStatus() methods.
1407       * @param f path to check
1408       */
1409      public boolean isDirectory(Path f) throws IOException {
1410        try {
1411          return getFileStatus(f).isDirectory();
1412        } catch (FileNotFoundException e) {
1413          return false;               // f does not exist
1414        }
1415      }
1416    
1417      /** True iff the named path is a regular file.
1418       * Note: Avoid using this method. Instead reuse the FileStatus 
1419       * returned by getFileStatus() or listStatus() methods.
1420       * @param f path to check
1421       */
1422      public boolean isFile(Path f) throws IOException {
1423        try {
1424          return getFileStatus(f).isFile();
1425        } catch (FileNotFoundException e) {
1426          return false;               // f does not exist
1427        }
1428      }
1429      
1430      /** The number of bytes in a file. */
1431      /** @deprecated Use getFileStatus() instead */
1432      @Deprecated
1433      public long getLength(Path f) throws IOException {
1434        return getFileStatus(f).getLen();
1435      }
1436        
1437      /** Return the {@link ContentSummary} of a given {@link Path}.
1438      * @param f path to use
1439      */
1440      public ContentSummary getContentSummary(Path f) throws IOException {
1441        FileStatus status = getFileStatus(f);
1442        if (status.isFile()) {
1443          // f is a file
1444          return new ContentSummary(status.getLen(), 1, 0);
1445        }
1446        // f is a directory
1447        long[] summary = {0, 0, 1};
1448        for(FileStatus s : listStatus(f)) {
1449          ContentSummary c = s.isDirectory() ? getContentSummary(s.getPath()) :
1450                                         new ContentSummary(s.getLen(), 1, 0);
1451          summary[0] += c.getLength();
1452          summary[1] += c.getFileCount();
1453          summary[2] += c.getDirectoryCount();
1454        }
1455        return new ContentSummary(summary[0], summary[1], summary[2]);
1456      }
1457    
1458      final private static PathFilter DEFAULT_FILTER = new PathFilter() {
1459          @Override
1460          public boolean accept(Path file) {
1461            return true;
1462          }     
1463        };
1464        
1465      /**
1466       * List the statuses of the files/directories in the given path if the path is
1467       * a directory.
1468       * 
1469       * @param f given path
1470       * @return the statuses of the files/directories in the given patch
1471       * @throws FileNotFoundException when the path does not exist;
1472       *         IOException see specific implementation
1473       */
1474      public abstract FileStatus[] listStatus(Path f) throws FileNotFoundException, 
1475                                                             IOException;
1476        
1477      /*
1478       * Filter files/directories in the given path using the user-supplied path
1479       * filter. Results are added to the given array <code>results</code>.
1480       */
1481      private void listStatus(ArrayList<FileStatus> results, Path f,
1482          PathFilter filter) throws FileNotFoundException, IOException {
1483        FileStatus listing[] = listStatus(f);
1484        if (listing == null) {
1485          throw new IOException("Error accessing " + f);
1486        }
1487    
1488        for (int i = 0; i < listing.length; i++) {
1489          if (filter.accept(listing[i].getPath())) {
1490            results.add(listing[i]);
1491          }
1492        }
1493      }
1494    
1495      /**
1496       * @return an iterator over the corrupt files under the given path
1497       * (may contain duplicates if a file has more than one corrupt block)
1498       * @throws IOException
1499       */
1500      public RemoteIterator<Path> listCorruptFileBlocks(Path path)
1501        throws IOException {
1502        throw new UnsupportedOperationException(getClass().getCanonicalName() +
1503                                                " does not support" +
1504                                                " listCorruptFileBlocks");
1505      }
1506    
1507      /**
1508       * Filter files/directories in the given path using the user-supplied path
1509       * filter.
1510       * 
1511       * @param f
1512       *          a path name
1513       * @param filter
1514       *          the user-supplied path filter
1515       * @return an array of FileStatus objects for the files under the given path
1516       *         after applying the filter
1517       * @throws FileNotFoundException when the path does not exist;
1518       *         IOException see specific implementation   
1519       */
1520      public FileStatus[] listStatus(Path f, PathFilter filter) 
1521                                       throws FileNotFoundException, IOException {
1522        ArrayList<FileStatus> results = new ArrayList<FileStatus>();
1523        listStatus(results, f, filter);
1524        return results.toArray(new FileStatus[results.size()]);
1525      }
1526    
1527      /**
1528       * Filter files/directories in the given list of paths using default
1529       * path filter.
1530       * 
1531       * @param files
1532       *          a list of paths
1533       * @return a list of statuses for the files under the given paths after
1534       *         applying the filter default Path filter
1535       * @throws FileNotFoundException when the path does not exist;
1536       *         IOException see specific implementation
1537       */
1538      public FileStatus[] listStatus(Path[] files)
1539          throws FileNotFoundException, IOException {
1540        return listStatus(files, DEFAULT_FILTER);
1541      }
1542    
1543      /**
1544       * Filter files/directories in the given list of paths using user-supplied
1545       * path filter.
1546       * 
1547       * @param files
1548       *          a list of paths
1549       * @param filter
1550       *          the user-supplied path filter
1551       * @return a list of statuses for the files under the given paths after
1552       *         applying the filter
1553       * @throws FileNotFoundException when the path does not exist;
1554       *         IOException see specific implementation
1555       */
1556      public FileStatus[] listStatus(Path[] files, PathFilter filter)
1557          throws FileNotFoundException, IOException {
1558        ArrayList<FileStatus> results = new ArrayList<FileStatus>();
1559        for (int i = 0; i < files.length; i++) {
1560          listStatus(results, files[i], filter);
1561        }
1562        return results.toArray(new FileStatus[results.size()]);
1563      }
1564    
1565      /**
1566       * <p>Return all the files that match filePattern and are not checksum
1567       * files. Results are sorted by their names.
1568       * 
1569       * <p>
1570       * A filename pattern is composed of <i>regular</i> characters and
1571       * <i>special pattern matching</i> characters, which are:
1572       *
1573       * <dl>
1574       *  <dd>
1575       *   <dl>
1576       *    <p>
1577       *    <dt> <tt> ? </tt>
1578       *    <dd> Matches any single character.
1579       *
1580       *    <p>
1581       *    <dt> <tt> * </tt>
1582       *    <dd> Matches zero or more characters.
1583       *
1584       *    <p>
1585       *    <dt> <tt> [<i>abc</i>] </tt>
1586       *    <dd> Matches a single character from character set
1587       *     <tt>{<i>a,b,c</i>}</tt>.
1588       *
1589       *    <p>
1590       *    <dt> <tt> [<i>a</i>-<i>b</i>] </tt>
1591       *    <dd> Matches a single character from the character range
1592       *     <tt>{<i>a...b</i>}</tt>.  Note that character <tt><i>a</i></tt> must be
1593       *     lexicographically less than or equal to character <tt><i>b</i></tt>.
1594       *
1595       *    <p>
1596       *    <dt> <tt> [^<i>a</i>] </tt>
1597       *    <dd> Matches a single character that is not from character set or range
1598       *     <tt>{<i>a</i>}</tt>.  Note that the <tt>^</tt> character must occur
1599       *     immediately to the right of the opening bracket.
1600       *
1601       *    <p>
1602       *    <dt> <tt> \<i>c</i> </tt>
1603       *    <dd> Removes (escapes) any special meaning of character <i>c</i>.
1604       *
1605       *    <p>
1606       *    <dt> <tt> {ab,cd} </tt>
1607       *    <dd> Matches a string from the string set <tt>{<i>ab, cd</i>} </tt>
1608       *    
1609       *    <p>
1610       *    <dt> <tt> {ab,c{de,fh}} </tt>
1611       *    <dd> Matches a string from the string set <tt>{<i>ab, cde, cfh</i>}</tt>
1612       *
1613       *   </dl>
1614       *  </dd>
1615       * </dl>
1616       *
1617       * @param pathPattern a regular expression specifying a pth pattern
1618    
1619       * @return an array of paths that match the path pattern
1620       * @throws IOException
1621       */
1622      public FileStatus[] globStatus(Path pathPattern) throws IOException {
1623        return new Globber(this, pathPattern, DEFAULT_FILTER).glob();
1624      }
1625      
1626      /**
1627       * Return an array of FileStatus objects whose path names match pathPattern
1628       * and is accepted by the user-supplied path filter. Results are sorted by
1629       * their path names.
1630       * Return null if pathPattern has no glob and the path does not exist.
1631       * Return an empty array if pathPattern has a glob and no path matches it. 
1632       * 
1633       * @param pathPattern
1634       *          a regular expression specifying the path pattern
1635       * @param filter
1636       *          a user-supplied path filter
1637       * @return an array of FileStatus objects
1638       * @throws IOException if any I/O error occurs when fetching file status
1639       */
1640      public FileStatus[] globStatus(Path pathPattern, PathFilter filter)
1641          throws IOException {
1642        return new Globber(this, pathPattern, filter).glob();
1643      }
1644      
1645      /**
1646       * List the statuses of the files/directories in the given path if the path is
1647       * a directory. 
1648       * Return the file's status and block locations If the path is a file.
1649       * 
1650       * If a returned status is a file, it contains the file's block locations.
1651       * 
1652       * @param f is the path
1653       *
1654       * @return an iterator that traverses statuses of the files/directories 
1655       *         in the given path
1656       *
1657       * @throws FileNotFoundException If <code>f</code> does not exist
1658       * @throws IOException If an I/O error occurred
1659       */
1660      public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f)
1661      throws FileNotFoundException, IOException {
1662        return listLocatedStatus(f, DEFAULT_FILTER);
1663      }
1664    
1665      /**
1666       * Listing a directory
1667       * The returned results include its block location if it is a file
1668       * The results are filtered by the given path filter
1669       * @param f a path
1670       * @param filter a path filter
1671       * @return an iterator that traverses statuses of the files/directories 
1672       *         in the given path
1673       * @throws FileNotFoundException if <code>f</code> does not exist
1674       * @throws IOException if any I/O error occurred
1675       */
1676      protected RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f,
1677          final PathFilter filter)
1678      throws FileNotFoundException, IOException {
1679        return new RemoteIterator<LocatedFileStatus>() {
1680          private final FileStatus[] stats = listStatus(f, filter);
1681          private int i = 0;
1682    
1683          @Override
1684          public boolean hasNext() {
1685            return i<stats.length;
1686          }
1687    
1688          @Override
1689          public LocatedFileStatus next() throws IOException {
1690            if (!hasNext()) {
1691              throw new NoSuchElementException("No more entry in " + f);
1692            }
1693            FileStatus result = stats[i++];
1694            BlockLocation[] locs = result.isFile() ?
1695                getFileBlockLocations(result.getPath(), 0, result.getLen()) :
1696                null;
1697            return new LocatedFileStatus(result, locs);
1698          }
1699        };
1700      }
1701    
1702      /**
1703       * List the statuses and block locations of the files in the given path.
1704       * 
1705       * If the path is a directory, 
1706       *   if recursive is false, returns files in the directory;
1707       *   if recursive is true, return files in the subtree rooted at the path.
1708       * If the path is a file, return the file's status and block locations.
1709       * 
1710       * @param f is the path
1711       * @param recursive if the subdirectories need to be traversed recursively
1712       *
1713       * @return an iterator that traverses statuses of the files
1714       *
1715       * @throws FileNotFoundException when the path does not exist;
1716       *         IOException see specific implementation
1717       */
1718      public RemoteIterator<LocatedFileStatus> listFiles(
1719          final Path f, final boolean recursive)
1720      throws FileNotFoundException, IOException {
1721        return new RemoteIterator<LocatedFileStatus>() {
1722          private Stack<RemoteIterator<LocatedFileStatus>> itors = 
1723            new Stack<RemoteIterator<LocatedFileStatus>>();
1724          private RemoteIterator<LocatedFileStatus> curItor =
1725            listLocatedStatus(f);
1726          private LocatedFileStatus curFile;
1727         
1728          @Override
1729          public boolean hasNext() throws IOException {
1730            while (curFile == null) {
1731              if (curItor.hasNext()) {
1732                handleFileStat(curItor.next());
1733              } else if (!itors.empty()) {
1734                curItor = itors.pop();
1735              } else {
1736                return false;
1737              }
1738            }
1739            return true;
1740          }
1741    
1742          /**
1743           * Process the input stat.
1744           * If it is a file, return the file stat.
1745           * If it is a directory, traverse the directory if recursive is true;
1746           * ignore it if recursive is false.
1747           * @param stat input status
1748           * @throws IOException if any IO error occurs
1749           */
1750          private void handleFileStat(LocatedFileStatus stat) throws IOException {
1751            if (stat.isFile()) { // file
1752              curFile = stat;
1753            } else if (recursive) { // directory
1754              itors.push(curItor);
1755              curItor = listLocatedStatus(stat.getPath());
1756            }
1757          }
1758    
1759          @Override
1760          public LocatedFileStatus next() throws IOException {
1761            if (hasNext()) {
1762              LocatedFileStatus result = curFile;
1763              curFile = null;
1764              return result;
1765            } 
1766            throw new java.util.NoSuchElementException("No more entry in " + f);
1767          }
1768        };
1769      }
1770      
1771      /** Return the current user's home directory in this filesystem.
1772       * The default implementation returns "/user/$USER/".
1773       */
1774      public Path getHomeDirectory() {
1775        return this.makeQualified(
1776            new Path("/user/"+System.getProperty("user.name")));
1777      }
1778    
1779    
1780      /**
1781       * Set the current working directory for the given file system. All relative
1782       * paths will be resolved relative to it.
1783       * 
1784       * @param new_dir
1785       */
1786      public abstract void setWorkingDirectory(Path new_dir);
1787        
1788      /**
1789       * Get the current working directory for the given file system
1790       * @return the directory pathname
1791       */
1792      public abstract Path getWorkingDirectory();
1793      
1794      
1795      /**
1796       * Note: with the new FilesContext class, getWorkingDirectory()
1797       * will be removed. 
1798       * The working directory is implemented in FilesContext.
1799       * 
1800       * Some file systems like LocalFileSystem have an initial workingDir
1801       * that we use as the starting workingDir. For other file systems
1802       * like HDFS there is no built in notion of an initial workingDir.
1803       * 
1804       * @return if there is built in notion of workingDir then it
1805       * is returned; else a null is returned.
1806       */
1807      protected Path getInitialWorkingDirectory() {
1808        return null;
1809      }
1810    
1811      /**
1812       * Call {@link #mkdirs(Path, FsPermission)} with default permission.
1813       */
1814      public boolean mkdirs(Path f) throws IOException {
1815        return mkdirs(f, FsPermission.getDirDefault());
1816      }
1817    
1818      /**
1819       * Make the given file and all non-existent parents into
1820       * directories. Has the semantics of Unix 'mkdir -p'.
1821       * Existence of the directory hierarchy is not an error.
1822       * @param f path to create
1823       * @param permission to apply to f
1824       */
1825      public abstract boolean mkdirs(Path f, FsPermission permission
1826          ) throws IOException;
1827    
1828      /**
1829       * The src file is on the local disk.  Add it to FS at
1830       * the given dst name and the source is kept intact afterwards
1831       * @param src path
1832       * @param dst path
1833       */
1834      public void copyFromLocalFile(Path src, Path dst)
1835        throws IOException {
1836        copyFromLocalFile(false, src, dst);
1837      }
1838    
1839      /**
1840       * The src files is on the local disk.  Add it to FS at
1841       * the given dst name, removing the source afterwards.
1842       * @param srcs path
1843       * @param dst path
1844       */
1845      public void moveFromLocalFile(Path[] srcs, Path dst)
1846        throws IOException {
1847        copyFromLocalFile(true, true, srcs, dst);
1848      }
1849    
1850      /**
1851       * The src file is on the local disk.  Add it to FS at
1852       * the given dst name, removing the source afterwards.
1853       * @param src path
1854       * @param dst path
1855       */
1856      public void moveFromLocalFile(Path src, Path dst)
1857        throws IOException {
1858        copyFromLocalFile(true, src, dst);
1859      }
1860    
1861      /**
1862       * The src file is on the local disk.  Add it to FS at
1863       * the given dst name.
1864       * delSrc indicates if the source should be removed
1865       * @param delSrc whether to delete the src
1866       * @param src path
1867       * @param dst path
1868       */
1869      public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
1870        throws IOException {
1871        copyFromLocalFile(delSrc, true, src, dst);
1872      }
1873      
1874      /**
1875       * The src files are on the local disk.  Add it to FS at
1876       * the given dst name.
1877       * delSrc indicates if the source should be removed
1878       * @param delSrc whether to delete the src
1879       * @param overwrite whether to overwrite an existing file
1880       * @param srcs array of paths which are source
1881       * @param dst path
1882       */
1883      public void copyFromLocalFile(boolean delSrc, boolean overwrite, 
1884                                    Path[] srcs, Path dst)
1885        throws IOException {
1886        Configuration conf = getConf();
1887        FileUtil.copy(getLocal(conf), srcs, this, dst, delSrc, overwrite, conf);
1888      }
1889      
1890      /**
1891       * The src file is on the local disk.  Add it to FS at
1892       * the given dst name.
1893       * delSrc indicates if the source should be removed
1894       * @param delSrc whether to delete the src
1895       * @param overwrite whether to overwrite an existing file
1896       * @param src path
1897       * @param dst path
1898       */
1899      public void copyFromLocalFile(boolean delSrc, boolean overwrite, 
1900                                    Path src, Path dst)
1901        throws IOException {
1902        Configuration conf = getConf();
1903        FileUtil.copy(getLocal(conf), src, this, dst, delSrc, overwrite, conf);
1904      }
1905        
1906      /**
1907       * The src file is under FS, and the dst is on the local disk.
1908       * Copy it from FS control to the local dst name.
1909       * @param src path
1910       * @param dst path
1911       */
1912      public void copyToLocalFile(Path src, Path dst) throws IOException {
1913        copyToLocalFile(false, src, dst);
1914      }
1915        
1916      /**
1917       * The src file is under FS, and the dst is on the local disk.
1918       * Copy it from FS control to the local dst name.
1919       * Remove the source afterwards
1920       * @param src path
1921       * @param dst path
1922       */
1923      public void moveToLocalFile(Path src, Path dst) throws IOException {
1924        copyToLocalFile(true, src, dst);
1925      }
1926    
1927      /**
1928       * The src file is under FS, and the dst is on the local disk.
1929       * Copy it from FS control to the local dst name.
1930       * delSrc indicates if the src will be removed or not.
1931       * @param delSrc whether to delete the src
1932       * @param src path
1933       * @param dst path
1934       */   
1935      public void copyToLocalFile(boolean delSrc, Path src, Path dst)
1936        throws IOException {
1937        copyToLocalFile(delSrc, src, dst, false);
1938      }
1939      
1940        /**
1941       * The src file is under FS, and the dst is on the local disk. Copy it from FS
1942       * control to the local dst name. delSrc indicates if the src will be removed
1943       * or not. useRawLocalFileSystem indicates whether to use RawLocalFileSystem
1944       * as local file system or not. RawLocalFileSystem is non crc file system.So,
1945       * It will not create any crc files at local.
1946       * 
1947       * @param delSrc
1948       *          whether to delete the src
1949       * @param src
1950       *          path
1951       * @param dst
1952       *          path
1953       * @param useRawLocalFileSystem
1954       *          whether to use RawLocalFileSystem as local file system or not.
1955       * 
1956       * @throws IOException
1957       *           - if any IO error
1958       */
1959      public void copyToLocalFile(boolean delSrc, Path src, Path dst,
1960          boolean useRawLocalFileSystem) throws IOException {
1961        Configuration conf = getConf();
1962        FileSystem local = null;
1963        if (useRawLocalFileSystem) {
1964          local = getLocal(conf).getRawFileSystem();
1965        } else {
1966          local = getLocal(conf);
1967        }
1968        FileUtil.copy(this, src, local, dst, delSrc, conf);
1969      }
1970    
1971      /**
1972       * Returns a local File that the user can write output to.  The caller
1973       * provides both the eventual FS target name and the local working
1974       * file.  If the FS is local, we write directly into the target.  If
1975       * the FS is remote, we write into the tmp local area.
1976       * @param fsOutputFile path of output file
1977       * @param tmpLocalFile path of local tmp file
1978       */
1979      public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
1980        throws IOException {
1981        return tmpLocalFile;
1982      }
1983    
1984      /**
1985       * Called when we're all done writing to the target.  A local FS will
1986       * do nothing, because we've written to exactly the right place.  A remote
1987       * FS will copy the contents of tmpLocalFile to the correct target at
1988       * fsOutputFile.
1989       * @param fsOutputFile path of output file
1990       * @param tmpLocalFile path to local tmp file
1991       */
1992      public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
1993        throws IOException {
1994        moveFromLocalFile(tmpLocalFile, fsOutputFile);
1995      }
1996    
1997      /**
1998       * No more filesystem operations are needed.  Will
1999       * release any held locks.
2000       */
2001      @Override
2002      public void close() throws IOException {
2003        // delete all files that were marked as delete-on-exit.
2004        processDeleteOnExit();
2005        CACHE.remove(this.key, this);
2006      }
2007    
2008      /** Return the total size of all files in the filesystem.*/
2009      public long getUsed() throws IOException{
2010        long used = 0;
2011        FileStatus[] files = listStatus(new Path("/"));
2012        for(FileStatus file:files){
2013          used += file.getLen();
2014        }
2015        return used;
2016      }
2017      
2018      /**
2019       * Get the block size for a particular file.
2020       * @param f the filename
2021       * @return the number of bytes in a block
2022       */
2023      /** @deprecated Use getFileStatus() instead */
2024      @Deprecated
2025      public long getBlockSize(Path f) throws IOException {
2026        return getFileStatus(f).getBlockSize();
2027      }
2028    
2029      /**
2030       * Return the number of bytes that large input files should be optimally
2031       * be split into to minimize i/o time.
2032       * @deprecated use {@link #getDefaultBlockSize(Path)} instead
2033       */
2034      @Deprecated
2035      public long getDefaultBlockSize() {
2036        // default to 32MB: large enough to minimize the impact of seeks
2037        return getConf().getLong("fs.local.block.size", 32 * 1024 * 1024);
2038      }
2039        
2040      /** Return the number of bytes that large input files should be optimally
2041       * be split into to minimize i/o time.  The given path will be used to
2042       * locate the actual filesystem.  The full path does not have to exist.
2043       * @param f path of file
2044       * @return the default block size for the path's filesystem
2045       */
2046      public long getDefaultBlockSize(Path f) {
2047        return getDefaultBlockSize();
2048      }
2049    
2050      /**
2051       * Get the default replication.
2052       * @deprecated use {@link #getDefaultReplication(Path)} instead
2053       */
2054      @Deprecated
2055      public short getDefaultReplication() { return 1; }
2056    
2057      /**
2058       * Get the default replication for a path.   The given path will be used to
2059       * locate the actual filesystem.  The full path does not have to exist.
2060       * @param path of the file
2061       * @return default replication for the path's filesystem 
2062       */
2063      public short getDefaultReplication(Path path) {
2064        return getDefaultReplication();
2065      }
2066      
2067      /**
2068       * Return a file status object that represents the path.
2069       * @param f The path we want information from
2070       * @return a FileStatus object
2071       * @throws FileNotFoundException when the path does not exist;
2072       *         IOException see specific implementation
2073       */
2074      public abstract FileStatus getFileStatus(Path f) throws IOException;
2075    
2076      /**
2077       * See {@link FileContext#fixRelativePart}
2078       */
2079      protected Path fixRelativePart(Path p) {
2080        if (p.isUriPathAbsolute()) {
2081          return p;
2082        } else {
2083          return new Path(getWorkingDirectory(), p);
2084        }
2085      }
2086    
2087      /**
2088       * See {@link FileContext#createSymlink(Path, Path, boolean)}
2089       */
2090      public void createSymlink(final Path target, final Path link,
2091          final boolean createParent) throws AccessControlException,
2092          FileAlreadyExistsException, FileNotFoundException,
2093          ParentNotDirectoryException, UnsupportedFileSystemException, 
2094          IOException {
2095        // Supporting filesystems should override this method
2096        throw new UnsupportedOperationException(
2097            "Filesystem does not support symlinks!");
2098      }
2099    
2100      /**
2101       * See {@link FileContext#getFileLinkStatus(Path)}
2102       */
2103      public FileStatus getFileLinkStatus(final Path f)
2104          throws AccessControlException, FileNotFoundException,
2105          UnsupportedFileSystemException, IOException {
2106        // Supporting filesystems should override this method
2107        return getFileStatus(f);
2108      }
2109    
2110      /**
2111       * See {@link AbstractFileSystem#supportsSymlinks()}
2112       */
2113      public boolean supportsSymlinks() {
2114        return false;
2115      }
2116    
2117      /**
2118       * See {@link FileContext#getLinkTarget(Path)}
2119       */
2120      public Path getLinkTarget(Path f) throws IOException {
2121        // Supporting filesystems should override this method
2122        throw new UnsupportedOperationException(
2123            "Filesystem does not support symlinks!");
2124      }
2125    
2126      /**
2127       * See {@link AbstractFileSystem#getLinkTarget(Path)}
2128       */
2129      protected Path resolveLink(Path f) throws IOException {
2130        // Supporting filesystems should override this method
2131        throw new UnsupportedOperationException(
2132            "Filesystem does not support symlinks!");
2133      }
2134    
2135      /**
2136       * Get the checksum of a file.
2137       *
2138       * @param f The file path
2139       * @return The file checksum.  The default return value is null,
2140       *  which indicates that no checksum algorithm is implemented
2141       *  in the corresponding FileSystem.
2142       */
2143      public FileChecksum getFileChecksum(Path f) throws IOException {
2144        return getFileChecksum(f, Long.MAX_VALUE);
2145      }
2146    
2147      /**
2148       * Get the checksum of a file, from the beginning of the file till the
2149       * specific length.
2150       * @param f The file path
2151       * @param length The length of the file range for checksum calculation
2152       * @return The file checksum.
2153       */
2154      public FileChecksum getFileChecksum(Path f, final long length)
2155          throws IOException {
2156        return null;
2157      }
2158    
2159      /**
2160       * Set the verify checksum flag. This is only applicable if the 
2161       * corresponding FileSystem supports checksum. By default doesn't do anything.
2162       * @param verifyChecksum
2163       */
2164      public void setVerifyChecksum(boolean verifyChecksum) {
2165        //doesn't do anything
2166      }
2167    
2168      /**
2169       * Set the write checksum flag. This is only applicable if the 
2170       * corresponding FileSystem supports checksum. By default doesn't do anything.
2171       * @param writeChecksum
2172       */
2173      public void setWriteChecksum(boolean writeChecksum) {
2174        //doesn't do anything
2175      }
2176    
2177      /**
2178       * Returns a status object describing the use and capacity of the
2179       * file system. If the file system has multiple partitions, the
2180       * use and capacity of the root partition is reflected.
2181       * 
2182       * @return a FsStatus object
2183       * @throws IOException
2184       *           see specific implementation
2185       */
2186      public FsStatus getStatus() throws IOException {
2187        return getStatus(null);
2188      }
2189    
2190      /**
2191       * Returns a status object describing the use and capacity of the
2192       * file system. If the file system has multiple partitions, the
2193       * use and capacity of the partition pointed to by the specified
2194       * path is reflected.
2195       * @param p Path for which status should be obtained. null means
2196       * the default partition. 
2197       * @return a FsStatus object
2198       * @throws IOException
2199       *           see specific implementation
2200       */
2201      public FsStatus getStatus(Path p) throws IOException {
2202        return new FsStatus(Long.MAX_VALUE, 0, Long.MAX_VALUE);
2203      }
2204    
2205      /**
2206       * Set permission of a path.
2207       * @param p
2208       * @param permission
2209       */
2210      public void setPermission(Path p, FsPermission permission
2211          ) throws IOException {
2212      }
2213    
2214      /**
2215       * Set owner of a path (i.e. a file or a directory).
2216       * The parameters username and groupname cannot both be null.
2217       * @param p The path
2218       * @param username If it is null, the original username remains unchanged.
2219       * @param groupname If it is null, the original groupname remains unchanged.
2220       */
2221      public void setOwner(Path p, String username, String groupname
2222          ) throws IOException {
2223      }
2224    
2225      /**
2226       * Set access time of a file
2227       * @param p The path
2228       * @param mtime Set the modification time of this file.
2229       *              The number of milliseconds since Jan 1, 1970. 
2230       *              A value of -1 means that this call should not set modification time.
2231       * @param atime Set the access time of this file.
2232       *              The number of milliseconds since Jan 1, 1970. 
2233       *              A value of -1 means that this call should not set access time.
2234       */
2235      public void setTimes(Path p, long mtime, long atime
2236          ) throws IOException {
2237      }
2238    
2239      /**
2240       * Create a snapshot with a default name.
2241       * @param path The directory where snapshots will be taken.
2242       * @return the snapshot path.
2243       */
2244      public final Path createSnapshot(Path path) throws IOException {
2245        return createSnapshot(path, null);
2246      }
2247    
2248      /**
2249       * Create a snapshot
2250       * @param path The directory where snapshots will be taken.
2251       * @param snapshotName The name of the snapshot
2252       * @return the snapshot path.
2253       */
2254      public Path createSnapshot(Path path, String snapshotName)
2255          throws IOException {
2256        throw new UnsupportedOperationException(getClass().getSimpleName()
2257            + " doesn't support createSnapshot");
2258      }
2259      
2260      /**
2261       * Rename a snapshot
2262       * @param path The directory path where the snapshot was taken
2263       * @param snapshotOldName Old name of the snapshot
2264       * @param snapshotNewName New name of the snapshot
2265       * @throws IOException
2266       */
2267      public void renameSnapshot(Path path, String snapshotOldName,
2268          String snapshotNewName) throws IOException {
2269        throw new UnsupportedOperationException(getClass().getSimpleName()
2270            + " doesn't support renameSnapshot");
2271      }
2272      
2273      /**
2274       * Delete a snapshot of a directory
2275       * @param path  The directory that the to-be-deleted snapshot belongs to
2276       * @param snapshotName The name of the snapshot
2277       */
2278      public void deleteSnapshot(Path path, String snapshotName)
2279          throws IOException {
2280        throw new UnsupportedOperationException(getClass().getSimpleName()
2281            + " doesn't support deleteSnapshot");
2282      }
2283      
2284      /**
2285       * Modifies ACL entries of files and directories.  This method can add new ACL
2286       * entries or modify the permissions on existing ACL entries.  All existing
2287       * ACL entries that are not specified in this call are retained without
2288       * changes.  (Modifications are merged into the current ACL.)
2289       *
2290       * @param path Path to modify
2291       * @param aclSpec List<AclEntry> describing modifications
2292       * @throws IOException if an ACL could not be modified
2293       */
2294      public void modifyAclEntries(Path path, List<AclEntry> aclSpec)
2295          throws IOException {
2296        throw new UnsupportedOperationException(getClass().getSimpleName()
2297            + " doesn't support modifyAclEntries");
2298      }
2299    
2300      /**
2301       * Removes ACL entries from files and directories.  Other ACL entries are
2302       * retained.
2303       *
2304       * @param path Path to modify
2305       * @param aclSpec List<AclEntry> describing entries to remove
2306       * @throws IOException if an ACL could not be modified
2307       */
2308      public void removeAclEntries(Path path, List<AclEntry> aclSpec)
2309          throws IOException {
2310        throw new UnsupportedOperationException(getClass().getSimpleName()
2311            + " doesn't support removeAclEntries");
2312      }
2313    
2314      /**
2315       * Removes all default ACL entries from files and directories.
2316       *
2317       * @param path Path to modify
2318       * @throws IOException if an ACL could not be modified
2319       */
2320      public void removeDefaultAcl(Path path)
2321          throws IOException {
2322        throw new UnsupportedOperationException(getClass().getSimpleName()
2323            + " doesn't support removeDefaultAcl");
2324      }
2325    
2326      /**
2327       * Removes all but the base ACL entries of files and directories.  The entries
2328       * for user, group, and others are retained for compatibility with permission
2329       * bits.
2330       *
2331       * @param path Path to modify
2332       * @throws IOException if an ACL could not be removed
2333       */
2334      public void removeAcl(Path path)
2335          throws IOException {
2336        throw new UnsupportedOperationException(getClass().getSimpleName()
2337            + " doesn't support removeAcl");
2338      }
2339    
2340      /**
2341       * Fully replaces ACL of files and directories, discarding all existing
2342       * entries.
2343       *
2344       * @param path Path to modify
2345       * @param aclSpec List<AclEntry> describing modifications, must include entries
2346       *   for user, group, and others for compatibility with permission bits.
2347       * @throws IOException if an ACL could not be modified
2348       */
2349      public void setAcl(Path path, List<AclEntry> aclSpec) throws IOException {
2350        throw new UnsupportedOperationException(getClass().getSimpleName()
2351            + " doesn't support setAcl");
2352      }
2353    
2354      /**
2355       * Gets the ACL of a file or directory.
2356       *
2357       * @param path Path to get
2358       * @return AclStatus describing the ACL of the file or directory
2359       * @throws IOException if an ACL could not be read
2360       */
2361      public AclStatus getAclStatus(Path path) throws IOException {
2362        throw new UnsupportedOperationException(getClass().getSimpleName()
2363            + " doesn't support getAclStatus");
2364      }
2365    
2366      /**
2367       * Set an xattr of a file or directory.
2368       * The name must be prefixed with user/trusted/security/system and
2369       * followed by ".". For example, "user.attr".
2370       * <p/>
2371       * A regular user can only set an xattr for the "user" namespace.
2372       * The super user can set an xattr of either the "user" or "trusted" namespaces.
2373       * The xattrs of the "security" and "system" namespaces are only used/exposed 
2374       * internally by/to the FS impl.
2375       * <p/>
2376       * The access permissions of an xattr in the "user" namespace are
2377       * defined by the file and directory permission bits.
2378       * An xattr can only be set when the logged-in user has the correct permissions.
2379       * If the xattr exists, it will be replaced.
2380       * <p/>
2381       * @see <a href="http://en.wikipedia.org/wiki/Extended_file_attributes">
2382       * http://en.wikipedia.org/wiki/Extended_file_attributes</a>
2383       *
2384       * @param path Path to modify
2385       * @param name xattr name.
2386       * @param value xattr value.
2387       * @throws IOException
2388       */
2389      public void setXAttr(Path path, String name, byte[] value)
2390          throws IOException {
2391        setXAttr(path, name, value, EnumSet.of(XAttrSetFlag.CREATE,
2392            XAttrSetFlag.REPLACE));
2393      }
2394    
2395      /**
2396       * Set an xattr of a file or directory.
2397       * The name must be prefixed with user/trusted/security/system and
2398       * followed by ".". For example, "user.attr".
2399       * <p/>
2400       * A regular user can only set an xattr for the "user" namespace.
2401       * The super user can set an xattr of either the "user" or "trusted" namespaces.
2402       * The xattrs of the "security" and "system" namespaces are only used/exposed 
2403       * internally by/to the FS impl.
2404       * <p/>
2405       * The access permissions of an xattr in the "user" namespace are
2406       * defined by the file and directory permission bits.
2407       * An xattr can only be set if the logged-in user has the correct permissions.
2408       * If the xattr exists, it is replaced.
2409       * <p/>
2410       * @see <a href="http://en.wikipedia.org/wiki/Extended_file_attributes">
2411       * http://en.wikipedia.org/wiki/Extended_file_attributes</a>
2412       *
2413       * @param path Path to modify
2414       * @param name xattr name.
2415       * @param value xattr value.
2416       * @param flag xattr set flag
2417       * @throws IOException
2418       */
2419      public void setXAttr(Path path, String name, byte[] value,
2420          EnumSet<XAttrSetFlag> flag) throws IOException {
2421        throw new UnsupportedOperationException(getClass().getSimpleName()
2422            + " doesn't support setXAttr");
2423      }
2424    
2425      /**
2426       * Get an xattr name and value for a file or directory.
2427       * The name must be prefixed with user/trusted/security/system and
2428       * followed by ".". For example, "user.attr".
2429       * <p/>
2430       * 
2431       * A regular user can only get an xattr for the "user" namespace.
2432       * The super user can get an xattr of either the "user" or "trusted" namespaces.
2433       * The xattrs of the "security" and "system" namespaces are only used/exposed 
2434       * internally by/to the FS impl.
2435       * <p/>
2436       * An xattr will only be returned if the logged-in user has the
2437       * correct permissions.
2438       * <p/>
2439       * @see <a href="http://en.wikipedia.org/wiki/Extended_file_attributes">
2440       * http://en.wikipedia.org/wiki/Extended_file_attributes</a>
2441       *
2442       * @param path Path to get extended attribute
2443       * @param name xattr name.
2444       * @return byte[] xattr value.
2445       * @throws IOException
2446       */
2447      public byte[] getXAttr(Path path, String name) throws IOException {
2448        throw new UnsupportedOperationException(getClass().getSimpleName()
2449            + " doesn't support getXAttr");
2450      }
2451    
2452      /**
2453       * Get all of the xattr name/value pairs for a file or directory.
2454       * Only those xattrs which the logged-in user has permissions to view
2455       * are returned.
2456       * <p/>
2457       * A regular user can only get xattrs for the "user" namespace.
2458       * The super user can only get xattrs for "user" and "trusted" namespaces.
2459       * The xattrs of the "security" and "system" namespaces are only used/exposed
2460       * internally by/to the FS impl.
2461       * <p/>
2462       * @see <a href="http://en.wikipedia.org/wiki/Extended_file_attributes">
2463       * http://en.wikipedia.org/wiki/Extended_file_attributes</a>
2464       *
2465       * @param path Path to get extended attributes
2466       * @return Map<String, byte[]> describing the XAttrs of the file or directory
2467       * @throws IOException
2468       */
2469      public Map<String, byte[]> getXAttrs(Path path) throws IOException {
2470        throw new UnsupportedOperationException(getClass().getSimpleName()
2471            + " doesn't support getXAttrs");
2472      }
2473    
2474      /**
2475       * Get all of the xattrs name/value pairs for a file or directory.
2476       * Only those xattrs which the logged-in user has permissions to view
2477       * are returned.
2478       * <p/>
2479       * A regular user can only get xattrs for the "user" namespace.
2480       * The super user can only get xattrs for "user" and "trusted" namespaces.
2481       * The xattrs of the "security" and "system" namespaces are only used/exposed
2482       * internally by/to the FS impl.
2483       * <p/>
2484       * @see <a href="http://en.wikipedia.org/wiki/Extended_file_attributes">
2485       * http://en.wikipedia.org/wiki/Extended_file_attributes</a>
2486       *
2487       * @param path Path to get extended attributes
2488       * @param names XAttr names.
2489       * @return Map<String, byte[]> describing the XAttrs of the file or directory
2490       * @throws IOException
2491       */
2492      public Map<String, byte[]> getXAttrs(Path path, List<String> names)
2493          throws IOException {
2494        throw new UnsupportedOperationException(getClass().getSimpleName()
2495            + " doesn't support getXAttrs");
2496      }
2497    
2498      /**
2499       * Get all of the xattr names for a file or directory.
2500       * Only those xattr names which the logged-in user has permissions to view
2501       * are returned.
2502       * <p/>
2503       * A regular user can only get xattr names for the "user" namespace.
2504       * The super user can only get xattr names for "user" and "trusted"
2505       * namespaces.
2506       * The xattrs of the "security" and "system" namespaces are only
2507       * used/exposed internally by/to the FS impl.
2508       * <p/>
2509       * @see <a href="http://en.wikipedia.org/wiki/Extended_file_attributes">
2510       * http://en.wikipedia.org/wiki/Extended_file_attributes</a>
2511       *
2512       * @param path Path to get extended attributes
2513       * @return Map<String, byte[]> describing the XAttrs of the file or directory
2514       * @throws IOException
2515       */
2516      public List<String> listXAttrs(Path path) throws IOException {
2517        throw new UnsupportedOperationException(getClass().getSimpleName()
2518                + " doesn't support listXAttrs");
2519      }
2520    
2521      /**
2522       * Remove an xattr of a file or directory.
2523       * The name must be prefixed with user/trusted/security/system and
2524       * followed by ".". For example, "user.attr".
2525       * <p/>
2526       * A regular user can only remove an xattr for the "user" namespace.
2527       * The super user can remove an xattr of either the "user" or "trusted" namespaces.
2528       * The xattrs of the "security" and "system" namespaces are only used/exposed 
2529       * internally by/to the FS impl.
2530       * <p/>
2531       * The access permissions of an xattr in the "user" namespace are
2532       * defined by the file and directory permission bits.
2533       * An xattr can only be set when the logged-in user has the correct permissions.
2534       * If the xattr exists, it will be replaced.
2535       * <p/>
2536       * @see <a href="http://en.wikipedia.org/wiki/Extended_file_attributes">
2537       * http://en.wikipedia.org/wiki/Extended_file_attributes</a>
2538       *
2539       * @param path Path to remove extended attribute
2540       * @param name xattr name
2541       * @throws IOException
2542       */
2543      public void removeXAttr(Path path, String name) throws IOException {
2544        throw new UnsupportedOperationException(getClass().getSimpleName()
2545            + " doesn't support removeXAttr");
2546      }
2547    
2548      // making it volatile to be able to do a double checked locking
2549      private volatile static boolean FILE_SYSTEMS_LOADED = false;
2550    
2551      private static final Map<String, Class<? extends FileSystem>>
2552        SERVICE_FILE_SYSTEMS = new HashMap<String, Class<? extends FileSystem>>();
2553    
2554      private static void loadFileSystems() {
2555        synchronized (FileSystem.class) {
2556          if (!FILE_SYSTEMS_LOADED) {
2557            ServiceLoader<FileSystem> serviceLoader = ServiceLoader.load(FileSystem.class);
2558            for (FileSystem fs : serviceLoader) {
2559              SERVICE_FILE_SYSTEMS.put(fs.getScheme(), fs.getClass());
2560            }
2561            FILE_SYSTEMS_LOADED = true;
2562          }
2563        }
2564      }
2565    
2566      public static Class<? extends FileSystem> getFileSystemClass(String scheme,
2567          Configuration conf) throws IOException {
2568        if (!FILE_SYSTEMS_LOADED) {
2569          loadFileSystems();
2570        }
2571        Class<? extends FileSystem> clazz = null;
2572        if (conf != null) {
2573          clazz = (Class<? extends FileSystem>) conf.getClass("fs." + scheme + ".impl", null);
2574        }
2575        if (clazz == null) {
2576          clazz = SERVICE_FILE_SYSTEMS.get(scheme);
2577        }
2578        if (clazz == null) {
2579          throw new IOException("No FileSystem for scheme: " + scheme);
2580        }
2581        return clazz;
2582      }
2583    
2584      private static FileSystem createFileSystem(URI uri, Configuration conf
2585          ) throws IOException {
2586        Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);
2587        if (clazz == null) {
2588          throw new IOException("No FileSystem for scheme: " + uri.getScheme());
2589        }
2590        FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
2591        fs.initialize(uri, conf);
2592        return fs;
2593      }
2594    
2595      /** Caching FileSystem objects */
2596      static class Cache {
2597        private final ClientFinalizer clientFinalizer = new ClientFinalizer();
2598    
2599        private final Map<Key, FileSystem> map = new HashMap<Key, FileSystem>();
2600        private final Set<Key> toAutoClose = new HashSet<Key>();
2601    
2602        /** A variable that makes all objects in the cache unique */
2603        private static AtomicLong unique = new AtomicLong(1);
2604    
2605        FileSystem get(URI uri, Configuration conf) throws IOException{
2606          Key key = new Key(uri, conf);
2607          return getInternal(uri, conf, key);
2608        }
2609    
2610        /** The objects inserted into the cache using this method are all unique */
2611        FileSystem getUnique(URI uri, Configuration conf) throws IOException{
2612          Key key = new Key(uri, conf, unique.getAndIncrement());
2613          return getInternal(uri, conf, key);
2614        }
2615    
2616        private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{
2617          FileSystem fs;
2618          synchronized (this) {
2619            fs = map.get(key);
2620          }
2621          if (fs != null) {
2622            return fs;
2623          }
2624    
2625          fs = createFileSystem(uri, conf);
2626          synchronized (this) { // refetch the lock again
2627            FileSystem oldfs = map.get(key);
2628            if (oldfs != null) { // a file system is created while lock is releasing
2629              fs.close(); // close the new file system
2630              return oldfs;  // return the old file system
2631            }
2632            
2633            // now insert the new file system into the map
2634            if (map.isEmpty()
2635                    && !ShutdownHookManager.get().isShutdownInProgress()) {
2636              ShutdownHookManager.get().addShutdownHook(clientFinalizer, SHUTDOWN_HOOK_PRIORITY);
2637            }
2638            fs.key = key;
2639            map.put(key, fs);
2640            if (conf.getBoolean("fs.automatic.close", true)) {
2641              toAutoClose.add(key);
2642            }
2643            return fs;
2644          }
2645        }
2646    
2647        synchronized void remove(Key key, FileSystem fs) {
2648          if (map.containsKey(key) && fs == map.get(key)) {
2649            map.remove(key);
2650            toAutoClose.remove(key);
2651            }
2652        }
2653    
2654        synchronized void closeAll() throws IOException {
2655          closeAll(false);
2656        }
2657    
2658        /**
2659         * Close all FileSystem instances in the Cache.
2660         * @param onlyAutomatic only close those that are marked for automatic closing
2661         */
2662        synchronized void closeAll(boolean onlyAutomatic) throws IOException {
2663          List<IOException> exceptions = new ArrayList<IOException>();
2664    
2665          // Make a copy of the keys in the map since we'll be modifying
2666          // the map while iterating over it, which isn't safe.
2667          List<Key> keys = new ArrayList<Key>();
2668          keys.addAll(map.keySet());
2669    
2670          for (Key key : keys) {
2671            final FileSystem fs = map.get(key);
2672    
2673            if (onlyAutomatic && !toAutoClose.contains(key)) {
2674              continue;
2675            }
2676    
2677            //remove from cache
2678            remove(key, fs);
2679    
2680            if (fs != null) {
2681              try {
2682                fs.close();
2683              }
2684              catch(IOException ioe) {
2685                exceptions.add(ioe);
2686              }
2687            }
2688          }
2689    
2690          if (!exceptions.isEmpty()) {
2691            throw MultipleIOException.createIOException(exceptions);
2692          }
2693        }
2694    
2695        private class ClientFinalizer implements Runnable {
2696          @Override
2697          public synchronized void run() {
2698            try {
2699              closeAll(true);
2700            } catch (IOException e) {
2701              LOG.info("FileSystem.Cache.closeAll() threw an exception:\n" + e);
2702            }
2703          }
2704        }
2705    
2706        synchronized void closeAll(UserGroupInformation ugi) throws IOException {
2707          List<FileSystem> targetFSList = new ArrayList<FileSystem>();
2708          //Make a pass over the list and collect the filesystems to close
2709          //we cannot close inline since close() removes the entry from the Map
2710          for (Map.Entry<Key, FileSystem> entry : map.entrySet()) {
2711            final Key key = entry.getKey();
2712            final FileSystem fs = entry.getValue();
2713            if (ugi.equals(key.ugi) && fs != null) {
2714              targetFSList.add(fs);   
2715            }
2716          }
2717          List<IOException> exceptions = new ArrayList<IOException>();
2718          //now make a pass over the target list and close each
2719          for (FileSystem fs : targetFSList) {
2720            try {
2721              fs.close();
2722            }
2723            catch(IOException ioe) {
2724              exceptions.add(ioe);
2725            }
2726          }
2727          if (!exceptions.isEmpty()) {
2728            throw MultipleIOException.createIOException(exceptions);
2729          }
2730        }
2731    
2732        /** FileSystem.Cache.Key */
2733        static class Key {
2734          final String scheme;
2735          final String authority;
2736          final UserGroupInformation ugi;
2737          final long unique;   // an artificial way to make a key unique
2738    
2739          Key(URI uri, Configuration conf) throws IOException {
2740            this(uri, conf, 0);
2741          }
2742    
2743          Key(URI uri, Configuration conf, long unique) throws IOException {
2744            scheme = uri.getScheme()==null?"":uri.getScheme().toLowerCase();
2745            authority = uri.getAuthority()==null?"":uri.getAuthority().toLowerCase();
2746            this.unique = unique;
2747            
2748            this.ugi = UserGroupInformation.getCurrentUser();
2749          }
2750    
2751          @Override
2752          public int hashCode() {
2753            return (scheme + authority).hashCode() + ugi.hashCode() + (int)unique;
2754          }
2755    
2756          static boolean isEqual(Object a, Object b) {
2757            return a == b || (a != null && a.equals(b));        
2758          }
2759    
2760          @Override
2761          public boolean equals(Object obj) {
2762            if (obj == this) {
2763              return true;
2764            }
2765            if (obj != null && obj instanceof Key) {
2766              Key that = (Key)obj;
2767              return isEqual(this.scheme, that.scheme)
2768                     && isEqual(this.authority, that.authority)
2769                     && isEqual(this.ugi, that.ugi)
2770                     && (this.unique == that.unique);
2771            }
2772            return false;        
2773          }
2774    
2775          @Override
2776          public String toString() {
2777            return "("+ugi.toString() + ")@" + scheme + "://" + authority;        
2778          }
2779        }
2780      }
2781      
2782      /**
2783       * Tracks statistics about how many reads, writes, and so forth have been
2784       * done in a FileSystem.
2785       * 
2786       * Since there is only one of these objects per FileSystem, there will 
2787       * typically be many threads writing to this object.  Almost every operation
2788       * on an open file will involve a write to this object.  In contrast, reading
2789       * statistics is done infrequently by most programs, and not at all by others.
2790       * Hence, this is optimized for writes.
2791       * 
2792       * Each thread writes to its own thread-local area of memory.  This removes 
2793       * contention and allows us to scale up to many, many threads.  To read
2794       * statistics, the reader thread totals up the contents of all of the 
2795       * thread-local data areas.
2796       */
2797      public static final class Statistics {
2798        /**
2799         * Statistics data.
2800         * 
2801         * There is only a single writer to thread-local StatisticsData objects.
2802         * Hence, volatile is adequate here-- we do not need AtomicLong or similar
2803         * to prevent lost updates.
2804         * The Java specification guarantees that updates to volatile longs will
2805         * be perceived as atomic with respect to other threads, which is all we
2806         * need.
2807         */
2808        public static class StatisticsData {
2809          volatile long bytesRead;
2810          volatile long bytesWritten;
2811          volatile int readOps;
2812          volatile int largeReadOps;
2813          volatile int writeOps;
2814          /**
2815           * Stores a weak reference to the thread owning this StatisticsData.
2816           * This allows us to remove StatisticsData objects that pertain to
2817           * threads that no longer exist.
2818           */
2819          final WeakReference<Thread> owner;
2820    
2821          StatisticsData(WeakReference<Thread> owner) {
2822            this.owner = owner;
2823          }
2824    
2825          /**
2826           * Add another StatisticsData object to this one.
2827           */
2828          void add(StatisticsData other) {
2829            this.bytesRead += other.bytesRead;
2830            this.bytesWritten += other.bytesWritten;
2831            this.readOps += other.readOps;
2832            this.largeReadOps += other.largeReadOps;
2833            this.writeOps += other.writeOps;
2834          }
2835    
2836          /**
2837           * Negate the values of all statistics.
2838           */
2839          void negate() {
2840            this.bytesRead = -this.bytesRead;
2841            this.bytesWritten = -this.bytesWritten;
2842            this.readOps = -this.readOps;
2843            this.largeReadOps = -this.largeReadOps;
2844            this.writeOps = -this.writeOps;
2845          }
2846    
2847          @Override
2848          public String toString() {
2849            return bytesRead + " bytes read, " + bytesWritten + " bytes written, "
2850                + readOps + " read ops, " + largeReadOps + " large read ops, "
2851                + writeOps + " write ops";
2852          }
2853          
2854          public long getBytesRead() {
2855            return bytesRead;
2856          }
2857          
2858          public long getBytesWritten() {
2859            return bytesWritten;
2860          }
2861          
2862          public int getReadOps() {
2863            return readOps;
2864          }
2865          
2866          public int getLargeReadOps() {
2867            return largeReadOps;
2868          }
2869          
2870          public int getWriteOps() {
2871            return writeOps;
2872          }
2873        }
2874    
2875        private interface StatisticsAggregator<T> {
2876          void accept(StatisticsData data);
2877          T aggregate();
2878        }
2879    
2880        private final String scheme;
2881    
2882        /**
2883         * rootData is data that doesn't belong to any thread, but will be added
2884         * to the totals.  This is useful for making copies of Statistics objects,
2885         * and for storing data that pertains to threads that have been garbage
2886         * collected.  Protected by the Statistics lock.
2887         */
2888        private final StatisticsData rootData;
2889    
2890        /**
2891         * Thread-local data.
2892         */
2893        private final ThreadLocal<StatisticsData> threadData;
2894        
2895        /**
2896         * List of all thread-local data areas.  Protected by the Statistics lock.
2897         */
2898        private LinkedList<StatisticsData> allData;
2899    
2900        public Statistics(String scheme) {
2901          this.scheme = scheme;
2902          this.rootData = new StatisticsData(null);
2903          this.threadData = new ThreadLocal<StatisticsData>();
2904          this.allData = null;
2905        }
2906    
2907        /**
2908         * Copy constructor.
2909         * 
2910         * @param other    The input Statistics object which is cloned.
2911         */
2912        public Statistics(Statistics other) {
2913          this.scheme = other.scheme;
2914          this.rootData = new StatisticsData(null);
2915          other.visitAll(new StatisticsAggregator<Void>() {
2916            @Override
2917            public void accept(StatisticsData data) {
2918              rootData.add(data);
2919            }
2920    
2921            public Void aggregate() {
2922              return null;
2923            }
2924          });
2925          this.threadData = new ThreadLocal<StatisticsData>();
2926        }
2927    
2928        /**
2929         * Get or create the thread-local data associated with the current thread.
2930         */
2931        public StatisticsData getThreadStatistics() {
2932          StatisticsData data = threadData.get();
2933          if (data == null) {
2934            data = new StatisticsData(
2935                new WeakReference<Thread>(Thread.currentThread()));
2936            threadData.set(data);
2937            synchronized(this) {
2938              if (allData == null) {
2939                allData = new LinkedList<StatisticsData>();
2940              }
2941              allData.add(data);
2942            }
2943          }
2944          return data;
2945        }
2946    
2947        /**
2948         * Increment the bytes read in the statistics
2949         * @param newBytes the additional bytes read
2950         */
2951        public void incrementBytesRead(long newBytes) {
2952          getThreadStatistics().bytesRead += newBytes;
2953        }
2954        
2955        /**
2956         * Increment the bytes written in the statistics
2957         * @param newBytes the additional bytes written
2958         */
2959        public void incrementBytesWritten(long newBytes) {
2960          getThreadStatistics().bytesWritten += newBytes;
2961        }
2962        
2963        /**
2964         * Increment the number of read operations
2965         * @param count number of read operations
2966         */
2967        public void incrementReadOps(int count) {
2968          getThreadStatistics().readOps += count;
2969        }
2970    
2971        /**
2972         * Increment the number of large read operations
2973         * @param count number of large read operations
2974         */
2975        public void incrementLargeReadOps(int count) {
2976          getThreadStatistics().largeReadOps += count;
2977        }
2978    
2979        /**
2980         * Increment the number of write operations
2981         * @param count number of write operations
2982         */
2983        public void incrementWriteOps(int count) {
2984          getThreadStatistics().writeOps += count;
2985        }
2986    
2987        /**
2988         * Apply the given aggregator to all StatisticsData objects associated with
2989         * this Statistics object.
2990         *
2991         * For each StatisticsData object, we will call accept on the visitor.
2992         * Finally, at the end, we will call aggregate to get the final total. 
2993         *
2994         * @param         The visitor to use.
2995         * @return        The total.
2996         */
2997        private synchronized <T> T visitAll(StatisticsAggregator<T> visitor) {
2998          visitor.accept(rootData);
2999          if (allData != null) {
3000            for (Iterator<StatisticsData> iter = allData.iterator();
3001                iter.hasNext(); ) {
3002              StatisticsData data = iter.next();
3003              visitor.accept(data);
3004              if (data.owner.get() == null) {
3005                /*
3006                 * If the thread that created this thread-local data no
3007                 * longer exists, remove the StatisticsData from our list
3008                 * and fold the values into rootData.
3009                 */
3010                rootData.add(data);
3011                iter.remove();
3012              }
3013            }
3014          }
3015          return visitor.aggregate();
3016        }
3017    
3018        /**
3019         * Get the total number of bytes read
3020         * @return the number of bytes
3021         */
3022        public long getBytesRead() {
3023          return visitAll(new StatisticsAggregator<Long>() {
3024            private long bytesRead = 0;
3025    
3026            @Override
3027            public void accept(StatisticsData data) {
3028              bytesRead += data.bytesRead;
3029            }
3030    
3031            public Long aggregate() {
3032              return bytesRead;
3033            }
3034          });
3035        }
3036        
3037        /**
3038         * Get the total number of bytes written
3039         * @return the number of bytes
3040         */
3041        public long getBytesWritten() {
3042          return visitAll(new StatisticsAggregator<Long>() {
3043            private long bytesWritten = 0;
3044    
3045            @Override
3046            public void accept(StatisticsData data) {
3047              bytesWritten += data.bytesWritten;
3048            }
3049    
3050            public Long aggregate() {
3051              return bytesWritten;
3052            }
3053          });
3054        }
3055        
3056        /**
3057         * Get the number of file system read operations such as list files
3058         * @return number of read operations
3059         */
3060        public int getReadOps() {
3061          return visitAll(new StatisticsAggregator<Integer>() {
3062            private int readOps = 0;
3063    
3064            @Override
3065            public void accept(StatisticsData data) {
3066              readOps += data.readOps;
3067              readOps += data.largeReadOps;
3068            }
3069    
3070            public Integer aggregate() {
3071              return readOps;
3072            }
3073          });
3074        }
3075    
3076        /**
3077         * Get the number of large file system read operations such as list files
3078         * under a large directory
3079         * @return number of large read operations
3080         */
3081        public int getLargeReadOps() {
3082          return visitAll(new StatisticsAggregator<Integer>() {
3083            private int largeReadOps = 0;
3084    
3085            @Override
3086            public void accept(StatisticsData data) {
3087              largeReadOps += data.largeReadOps;
3088            }
3089    
3090            public Integer aggregate() {
3091              return largeReadOps;
3092            }
3093          });
3094        }
3095    
3096        /**
3097         * Get the number of file system write operations such as create, append 
3098         * rename etc.
3099         * @return number of write operations
3100         */
3101        public int getWriteOps() {
3102          return visitAll(new StatisticsAggregator<Integer>() {
3103            private int writeOps = 0;
3104    
3105            @Override
3106            public void accept(StatisticsData data) {
3107              writeOps += data.writeOps;
3108            }
3109    
3110            public Integer aggregate() {
3111              return writeOps;
3112            }
3113          });
3114        }
3115    
3116    
3117        @Override
3118        public String toString() {
3119          return visitAll(new StatisticsAggregator<String>() {
3120            private StatisticsData total = new StatisticsData(null);
3121    
3122            @Override
3123            public void accept(StatisticsData data) {
3124              total.add(data);
3125            }
3126    
3127            public String aggregate() {
3128              return total.toString();
3129            }
3130          });
3131        }
3132    
3133        /**
3134         * Resets all statistics to 0.
3135         *
3136         * In order to reset, we add up all the thread-local statistics data, and
3137         * set rootData to the negative of that.
3138         *
3139         * This may seem like a counterintuitive way to reset the statsitics.  Why
3140         * can't we just zero out all the thread-local data?  Well, thread-local
3141         * data can only be modified by the thread that owns it.  If we tried to
3142         * modify the thread-local data from this thread, our modification might get
3143         * interleaved with a read-modify-write operation done by the thread that
3144         * owns the data.  That would result in our update getting lost.
3145         *
3146         * The approach used here avoids this problem because it only ever reads
3147         * (not writes) the thread-local data.  Both reads and writes to rootData
3148         * are done under the lock, so we're free to modify rootData from any thread
3149         * that holds the lock.
3150         */
3151        public void reset() {
3152          visitAll(new StatisticsAggregator<Void>() {
3153            private StatisticsData total = new StatisticsData(null);
3154    
3155            @Override
3156            public void accept(StatisticsData data) {
3157              total.add(data);
3158            }
3159    
3160            public Void aggregate() {
3161              total.negate();
3162              rootData.add(total);
3163              return null;
3164            }
3165          });
3166        }
3167        
3168        /**
3169         * Get the uri scheme associated with this statistics object.
3170         * @return the schema associated with this set of statistics
3171         */
3172        public String getScheme() {
3173          return scheme;
3174        }
3175      }
3176      
3177      /**
3178       * Get the Map of Statistics object indexed by URI Scheme.
3179       * @return a Map having a key as URI scheme and value as Statistics object
3180       * @deprecated use {@link #getAllStatistics} instead
3181       */
3182      @Deprecated
3183      public static synchronized Map<String, Statistics> getStatistics() {
3184        Map<String, Statistics> result = new HashMap<String, Statistics>();
3185        for(Statistics stat: statisticsTable.values()) {
3186          result.put(stat.getScheme(), stat);
3187        }
3188        return result;
3189      }
3190    
3191      /**
3192       * Return the FileSystem classes that have Statistics
3193       */
3194      public static synchronized List<Statistics> getAllStatistics() {
3195        return new ArrayList<Statistics>(statisticsTable.values());
3196      }
3197      
3198      /**
3199       * Get the statistics for a particular file system
3200       * @param cls the class to lookup
3201       * @return a statistics object
3202       */
3203      public static synchronized 
3204      Statistics getStatistics(String scheme, Class<? extends FileSystem> cls) {
3205        Statistics result = statisticsTable.get(cls);
3206        if (result == null) {
3207          result = new Statistics(scheme);
3208          statisticsTable.put(cls, result);
3209        }
3210        return result;
3211      }
3212      
3213      /**
3214       * Reset all statistics for all file systems
3215       */
3216      public static synchronized void clearStatistics() {
3217        for(Statistics stat: statisticsTable.values()) {
3218          stat.reset();
3219        }
3220      }
3221    
3222      /**
3223       * Print all statistics for all file systems
3224       */
3225      public static synchronized
3226      void printStatistics() throws IOException {
3227        for (Map.Entry<Class<? extends FileSystem>, Statistics> pair: 
3228                statisticsTable.entrySet()) {
3229          System.out.println("  FileSystem " + pair.getKey().getName() + 
3230                             ": " + pair.getValue());
3231        }
3232      }
3233    
3234      // Symlinks are temporarily disabled - see HADOOP-10020 and HADOOP-10052
3235      private static boolean symlinksEnabled = false;
3236    
3237      private static Configuration conf = null;
3238    
3239      @VisibleForTesting
3240      public static boolean areSymlinksEnabled() {
3241        return symlinksEnabled;
3242      }
3243    
3244      @VisibleForTesting
3245      public static void enableSymlinks() {
3246        symlinksEnabled = true;
3247      }
3248    }