001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019
020package org.apache.hadoop.fs;
021
022import com.google.common.annotations.VisibleForTesting;
023
024import java.io.BufferedOutputStream;
025import java.io.DataOutput;
026import java.io.EOFException;
027import java.io.File;
028import java.io.FileInputStream;
029import java.io.FileNotFoundException;
030import java.io.FileOutputStream;
031import java.io.IOException;
032import java.io.OutputStream;
033import java.io.FileDescriptor;
034import java.net.URI;
035import java.nio.ByteBuffer;
036import java.util.Arrays;
037import java.util.EnumSet;
038import java.util.StringTokenizer;
039
040import org.apache.hadoop.classification.InterfaceAudience;
041import org.apache.hadoop.classification.InterfaceStability;
042import org.apache.hadoop.conf.Configuration;
043import org.apache.hadoop.fs.permission.FsPermission;
044import org.apache.hadoop.io.IOUtils;
045import org.apache.hadoop.io.nativeio.NativeIO;
046import org.apache.hadoop.io.nativeio.NativeIOException;
047import org.apache.hadoop.util.Progressable;
048import org.apache.hadoop.util.Shell;
049import org.apache.hadoop.util.StringUtils;
050
051/****************************************************************
052 * Implement the FileSystem API for the raw local filesystem.
053 *
054 *****************************************************************/
055@InterfaceAudience.Public
056@InterfaceStability.Stable
057public class RawLocalFileSystem extends FileSystem {
058  static final URI NAME = URI.create("file:///");
059  private Path workingDir;
060  // Temporary workaround for HADOOP-9652.
061  private static boolean useDeprecatedFileStatus = true;
062
063  @VisibleForTesting
064  public static void useStatIfAvailable() {
065    useDeprecatedFileStatus = !Stat.isAvailable();
066  }
067  
068  public RawLocalFileSystem() {
069    workingDir = getInitialWorkingDirectory();
070  }
071  
072  private Path makeAbsolute(Path f) {
073    if (f.isAbsolute()) {
074      return f;
075    } else {
076      return new Path(workingDir, f);
077    }
078  }
079  
080  /** Convert a path to a File. */
081  public File pathToFile(Path path) {
082    checkPath(path);
083    if (!path.isAbsolute()) {
084      path = new Path(getWorkingDirectory(), path);
085    }
086    return new File(path.toUri().getPath());
087  }
088
089  @Override
090  public URI getUri() { return NAME; }
091  
092  @Override
093  public void initialize(URI uri, Configuration conf) throws IOException {
094    super.initialize(uri, conf);
095    setConf(conf);
096  }
097  
098  /*******************************************************
099   * For open()'s FSInputStream.
100   *******************************************************/
101  class LocalFSFileInputStream extends FSInputStream implements HasFileDescriptor {
102    private FileInputStream fis;
103    private long position;
104
105    public LocalFSFileInputStream(Path f) throws IOException {
106      fis = new FileInputStream(pathToFile(f));
107    }
108    
109    @Override
110    public void seek(long pos) throws IOException {
111      if (pos < 0) {
112        throw new EOFException(
113          FSExceptionMessages.NEGATIVE_SEEK);
114      }
115      fis.getChannel().position(pos);
116      this.position = pos;
117    }
118    
119    @Override
120    public long getPos() throws IOException {
121      return this.position;
122    }
123    
124    @Override
125    public boolean seekToNewSource(long targetPos) throws IOException {
126      return false;
127    }
128    
129    /*
130     * Just forward to the fis
131     */
132    @Override
133    public int available() throws IOException { return fis.available(); }
134    @Override
135    public void close() throws IOException { fis.close(); }
136    @Override
137    public boolean markSupported() { return false; }
138    
139    @Override
140    public int read() throws IOException {
141      try {
142        int value = fis.read();
143        if (value >= 0) {
144          this.position++;
145          statistics.incrementBytesRead(1);
146        }
147        return value;
148      } catch (IOException e) {                 // unexpected exception
149        throw new FSError(e);                   // assume native fs error
150      }
151    }
152    
153    @Override
154    public int read(byte[] b, int off, int len) throws IOException {
155      try {
156        int value = fis.read(b, off, len);
157        if (value > 0) {
158          this.position += value;
159          statistics.incrementBytesRead(value);
160        }
161        return value;
162      } catch (IOException e) {                 // unexpected exception
163        throw new FSError(e);                   // assume native fs error
164      }
165    }
166    
167    @Override
168    public int read(long position, byte[] b, int off, int len)
169      throws IOException {
170      ByteBuffer bb = ByteBuffer.wrap(b, off, len);
171      try {
172        int value = fis.getChannel().read(bb, position);
173        if (value > 0) {
174          statistics.incrementBytesRead(value);
175        }
176        return value;
177      } catch (IOException e) {
178        throw new FSError(e);
179      }
180    }
181    
182    @Override
183    public long skip(long n) throws IOException {
184      long value = fis.skip(n);
185      if (value > 0) {
186        this.position += value;
187      }
188      return value;
189    }
190
191    @Override
192    public FileDescriptor getFileDescriptor() throws IOException {
193      return fis.getFD();
194    }
195  }
196  
197  @Override
198  public FSDataInputStream open(Path f, int bufferSize) throws IOException {
199    if (!exists(f)) {
200      throw new FileNotFoundException(f.toString());
201    }
202    return new FSDataInputStream(new BufferedFSInputStream(
203        new LocalFSFileInputStream(f), bufferSize));
204  }
205  
206  /*********************************************************
207   * For create()'s FSOutputStream.
208   *********************************************************/
209  class LocalFSFileOutputStream extends OutputStream {
210    private FileOutputStream fos;
211    
212    private LocalFSFileOutputStream(Path f, boolean append,
213        FsPermission permission) throws IOException {
214      File file = pathToFile(f);
215      if (permission == null) {
216        this.fos = new FileOutputStream(file, append);
217      } else {
218        if (Shell.WINDOWS && NativeIO.isAvailable()) {
219          this.fos = NativeIO.Windows.createFileOutputStreamWithMode(file,
220              append, permission.toShort());
221        } else {
222          this.fos = new FileOutputStream(file, append);
223          boolean success = false;
224          try {
225            setPermission(f, permission);
226            success = true;
227          } finally {
228            if (!success) {
229              IOUtils.cleanup(LOG, this.fos);
230            }
231          }
232        }
233      }
234    }
235    
236    /*
237     * Just forward to the fos
238     */
239    @Override
240    public void close() throws IOException { fos.close(); }
241    @Override
242    public void flush() throws IOException { fos.flush(); }
243    @Override
244    public void write(byte[] b, int off, int len) throws IOException {
245      try {
246        fos.write(b, off, len);
247      } catch (IOException e) {                // unexpected exception
248        throw new FSError(e);                  // assume native fs error
249      }
250    }
251    
252    @Override
253    public void write(int b) throws IOException {
254      try {
255        fos.write(b);
256      } catch (IOException e) {              // unexpected exception
257        throw new FSError(e);                // assume native fs error
258      }
259    }
260  }
261
262  @Override
263  public FSDataOutputStream append(Path f, int bufferSize,
264      Progressable progress) throws IOException {
265    if (!exists(f)) {
266      throw new FileNotFoundException("File " + f + " not found");
267    }
268    if (getFileStatus(f).isDirectory()) {
269      throw new IOException("Cannot append to a diretory (=" + f + " )");
270    }
271    return new FSDataOutputStream(new BufferedOutputStream(
272        createOutputStreamWithMode(f, true, null), bufferSize), statistics);
273  }
274
275  @Override
276  public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize,
277    short replication, long blockSize, Progressable progress)
278    throws IOException {
279    return create(f, overwrite, true, bufferSize, replication, blockSize,
280        progress, null);
281  }
282
283  private FSDataOutputStream create(Path f, boolean overwrite,
284      boolean createParent, int bufferSize, short replication, long blockSize,
285      Progressable progress, FsPermission permission) throws IOException {
286    if (exists(f) && !overwrite) {
287      throw new FileAlreadyExistsException("File already exists: " + f);
288    }
289    Path parent = f.getParent();
290    if (parent != null && !mkdirs(parent)) {
291      throw new IOException("Mkdirs failed to create " + parent.toString());
292    }
293    return new FSDataOutputStream(new BufferedOutputStream(
294        createOutputStreamWithMode(f, false, permission), bufferSize),
295        statistics);
296  }
297  
298  protected OutputStream createOutputStream(Path f, boolean append) 
299      throws IOException {
300    return createOutputStreamWithMode(f, append, null);
301  }
302
303  protected OutputStream createOutputStreamWithMode(Path f, boolean append,
304      FsPermission permission) throws IOException {
305    return new LocalFSFileOutputStream(f, append, permission);
306  }
307  
308  @Override
309  @Deprecated
310  public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
311      EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize,
312      Progressable progress) throws IOException {
313    if (exists(f) && !flags.contains(CreateFlag.OVERWRITE)) {
314      throw new FileAlreadyExistsException("File already exists: " + f);
315    }
316    return new FSDataOutputStream(new BufferedOutputStream(
317        createOutputStreamWithMode(f, false, permission), bufferSize),
318            statistics);
319  }
320
321  @Override
322  public FSDataOutputStream create(Path f, FsPermission permission,
323    boolean overwrite, int bufferSize, short replication, long blockSize,
324    Progressable progress) throws IOException {
325
326    FSDataOutputStream out = create(f, overwrite, true, bufferSize, replication,
327        blockSize, progress, permission);
328    return out;
329  }
330
331  @Override
332  public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
333      boolean overwrite,
334      int bufferSize, short replication, long blockSize,
335      Progressable progress) throws IOException {
336    FSDataOutputStream out = create(f, overwrite, false, bufferSize, replication,
337        blockSize, progress, permission);
338    return out;
339  }
340
341  @Override
342  public boolean rename(Path src, Path dst) throws IOException {
343    // Attempt rename using Java API.
344    File srcFile = pathToFile(src);
345    File dstFile = pathToFile(dst);
346    if (srcFile.renameTo(dstFile)) {
347      return true;
348    }
349
350    // Enforce POSIX rename behavior that a source directory replaces an existing
351    // destination if the destination is an empty directory.  On most platforms,
352    // this is already handled by the Java API call above.  Some platforms
353    // (notably Windows) do not provide this behavior, so the Java API call above
354    // fails.  Delete destination and attempt rename again.
355    if (this.exists(dst)) {
356      FileStatus sdst = this.getFileStatus(dst);
357      if (sdst.isDirectory() && dstFile.list().length == 0) {
358        if (LOG.isDebugEnabled()) {
359          LOG.debug("Deleting empty destination and renaming " + src + " to " +
360            dst);
361        }
362        if (this.delete(dst, false) && srcFile.renameTo(dstFile)) {
363          return true;
364        }
365      }
366    }
367
368    // The fallback behavior accomplishes the rename by a full copy.
369    if (LOG.isDebugEnabled()) {
370      LOG.debug("Falling through to a copy of " + src + " to " + dst);
371    }
372    return FileUtil.copy(this, src, this, dst, true, getConf());
373  }
374
375  @Override
376  public boolean truncate(Path f, final long newLength) throws IOException {
377    FileStatus status = getFileStatus(f);
378    if(status == null) {
379      throw new FileNotFoundException("File " + f + " not found");
380    }
381    if(status.isDirectory()) {
382      throw new IOException("Cannot truncate a directory (=" + f + ")");
383    }
384    long oldLength = status.getLen();
385    if(newLength > oldLength) {
386      throw new IllegalArgumentException(
387          "Cannot truncate to a larger file size. Current size: " + oldLength +
388          ", truncate size: " + newLength + ".");
389    }
390    try (FileOutputStream out = new FileOutputStream(pathToFile(f), true)) {
391      try {
392        out.getChannel().truncate(newLength);
393      } catch(IOException e) {
394        throw new FSError(e);
395      }
396    }
397    return true;
398  }
399  
400  /**
401   * Delete the given path to a file or directory.
402   * @param p the path to delete
403   * @param recursive to delete sub-directories
404   * @return true if the file or directory and all its contents were deleted
405   * @throws IOException if p is non-empty and recursive is false 
406   */
407  @Override
408  public boolean delete(Path p, boolean recursive) throws IOException {
409    File f = pathToFile(p);
410    if (!f.exists()) {
411      //no path, return false "nothing to delete"
412      return false;
413    }
414    if (f.isFile()) {
415      return f.delete();
416    } else if (!recursive && f.isDirectory() && 
417        (FileUtil.listFiles(f).length != 0)) {
418      throw new IOException("Directory " + f.toString() + " is not empty");
419    }
420    return FileUtil.fullyDelete(f);
421  }
422 
423  @Override
424  public FileStatus[] listStatus(Path f) throws IOException {
425    File localf = pathToFile(f);
426    FileStatus[] results;
427
428    if (!localf.exists()) {
429      throw new FileNotFoundException("File " + f + " does not exist");
430    }
431
432    if (localf.isDirectory()) {
433      String[] names = localf.list();
434      if (names == null) {
435        return null;
436      }
437      results = new FileStatus[names.length];
438      int j = 0;
439      for (int i = 0; i < names.length; i++) {
440        try {
441          // Assemble the path using the Path 3 arg constructor to make sure
442          // paths with colon are properly resolved on Linux
443          results[j] = getFileStatus(new Path(f, new Path(null, null,
444                                                          names[i])));
445          j++;
446        } catch (FileNotFoundException e) {
447          // ignore the files not found since the dir list may have have
448          // changed since the names[] list was generated.
449        }
450      }
451      if (j == names.length) {
452        return results;
453      }
454      return Arrays.copyOf(results, j);
455    }
456
457    if (!useDeprecatedFileStatus) {
458      return new FileStatus[] { getFileStatus(f) };
459    }
460    return new FileStatus[] {
461        new DeprecatedRawLocalFileStatus(localf,
462        getDefaultBlockSize(f), this) };
463  }
464  
465  protected boolean mkOneDir(File p2f) throws IOException {
466    return mkOneDirWithMode(new Path(p2f.getAbsolutePath()), p2f, null);
467  }
468
469  protected boolean mkOneDirWithMode(Path p, File p2f, FsPermission permission)
470      throws IOException {
471    if (permission == null) {
472      return p2f.mkdir();
473    } else {
474      if (Shell.WINDOWS && NativeIO.isAvailable()) {
475        try {
476          NativeIO.Windows.createDirectoryWithMode(p2f, permission.toShort());
477          return true;
478        } catch (IOException e) {
479          if (LOG.isDebugEnabled()) {
480            LOG.debug(String.format(
481                "NativeIO.createDirectoryWithMode error, path = %s, mode = %o",
482                p2f, permission.toShort()), e);
483          }
484          return false;
485        }
486      } else {
487        boolean b = p2f.mkdir();
488        if (b) {
489          setPermission(p, permission);
490        }
491        return b;
492      }
493    }
494  }
495
496  /**
497   * Creates the specified directory hierarchy. Does not
498   * treat existence as an error.
499   */
500  @Override
501  public boolean mkdirs(Path f) throws IOException {
502    return mkdirsWithOptionalPermission(f, null);
503  }
504
505  @Override
506  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
507    return mkdirsWithOptionalPermission(f, permission);
508  }
509
510  private boolean mkdirsWithOptionalPermission(Path f, FsPermission permission)
511      throws IOException {
512    if(f == null) {
513      throw new IllegalArgumentException("mkdirs path arg is null");
514    }
515    Path parent = f.getParent();
516    File p2f = pathToFile(f);
517    File parent2f = null;
518    if(parent != null) {
519      parent2f = pathToFile(parent);
520      if(parent2f != null && parent2f.exists() && !parent2f.isDirectory()) {
521        throw new ParentNotDirectoryException("Parent path is not a directory: "
522            + parent);
523      }
524    }
525    if (p2f.exists() && !p2f.isDirectory()) {
526      throw new FileNotFoundException("Destination exists" +
527              " and is not a directory: " + p2f.getCanonicalPath());
528    }
529    return (parent == null || parent2f.exists() || mkdirs(parent)) &&
530      (mkOneDirWithMode(f, p2f, permission) || p2f.isDirectory());
531  }
532  
533  
534  @Override
535  public Path getHomeDirectory() {
536    return this.makeQualified(new Path(System.getProperty("user.home")));
537  }
538
539  /**
540   * Set the working directory to the given directory.
541   */
542  @Override
543  public void setWorkingDirectory(Path newDir) {
544    workingDir = makeAbsolute(newDir);
545    checkPath(workingDir);
546  }
547  
548  @Override
549  public Path getWorkingDirectory() {
550    return workingDir;
551  }
552  
553  @Override
554  protected Path getInitialWorkingDirectory() {
555    return this.makeQualified(new Path(System.getProperty("user.dir")));
556  }
557
558  @Override
559  public FsStatus getStatus(Path p) throws IOException {
560    File partition = pathToFile(p == null ? new Path("/") : p);
561    //File provides getUsableSpace() and getFreeSpace()
562    //File provides no API to obtain used space, assume used = total - free
563    return new FsStatus(partition.getTotalSpace(), 
564      partition.getTotalSpace() - partition.getFreeSpace(),
565      partition.getFreeSpace());
566  }
567  
568  // In the case of the local filesystem, we can just rename the file.
569  @Override
570  public void moveFromLocalFile(Path src, Path dst) throws IOException {
571    rename(src, dst);
572  }
573  
574  // We can write output directly to the final location
575  @Override
576  public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
577    throws IOException {
578    return fsOutputFile;
579  }
580  
581  // It's in the right place - nothing to do.
582  @Override
583  public void completeLocalOutput(Path fsWorkingFile, Path tmpLocalFile)
584    throws IOException {
585  }
586  
587  @Override
588  public void close() throws IOException {
589    super.close();
590  }
591  
592  @Override
593  public String toString() {
594    return "LocalFS";
595  }
596  
597  @Override
598  public FileStatus getFileStatus(Path f) throws IOException {
599    return getFileLinkStatusInternal(f, true);
600  }
601
602  @Deprecated
603  private FileStatus deprecatedGetFileStatus(Path f) throws IOException {
604    File path = pathToFile(f);
605    if (path.exists()) {
606      return new DeprecatedRawLocalFileStatus(pathToFile(f),
607          getDefaultBlockSize(f), this);
608    } else {
609      throw new FileNotFoundException("File " + f + " does not exist");
610    }
611  }
612
613  @Deprecated
614  static class DeprecatedRawLocalFileStatus extends FileStatus {
615    /* We can add extra fields here. It breaks at least CopyFiles.FilePair().
616     * We recognize if the information is already loaded by check if
617     * onwer.equals("").
618     */
619    private boolean isPermissionLoaded() {
620      return !super.getOwner().isEmpty(); 
621    }
622    
623    DeprecatedRawLocalFileStatus(File f, long defaultBlockSize, FileSystem fs) {
624      super(f.length(), f.isDirectory(), 1, defaultBlockSize,
625          f.lastModified(), new Path(f.getPath()).makeQualified(fs.getUri(),
626            fs.getWorkingDirectory()));
627    }
628    
629    @Override
630    public FsPermission getPermission() {
631      if (!isPermissionLoaded()) {
632        loadPermissionInfo();
633      }
634      return super.getPermission();
635    }
636
637    @Override
638    public String getOwner() {
639      if (!isPermissionLoaded()) {
640        loadPermissionInfo();
641      }
642      return super.getOwner();
643    }
644
645    @Override
646    public String getGroup() {
647      if (!isPermissionLoaded()) {
648        loadPermissionInfo();
649      }
650      return super.getGroup();
651    }
652
653    /// loads permissions, owner, and group from `ls -ld`
654    private void loadPermissionInfo() {
655      IOException e = null;
656      try {
657        String output = FileUtil.execCommand(new File(getPath().toUri()), 
658            Shell.getGetPermissionCommand());
659        StringTokenizer t =
660            new StringTokenizer(output, Shell.TOKEN_SEPARATOR_REGEX);
661        //expected format
662        //-rw-------    1 username groupname ...
663        String permission = t.nextToken();
664        if (permission.length() > FsPermission.MAX_PERMISSION_LENGTH) {
665          //files with ACLs might have a '+'
666          permission = permission.substring(0,
667            FsPermission.MAX_PERMISSION_LENGTH);
668        }
669        setPermission(FsPermission.valueOf(permission));
670        t.nextToken();
671
672        String owner = t.nextToken();
673        // If on windows domain, token format is DOMAIN\\user and we want to
674        // extract only the user name
675        if (Shell.WINDOWS) {
676          int i = owner.indexOf('\\');
677          if (i != -1)
678            owner = owner.substring(i + 1);
679        }
680        setOwner(owner);
681
682        setGroup(t.nextToken());
683      } catch (Shell.ExitCodeException ioe) {
684        if (ioe.getExitCode() != 1) {
685          e = ioe;
686        } else {
687          setPermission(null);
688          setOwner(null);
689          setGroup(null);
690        }
691      } catch (IOException ioe) {
692        e = ioe;
693      } finally {
694        if (e != null) {
695          throw new RuntimeException("Error while running command to get " +
696                                     "file permissions : " + 
697                                     StringUtils.stringifyException(e));
698        }
699      }
700    }
701
702    @Override
703    public void write(DataOutput out) throws IOException {
704      if (!isPermissionLoaded()) {
705        loadPermissionInfo();
706      }
707      super.write(out);
708    }
709  }
710
711  /**
712   * Use the command chown to set owner.
713   */
714  @Override
715  public void setOwner(Path p, String username, String groupname)
716    throws IOException {
717    FileUtil.setOwner(pathToFile(p), username, groupname);
718  }
719
720  /**
721   * Use the command chmod to set permission.
722   */
723  @Override
724  public void setPermission(Path p, FsPermission permission)
725    throws IOException {
726    if (NativeIO.isAvailable()) {
727      NativeIO.POSIX.chmod(pathToFile(p).getCanonicalPath(),
728                     permission.toShort());
729    } else {
730      String perm = String.format("%04o", permission.toShort());
731      Shell.execCommand(Shell.getSetPermissionCommand(perm, false,
732        FileUtil.makeShellPath(pathToFile(p), true)));
733    }
734  }
735 
736  /**
737   * Sets the {@link Path}'s last modified time <em>only</em> to the given
738   * valid time.
739   *
740   * @param mtime the modification time to set (only if greater than zero).
741   * @param atime currently ignored.
742   * @throws IOException if setting the last modified time fails.
743   */
744  @Override
745  public void setTimes(Path p, long mtime, long atime) throws IOException {
746    File f = pathToFile(p);
747    if(mtime >= 0) {
748      if(!f.setLastModified(mtime)) {
749        throw new IOException(
750          "couldn't set last-modified time to " +
751          mtime +
752          " for " +
753          f.getAbsolutePath());
754      }
755    }
756  }
757
758  @Override
759  public boolean supportsSymlinks() {
760    return true;
761  }
762
763  @SuppressWarnings("deprecation")
764  @Override
765  public void createSymlink(Path target, Path link, boolean createParent)
766      throws IOException {
767    if (!FileSystem.areSymlinksEnabled()) {
768      throw new UnsupportedOperationException("Symlinks not supported");
769    }
770    final String targetScheme = target.toUri().getScheme();
771    if (targetScheme != null && !"file".equals(targetScheme)) {
772      throw new IOException("Unable to create symlink to non-local file "+
773                            "system: "+target.toString());
774    }
775    if (createParent) {
776      mkdirs(link.getParent());
777    }
778
779    // NB: Use createSymbolicLink in java.nio.file.Path once available
780    int result = FileUtil.symLink(target.toString(),
781        makeAbsolute(link).toString());
782    if (result != 0) {
783      throw new IOException("Error " + result + " creating symlink " +
784          link + " to " + target);
785    }
786  }
787
788  /**
789   * Return a FileStatus representing the given path. If the path refers
790   * to a symlink return a FileStatus representing the link rather than
791   * the object the link refers to.
792   */
793  @Override
794  public FileStatus getFileLinkStatus(final Path f) throws IOException {
795    FileStatus fi = getFileLinkStatusInternal(f, false);
796    // getFileLinkStatus is supposed to return a symlink with a
797    // qualified path
798    if (fi.isSymlink()) {
799      Path targetQual = FSLinkResolver.qualifySymlinkTarget(this.getUri(),
800          fi.getPath(), fi.getSymlink());
801      fi.setSymlink(targetQual);
802    }
803    return fi;
804  }
805
806  /**
807   * Public {@link FileStatus} methods delegate to this function, which in turn
808   * either call the new {@link Stat} based implementation or the deprecated
809   * methods based on platform support.
810   * 
811   * @param f Path to stat
812   * @param dereference whether to dereference the final path component if a
813   *          symlink
814   * @return FileStatus of f
815   * @throws IOException
816   */
817  private FileStatus getFileLinkStatusInternal(final Path f,
818      boolean dereference) throws IOException {
819    if (!useDeprecatedFileStatus) {
820      return getNativeFileLinkStatus(f, dereference);
821    } else if (dereference) {
822      return deprecatedGetFileStatus(f);
823    } else {
824      return deprecatedGetFileLinkStatusInternal(f);
825    }
826  }
827
828  /**
829   * Deprecated. Remains for legacy support. Should be removed when {@link Stat}
830   * gains support for Windows and other operating systems.
831   */
832  @Deprecated
833  private FileStatus deprecatedGetFileLinkStatusInternal(final Path f)
834      throws IOException {
835    String target = FileUtil.readLink(new File(f.toString()));
836
837    try {
838      FileStatus fs = getFileStatus(f);
839      // If f refers to a regular file or directory
840      if (target.isEmpty()) {
841        return fs;
842      }
843      // Otherwise f refers to a symlink
844      return new FileStatus(fs.getLen(),
845          false,
846          fs.getReplication(),
847          fs.getBlockSize(),
848          fs.getModificationTime(),
849          fs.getAccessTime(),
850          fs.getPermission(),
851          fs.getOwner(),
852          fs.getGroup(),
853          new Path(target),
854          f);
855    } catch (FileNotFoundException e) {
856      /* The exists method in the File class returns false for dangling
857       * links so we can get a FileNotFoundException for links that exist.
858       * It's also possible that we raced with a delete of the link. Use
859       * the readBasicFileAttributes method in java.nio.file.attributes
860       * when available.
861       */
862      if (!target.isEmpty()) {
863        return new FileStatus(0, false, 0, 0, 0, 0, FsPermission.getDefault(),
864            "", "", new Path(target), f);
865      }
866      // f refers to a file or directory that does not exist
867      throw e;
868    }
869  }
870  /**
871   * Calls out to platform's native stat(1) implementation to get file metadata
872   * (permissions, user, group, atime, mtime, etc). This works around the lack
873   * of lstat(2) in Java 6.
874   * 
875   *  Currently, the {@link Stat} class used to do this only supports Linux
876   *  and FreeBSD, so the old {@link #deprecatedGetFileLinkStatusInternal(Path)}
877   *  implementation (deprecated) remains further OS support is added.
878   *
879   * @param f File to stat
880   * @param dereference whether to dereference symlinks
881   * @return FileStatus of f
882   * @throws IOException
883   */
884  private FileStatus getNativeFileLinkStatus(final Path f,
885      boolean dereference) throws IOException {
886    checkPath(f);
887    Stat stat = new Stat(f, getDefaultBlockSize(f), dereference, this);
888    FileStatus status = stat.getFileStatus();
889    return status;
890  }
891
892  @Override
893  public Path getLinkTarget(Path f) throws IOException {
894    FileStatus fi = getFileLinkStatusInternal(f, false);
895    // return an unqualified symlink target
896    return fi.getSymlink();
897  }
898}