001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.fs;
019
020import java.io.Closeable;
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.net.URI;
024import java.security.PrivilegedExceptionAction;
025import java.util.ArrayList;
026import java.util.Arrays;
027import java.util.Collections;
028import java.util.EnumSet;
029import java.util.HashMap;
030import java.util.HashSet;
031import java.util.IdentityHashMap;
032import java.util.Iterator;
033import java.util.List;
034import java.util.Map;
035import java.util.NoSuchElementException;
036import java.util.Set;
037import java.util.Stack;
038import java.util.TreeSet;
039import java.util.concurrent.atomic.AtomicInteger;
040import java.util.concurrent.atomic.AtomicLong;
041
042import org.apache.commons.logging.Log;
043import org.apache.commons.logging.LogFactory;
044import org.apache.hadoop.classification.InterfaceAudience;
045import org.apache.hadoop.classification.InterfaceStability;
046import org.apache.hadoop.conf.Configuration;
047import org.apache.hadoop.conf.Configured;
048import org.apache.hadoop.fs.Options.ChecksumOpt;
049import org.apache.hadoop.fs.Options.Rename;
050import org.apache.hadoop.fs.permission.FsPermission;
051import org.apache.hadoop.io.MultipleIOException;
052import org.apache.hadoop.io.Text;
053import org.apache.hadoop.net.NetUtils;
054import org.apache.hadoop.security.Credentials;
055import org.apache.hadoop.security.SecurityUtil;
056import org.apache.hadoop.security.UserGroupInformation;
057import org.apache.hadoop.security.token.Token;
058import org.apache.hadoop.util.DataChecksum;
059import org.apache.hadoop.util.Progressable;
060import org.apache.hadoop.util.ReflectionUtils;
061import org.apache.hadoop.util.ShutdownHookManager;
062
063import com.google.common.annotations.VisibleForTesting;
064
065/****************************************************************
066 * An abstract base class for a fairly generic filesystem.  It
067 * may be implemented as a distributed filesystem, or as a "local"
068 * one that reflects the locally-connected disk.  The local version
069 * exists for small Hadoop instances and for testing.
070 *
071 * <p>
072 *
073 * All user code that may potentially use the Hadoop Distributed
074 * File System should be written to use a FileSystem object.  The
075 * Hadoop DFS is a multi-machine system that appears as a single
076 * disk.  It's useful because of its fault tolerance and potentially
077 * very large capacity.
078 * 
079 * <p>
080 * The local implementation is {@link LocalFileSystem} and distributed
081 * implementation is DistributedFileSystem.
082 *****************************************************************/
083@InterfaceAudience.Public
084@InterfaceStability.Stable
085public abstract class FileSystem extends Configured implements Closeable {
086  public static final String FS_DEFAULT_NAME_KEY = 
087                   CommonConfigurationKeys.FS_DEFAULT_NAME_KEY;
088  public static final String DEFAULT_FS = 
089                   CommonConfigurationKeys.FS_DEFAULT_NAME_DEFAULT;
090
091  public static final Log LOG = LogFactory.getLog(FileSystem.class);
092
093  /**
094   * Priority of the FileSystem shutdown hook.
095   */
096  public static final int SHUTDOWN_HOOK_PRIORITY = 10;
097
098  /** FileSystem cache */
099  static final Cache CACHE = new Cache();
100
101  /** The key this instance is stored under in the cache. */
102  private Cache.Key key;
103
104  /** Recording statistics per a FileSystem class */
105  private static final Map<Class<? extends FileSystem>, Statistics> 
106    statisticsTable =
107      new IdentityHashMap<Class<? extends FileSystem>, Statistics>();
108  
109  /**
110   * The statistics for this file system.
111   */
112  protected Statistics statistics;
113
114  /**
115   * A cache of files that should be deleted when filsystem is closed
116   * or the JVM is exited.
117   */
118  private Set<Path> deleteOnExit = new TreeSet<Path>();
119  
120  /**
121   * This method adds a file system for testing so that we can find it later. It
122   * is only for testing.
123   * @param uri the uri to store it under
124   * @param conf the configuration to store it under
125   * @param fs the file system to store
126   * @throws IOException
127   */
128  static void addFileSystemForTesting(URI uri, Configuration conf,
129      FileSystem fs) throws IOException {
130    CACHE.map.put(new Cache.Key(uri, conf), fs);
131  }
132
133  /**
134   * Get a filesystem instance based on the uri, the passed
135   * configuration and the user
136   * @param uri of the filesystem
137   * @param conf the configuration to use
138   * @param user to perform the get as
139   * @return the filesystem instance
140   * @throws IOException
141   * @throws InterruptedException
142   */
143  public static FileSystem get(final URI uri, final Configuration conf,
144        final String user) throws IOException, InterruptedException {
145    UserGroupInformation ugi;
146    if (user == null) {
147      ugi = UserGroupInformation.getCurrentUser();
148    } else {
149      ugi = UserGroupInformation.createRemoteUser(user);
150    }
151    return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
152      public FileSystem run() throws IOException {
153        return get(uri, conf);
154      }
155    });
156  }
157
158  /**
159   * Returns the configured filesystem implementation.
160   * @param conf the configuration to use
161   */
162  public static FileSystem get(Configuration conf) throws IOException {
163    return get(getDefaultUri(conf), conf);
164  }
165  
166  /** Get the default filesystem URI from a configuration.
167   * @param conf the configuration to use
168   * @return the uri of the default filesystem
169   */
170  public static URI getDefaultUri(Configuration conf) {
171    return URI.create(fixName(conf.get(FS_DEFAULT_NAME_KEY, DEFAULT_FS)));
172  }
173
174  /** Set the default filesystem URI in a configuration.
175   * @param conf the configuration to alter
176   * @param uri the new default filesystem uri
177   */
178  public static void setDefaultUri(Configuration conf, URI uri) {
179    conf.set(FS_DEFAULT_NAME_KEY, uri.toString());
180  }
181
182  /** Set the default filesystem URI in a configuration.
183   * @param conf the configuration to alter
184   * @param uri the new default filesystem uri
185   */
186  public static void setDefaultUri(Configuration conf, String uri) {
187    setDefaultUri(conf, URI.create(fixName(uri)));
188  }
189
190  /** Called after a new FileSystem instance is constructed.
191   * @param name a uri whose authority section names the host, port, etc.
192   *   for this FileSystem
193   * @param conf the configuration
194   */
195  public void initialize(URI name, Configuration conf) throws IOException {
196    statistics = getStatistics(name.getScheme(), getClass());    
197  }
198
199  /** Returns a URI whose scheme and authority identify this FileSystem.*/
200  public abstract URI getUri();
201  
202  /**
203   * Resolve the uri's hostname and add the default port if not in the uri
204   * @return URI
205   * @see NetUtils#getCanonicalUri(URI, int)
206   */
207  protected URI getCanonicalUri() {
208    return NetUtils.getCanonicalUri(getUri(), getDefaultPort());
209  }
210  
211  /**
212   * Get the default port for this file system.
213   * @return the default port or 0 if there isn't one
214   */
215  protected int getDefaultPort() {
216    return 0;
217  }
218
219  /**
220   * Get a canonical service name for this file system.  The token cache is
221   * the only user of the canonical service name, and uses it to lookup this
222   * filesystem's service tokens.
223   * If file system provides a token of its own then it must have a canonical
224   * name, otherwise canonical name can be null.
225   * 
226   * Default Impl: If the file system has child file systems 
227   * (such as an embedded file system) then it is assumed that the fs has no
228   * tokens of its own and hence returns a null name; otherwise a service
229   * name is built using Uri and port.
230   * 
231   * @return a service string that uniquely identifies this file system, null
232   *         if the filesystem does not implement tokens
233   * @see SecurityUtil#buildDTServiceName(URI, int) 
234   */
235  @InterfaceAudience.LimitedPrivate({ "HDFS", "MapReduce" })
236  public String getCanonicalServiceName() {
237    return (getChildFileSystems() == null)
238      ? SecurityUtil.buildDTServiceName(getUri(), getDefaultPort())
239      : null;
240  }
241
242  /** @deprecated call #getUri() instead.*/
243  @Deprecated
244  public String getName() { return getUri().toString(); }
245
246  /** @deprecated call #get(URI,Configuration) instead. */
247  @Deprecated
248  public static FileSystem getNamed(String name, Configuration conf)
249    throws IOException {
250    return get(URI.create(fixName(name)), conf);
251  }
252  
253  /** Update old-format filesystem names, for back-compatibility.  This should
254   * eventually be replaced with a checkName() method that throws an exception
255   * for old-format names. */ 
256  private static String fixName(String name) {
257    // convert old-format name to new-format name
258    if (name.equals("local")) {         // "local" is now "file:///".
259      LOG.warn("\"local\" is a deprecated filesystem name."
260               +" Use \"file:///\" instead.");
261      name = "file:///";
262    } else if (name.indexOf('/')==-1) {   // unqualified is "hdfs://"
263      LOG.warn("\""+name+"\" is a deprecated filesystem name."
264               +" Use \"hdfs://"+name+"/\" instead.");
265      name = "hdfs://"+name;
266    }
267    return name;
268  }
269
270  /**
271   * Get the local file system.
272   * @param conf the configuration to configure the file system with
273   * @return a LocalFileSystem
274   */
275  public static LocalFileSystem getLocal(Configuration conf)
276    throws IOException {
277    return (LocalFileSystem)get(LocalFileSystem.NAME, conf);
278  }
279
280  /** Returns the FileSystem for this URI's scheme and authority.  The scheme
281   * of the URI determines a configuration property name,
282   * <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
283   * The entire URI is passed to the FileSystem instance's initialize method.
284   */
285  public static FileSystem get(URI uri, Configuration conf) throws IOException {
286    String scheme = uri.getScheme();
287    String authority = uri.getAuthority();
288
289    if (scheme == null && authority == null) {     // use default FS
290      return get(conf);
291    }
292
293    if (scheme != null && authority == null) {     // no authority
294      URI defaultUri = getDefaultUri(conf);
295      if (scheme.equals(defaultUri.getScheme())    // if scheme matches default
296          && defaultUri.getAuthority() != null) {  // & default has authority
297        return get(defaultUri, conf);              // return default
298      }
299    }
300    
301    String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
302    if (conf.getBoolean(disableCacheName, false)) {
303      return createFileSystem(uri, conf);
304    }
305
306    return CACHE.get(uri, conf);
307  }
308
309  /**
310   * Returns the FileSystem for this URI's scheme and authority and the 
311   * passed user. Internally invokes {@link #newInstance(URI, Configuration)}
312   * @param uri of the filesystem
313   * @param conf the configuration to use
314   * @param user to perform the get as
315   * @return filesystem instance
316   * @throws IOException
317   * @throws InterruptedException
318   */
319  public static FileSystem newInstance(final URI uri, final Configuration conf,
320      final String user) throws IOException, InterruptedException {
321    UserGroupInformation ugi;
322    if (user == null) {
323      ugi = UserGroupInformation.getCurrentUser();
324    } else {
325      ugi = UserGroupInformation.createRemoteUser(user);
326    }
327    return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
328      public FileSystem run() throws IOException {
329        return newInstance(uri,conf); 
330      }
331    });
332  }
333  /** Returns the FileSystem for this URI's scheme and authority.  The scheme
334   * of the URI determines a configuration property name,
335   * <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
336   * The entire URI is passed to the FileSystem instance's initialize method.
337   * This always returns a new FileSystem object.
338   */
339  public static FileSystem newInstance(URI uri, Configuration conf) throws IOException {
340    String scheme = uri.getScheme();
341    String authority = uri.getAuthority();
342
343    if (scheme == null) {                       // no scheme: use default FS
344      return newInstance(conf);
345    }
346
347    if (authority == null) {                       // no authority
348      URI defaultUri = getDefaultUri(conf);
349      if (scheme.equals(defaultUri.getScheme())    // if scheme matches default
350          && defaultUri.getAuthority() != null) {  // & default has authority
351        return newInstance(defaultUri, conf);              // return default
352      }
353    }
354    return CACHE.getUnique(uri, conf);
355  }
356
357  /** Returns a unique configured filesystem implementation.
358   * This always returns a new FileSystem object.
359   * @param conf the configuration to use
360   */
361  public static FileSystem newInstance(Configuration conf) throws IOException {
362    return newInstance(getDefaultUri(conf), conf);
363  }
364
365  /**
366   * Get a unique local file system object
367   * @param conf the configuration to configure the file system with
368   * @return a LocalFileSystem
369   * This always returns a new FileSystem object.
370   */
371  public static LocalFileSystem newInstanceLocal(Configuration conf)
372    throws IOException {
373    return (LocalFileSystem)newInstance(LocalFileSystem.NAME, conf);
374  }
375
376  /**
377   * Close all cached filesystems. Be sure those filesystems are not
378   * used anymore.
379   * 
380   * @throws IOException
381   */
382  public static void closeAll() throws IOException {
383    CACHE.closeAll();
384  }
385
386  /**
387   * Close all cached filesystems for a given UGI. Be sure those filesystems 
388   * are not used anymore.
389   * @param ugi user group info to close
390   * @throws IOException
391   */
392  public static void closeAllForUGI(UserGroupInformation ugi) 
393  throws IOException {
394    CACHE.closeAll(ugi);
395  }
396
397  /** 
398   * Make sure that a path specifies a FileSystem.
399   * @param path to use
400   */
401  public Path makeQualified(Path path) {
402    checkPath(path);
403    return path.makeQualified(this.getUri(), this.getWorkingDirectory());
404  }
405    
406  /**
407   * Get a new delegation token for this file system.
408   * This is an internal method that should have been declared protected
409   * but wasn't historically.
410   * Callers should use {@link #addDelegationTokens(String, Credentials)}
411   * 
412   * @param renewer the account name that is allowed to renew the token.
413   * @return a new delegation token
414   * @throws IOException
415   */
416  @InterfaceAudience.Private()
417  public Token<?> getDelegationToken(String renewer) throws IOException {
418    return null;
419  }
420  
421  /**
422   * Obtain all delegation tokens used by this FileSystem that are not
423   * already present in the given Credentials.  Existing tokens will neither
424   * be verified as valid nor having the given renewer.  Missing tokens will
425   * be acquired and added to the given Credentials.
426   * 
427   * Default Impl: works for simple fs with its own token
428   * and also for an embedded fs whose tokens are those of its
429   * children file system (i.e. the embedded fs has not tokens of its
430   * own).
431   * 
432   * @param renewer the user allowed to renew the delegation tokens
433   * @param credentials cache in which to add new delegation tokens
434   * @return list of new delegation tokens
435   * @throws IOException
436   */
437  @InterfaceAudience.LimitedPrivate({ "HDFS", "MapReduce" })
438  public Token<?>[] addDelegationTokens(
439      final String renewer, Credentials credentials) throws IOException {
440    if (credentials == null) {
441      credentials = new Credentials();
442    }
443    final List<Token<?>> tokens = new ArrayList<Token<?>>();
444    collectDelegationTokens(renewer, credentials, tokens);
445    return tokens.toArray(new Token<?>[tokens.size()]);
446  }
447  
448  /**
449   * Recursively obtain the tokens for this FileSystem and all descended
450   * FileSystems as determined by getChildFileSystems().
451   * @param renewer the user allowed to renew the delegation tokens
452   * @param credentials cache in which to add the new delegation tokens
453   * @param tokens list in which to add acquired tokens
454   * @throws IOException
455   */
456  private void collectDelegationTokens(final String renewer,
457                                       final Credentials credentials,
458                                       final List<Token<?>> tokens)
459                                           throws IOException {
460    final String serviceName = getCanonicalServiceName();
461    // Collect token of the this filesystem and then of its embedded children
462    if (serviceName != null) { // fs has token, grab it
463      final Text service = new Text(serviceName);
464      Token<?> token = credentials.getToken(service);
465      if (token == null) {
466        token = getDelegationToken(renewer);
467        if (token != null) {
468          tokens.add(token);
469          credentials.addToken(service, token);
470        }
471      }
472    }
473    // Now collect the tokens from the children
474    final FileSystem[] children = getChildFileSystems();
475    if (children != null) {
476      for (final FileSystem fs : children) {
477        fs.collectDelegationTokens(renewer, credentials, tokens);
478      }
479    }
480  }
481
482  /**
483   * Get all the immediate child FileSystems embedded in this FileSystem.
484   * It does not recurse and get grand children.  If a FileSystem
485   * has multiple child FileSystems, then it should return a unique list
486   * of those FileSystems.  Default is to return null to signify no children.
487   * 
488   * @return FileSystems used by this FileSystem
489   */
490  @InterfaceAudience.LimitedPrivate({ "HDFS" })
491  @VisibleForTesting
492  public FileSystem[] getChildFileSystems() {
493    return null;
494  }
495  
496  /** create a file with the provided permission
497   * The permission of the file is set to be the provided permission as in
498   * setPermission, not permission&~umask
499   * 
500   * It is implemented using two RPCs. It is understood that it is inefficient,
501   * but the implementation is thread-safe. The other option is to change the
502   * value of umask in configuration to be 0, but it is not thread-safe.
503   * 
504   * @param fs file system handle
505   * @param file the name of the file to be created
506   * @param permission the permission of the file
507   * @return an output stream
508   * @throws IOException
509   */
510  public static FSDataOutputStream create(FileSystem fs,
511      Path file, FsPermission permission) throws IOException {
512    // create the file with default permission
513    FSDataOutputStream out = fs.create(file);
514    // set its permission to the supplied one
515    fs.setPermission(file, permission);
516    return out;
517  }
518
519  /** create a directory with the provided permission
520   * The permission of the directory is set to be the provided permission as in
521   * setPermission, not permission&~umask
522   * 
523   * @see #create(FileSystem, Path, FsPermission)
524   * 
525   * @param fs file system handle
526   * @param dir the name of the directory to be created
527   * @param permission the permission of the directory
528   * @return true if the directory creation succeeds; false otherwise
529   * @throws IOException
530   */
531  public static boolean mkdirs(FileSystem fs, Path dir, FsPermission permission)
532  throws IOException {
533    // create the directory using the default permission
534    boolean result = fs.mkdirs(dir);
535    // set its permission to be the supplied one
536    fs.setPermission(dir, permission);
537    return result;
538  }
539
540  ///////////////////////////////////////////////////////////////
541  // FileSystem
542  ///////////////////////////////////////////////////////////////
543
544  protected FileSystem() {
545    super(null);
546  }
547
548  /** 
549   * Check that a Path belongs to this FileSystem.
550   * @param path to check
551   */
552  protected void checkPath(Path path) {
553    URI uri = path.toUri();
554    String thatScheme = uri.getScheme();
555    if (thatScheme == null)                // fs is relative
556      return;
557    URI thisUri = getCanonicalUri();
558    String thisScheme = thisUri.getScheme();
559    //authority and scheme are not case sensitive
560    if (thisScheme.equalsIgnoreCase(thatScheme)) {// schemes match
561      String thisAuthority = thisUri.getAuthority();
562      String thatAuthority = uri.getAuthority();
563      if (thatAuthority == null &&                // path's authority is null
564          thisAuthority != null) {                // fs has an authority
565        URI defaultUri = getDefaultUri(getConf());
566        if (thisScheme.equalsIgnoreCase(defaultUri.getScheme())) {
567          uri = defaultUri; // schemes match, so use this uri instead
568        } else {
569          uri = null; // can't determine auth of the path
570        }
571      }
572      if (uri != null) {
573        // canonicalize uri before comparing with this fs
574        uri = NetUtils.getCanonicalUri(uri, getDefaultPort());
575        thatAuthority = uri.getAuthority();
576        if (thisAuthority == thatAuthority ||       // authorities match
577            (thisAuthority != null &&
578             thisAuthority.equalsIgnoreCase(thatAuthority)))
579          return;
580      }
581    }
582    throw new IllegalArgumentException("Wrong FS: "+path+
583                                       ", expected: "+this.getUri());
584  }
585
586  /**
587   * Return an array containing hostnames, offset and size of 
588   * portions of the given file.  For a nonexistent 
589   * file or regions, null will be returned.
590   *
591   * This call is most helpful with DFS, where it returns 
592   * hostnames of machines that contain the given file.
593   *
594   * The FileSystem will simply return an elt containing 'localhost'.
595   *
596   * @param file FilesStatus to get data from
597   * @param start offset into the given file
598   * @param len length for which to get locations for
599   */
600  public BlockLocation[] getFileBlockLocations(FileStatus file, 
601      long start, long len) throws IOException {
602    if (file == null) {
603      return null;
604    }
605
606    if (start < 0 || len < 0) {
607      throw new IllegalArgumentException("Invalid start or len parameter");
608    }
609
610    if (file.getLen() <= start) {
611      return new BlockLocation[0];
612
613    }
614    String[] name = { "localhost:50010" };
615    String[] host = { "localhost" };
616    return new BlockLocation[] {
617      new BlockLocation(name, host, 0, file.getLen()) };
618  }
619 
620
621  /**
622   * Return an array containing hostnames, offset and size of 
623   * portions of the given file.  For a nonexistent 
624   * file or regions, null will be returned.
625   *
626   * This call is most helpful with DFS, where it returns 
627   * hostnames of machines that contain the given file.
628   *
629   * The FileSystem will simply return an elt containing 'localhost'.
630   *
631   * @param p path is used to identify an FS since an FS could have
632   *          another FS that it could be delegating the call to
633   * @param start offset into the given file
634   * @param len length for which to get locations for
635   */
636  public BlockLocation[] getFileBlockLocations(Path p, 
637      long start, long len) throws IOException {
638    if (p == null) {
639      throw new NullPointerException();
640    }
641    FileStatus file = getFileStatus(p);
642    return getFileBlockLocations(file, start, len);
643  }
644  
645  /**
646   * Return a set of server default configuration values
647   * @return server default configuration values
648   * @throws IOException
649   */
650  public FsServerDefaults getServerDefaults() throws IOException {
651    Configuration conf = getConf();
652    // CRC32 is chosen as default as it is available in all 
653    // releases that support checksum.
654    return new FsServerDefaults(getDefaultBlockSize(), 
655        conf.getInt("io.bytes.per.checksum", 512), 
656        64 * 1024, 
657        getDefaultReplication(),
658        conf.getInt("io.file.buffer.size", 4096),
659        DataChecksum.Type.CRC32);
660  }
661
662  /**
663   * Return a set of server default configuration values
664   * @param p path is used to identify an FS since an FS could have
665   *          another FS that it could be delegating the call to
666   * @return server default configuration values
667   * @throws IOException
668   */
669  public FsServerDefaults getServerDefaults(Path p) throws IOException {
670    return getServerDefaults();
671  }
672
673  /**
674   * Return the fully-qualified path of path f resolving the path
675   * through any symlinks or mount point
676   * @param p path to be resolved
677   * @return fully qualified path 
678   * @throws FileNotFoundException
679   */
680   public Path resolvePath(final Path p) throws IOException {
681     checkPath(p);
682     return getFileStatus(p).getPath();
683   }
684
685  /**
686   * Opens an FSDataInputStream at the indicated Path.
687   * @param f the file name to open
688   * @param bufferSize the size of the buffer to be used.
689   */
690  public abstract FSDataInputStream open(Path f, int bufferSize)
691    throws IOException;
692    
693  /**
694   * Opens an FSDataInputStream at the indicated Path.
695   * @param f the file to open
696   */
697  public FSDataInputStream open(Path f) throws IOException {
698    return open(f, getConf().getInt("io.file.buffer.size", 4096));
699  }
700
701  /**
702   * Create an FSDataOutputStream at the indicated Path.
703   * Files are overwritten by default.
704   * @param f the file to create
705   */
706  public FSDataOutputStream create(Path f) throws IOException {
707    return create(f, true);
708  }
709
710  /**
711   * Create an FSDataOutputStream at the indicated Path.
712   * @param f the file to create
713   * @param overwrite if a file with this name already exists, then if true,
714   *   the file will be overwritten, and if false an exception will be thrown.
715   */
716  public FSDataOutputStream create(Path f, boolean overwrite)
717      throws IOException {
718    return create(f, overwrite, 
719                  getConf().getInt("io.file.buffer.size", 4096),
720                  getDefaultReplication(f),
721                  getDefaultBlockSize(f));
722  }
723
724  /**
725   * Create an FSDataOutputStream at the indicated Path with write-progress
726   * reporting.
727   * Files are overwritten by default.
728   * @param f the file to create
729   * @param progress to report progress
730   */
731  public FSDataOutputStream create(Path f, Progressable progress) 
732      throws IOException {
733    return create(f, true, 
734                  getConf().getInt("io.file.buffer.size", 4096),
735                  getDefaultReplication(f),
736                  getDefaultBlockSize(f), progress);
737  }
738
739  /**
740   * Create an FSDataOutputStream at the indicated Path.
741   * Files are overwritten by default.
742   * @param f the file to create
743   * @param replication the replication factor
744   */
745  public FSDataOutputStream create(Path f, short replication)
746      throws IOException {
747    return create(f, true, 
748                  getConf().getInt("io.file.buffer.size", 4096),
749                  replication,
750                  getDefaultBlockSize(f));
751  }
752
753  /**
754   * Create an FSDataOutputStream at the indicated Path with write-progress
755   * reporting.
756   * Files are overwritten by default.
757   * @param f the file to create
758   * @param replication the replication factor
759   * @param progress to report progress
760   */
761  public FSDataOutputStream create(Path f, short replication, 
762      Progressable progress) throws IOException {
763    return create(f, true, 
764                  getConf().getInt("io.file.buffer.size", 4096),
765                  replication,
766                  getDefaultBlockSize(f), progress);
767  }
768
769    
770  /**
771   * Create an FSDataOutputStream at the indicated Path.
772   * @param f the file name to create
773   * @param overwrite if a file with this name already exists, then if true,
774   *   the file will be overwritten, and if false an error will be thrown.
775   * @param bufferSize the size of the buffer to be used.
776   */
777  public FSDataOutputStream create(Path f, 
778                                   boolean overwrite,
779                                   int bufferSize
780                                   ) throws IOException {
781    return create(f, overwrite, bufferSize, 
782                  getDefaultReplication(f),
783                  getDefaultBlockSize(f));
784  }
785    
786  /**
787   * Create an FSDataOutputStream at the indicated Path with write-progress
788   * reporting.
789   * @param f the path of the file to open
790   * @param overwrite if a file with this name already exists, then if true,
791   *   the file will be overwritten, and if false an error will be thrown.
792   * @param bufferSize the size of the buffer to be used.
793   */
794  public FSDataOutputStream create(Path f, 
795                                   boolean overwrite,
796                                   int bufferSize,
797                                   Progressable progress
798                                   ) throws IOException {
799    return create(f, overwrite, bufferSize, 
800                  getDefaultReplication(f),
801                  getDefaultBlockSize(f), progress);
802  }
803    
804    
805  /**
806   * Create an FSDataOutputStream at the indicated Path.
807   * @param f the file name to open
808   * @param overwrite if a file with this name already exists, then if true,
809   *   the file will be overwritten, and if false an error will be thrown.
810   * @param bufferSize the size of the buffer to be used.
811   * @param replication required block replication for the file. 
812   */
813  public FSDataOutputStream create(Path f, 
814                                   boolean overwrite,
815                                   int bufferSize,
816                                   short replication,
817                                   long blockSize
818                                   ) throws IOException {
819    return create(f, overwrite, bufferSize, replication, blockSize, null);
820  }
821
822  /**
823   * Create an FSDataOutputStream at the indicated Path with write-progress
824   * reporting.
825   * @param f the file name to open
826   * @param overwrite if a file with this name already exists, then if true,
827   *   the file will be overwritten, and if false an error will be thrown.
828   * @param bufferSize the size of the buffer to be used.
829   * @param replication required block replication for the file. 
830   */
831  public FSDataOutputStream create(Path f,
832                                            boolean overwrite,
833                                            int bufferSize,
834                                            short replication,
835                                            long blockSize,
836                                            Progressable progress
837                                            ) throws IOException {
838    return this.create(f, FsPermission.getFileDefault().applyUMask(
839        FsPermission.getUMask(getConf())), overwrite, bufferSize,
840        replication, blockSize, progress);
841  }
842
843  /**
844   * Create an FSDataOutputStream at the indicated Path with write-progress
845   * reporting.
846   * @param f the file name to open
847   * @param permission
848   * @param overwrite if a file with this name already exists, then if true,
849   *   the file will be overwritten, and if false an error will be thrown.
850   * @param bufferSize the size of the buffer to be used.
851   * @param replication required block replication for the file.
852   * @param blockSize
853   * @param progress
854   * @throws IOException
855   * @see #setPermission(Path, FsPermission)
856   */
857  public abstract FSDataOutputStream create(Path f,
858      FsPermission permission,
859      boolean overwrite,
860      int bufferSize,
861      short replication,
862      long blockSize,
863      Progressable progress) throws IOException;
864
865   /**
866    * Create an FSDataOutputStream at the indicated Path with a custom
867    * checksum option. This create method is the common method to be
868    * used to specify ChecksumOpt in both 0.23.x and 2.x.
869    *
870    * @param f the file name to open
871    * @param permission
872    * @param flags {@link CreateFlag}s to use for this stream.
873    * @param bufferSize the size of the buffer to be used.
874    * @param replication required block replication for the file.
875    * @param blockSize
876    * @param progress
877    * @param checksumOpt checksum parameter. If null, the values
878    *        found in conf will be used.
879    * @throws IOException
880    * @see #setPermission(Path, FsPermission)
881    */
882   public FSDataOutputStream create(Path f,
883       FsPermission permission,
884       EnumSet<CreateFlag> flags,
885       int bufferSize,
886       short replication,
887       long blockSize,
888       Progressable progress,
889       ChecksumOpt checksumOpt) throws IOException {
890     // Checksum options are ignored by default. The file systems that
891     // implement checksum need to override this method. The full
892     // support is currently only available in DFS.
893     return create(f, permission, flags.contains(CreateFlag.OVERWRITE), 
894         bufferSize, replication, blockSize, progress);
895   }
896  
897  /*.
898   * This create has been added to support the FileContext that processes
899   * the permission
900   * with umask before calling this method.
901   * This a temporary method added to support the transition from FileSystem
902   * to FileContext for user applications.
903   */
904  @Deprecated
905  protected FSDataOutputStream primitiveCreate(Path f,
906     FsPermission absolutePermission, EnumSet<CreateFlag> flag, int bufferSize,
907     short replication, long blockSize, Progressable progress,
908     ChecksumOpt checksumOpt) throws IOException {
909
910    boolean pathExists = exists(f);
911    CreateFlag.validate(f, pathExists, flag);
912    
913    // Default impl  assumes that permissions do not matter and 
914    // nor does the bytesPerChecksum  hence
915    // calling the regular create is good enough.
916    // FSs that implement permissions should override this.
917
918    if (pathExists && flag.contains(CreateFlag.APPEND)) {
919      return append(f, bufferSize, progress);
920    }
921    
922    return this.create(f, absolutePermission,
923        flag.contains(CreateFlag.OVERWRITE), bufferSize, replication,
924        blockSize, progress);
925  }
926  
927  /**
928   * This version of the mkdirs method assumes that the permission is absolute.
929   * It has been added to support the FileContext that processes the permission
930   * with umask before calling this method.
931   * This a temporary method added to support the transition from FileSystem
932   * to FileContext for user applications.
933   */
934  @Deprecated
935  protected boolean primitiveMkdir(Path f, FsPermission absolutePermission)
936    throws IOException {
937    // Default impl is to assume that permissions do not matter and hence
938    // calling the regular mkdirs is good enough.
939    // FSs that implement permissions should override this.
940   return this.mkdirs(f, absolutePermission);
941  }
942
943
944  /**
945   * This version of the mkdirs method assumes that the permission is absolute.
946   * It has been added to support the FileContext that processes the permission
947   * with umask before calling this method.
948   * This a temporary method added to support the transition from FileSystem
949   * to FileContext for user applications.
950   */
951  @Deprecated
952  protected void primitiveMkdir(Path f, FsPermission absolutePermission, 
953                    boolean createParent)
954    throws IOException {
955    
956    if (!createParent) { // parent must exist.
957      // since the this.mkdirs makes parent dirs automatically
958      // we must throw exception if parent does not exist.
959      final FileStatus stat = getFileStatus(f.getParent());
960      if (stat == null) {
961        throw new FileNotFoundException("Missing parent:" + f);
962      }
963      if (!stat.isDirectory()) {
964        throw new ParentNotDirectoryException("parent is not a dir");
965      }
966      // parent does exist - go ahead with mkdir of leaf
967    }
968    // Default impl is to assume that permissions do not matter and hence
969    // calling the regular mkdirs is good enough.
970    // FSs that implement permissions should override this.
971    if (!this.mkdirs(f, absolutePermission)) {
972      throw new IOException("mkdir of "+ f + " failed");
973    }
974  }
975
976  /**
977   * Opens an FSDataOutputStream at the indicated Path with write-progress
978   * reporting. Same as create(), except fails if parent directory doesn't
979   * already exist.
980   * @param f the file name to open
981   * @param overwrite if a file with this name already exists, then if true,
982   * the file will be overwritten, and if false an error will be thrown.
983   * @param bufferSize the size of the buffer to be used.
984   * @param replication required block replication for the file.
985   * @param blockSize
986   * @param progress
987   * @throws IOException
988   * @see #setPermission(Path, FsPermission)
989   * @deprecated API only for 0.20-append
990   */
991  @Deprecated
992  public FSDataOutputStream createNonRecursive(Path f,
993      boolean overwrite,
994      int bufferSize, short replication, long blockSize,
995      Progressable progress) throws IOException {
996    return this.createNonRecursive(f, FsPermission.getFileDefault(),
997        overwrite, bufferSize, replication, blockSize, progress);
998  }
999
1000  /**
1001   * Opens an FSDataOutputStream at the indicated Path with write-progress
1002   * reporting. Same as create(), except fails if parent directory doesn't
1003   * already exist.
1004   * @param f the file name to open
1005   * @param permission
1006   * @param overwrite if a file with this name already exists, then if true,
1007   * the file will be overwritten, and if false an error will be thrown.
1008   * @param bufferSize the size of the buffer to be used.
1009   * @param replication required block replication for the file.
1010   * @param blockSize
1011   * @param progress
1012   * @throws IOException
1013   * @see #setPermission(Path, FsPermission)
1014   * @deprecated API only for 0.20-append
1015   */
1016   @Deprecated
1017   public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
1018       boolean overwrite, int bufferSize, short replication, long blockSize,
1019       Progressable progress) throws IOException {
1020     throw new IOException("createNonRecursive unsupported for this filesystem "
1021         + this.getClass());
1022   }
1023
1024  /**
1025   * Creates the given Path as a brand-new zero-length file.  If
1026   * create fails, or if it already existed, return false.
1027   *
1028   * @param f path to use for create
1029   */
1030  public boolean createNewFile(Path f) throws IOException {
1031    if (exists(f)) {
1032      return false;
1033    } else {
1034      create(f, false, getConf().getInt("io.file.buffer.size", 4096)).close();
1035      return true;
1036    }
1037  }
1038
1039  /**
1040   * Append to an existing file (optional operation).
1041   * Same as append(f, getConf().getInt("io.file.buffer.size", 4096), null)
1042   * @param f the existing file to be appended.
1043   * @throws IOException
1044   */
1045  public FSDataOutputStream append(Path f) throws IOException {
1046    return append(f, getConf().getInt("io.file.buffer.size", 4096), null);
1047  }
1048  /**
1049   * Append to an existing file (optional operation).
1050   * Same as append(f, bufferSize, null).
1051   * @param f the existing file to be appended.
1052   * @param bufferSize the size of the buffer to be used.
1053   * @throws IOException
1054   */
1055  public FSDataOutputStream append(Path f, int bufferSize) throws IOException {
1056    return append(f, bufferSize, null);
1057  }
1058
1059  /**
1060   * Append to an existing file (optional operation).
1061   * @param f the existing file to be appended.
1062   * @param bufferSize the size of the buffer to be used.
1063   * @param progress for reporting progress if it is not null.
1064   * @throws IOException
1065   */
1066  public abstract FSDataOutputStream append(Path f, int bufferSize,
1067      Progressable progress) throws IOException;
1068
1069 /**
1070   * Get replication.
1071   * 
1072   * @deprecated Use getFileStatus() instead
1073   * @param src file name
1074   * @return file replication
1075   * @throws IOException
1076   */ 
1077  @Deprecated
1078  public short getReplication(Path src) throws IOException {
1079    return getFileStatus(src).getReplication();
1080  }
1081
1082  /**
1083   * Set replication for an existing file.
1084   * 
1085   * @param src file name
1086   * @param replication new replication
1087   * @throws IOException
1088   * @return true if successful;
1089   *         false if file does not exist or is a directory
1090   */
1091  public boolean setReplication(Path src, short replication)
1092    throws IOException {
1093    return true;
1094  }
1095
1096  /**
1097   * Renames Path src to Path dst.  Can take place on local fs
1098   * or remote DFS.
1099   * @param src path to be renamed
1100   * @param dst new path after rename
1101   * @throws IOException on failure
1102   * @return true if rename is successful
1103   */
1104  public abstract boolean rename(Path src, Path dst) throws IOException;
1105
1106  /**
1107   * Renames Path src to Path dst
1108   * <ul>
1109   * <li
1110   * <li>Fails if src is a file and dst is a directory.
1111   * <li>Fails if src is a directory and dst is a file.
1112   * <li>Fails if the parent of dst does not exist or is a file.
1113   * </ul>
1114   * <p>
1115   * If OVERWRITE option is not passed as an argument, rename fails
1116   * if the dst already exists.
1117   * <p>
1118   * If OVERWRITE option is passed as an argument, rename overwrites
1119   * the dst if it is a file or an empty directory. Rename fails if dst is
1120   * a non-empty directory.
1121   * <p>
1122   * Note that atomicity of rename is dependent on the file system
1123   * implementation. Please refer to the file system documentation for
1124   * details. This default implementation is non atomic.
1125   * <p>
1126   * This method is deprecated since it is a temporary method added to 
1127   * support the transition from FileSystem to FileContext for user 
1128   * applications.
1129   * 
1130   * @param src path to be renamed
1131   * @param dst new path after rename
1132   * @throws IOException on failure
1133   */
1134  @Deprecated
1135  protected void rename(final Path src, final Path dst,
1136      final Rename... options) throws IOException {
1137    // Default implementation
1138    final FileStatus srcStatus = getFileStatus(src);
1139    if (srcStatus == null) {
1140      throw new FileNotFoundException("rename source " + src + " not found.");
1141    }
1142
1143    boolean overwrite = false;
1144    if (null != options) {
1145      for (Rename option : options) {
1146        if (option == Rename.OVERWRITE) {
1147          overwrite = true;
1148        }
1149      }
1150    }
1151
1152    FileStatus dstStatus;
1153    try {
1154      dstStatus = getFileStatus(dst);
1155    } catch (IOException e) {
1156      dstStatus = null;
1157    }
1158    if (dstStatus != null) {
1159      if (srcStatus.isDirectory() != dstStatus.isDirectory()) {
1160        throw new IOException("Source " + src + " Destination " + dst
1161            + " both should be either file or directory");
1162      }
1163      if (!overwrite) {
1164        throw new FileAlreadyExistsException("rename destination " + dst
1165            + " already exists.");
1166      }
1167      // Delete the destination that is a file or an empty directory
1168      if (dstStatus.isDirectory()) {
1169        FileStatus[] list = listStatus(dst);
1170        if (list != null && list.length != 0) {
1171          throw new IOException(
1172              "rename cannot overwrite non empty destination directory " + dst);
1173        }
1174      }
1175      delete(dst, false);
1176    } else {
1177      final Path parent = dst.getParent();
1178      final FileStatus parentStatus = getFileStatus(parent);
1179      if (parentStatus == null) {
1180        throw new FileNotFoundException("rename destination parent " + parent
1181            + " not found.");
1182      }
1183      if (!parentStatus.isDirectory()) {
1184        throw new ParentNotDirectoryException("rename destination parent " + parent
1185            + " is a file.");
1186      }
1187    }
1188    if (!rename(src, dst)) {
1189      throw new IOException("rename from " + src + " to " + dst + " failed.");
1190    }
1191  }
1192  
1193  /**
1194   * Delete a file 
1195   * @deprecated Use {@link #delete(Path, boolean)} instead.
1196   */
1197  @Deprecated
1198  public boolean delete(Path f) throws IOException {
1199    return delete(f, true);
1200  }
1201  
1202  /** Delete a file.
1203   *
1204   * @param f the path to delete.
1205   * @param recursive if path is a directory and set to 
1206   * true, the directory is deleted else throws an exception. In
1207   * case of a file the recursive can be set to either true or false. 
1208   * @return  true if delete is successful else false. 
1209   * @throws IOException
1210   */
1211  public abstract boolean delete(Path f, boolean recursive) throws IOException;
1212
1213  /**
1214   * Mark a path to be deleted when FileSystem is closed.
1215   * When the JVM shuts down,
1216   * all FileSystem objects will be closed automatically.
1217   * Then,
1218   * the marked path will be deleted as a result of closing the FileSystem.
1219   *
1220   * The path has to exist in the file system.
1221   * 
1222   * @param f the path to delete.
1223   * @return  true if deleteOnExit is successful, otherwise false.
1224   * @throws IOException
1225   */
1226  public boolean deleteOnExit(Path f) throws IOException {
1227    if (!exists(f)) {
1228      return false;
1229    }
1230    synchronized (deleteOnExit) {
1231      deleteOnExit.add(f);
1232    }
1233    return true;
1234  }
1235  
1236  /**
1237   * Cancel the deletion of the path when the FileSystem is closed
1238   * @param f the path to cancel deletion
1239   */
1240  public boolean cancelDeleteOnExit(Path f) {
1241    synchronized (deleteOnExit) {
1242      return deleteOnExit.remove(f);
1243    }
1244  }
1245
1246  /**
1247   * Delete all files that were marked as delete-on-exit. This recursively
1248   * deletes all files in the specified paths.
1249   */
1250  protected void processDeleteOnExit() {
1251    synchronized (deleteOnExit) {
1252      for (Iterator<Path> iter = deleteOnExit.iterator(); iter.hasNext();) {
1253        Path path = iter.next();
1254        try {
1255          if (exists(path)) {
1256            delete(path, true);
1257          }
1258        }
1259        catch (IOException e) {
1260          LOG.info("Ignoring failure to deleteOnExit for path " + path);
1261        }
1262        iter.remove();
1263      }
1264    }
1265  }
1266  
1267  /** Check if exists.
1268   * @param f source file
1269   */
1270  public boolean exists(Path f) throws IOException {
1271    try {
1272      return getFileStatus(f) != null;
1273    } catch (FileNotFoundException e) {
1274      return false;
1275    }
1276  }
1277
1278  /** True iff the named path is a directory.
1279   * Note: Avoid using this method. Instead reuse the FileStatus 
1280   * returned by getFileStatus() or listStatus() methods.
1281   * @param f path to check
1282   */
1283  public boolean isDirectory(Path f) throws IOException {
1284    try {
1285      return getFileStatus(f).isDirectory();
1286    } catch (FileNotFoundException e) {
1287      return false;               // f does not exist
1288    }
1289  }
1290
1291  /** True iff the named path is a regular file.
1292   * Note: Avoid using this method. Instead reuse the FileStatus 
1293   * returned by getFileStatus() or listStatus() methods.
1294   * @param f path to check
1295   */
1296  public boolean isFile(Path f) throws IOException {
1297    try {
1298      return getFileStatus(f).isFile();
1299    } catch (FileNotFoundException e) {
1300      return false;               // f does not exist
1301    }
1302  }
1303  
1304  /** The number of bytes in a file. */
1305  /** @deprecated Use getFileStatus() instead */
1306  @Deprecated
1307  public long getLength(Path f) throws IOException {
1308    return getFileStatus(f).getLen();
1309  }
1310    
1311  /** Return the {@link ContentSummary} of a given {@link Path}.
1312  * @param f path to use
1313  */
1314  public ContentSummary getContentSummary(Path f) throws IOException {
1315    FileStatus status = getFileStatus(f);
1316    if (status.isFile()) {
1317      // f is a file
1318      return new ContentSummary(status.getLen(), 1, 0);
1319    }
1320    // f is a directory
1321    long[] summary = {0, 0, 1};
1322    for(FileStatus s : listStatus(f)) {
1323      ContentSummary c = s.isDirectory() ? getContentSummary(s.getPath()) :
1324                                     new ContentSummary(s.getLen(), 1, 0);
1325      summary[0] += c.getLength();
1326      summary[1] += c.getFileCount();
1327      summary[2] += c.getDirectoryCount();
1328    }
1329    return new ContentSummary(summary[0], summary[1], summary[2]);
1330  }
1331
1332  final protected static PathFilter DEFAULT_FILTER = new PathFilter() {
1333      public boolean accept(Path file) {
1334        return true;
1335      }     
1336    };
1337    
1338  /**
1339   * List the statuses of the files/directories in the given path if the path is
1340   * a directory.
1341   * 
1342   * @param f given path
1343   * @return the statuses of the files/directories in the given patch
1344   * @throws FileNotFoundException when the path does not exist;
1345   *         IOException see specific implementation
1346   */
1347  public abstract FileStatus[] listStatus(Path f) throws FileNotFoundException, 
1348                                                         IOException;
1349    
1350  /*
1351   * Filter files/directories in the given path using the user-supplied path
1352   * filter. Results are added to the given array <code>results</code>.
1353   */
1354  private void listStatus(ArrayList<FileStatus> results, Path f,
1355      PathFilter filter) throws FileNotFoundException, IOException {
1356    FileStatus listing[] = listStatus(f);
1357    if (listing == null) {
1358      throw new IOException("Error accessing " + f);
1359    }
1360
1361    for (int i = 0; i < listing.length; i++) {
1362      if (filter.accept(listing[i].getPath())) {
1363        results.add(listing[i]);
1364      }
1365    }
1366  }
1367
1368  /**
1369   * @return an iterator over the corrupt files under the given path
1370   * (may contain duplicates if a file has more than one corrupt block)
1371   * @throws IOException
1372   */
1373  public RemoteIterator<Path> listCorruptFileBlocks(Path path)
1374    throws IOException {
1375    throw new UnsupportedOperationException(getClass().getCanonicalName() +
1376                                            " does not support" +
1377                                            " listCorruptFileBlocks");
1378  }
1379
1380  /**
1381   * Filter files/directories in the given path using the user-supplied path
1382   * filter.
1383   * 
1384   * @param f
1385   *          a path name
1386   * @param filter
1387   *          the user-supplied path filter
1388   * @return an array of FileStatus objects for the files under the given path
1389   *         after applying the filter
1390   * @throws FileNotFoundException when the path does not exist;
1391   *         IOException see specific implementation   
1392   */
1393  public FileStatus[] listStatus(Path f, PathFilter filter) 
1394                                   throws FileNotFoundException, IOException {
1395    ArrayList<FileStatus> results = new ArrayList<FileStatus>();
1396    listStatus(results, f, filter);
1397    return results.toArray(new FileStatus[results.size()]);
1398  }
1399
1400  /**
1401   * Filter files/directories in the given list of paths using default
1402   * path filter.
1403   * 
1404   * @param files
1405   *          a list of paths
1406   * @return a list of statuses for the files under the given paths after
1407   *         applying the filter default Path filter
1408   * @throws FileNotFoundException when the path does not exist;
1409   *         IOException see specific implementation
1410   */
1411  public FileStatus[] listStatus(Path[] files)
1412      throws FileNotFoundException, IOException {
1413    return listStatus(files, DEFAULT_FILTER);
1414  }
1415
1416  /**
1417   * Filter files/directories in the given list of paths using user-supplied
1418   * path filter.
1419   * 
1420   * @param files
1421   *          a list of paths
1422   * @param filter
1423   *          the user-supplied path filter
1424   * @return a list of statuses for the files under the given paths after
1425   *         applying the filter
1426   * @throws FileNotFoundException when the path does not exist;
1427   *         IOException see specific implementation
1428   */
1429  public FileStatus[] listStatus(Path[] files, PathFilter filter)
1430      throws FileNotFoundException, IOException {
1431    ArrayList<FileStatus> results = new ArrayList<FileStatus>();
1432    for (int i = 0; i < files.length; i++) {
1433      listStatus(results, files[i], filter);
1434    }
1435    return results.toArray(new FileStatus[results.size()]);
1436  }
1437
1438  /**
1439   * <p>Return all the files that match filePattern and are not checksum
1440   * files. Results are sorted by their names.
1441   * 
1442   * <p>
1443   * A filename pattern is composed of <i>regular</i> characters and
1444   * <i>special pattern matching</i> characters, which are:
1445   *
1446   * <dl>
1447   *  <dd>
1448   *   <dl>
1449   *    <p>
1450   *    <dt> <tt> ? </tt>
1451   *    <dd> Matches any single character.
1452   *
1453   *    <p>
1454   *    <dt> <tt> * </tt>
1455   *    <dd> Matches zero or more characters.
1456   *
1457   *    <p>
1458   *    <dt> <tt> [<i>abc</i>] </tt>
1459   *    <dd> Matches a single character from character set
1460   *     <tt>{<i>a,b,c</i>}</tt>.
1461   *
1462   *    <p>
1463   *    <dt> <tt> [<i>a</i>-<i>b</i>] </tt>
1464   *    <dd> Matches a single character from the character range
1465   *     <tt>{<i>a...b</i>}</tt>.  Note that character <tt><i>a</i></tt> must be
1466   *     lexicographically less than or equal to character <tt><i>b</i></tt>.
1467   *
1468   *    <p>
1469   *    <dt> <tt> [^<i>a</i>] </tt>
1470   *    <dd> Matches a single character that is not from character set or range
1471   *     <tt>{<i>a</i>}</tt>.  Note that the <tt>^</tt> character must occur
1472   *     immediately to the right of the opening bracket.
1473   *
1474   *    <p>
1475   *    <dt> <tt> \<i>c</i> </tt>
1476   *    <dd> Removes (escapes) any special meaning of character <i>c</i>.
1477   *
1478   *    <p>
1479   *    <dt> <tt> {ab,cd} </tt>
1480   *    <dd> Matches a string from the string set <tt>{<i>ab, cd</i>} </tt>
1481   *    
1482   *    <p>
1483   *    <dt> <tt> {ab,c{de,fh}} </tt>
1484   *    <dd> Matches a string from the string set <tt>{<i>ab, cde, cfh</i>}</tt>
1485   *
1486   *   </dl>
1487   *  </dd>
1488   * </dl>
1489   *
1490   * @param pathPattern a regular expression specifying a pth pattern
1491
1492   * @return an array of paths that match the path pattern
1493   * @throws IOException
1494   */
1495  public FileStatus[] globStatus(Path pathPattern) throws IOException {
1496    return globStatus(pathPattern, DEFAULT_FILTER);
1497  }
1498  
1499  /**
1500   * Return an array of FileStatus objects whose path names match pathPattern
1501   * and is accepted by the user-supplied path filter. Results are sorted by
1502   * their path names.
1503   * Return null if pathPattern has no glob and the path does not exist.
1504   * Return an empty array if pathPattern has a glob and no path matches it. 
1505   * 
1506   * @param pathPattern
1507   *          a regular expression specifying the path pattern
1508   * @param filter
1509   *          a user-supplied path filter
1510   * @return an array of FileStatus objects
1511   * @throws IOException if any I/O error occurs when fetching file status
1512   */
1513  public FileStatus[] globStatus(Path pathPattern, PathFilter filter)
1514      throws IOException {
1515    String filename = pathPattern.toUri().getPath();
1516    List<FileStatus> allMatches = null;
1517    
1518    List<String> filePatterns = GlobExpander.expand(filename);
1519    for (String filePattern : filePatterns) {
1520      Path path = new Path(filePattern.isEmpty() ? Path.CUR_DIR : filePattern);
1521      List<FileStatus> matches = globStatusInternal(path, filter);
1522      if (matches != null) {
1523        if (allMatches == null) {
1524          allMatches = matches;
1525        } else {
1526          allMatches.addAll(matches);
1527        }
1528      }
1529    }
1530    
1531    FileStatus[] results = null;
1532    if (allMatches != null) {
1533      results = allMatches.toArray(new FileStatus[allMatches.size()]);
1534    } else if (filePatterns.size() > 1) {
1535      // no matches with multiple expansions is a non-matching glob 
1536      results = new FileStatus[0];
1537    }
1538    return results;
1539  }
1540  
1541  // sort gripes because FileStatus Comparable isn't parameterized...
1542  @SuppressWarnings("unchecked") 
1543  private List<FileStatus> globStatusInternal(Path pathPattern,
1544      PathFilter filter) throws IOException {
1545    boolean patternHasGlob = false;       // pathPattern has any globs
1546    List<FileStatus> matches = new ArrayList<FileStatus>();
1547
1548    // determine starting point
1549    int level = 0;
1550    String baseDir = Path.CUR_DIR;
1551    if (pathPattern.isAbsolute()) {
1552      level = 1; // need to skip empty item at beginning of split list
1553      baseDir = Path.SEPARATOR;
1554    }
1555    
1556    // parse components and determine if it's a glob
1557    String[] components = null;
1558    GlobFilter[] filters = null;
1559    String filename = pathPattern.toUri().getPath();
1560    if (!filename.isEmpty() && !Path.SEPARATOR.equals(filename)) {
1561      components = filename.split(Path.SEPARATOR);
1562      filters = new GlobFilter[components.length];
1563      for (int i=level; i < components.length; i++) {
1564        filters[i] = new GlobFilter(components[i]);
1565        patternHasGlob |= filters[i].hasPattern();
1566      }
1567      if (!patternHasGlob) {
1568        baseDir = unquotePathComponent(filename);
1569        components = null; // short through to filter check
1570      }
1571    }
1572    
1573    // seed the parent directory path, return if it doesn't exist
1574    try {
1575      matches.add(getFileStatus(new Path(baseDir)));
1576    } catch (FileNotFoundException e) {
1577      return patternHasGlob ? matches : null;
1578    }
1579    
1580    // skip if there are no components other than the basedir
1581    if (components != null) {
1582      // iterate through each path component
1583      for (int i=level; (i < components.length) && !matches.isEmpty(); i++) {
1584        List<FileStatus> children = new ArrayList<FileStatus>();
1585        for (FileStatus match : matches) {
1586          // don't look for children in a file matched by a glob
1587          if (!match.isDirectory()) {
1588            continue;
1589          }
1590          try {
1591            if (filters[i].hasPattern()) {
1592              // get all children matching the filter
1593              FileStatus[] statuses = listStatus(match.getPath(), filters[i]);
1594              children.addAll(Arrays.asList(statuses));
1595            } else {
1596              // the component does not have a pattern
1597              String component = unquotePathComponent(components[i]);
1598              Path child = new Path(match.getPath(), component);
1599              children.add(getFileStatus(child));
1600            }
1601          } catch (FileNotFoundException e) {
1602            // don't care
1603          }
1604        }
1605        matches = children;
1606      }
1607    }
1608    // remove anything that didn't match the filter
1609    if (!matches.isEmpty()) {
1610      Iterator<FileStatus> iter = matches.iterator();
1611      while (iter.hasNext()) {
1612        if (!filter.accept(iter.next().getPath())) {
1613          iter.remove();
1614        }
1615      }
1616    }
1617    // no final paths, if there were any globs return empty list
1618    if (matches.isEmpty()) {
1619      return patternHasGlob ? matches : null;
1620    }
1621    Collections.sort(matches);
1622    return matches;
1623  }
1624
1625  /**
1626   * The glob filter builds a regexp per path component.  If the component
1627   * does not contain a shell metachar, then it falls back to appending the
1628   * raw string to the list of built up paths.  This raw path needs to have
1629   * the quoting removed.  Ie. convert all occurances of "\X" to "X"
1630   * @param name of the path component
1631   * @return the unquoted path component
1632   */
1633  private String unquotePathComponent(String name) {
1634    return name.replaceAll("\\\\(.)", "$1");
1635  }
1636  
1637  /**
1638   * List the statuses of the files/directories in the given path if the path is
1639   * a directory. 
1640   * Return the file's status and block locations If the path is a file.
1641   * 
1642   * If a returned status is a file, it contains the file's block locations.
1643   * 
1644   * @param f is the path
1645   *
1646   * @return an iterator that traverses statuses of the files/directories 
1647   *         in the given path
1648   *
1649   * @throws FileNotFoundException If <code>f</code> does not exist
1650   * @throws IOException If an I/O error occurred
1651   */
1652  public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f)
1653  throws FileNotFoundException, IOException {
1654    return listLocatedStatus(f, DEFAULT_FILTER);
1655  }
1656
1657  /**
1658   * Listing a directory
1659   * The returned results include its block location if it is a file
1660   * The results are filtered by the given path filter
1661   * @param f a path
1662   * @param filter a path filter
1663   * @return an iterator that traverses statuses of the files/directories 
1664   *         in the given path
1665   * @throws FileNotFoundException if <code>f</code> does not exist
1666   * @throws IOException if any I/O error occurred
1667   */
1668  protected RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f,
1669      final PathFilter filter)
1670  throws FileNotFoundException, IOException {
1671    return new RemoteIterator<LocatedFileStatus>() {
1672      private final FileStatus[] stats = listStatus(f, filter);
1673      private int i = 0;
1674
1675      @Override
1676      public boolean hasNext() {
1677        return i<stats.length;
1678      }
1679
1680      @Override
1681      public LocatedFileStatus next() throws IOException {
1682        if (!hasNext()) {
1683          throw new NoSuchElementException("No more entry in " + f);
1684        }
1685        FileStatus result = stats[i++];
1686        BlockLocation[] locs = result.isFile() ?
1687            getFileBlockLocations(result.getPath(), 0, result.getLen()) :
1688            null;
1689        return new LocatedFileStatus(result, locs);
1690      }
1691    };
1692  }
1693
1694  /**
1695   * List the statuses and block locations of the files in the given path.
1696   * 
1697   * If the path is a directory, 
1698   *   if recursive is false, returns files in the directory;
1699   *   if recursive is true, return files in the subtree rooted at the path.
1700   * If the path is a file, return the file's status and block locations.
1701   * 
1702   * @param f is the path
1703   * @param recursive if the subdirectories need to be traversed recursively
1704   *
1705   * @return an iterator that traverses statuses of the files
1706   *
1707   * @throws FileNotFoundException when the path does not exist;
1708   *         IOException see specific implementation
1709   */
1710  public RemoteIterator<LocatedFileStatus> listFiles(
1711      final Path f, final boolean recursive)
1712  throws FileNotFoundException, IOException {
1713    return new RemoteIterator<LocatedFileStatus>() {
1714      private Stack<RemoteIterator<LocatedFileStatus>> itors = 
1715        new Stack<RemoteIterator<LocatedFileStatus>>();
1716      private RemoteIterator<LocatedFileStatus> curItor =
1717        listLocatedStatus(f);
1718      private LocatedFileStatus curFile;
1719     
1720      @Override
1721      public boolean hasNext() throws IOException {
1722        while (curFile == null) {
1723          if (curItor.hasNext()) {
1724            handleFileStat(curItor.next());
1725          } else if (!itors.empty()) {
1726            curItor = itors.pop();
1727          } else {
1728            return false;
1729          }
1730        }
1731        return true;
1732      }
1733
1734      /**
1735       * Process the input stat.
1736       * If it is a file, return the file stat.
1737       * If it is a directory, traverse the directory if recursive is true;
1738       * ignore it if recursive is false.
1739       * @param stat input status
1740       * @throws IOException if any IO error occurs
1741       */
1742      private void handleFileStat(LocatedFileStatus stat) throws IOException {
1743        if (stat.isFile()) { // file
1744          curFile = stat;
1745        } else if (recursive) { // directory
1746          itors.push(curItor);
1747          curItor = listLocatedStatus(stat.getPath());
1748        }
1749      }
1750
1751      @Override
1752      public LocatedFileStatus next() throws IOException {
1753        if (hasNext()) {
1754          LocatedFileStatus result = curFile;
1755          curFile = null;
1756          return result;
1757        } 
1758        throw new java.util.NoSuchElementException("No more entry in " + f);
1759      }
1760    };
1761  }
1762  
1763  /** Return the current user's home directory in this filesystem.
1764   * The default implementation returns "/user/$USER/".
1765   */
1766  public Path getHomeDirectory() {
1767    return this.makeQualified(
1768        new Path("/user/"+System.getProperty("user.name")));
1769  }
1770
1771
1772  /**
1773   * Set the current working directory for the given file system. All relative
1774   * paths will be resolved relative to it.
1775   * 
1776   * @param new_dir
1777   */
1778  public abstract void setWorkingDirectory(Path new_dir);
1779    
1780  /**
1781   * Get the current working directory for the given file system
1782   * @return the directory pathname
1783   */
1784  public abstract Path getWorkingDirectory();
1785  
1786  
1787  /**
1788   * Note: with the new FilesContext class, getWorkingDirectory()
1789   * will be removed. 
1790   * The working directory is implemented in FilesContext.
1791   * 
1792   * Some file systems like LocalFileSystem have an initial workingDir
1793   * that we use as the starting workingDir. For other file systems
1794   * like HDFS there is no built in notion of an inital workingDir.
1795   * 
1796   * @return if there is built in notion of workingDir then it
1797   * is returned; else a null is returned.
1798   */
1799  protected Path getInitialWorkingDirectory() {
1800    return null;
1801  }
1802
1803  /**
1804   * Call {@link #mkdirs(Path, FsPermission)} with default permission.
1805   */
1806  public boolean mkdirs(Path f) throws IOException {
1807    return mkdirs(f, FsPermission.getDirDefault());
1808  }
1809
1810  /**
1811   * Make the given file and all non-existent parents into
1812   * directories. Has the semantics of Unix 'mkdir -p'.
1813   * Existence of the directory hierarchy is not an error.
1814   * @param f path to create
1815   * @param permission to apply to f
1816   */
1817  public abstract boolean mkdirs(Path f, FsPermission permission
1818      ) throws IOException;
1819
1820  /**
1821   * The src file is on the local disk.  Add it to FS at
1822   * the given dst name and the source is kept intact afterwards
1823   * @param src path
1824   * @param dst path
1825   */
1826  public void copyFromLocalFile(Path src, Path dst)
1827    throws IOException {
1828    copyFromLocalFile(false, src, dst);
1829  }
1830
1831  /**
1832   * The src files is on the local disk.  Add it to FS at
1833   * the given dst name, removing the source afterwards.
1834   * @param srcs path
1835   * @param dst path
1836   */
1837  public void moveFromLocalFile(Path[] srcs, Path dst)
1838    throws IOException {
1839    copyFromLocalFile(true, true, srcs, dst);
1840  }
1841
1842  /**
1843   * The src file is on the local disk.  Add it to FS at
1844   * the given dst name, removing the source afterwards.
1845   * @param src path
1846   * @param dst path
1847   */
1848  public void moveFromLocalFile(Path src, Path dst)
1849    throws IOException {
1850    copyFromLocalFile(true, src, dst);
1851  }
1852
1853  /**
1854   * The src file is on the local disk.  Add it to FS at
1855   * the given dst name.
1856   * delSrc indicates if the source should be removed
1857   * @param delSrc whether to delete the src
1858   * @param src path
1859   * @param dst path
1860   */
1861  public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
1862    throws IOException {
1863    copyFromLocalFile(delSrc, true, src, dst);
1864  }
1865  
1866  /**
1867   * The src files are on the local disk.  Add it to FS at
1868   * the given dst name.
1869   * delSrc indicates if the source should be removed
1870   * @param delSrc whether to delete the src
1871   * @param overwrite whether to overwrite an existing file
1872   * @param srcs array of paths which are source
1873   * @param dst path
1874   */
1875  public void copyFromLocalFile(boolean delSrc, boolean overwrite, 
1876                                Path[] srcs, Path dst)
1877    throws IOException {
1878    Configuration conf = getConf();
1879    FileUtil.copy(getLocal(conf), srcs, this, dst, delSrc, overwrite, conf);
1880  }
1881  
1882  /**
1883   * The src file is on the local disk.  Add it to FS at
1884   * the given dst name.
1885   * delSrc indicates if the source should be removed
1886   * @param delSrc whether to delete the src
1887   * @param overwrite whether to overwrite an existing file
1888   * @param src path
1889   * @param dst path
1890   */
1891  public void copyFromLocalFile(boolean delSrc, boolean overwrite, 
1892                                Path src, Path dst)
1893    throws IOException {
1894    Configuration conf = getConf();
1895    FileUtil.copy(getLocal(conf), src, this, dst, delSrc, overwrite, conf);
1896  }
1897    
1898  /**
1899   * The src file is under FS, and the dst is on the local disk.
1900   * Copy it from FS control to the local dst name.
1901   * @param src path
1902   * @param dst path
1903   */
1904  public void copyToLocalFile(Path src, Path dst) throws IOException {
1905    copyToLocalFile(false, src, dst);
1906  }
1907    
1908  /**
1909   * The src file is under FS, and the dst is on the local disk.
1910   * Copy it from FS control to the local dst name.
1911   * Remove the source afterwards
1912   * @param src path
1913   * @param dst path
1914   */
1915  public void moveToLocalFile(Path src, Path dst) throws IOException {
1916    copyToLocalFile(true, src, dst);
1917  }
1918
1919  /**
1920   * The src file is under FS, and the dst is on the local disk.
1921   * Copy it from FS control to the local dst name.
1922   * delSrc indicates if the src will be removed or not.
1923   * @param delSrc whether to delete the src
1924   * @param src path
1925   * @param dst path
1926   */   
1927  public void copyToLocalFile(boolean delSrc, Path src, Path dst)
1928    throws IOException {
1929    copyToLocalFile(delSrc, src, dst, false);
1930  }
1931  
1932    /**
1933   * The src file is under FS, and the dst is on the local disk. Copy it from FS
1934   * control to the local dst name. delSrc indicates if the src will be removed
1935   * or not. useRawLocalFileSystem indicates whether to use RawLocalFileSystem
1936   * as local file system or not. RawLocalFileSystem is non crc file system.So,
1937   * It will not create any crc files at local.
1938   * 
1939   * @param delSrc
1940   *          whether to delete the src
1941   * @param src
1942   *          path
1943   * @param dst
1944   *          path
1945   * @param useRawLocalFileSystem
1946   *          whether to use RawLocalFileSystem as local file system or not.
1947   * 
1948   * @throws IOException
1949   *           - if any IO error
1950   */
1951  public void copyToLocalFile(boolean delSrc, Path src, Path dst,
1952      boolean useRawLocalFileSystem) throws IOException {
1953    Configuration conf = getConf();
1954    FileSystem local = null;
1955    if (useRawLocalFileSystem) {
1956      local = getLocal(conf).getRawFileSystem();
1957    } else {
1958      local = getLocal(conf);
1959    }
1960    FileUtil.copy(this, src, local, dst, delSrc, conf);
1961  }
1962
1963  /**
1964   * Returns a local File that the user can write output to.  The caller
1965   * provides both the eventual FS target name and the local working
1966   * file.  If the FS is local, we write directly into the target.  If
1967   * the FS is remote, we write into the tmp local area.
1968   * @param fsOutputFile path of output file
1969   * @param tmpLocalFile path of local tmp file
1970   */
1971  public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
1972    throws IOException {
1973    return tmpLocalFile;
1974  }
1975
1976  /**
1977   * Called when we're all done writing to the target.  A local FS will
1978   * do nothing, because we've written to exactly the right place.  A remote
1979   * FS will copy the contents of tmpLocalFile to the correct target at
1980   * fsOutputFile.
1981   * @param fsOutputFile path of output file
1982   * @param tmpLocalFile path to local tmp file
1983   */
1984  public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
1985    throws IOException {
1986    moveFromLocalFile(tmpLocalFile, fsOutputFile);
1987  }
1988
1989  /**
1990   * No more filesystem operations are needed.  Will
1991   * release any held locks.
1992   */
1993  public void close() throws IOException {
1994    // delete all files that were marked as delete-on-exit.
1995    processDeleteOnExit();
1996    CACHE.remove(this.key, this);
1997  }
1998
1999  /** Return the total size of all files in the filesystem.*/
2000  public long getUsed() throws IOException{
2001    long used = 0;
2002    FileStatus[] files = listStatus(new Path("/"));
2003    for(FileStatus file:files){
2004      used += file.getLen();
2005    }
2006    return used;
2007  }
2008  
2009  /**
2010   * Get the block size for a particular file.
2011   * @param f the filename
2012   * @return the number of bytes in a block
2013   */
2014  /** @deprecated Use getFileStatus() instead */
2015  @Deprecated
2016  public long getBlockSize(Path f) throws IOException {
2017    return getFileStatus(f).getBlockSize();
2018  }
2019
2020  /** Return the number of bytes that large input files should be optimally
2021   * be split into to minimize i/o time. */
2022  public long getDefaultBlockSize() {
2023    // default to 32MB: large enough to minimize the impact of seeks
2024    return getConf().getLong("fs.local.block.size", 32 * 1024 * 1024);
2025  }
2026    
2027  /** Return the number of bytes that large input files should be optimally
2028   * be split into to minimize i/o time.  The given path will be used to
2029   * locate the actual filesystem.  The full path does not have to exist.
2030   * @param f path of file
2031   * @return the default block size for the path's filesystem
2032   */
2033  public long getDefaultBlockSize(Path f) {
2034    return getDefaultBlockSize();
2035  }
2036
2037  /**
2038   * Get the default replication.
2039   */
2040  public short getDefaultReplication() { return 1; }
2041
2042  /**
2043   * Get the default replication for a path.   The given path will be used to
2044   * locate the actual filesystem.  The full path does not have to exist.
2045   * @param path of the file
2046   * @return default replication for the path's filesystem 
2047   */
2048  public short getDefaultReplication(Path path) {
2049    return getDefaultReplication();
2050  }
2051  
2052  /**
2053   * Return a file status object that represents the path.
2054   * @param f The path we want information from
2055   * @return a FileStatus object
2056   * @throws FileNotFoundException when the path does not exist;
2057   *         IOException see specific implementation
2058   */
2059  public abstract FileStatus getFileStatus(Path f) throws IOException;
2060
2061  /**
2062   * Get the checksum of a file.
2063   *
2064   * @param f The file path
2065   * @return The file checksum.  The default return value is null,
2066   *  which indicates that no checksum algorithm is implemented
2067   *  in the corresponding FileSystem.
2068   */
2069  public FileChecksum getFileChecksum(Path f) throws IOException {
2070    return null;
2071  }
2072  
2073  /**
2074   * Set the verify checksum flag. This is only applicable if the 
2075   * corresponding FileSystem supports checksum. By default doesn't do anything.
2076   * @param verifyChecksum
2077   */
2078  public void setVerifyChecksum(boolean verifyChecksum) {
2079    //doesn't do anything
2080  }
2081
2082  /**
2083   * Set the write checksum flag. This is only applicable if the 
2084   * corresponding FileSystem supports checksum. By default doesn't do anything.
2085   * @param writeChecksum
2086   */
2087  public void setWriteChecksum(boolean writeChecksum) {
2088    //doesn't do anything
2089  }
2090
2091  /**
2092   * Returns a status object describing the use and capacity of the
2093   * file system. If the file system has multiple partitions, the
2094   * use and capacity of the root partition is reflected.
2095   * 
2096   * @return a FsStatus object
2097   * @throws IOException
2098   *           see specific implementation
2099   */
2100  public FsStatus getStatus() throws IOException {
2101    return getStatus(null);
2102  }
2103
2104  /**
2105   * Returns a status object describing the use and capacity of the
2106   * file system. If the file system has multiple partitions, the
2107   * use and capacity of the partition pointed to by the specified
2108   * path is reflected.
2109   * @param p Path for which status should be obtained. null means
2110   * the default partition. 
2111   * @return a FsStatus object
2112   * @throws IOException
2113   *           see specific implementation
2114   */
2115  public FsStatus getStatus(Path p) throws IOException {
2116    return new FsStatus(Long.MAX_VALUE, 0, Long.MAX_VALUE);
2117  }
2118
2119  /**
2120   * Set permission of a path.
2121   * @param p
2122   * @param permission
2123   */
2124  public void setPermission(Path p, FsPermission permission
2125      ) throws IOException {
2126  }
2127
2128  /**
2129   * Set owner of a path (i.e. a file or a directory).
2130   * The parameters username and groupname cannot both be null.
2131   * @param p The path
2132   * @param username If it is null, the original username remains unchanged.
2133   * @param groupname If it is null, the original groupname remains unchanged.
2134   */
2135  public void setOwner(Path p, String username, String groupname
2136      ) throws IOException {
2137  }
2138
2139  /**
2140   * Set access time of a file
2141   * @param p The path
2142   * @param mtime Set the modification time of this file.
2143   *              The number of milliseconds since Jan 1, 1970. 
2144   *              A value of -1 means that this call should not set modification time.
2145   * @param atime Set the access time of this file.
2146   *              The number of milliseconds since Jan 1, 1970. 
2147   *              A value of -1 means that this call should not set access time.
2148   */
2149  public void setTimes(Path p, long mtime, long atime
2150      ) throws IOException {
2151  }
2152
2153  private static FileSystem createFileSystem(URI uri, Configuration conf
2154      ) throws IOException {
2155    Class<?> clazz = conf.getClass("fs." + uri.getScheme() + ".impl", null);
2156    if (clazz == null) {
2157      throw new IOException("No FileSystem for scheme: " + uri.getScheme());
2158    }
2159    FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
2160    fs.initialize(uri, conf);
2161    return fs;
2162  }
2163
2164  /** Caching FileSystem objects */
2165  static class Cache {
2166    private final ClientFinalizer clientFinalizer = new ClientFinalizer();
2167
2168    private final Map<Key, FileSystem> map = new HashMap<Key, FileSystem>();
2169    private final Set<Key> toAutoClose = new HashSet<Key>();
2170
2171    /** A variable that makes all objects in the cache unique */
2172    private static AtomicLong unique = new AtomicLong(1);
2173
2174    FileSystem get(URI uri, Configuration conf) throws IOException{
2175      Key key = new Key(uri, conf);
2176      return getInternal(uri, conf, key);
2177    }
2178
2179    /** The objects inserted into the cache using this method are all unique */
2180    FileSystem getUnique(URI uri, Configuration conf) throws IOException{
2181      Key key = new Key(uri, conf, unique.getAndIncrement());
2182      return getInternal(uri, conf, key);
2183    }
2184
2185    private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{
2186      FileSystem fs;
2187      synchronized (this) {
2188        fs = map.get(key);
2189      }
2190      if (fs != null) {
2191        return fs;
2192      }
2193
2194      fs = createFileSystem(uri, conf);
2195      synchronized (this) { // refetch the lock again
2196        FileSystem oldfs = map.get(key);
2197        if (oldfs != null) { // a file system is created while lock is releasing
2198          fs.close(); // close the new file system
2199          return oldfs;  // return the old file system
2200        }
2201        
2202        // now insert the new file system into the map
2203        if (map.isEmpty() ) {
2204          ShutdownHookManager.get().addShutdownHook(clientFinalizer, SHUTDOWN_HOOK_PRIORITY);
2205        }
2206        fs.key = key;
2207        map.put(key, fs);
2208        if (conf.getBoolean("fs.automatic.close", true)) {
2209          toAutoClose.add(key);
2210        }
2211        return fs;
2212      }
2213    }
2214
2215    synchronized void remove(Key key, FileSystem fs) {
2216      if (map.containsKey(key) && fs == map.get(key)) {
2217        map.remove(key);
2218        toAutoClose.remove(key);
2219        }
2220    }
2221
2222    synchronized void closeAll() throws IOException {
2223      closeAll(false);
2224    }
2225
2226    /**
2227     * Close all FileSystem instances in the Cache.
2228     * @param onlyAutomatic only close those that are marked for automatic closing
2229     */
2230    synchronized void closeAll(boolean onlyAutomatic) throws IOException {
2231      List<IOException> exceptions = new ArrayList<IOException>();
2232
2233      // Make a copy of the keys in the map since we'll be modifying
2234      // the map while iterating over it, which isn't safe.
2235      List<Key> keys = new ArrayList<Key>();
2236      keys.addAll(map.keySet());
2237
2238      for (Key key : keys) {
2239        final FileSystem fs = map.get(key);
2240
2241        if (onlyAutomatic && !toAutoClose.contains(key)) {
2242          continue;
2243        }
2244
2245        //remove from cache
2246        remove(key, fs);
2247
2248        if (fs != null) {
2249          try {
2250            fs.close();
2251          }
2252          catch(IOException ioe) {
2253            exceptions.add(ioe);
2254          }
2255        }
2256      }
2257
2258      if (!exceptions.isEmpty()) {
2259        throw MultipleIOException.createIOException(exceptions);
2260      }
2261    }
2262
2263    private class ClientFinalizer implements Runnable {
2264      public synchronized void run() {
2265        try {
2266          closeAll(true);
2267        } catch (IOException e) {
2268          LOG.info("FileSystem.Cache.closeAll() threw an exception:\n" + e);
2269        }
2270      }
2271    }
2272
2273    synchronized void closeAll(UserGroupInformation ugi) throws IOException {
2274      List<FileSystem> targetFSList = new ArrayList<FileSystem>();
2275      //Make a pass over the list and collect the filesystems to close
2276      //we cannot close inline since close() removes the entry from the Map
2277      for (Map.Entry<Key, FileSystem> entry : map.entrySet()) {
2278        final Key key = entry.getKey();
2279        final FileSystem fs = entry.getValue();
2280        if (ugi.equals(key.ugi) && fs != null) {
2281          targetFSList.add(fs);   
2282        }
2283      }
2284      List<IOException> exceptions = new ArrayList<IOException>();
2285      //now make a pass over the target list and close each
2286      for (FileSystem fs : targetFSList) {
2287        try {
2288          fs.close();
2289        }
2290        catch(IOException ioe) {
2291          exceptions.add(ioe);
2292        }
2293      }
2294      if (!exceptions.isEmpty()) {
2295        throw MultipleIOException.createIOException(exceptions);
2296      }
2297    }
2298
2299    /** FileSystem.Cache.Key */
2300    static class Key {
2301      final String scheme;
2302      final String authority;
2303      final UserGroupInformation ugi;
2304      final long unique;   // an artificial way to make a key unique
2305
2306      Key(URI uri, Configuration conf) throws IOException {
2307        this(uri, conf, 0);
2308      }
2309
2310      Key(URI uri, Configuration conf, long unique) throws IOException {
2311        scheme = uri.getScheme()==null?"":uri.getScheme().toLowerCase();
2312        authority = uri.getAuthority()==null?"":uri.getAuthority().toLowerCase();
2313        this.unique = unique;
2314        
2315        this.ugi = UserGroupInformation.getCurrentUser();
2316      }
2317
2318      /** {@inheritDoc} */
2319      public int hashCode() {
2320        return (scheme + authority).hashCode() + ugi.hashCode() + (int)unique;
2321      }
2322
2323      static boolean isEqual(Object a, Object b) {
2324        return a == b || (a != null && a.equals(b));        
2325      }
2326
2327      /** {@inheritDoc} */
2328      public boolean equals(Object obj) {
2329        if (obj == this) {
2330          return true;
2331        }
2332        if (obj != null && obj instanceof Key) {
2333          Key that = (Key)obj;
2334          return isEqual(this.scheme, that.scheme)
2335                 && isEqual(this.authority, that.authority)
2336                 && isEqual(this.ugi, that.ugi)
2337                 && (this.unique == that.unique);
2338        }
2339        return false;        
2340      }
2341
2342      /** {@inheritDoc} */
2343      public String toString() {
2344        return "("+ugi.toString() + ")@" + scheme + "://" + authority;        
2345      }
2346    }
2347  }
2348  
2349  public static final class Statistics {
2350    private final String scheme;
2351    private AtomicLong bytesRead = new AtomicLong();
2352    private AtomicLong bytesWritten = new AtomicLong();
2353    private AtomicInteger readOps = new AtomicInteger();
2354    private AtomicInteger largeReadOps = new AtomicInteger();
2355    private AtomicInteger writeOps = new AtomicInteger();
2356    
2357    public Statistics(String scheme) {
2358      this.scheme = scheme;
2359    }
2360
2361    /**
2362     * Copy constructor.
2363     * 
2364     * @param st
2365     *          The input Statistics object which is cloned.
2366     */
2367    public Statistics(Statistics st) {
2368      this.scheme = st.scheme;
2369      this.bytesRead = new AtomicLong(st.bytesRead.longValue());
2370      this.bytesWritten = new AtomicLong(st.bytesWritten.longValue());
2371    }
2372
2373    /**
2374     * Increment the bytes read in the statistics
2375     * @param newBytes the additional bytes read
2376     */
2377    public void incrementBytesRead(long newBytes) {
2378      bytesRead.getAndAdd(newBytes);
2379    }
2380    
2381    /**
2382     * Increment the bytes written in the statistics
2383     * @param newBytes the additional bytes written
2384     */
2385    public void incrementBytesWritten(long newBytes) {
2386      bytesWritten.getAndAdd(newBytes);
2387    }
2388    
2389    /**
2390     * Increment the number of read operations
2391     * @param count number of read operations
2392     */
2393    public void incrementReadOps(int count) {
2394      readOps.getAndAdd(count);
2395    }
2396
2397    /**
2398     * Increment the number of large read operations
2399     * @param count number of large read operations
2400     */
2401    public void incrementLargeReadOps(int count) {
2402      largeReadOps.getAndAdd(count);
2403    }
2404
2405    /**
2406     * Increment the number of write operations
2407     * @param count number of write operations
2408     */
2409    public void incrementWriteOps(int count) {
2410      writeOps.getAndAdd(count);
2411    }
2412
2413    /**
2414     * Get the total number of bytes read
2415     * @return the number of bytes
2416     */
2417    public long getBytesRead() {
2418      return bytesRead.get();
2419    }
2420    
2421    /**
2422     * Get the total number of bytes written
2423     * @return the number of bytes
2424     */
2425    public long getBytesWritten() {
2426      return bytesWritten.get();
2427    }
2428    
2429    /**
2430     * Get the number of file system read operations such as list files
2431     * @return number of read operations
2432     */
2433    public int getReadOps() {
2434      return readOps.get() + largeReadOps.get();
2435    }
2436
2437    /**
2438     * Get the number of large file system read operations such as list files
2439     * under a large directory
2440     * @return number of large read operations
2441     */
2442    public int getLargeReadOps() {
2443      return largeReadOps.get();
2444    }
2445
2446    /**
2447     * Get the number of file system write operations such as create, append 
2448     * rename etc.
2449     * @return number of write operations
2450     */
2451    public int getWriteOps() {
2452      return writeOps.get();
2453    }
2454
2455    public String toString() {
2456      return bytesRead + " bytes read, " + bytesWritten + " bytes written, "
2457          + readOps + " read ops, " + largeReadOps + " large read ops, "
2458          + writeOps + " write ops";
2459    }
2460    
2461    /**
2462     * Reset the counts of bytes to 0.
2463     */
2464    public void reset() {
2465      bytesWritten.set(0);
2466      bytesRead.set(0);
2467    }
2468    
2469    /**
2470     * Get the uri scheme associated with this statistics object.
2471     * @return the schema associated with this set of statistics
2472     */
2473    public String getScheme() {
2474      return scheme;
2475    }
2476  }
2477  
2478  /**
2479   * Get the Map of Statistics object indexed by URI Scheme.
2480   * @return a Map having a key as URI scheme and value as Statistics object
2481   * @deprecated use {@link #getAllStatistics} instead
2482   */
2483  @Deprecated
2484  public static synchronized Map<String, Statistics> getStatistics() {
2485    Map<String, Statistics> result = new HashMap<String, Statistics>();
2486    for(Statistics stat: statisticsTable.values()) {
2487      result.put(stat.getScheme(), stat);
2488    }
2489    return result;
2490  }
2491
2492  /**
2493   * Return the FileSystem classes that have Statistics
2494   */
2495  public static synchronized List<Statistics> getAllStatistics() {
2496    return new ArrayList<Statistics>(statisticsTable.values());
2497  }
2498  
2499  /**
2500   * Get the statistics for a particular file system
2501   * @param cls the class to lookup
2502   * @return a statistics object
2503   */
2504  public static synchronized 
2505  Statistics getStatistics(String scheme, Class<? extends FileSystem> cls) {
2506    Statistics result = statisticsTable.get(cls);
2507    if (result == null) {
2508      result = new Statistics(scheme);
2509      statisticsTable.put(cls, result);
2510    }
2511    return result;
2512  }
2513  
2514  /**
2515   * Reset all statistics for all file systems
2516   */
2517  public static synchronized void clearStatistics() {
2518    for(Statistics stat: statisticsTable.values()) {
2519      stat.reset();
2520    }
2521  }
2522
2523  /**
2524   * Print all statistics for all file systems
2525   */
2526  public static synchronized
2527  void printStatistics() throws IOException {
2528    for (Map.Entry<Class<? extends FileSystem>, Statistics> pair: 
2529            statisticsTable.entrySet()) {
2530      System.out.println("  FileSystem " + pair.getKey().getName() + 
2531                         ": " + pair.getValue());
2532    }
2533  }
2534}