001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019
020package org.apache.hadoop.fs;
021
022import com.google.common.annotations.VisibleForTesting;
023
024import java.io.BufferedOutputStream;
025import java.io.DataOutput;
026import java.io.EOFException;
027import java.io.File;
028import java.io.FileInputStream;
029import java.io.FileNotFoundException;
030import java.io.FileOutputStream;
031import java.io.IOException;
032import java.io.OutputStream;
033import java.io.FileDescriptor;
034import java.net.URI;
035import java.nio.ByteBuffer;
036import java.util.Arrays;
037import java.util.EnumSet;
038import java.util.StringTokenizer;
039
040import org.apache.hadoop.classification.InterfaceAudience;
041import org.apache.hadoop.classification.InterfaceStability;
042import org.apache.hadoop.conf.Configuration;
043import org.apache.hadoop.fs.permission.FsPermission;
044import org.apache.hadoop.io.IOUtils;
045import org.apache.hadoop.io.nativeio.NativeIO;
046import org.apache.hadoop.io.nativeio.NativeIOException;
047import org.apache.hadoop.util.Progressable;
048import org.apache.hadoop.util.Shell;
049import org.apache.hadoop.util.StringUtils;
050
051/****************************************************************
052 * Implement the FileSystem API for the raw local filesystem.
053 *
054 *****************************************************************/
055@InterfaceAudience.Public
056@InterfaceStability.Stable
057public class RawLocalFileSystem extends FileSystem {
058  static final URI NAME = URI.create("file:///");
059  private Path workingDir;
060  // Temporary workaround for HADOOP-9652.
061  private static boolean useDeprecatedFileStatus = true;
062
063  @VisibleForTesting
064  public static void useStatIfAvailable() {
065    useDeprecatedFileStatus = !Stat.isAvailable();
066  }
067  
068  public RawLocalFileSystem() {
069    workingDir = getInitialWorkingDirectory();
070  }
071  
072  private Path makeAbsolute(Path f) {
073    if (f.isAbsolute()) {
074      return f;
075    } else {
076      return new Path(workingDir, f);
077    }
078  }
079  
080  /** Convert a path to a File. */
081  public File pathToFile(Path path) {
082    checkPath(path);
083    if (!path.isAbsolute()) {
084      path = new Path(getWorkingDirectory(), path);
085    }
086    return new File(path.toUri().getPath());
087  }
088
089  @Override
090  public URI getUri() { return NAME; }
091  
092  @Override
093  public void initialize(URI uri, Configuration conf) throws IOException {
094    super.initialize(uri, conf);
095    setConf(conf);
096  }
097  
098  /*******************************************************
099   * For open()'s FSInputStream.
100   *******************************************************/
101  class LocalFSFileInputStream extends FSInputStream implements HasFileDescriptor {
102    private FileInputStream fis;
103    private long position;
104
105    public LocalFSFileInputStream(Path f) throws IOException {
106      fis = new FileInputStream(pathToFile(f));
107    }
108    
109    @Override
110    public void seek(long pos) throws IOException {
111      if (pos < 0) {
112        throw new EOFException(
113          FSExceptionMessages.NEGATIVE_SEEK);
114      }
115      fis.getChannel().position(pos);
116      this.position = pos;
117    }
118    
119    @Override
120    public long getPos() throws IOException {
121      return this.position;
122    }
123    
124    @Override
125    public boolean seekToNewSource(long targetPos) throws IOException {
126      return false;
127    }
128    
129    /*
130     * Just forward to the fis
131     */
132    @Override
133    public int available() throws IOException { return fis.available(); }
134    @Override
135    public void close() throws IOException { fis.close(); }
136    @Override
137    public boolean markSupported() { return false; }
138    
139    @Override
140    public int read() throws IOException {
141      try {
142        int value = fis.read();
143        if (value >= 0) {
144          this.position++;
145          statistics.incrementBytesRead(1);
146        }
147        return value;
148      } catch (IOException e) {                 // unexpected exception
149        throw new FSError(e);                   // assume native fs error
150      }
151    }
152    
153    @Override
154    public int read(byte[] b, int off, int len) throws IOException {
155      try {
156        int value = fis.read(b, off, len);
157        if (value > 0) {
158          this.position += value;
159          statistics.incrementBytesRead(value);
160        }
161        return value;
162      } catch (IOException e) {                 // unexpected exception
163        throw new FSError(e);                   // assume native fs error
164      }
165    }
166    
167    @Override
168    public int read(long position, byte[] b, int off, int len)
169      throws IOException {
170      ByteBuffer bb = ByteBuffer.wrap(b, off, len);
171      try {
172        int value = fis.getChannel().read(bb, position);
173        if (value > 0) {
174          statistics.incrementBytesRead(value);
175        }
176        return value;
177      } catch (IOException e) {
178        throw new FSError(e);
179      }
180    }
181    
182    @Override
183    public long skip(long n) throws IOException {
184      long value = fis.skip(n);
185      if (value > 0) {
186        this.position += value;
187      }
188      return value;
189    }
190
191    @Override
192    public FileDescriptor getFileDescriptor() throws IOException {
193      return fis.getFD();
194    }
195  }
196  
197  @Override
198  public FSDataInputStream open(Path f, int bufferSize) throws IOException {
199    if (!exists(f)) {
200      throw new FileNotFoundException(f.toString());
201    }
202    return new FSDataInputStream(new BufferedFSInputStream(
203        new LocalFSFileInputStream(f), bufferSize));
204  }
205  
206  /*********************************************************
207   * For create()'s FSOutputStream.
208   *********************************************************/
209  class LocalFSFileOutputStream extends OutputStream {
210    private FileOutputStream fos;
211    
212    private LocalFSFileOutputStream(Path f, boolean append,
213        FsPermission permission) throws IOException {
214      File file = pathToFile(f);
215      if (permission == null) {
216        this.fos = new FileOutputStream(file, append);
217      } else {
218        if (Shell.WINDOWS && NativeIO.isAvailable()) {
219          this.fos = NativeIO.Windows.createFileOutputStreamWithMode(file,
220              append, permission.toShort());
221        } else {
222          this.fos = new FileOutputStream(file, append);
223          boolean success = false;
224          try {
225            setPermission(f, permission);
226            success = true;
227          } finally {
228            if (!success) {
229              IOUtils.cleanup(LOG, this.fos);
230            }
231          }
232        }
233      }
234    }
235    
236    /*
237     * Just forward to the fos
238     */
239    @Override
240    public void close() throws IOException { fos.close(); }
241    @Override
242    public void flush() throws IOException { fos.flush(); }
243    @Override
244    public void write(byte[] b, int off, int len) throws IOException {
245      try {
246        fos.write(b, off, len);
247      } catch (IOException e) {                // unexpected exception
248        throw new FSError(e);                  // assume native fs error
249      }
250    }
251    
252    @Override
253    public void write(int b) throws IOException {
254      try {
255        fos.write(b);
256      } catch (IOException e) {              // unexpected exception
257        throw new FSError(e);                // assume native fs error
258      }
259    }
260  }
261
262  @Override
263  public FSDataOutputStream append(Path f, int bufferSize,
264      Progressable progress) throws IOException {
265    if (!exists(f)) {
266      throw new FileNotFoundException("File " + f + " not found");
267    }
268    FileStatus status = getFileStatus(f);
269    if (status.isDirectory()) {
270      throw new IOException("Cannot append to a diretory (=" + f + " )");
271    }
272    return new FSDataOutputStream(new BufferedOutputStream(
273        createOutputStreamWithMode(f, true, null), bufferSize), statistics,
274        status.getLen());
275  }
276
277  @Override
278  public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize,
279    short replication, long blockSize, Progressable progress)
280    throws IOException {
281    return create(f, overwrite, true, bufferSize, replication, blockSize,
282        progress, null);
283  }
284
285  private FSDataOutputStream create(Path f, boolean overwrite,
286      boolean createParent, int bufferSize, short replication, long blockSize,
287      Progressable progress, FsPermission permission) throws IOException {
288    if (exists(f) && !overwrite) {
289      throw new FileAlreadyExistsException("File already exists: " + f);
290    }
291    Path parent = f.getParent();
292    if (parent != null && !mkdirs(parent)) {
293      throw new IOException("Mkdirs failed to create " + parent.toString());
294    }
295    return new FSDataOutputStream(new BufferedOutputStream(
296        createOutputStreamWithMode(f, false, permission), bufferSize),
297        statistics);
298  }
299  
300  protected OutputStream createOutputStream(Path f, boolean append) 
301      throws IOException {
302    return createOutputStreamWithMode(f, append, null);
303  }
304
305  protected OutputStream createOutputStreamWithMode(Path f, boolean append,
306      FsPermission permission) throws IOException {
307    return new LocalFSFileOutputStream(f, append, permission);
308  }
309  
310  @Override
311  @Deprecated
312  public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
313      EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize,
314      Progressable progress) throws IOException {
315    if (exists(f) && !flags.contains(CreateFlag.OVERWRITE)) {
316      throw new FileAlreadyExistsException("File already exists: " + f);
317    }
318    return new FSDataOutputStream(new BufferedOutputStream(
319        createOutputStreamWithMode(f, false, permission), bufferSize),
320            statistics);
321  }
322
323  @Override
324  public FSDataOutputStream create(Path f, FsPermission permission,
325    boolean overwrite, int bufferSize, short replication, long blockSize,
326    Progressable progress) throws IOException {
327
328    FSDataOutputStream out = create(f, overwrite, true, bufferSize, replication,
329        blockSize, progress, permission);
330    return out;
331  }
332
333  @Override
334  public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
335      boolean overwrite,
336      int bufferSize, short replication, long blockSize,
337      Progressable progress) throws IOException {
338    FSDataOutputStream out = create(f, overwrite, false, bufferSize, replication,
339        blockSize, progress, permission);
340    return out;
341  }
342
343  @Override
344  public boolean rename(Path src, Path dst) throws IOException {
345    // Attempt rename using Java API.
346    File srcFile = pathToFile(src);
347    File dstFile = pathToFile(dst);
348    if (srcFile.renameTo(dstFile)) {
349      return true;
350    }
351
352    // Enforce POSIX rename behavior that a source directory replaces an existing
353    // destination if the destination is an empty directory.  On most platforms,
354    // this is already handled by the Java API call above.  Some platforms
355    // (notably Windows) do not provide this behavior, so the Java API call above
356    // fails.  Delete destination and attempt rename again.
357    if (this.exists(dst)) {
358      FileStatus sdst = this.getFileStatus(dst);
359      if (sdst.isDirectory() && dstFile.list().length == 0) {
360        if (LOG.isDebugEnabled()) {
361          LOG.debug("Deleting empty destination and renaming " + src + " to " +
362            dst);
363        }
364        if (this.delete(dst, false) && srcFile.renameTo(dstFile)) {
365          return true;
366        }
367      }
368    }
369
370    // The fallback behavior accomplishes the rename by a full copy.
371    if (LOG.isDebugEnabled()) {
372      LOG.debug("Falling through to a copy of " + src + " to " + dst);
373    }
374    return FileUtil.copy(this, src, this, dst, true, getConf());
375  }
376
377  @Override
378  public boolean truncate(Path f, final long newLength) throws IOException {
379    FileStatus status = getFileStatus(f);
380    if(status == null) {
381      throw new FileNotFoundException("File " + f + " not found");
382    }
383    if(status.isDirectory()) {
384      throw new IOException("Cannot truncate a directory (=" + f + ")");
385    }
386    long oldLength = status.getLen();
387    if(newLength > oldLength) {
388      throw new IllegalArgumentException(
389          "Cannot truncate to a larger file size. Current size: " + oldLength +
390          ", truncate size: " + newLength + ".");
391    }
392    try (FileOutputStream out = new FileOutputStream(pathToFile(f), true)) {
393      try {
394        out.getChannel().truncate(newLength);
395      } catch(IOException e) {
396        throw new FSError(e);
397      }
398    }
399    return true;
400  }
401  
402  /**
403   * Delete the given path to a file or directory.
404   * @param p the path to delete
405   * @param recursive to delete sub-directories
406   * @return true if the file or directory and all its contents were deleted
407   * @throws IOException if p is non-empty and recursive is false 
408   */
409  @Override
410  public boolean delete(Path p, boolean recursive) throws IOException {
411    File f = pathToFile(p);
412    if (!f.exists()) {
413      //no path, return false "nothing to delete"
414      return false;
415    }
416    if (f.isFile()) {
417      return f.delete();
418    } else if (!recursive && f.isDirectory() && 
419        (FileUtil.listFiles(f).length != 0)) {
420      throw new IOException("Directory " + f.toString() + " is not empty");
421    }
422    return FileUtil.fullyDelete(f);
423  }
424 
425  @Override
426  public FileStatus[] listStatus(Path f) throws IOException {
427    File localf = pathToFile(f);
428    FileStatus[] results;
429
430    if (!localf.exists()) {
431      throw new FileNotFoundException("File " + f + " does not exist");
432    }
433
434    if (localf.isDirectory()) {
435      String[] names = localf.list();
436      if (names == null) {
437        return null;
438      }
439      results = new FileStatus[names.length];
440      int j = 0;
441      for (int i = 0; i < names.length; i++) {
442        try {
443          // Assemble the path using the Path 3 arg constructor to make sure
444          // paths with colon are properly resolved on Linux
445          results[j] = getFileStatus(new Path(f, new Path(null, null,
446                                                          names[i])));
447          j++;
448        } catch (FileNotFoundException e) {
449          // ignore the files not found since the dir list may have have
450          // changed since the names[] list was generated.
451        }
452      }
453      if (j == names.length) {
454        return results;
455      }
456      return Arrays.copyOf(results, j);
457    }
458
459    if (!useDeprecatedFileStatus) {
460      return new FileStatus[] { getFileStatus(f) };
461    }
462    return new FileStatus[] {
463        new DeprecatedRawLocalFileStatus(localf,
464        getDefaultBlockSize(f), this) };
465  }
466  
467  protected boolean mkOneDir(File p2f) throws IOException {
468    return mkOneDirWithMode(new Path(p2f.getAbsolutePath()), p2f, null);
469  }
470
471  protected boolean mkOneDirWithMode(Path p, File p2f, FsPermission permission)
472      throws IOException {
473    if (permission == null) {
474      return p2f.mkdir();
475    } else {
476      if (Shell.WINDOWS && NativeIO.isAvailable()) {
477        try {
478          NativeIO.Windows.createDirectoryWithMode(p2f, permission.toShort());
479          return true;
480        } catch (IOException e) {
481          if (LOG.isDebugEnabled()) {
482            LOG.debug(String.format(
483                "NativeIO.createDirectoryWithMode error, path = %s, mode = %o",
484                p2f, permission.toShort()), e);
485          }
486          return false;
487        }
488      } else {
489        boolean b = p2f.mkdir();
490        if (b) {
491          setPermission(p, permission);
492        }
493        return b;
494      }
495    }
496  }
497
498  /**
499   * Creates the specified directory hierarchy. Does not
500   * treat existence as an error.
501   */
502  @Override
503  public boolean mkdirs(Path f) throws IOException {
504    return mkdirsWithOptionalPermission(f, null);
505  }
506
507  @Override
508  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
509    return mkdirsWithOptionalPermission(f, permission);
510  }
511
512  private boolean mkdirsWithOptionalPermission(Path f, FsPermission permission)
513      throws IOException {
514    if(f == null) {
515      throw new IllegalArgumentException("mkdirs path arg is null");
516    }
517    Path parent = f.getParent();
518    File p2f = pathToFile(f);
519    File parent2f = null;
520    if(parent != null) {
521      parent2f = pathToFile(parent);
522      if(parent2f != null && parent2f.exists() && !parent2f.isDirectory()) {
523        throw new ParentNotDirectoryException("Parent path is not a directory: "
524            + parent);
525      }
526    }
527    if (p2f.exists() && !p2f.isDirectory()) {
528      throw new FileNotFoundException("Destination exists" +
529              " and is not a directory: " + p2f.getCanonicalPath());
530    }
531    return (parent == null || parent2f.exists() || mkdirs(parent)) &&
532      (mkOneDirWithMode(f, p2f, permission) || p2f.isDirectory());
533  }
534  
535  
536  @Override
537  public Path getHomeDirectory() {
538    return this.makeQualified(new Path(System.getProperty("user.home")));
539  }
540
541  /**
542   * Set the working directory to the given directory.
543   */
544  @Override
545  public void setWorkingDirectory(Path newDir) {
546    workingDir = makeAbsolute(newDir);
547    checkPath(workingDir);
548  }
549  
550  @Override
551  public Path getWorkingDirectory() {
552    return workingDir;
553  }
554  
555  @Override
556  protected Path getInitialWorkingDirectory() {
557    return this.makeQualified(new Path(System.getProperty("user.dir")));
558  }
559
560  @Override
561  public FsStatus getStatus(Path p) throws IOException {
562    File partition = pathToFile(p == null ? new Path("/") : p);
563    //File provides getUsableSpace() and getFreeSpace()
564    //File provides no API to obtain used space, assume used = total - free
565    return new FsStatus(partition.getTotalSpace(), 
566      partition.getTotalSpace() - partition.getFreeSpace(),
567      partition.getFreeSpace());
568  }
569  
570  // In the case of the local filesystem, we can just rename the file.
571  @Override
572  public void moveFromLocalFile(Path src, Path dst) throws IOException {
573    rename(src, dst);
574  }
575  
576  // We can write output directly to the final location
577  @Override
578  public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
579    throws IOException {
580    return fsOutputFile;
581  }
582  
583  // It's in the right place - nothing to do.
584  @Override
585  public void completeLocalOutput(Path fsWorkingFile, Path tmpLocalFile)
586    throws IOException {
587  }
588  
589  @Override
590  public void close() throws IOException {
591    super.close();
592  }
593  
594  @Override
595  public String toString() {
596    return "LocalFS";
597  }
598  
599  @Override
600  public FileStatus getFileStatus(Path f) throws IOException {
601    return getFileLinkStatusInternal(f, true);
602  }
603
604  @Deprecated
605  private FileStatus deprecatedGetFileStatus(Path f) throws IOException {
606    File path = pathToFile(f);
607    if (path.exists()) {
608      return new DeprecatedRawLocalFileStatus(pathToFile(f),
609          getDefaultBlockSize(f), this);
610    } else {
611      throw new FileNotFoundException("File " + f + " does not exist");
612    }
613  }
614
615  @Deprecated
616  static class DeprecatedRawLocalFileStatus extends FileStatus {
617    /* We can add extra fields here. It breaks at least CopyFiles.FilePair().
618     * We recognize if the information is already loaded by check if
619     * onwer.equals("").
620     */
621    private boolean isPermissionLoaded() {
622      return !super.getOwner().isEmpty(); 
623    }
624    
625    DeprecatedRawLocalFileStatus(File f, long defaultBlockSize, FileSystem fs) {
626      super(f.length(), f.isDirectory(), 1, defaultBlockSize,
627          f.lastModified(), new Path(f.getPath()).makeQualified(fs.getUri(),
628            fs.getWorkingDirectory()));
629    }
630    
631    @Override
632    public FsPermission getPermission() {
633      if (!isPermissionLoaded()) {
634        loadPermissionInfo();
635      }
636      return super.getPermission();
637    }
638
639    @Override
640    public String getOwner() {
641      if (!isPermissionLoaded()) {
642        loadPermissionInfo();
643      }
644      return super.getOwner();
645    }
646
647    @Override
648    public String getGroup() {
649      if (!isPermissionLoaded()) {
650        loadPermissionInfo();
651      }
652      return super.getGroup();
653    }
654
655    /// loads permissions, owner, and group from `ls -ld`
656    private void loadPermissionInfo() {
657      IOException e = null;
658      try {
659        String output = FileUtil.execCommand(new File(getPath().toUri()), 
660            Shell.getGetPermissionCommand());
661        StringTokenizer t =
662            new StringTokenizer(output, Shell.TOKEN_SEPARATOR_REGEX);
663        //expected format
664        //-rw-------    1 username groupname ...
665        String permission = t.nextToken();
666        if (permission.length() > FsPermission.MAX_PERMISSION_LENGTH) {
667          //files with ACLs might have a '+'
668          permission = permission.substring(0,
669            FsPermission.MAX_PERMISSION_LENGTH);
670        }
671        setPermission(FsPermission.valueOf(permission));
672        t.nextToken();
673
674        String owner = t.nextToken();
675        // If on windows domain, token format is DOMAIN\\user and we want to
676        // extract only the user name
677        if (Shell.WINDOWS) {
678          int i = owner.indexOf('\\');
679          if (i != -1)
680            owner = owner.substring(i + 1);
681        }
682        setOwner(owner);
683
684        setGroup(t.nextToken());
685      } catch (Shell.ExitCodeException ioe) {
686        if (ioe.getExitCode() != 1) {
687          e = ioe;
688        } else {
689          setPermission(null);
690          setOwner(null);
691          setGroup(null);
692        }
693      } catch (IOException ioe) {
694        e = ioe;
695      } finally {
696        if (e != null) {
697          throw new RuntimeException("Error while running command to get " +
698                                     "file permissions : " + 
699                                     StringUtils.stringifyException(e));
700        }
701      }
702    }
703
704    @Override
705    public void write(DataOutput out) throws IOException {
706      if (!isPermissionLoaded()) {
707        loadPermissionInfo();
708      }
709      super.write(out);
710    }
711  }
712
713  /**
714   * Use the command chown to set owner.
715   */
716  @Override
717  public void setOwner(Path p, String username, String groupname)
718    throws IOException {
719    FileUtil.setOwner(pathToFile(p), username, groupname);
720  }
721
722  /**
723   * Use the command chmod to set permission.
724   */
725  @Override
726  public void setPermission(Path p, FsPermission permission)
727    throws IOException {
728    if (NativeIO.isAvailable()) {
729      NativeIO.POSIX.chmod(pathToFile(p).getCanonicalPath(),
730                     permission.toShort());
731    } else {
732      String perm = String.format("%04o", permission.toShort());
733      Shell.execCommand(Shell.getSetPermissionCommand(perm, false,
734        FileUtil.makeShellPath(pathToFile(p), true)));
735    }
736  }
737 
738  /**
739   * Sets the {@link Path}'s last modified time <em>only</em> to the given
740   * valid time.
741   *
742   * @param mtime the modification time to set (only if greater than zero).
743   * @param atime currently ignored.
744   * @throws IOException if setting the last modified time fails.
745   */
746  @Override
747  public void setTimes(Path p, long mtime, long atime) throws IOException {
748    File f = pathToFile(p);
749    if(mtime >= 0) {
750      if(!f.setLastModified(mtime)) {
751        throw new IOException(
752          "couldn't set last-modified time to " +
753          mtime +
754          " for " +
755          f.getAbsolutePath());
756      }
757    }
758  }
759
760  @Override
761  public boolean supportsSymlinks() {
762    return true;
763  }
764
765  @SuppressWarnings("deprecation")
766  @Override
767  public void createSymlink(Path target, Path link, boolean createParent)
768      throws IOException {
769    if (!FileSystem.areSymlinksEnabled()) {
770      throw new UnsupportedOperationException("Symlinks not supported");
771    }
772    final String targetScheme = target.toUri().getScheme();
773    if (targetScheme != null && !"file".equals(targetScheme)) {
774      throw new IOException("Unable to create symlink to non-local file "+
775                            "system: "+target.toString());
776    }
777    if (createParent) {
778      mkdirs(link.getParent());
779    }
780
781    // NB: Use createSymbolicLink in java.nio.file.Path once available
782    int result = FileUtil.symLink(target.toString(),
783        makeAbsolute(link).toString());
784    if (result != 0) {
785      throw new IOException("Error " + result + " creating symlink " +
786          link + " to " + target);
787    }
788  }
789
790  /**
791   * Return a FileStatus representing the given path. If the path refers
792   * to a symlink return a FileStatus representing the link rather than
793   * the object the link refers to.
794   */
795  @Override
796  public FileStatus getFileLinkStatus(final Path f) throws IOException {
797    FileStatus fi = getFileLinkStatusInternal(f, false);
798    // getFileLinkStatus is supposed to return a symlink with a
799    // qualified path
800    if (fi.isSymlink()) {
801      Path targetQual = FSLinkResolver.qualifySymlinkTarget(this.getUri(),
802          fi.getPath(), fi.getSymlink());
803      fi.setSymlink(targetQual);
804    }
805    return fi;
806  }
807
808  /**
809   * Public {@link FileStatus} methods delegate to this function, which in turn
810   * either call the new {@link Stat} based implementation or the deprecated
811   * methods based on platform support.
812   * 
813   * @param f Path to stat
814   * @param dereference whether to dereference the final path component if a
815   *          symlink
816   * @return FileStatus of f
817   * @throws IOException
818   */
819  private FileStatus getFileLinkStatusInternal(final Path f,
820      boolean dereference) throws IOException {
821    if (!useDeprecatedFileStatus) {
822      return getNativeFileLinkStatus(f, dereference);
823    } else if (dereference) {
824      return deprecatedGetFileStatus(f);
825    } else {
826      return deprecatedGetFileLinkStatusInternal(f);
827    }
828  }
829
830  /**
831   * Deprecated. Remains for legacy support. Should be removed when {@link Stat}
832   * gains support for Windows and other operating systems.
833   */
834  @Deprecated
835  private FileStatus deprecatedGetFileLinkStatusInternal(final Path f)
836      throws IOException {
837    String target = FileUtil.readLink(new File(f.toString()));
838
839    try {
840      FileStatus fs = getFileStatus(f);
841      // If f refers to a regular file or directory
842      if (target.isEmpty()) {
843        return fs;
844      }
845      // Otherwise f refers to a symlink
846      return new FileStatus(fs.getLen(),
847          false,
848          fs.getReplication(),
849          fs.getBlockSize(),
850          fs.getModificationTime(),
851          fs.getAccessTime(),
852          fs.getPermission(),
853          fs.getOwner(),
854          fs.getGroup(),
855          new Path(target),
856          f);
857    } catch (FileNotFoundException e) {
858      /* The exists method in the File class returns false for dangling
859       * links so we can get a FileNotFoundException for links that exist.
860       * It's also possible that we raced with a delete of the link. Use
861       * the readBasicFileAttributes method in java.nio.file.attributes
862       * when available.
863       */
864      if (!target.isEmpty()) {
865        return new FileStatus(0, false, 0, 0, 0, 0, FsPermission.getDefault(),
866            "", "", new Path(target), f);
867      }
868      // f refers to a file or directory that does not exist
869      throw e;
870    }
871  }
872  /**
873   * Calls out to platform's native stat(1) implementation to get file metadata
874   * (permissions, user, group, atime, mtime, etc). This works around the lack
875   * of lstat(2) in Java 6.
876   * 
877   *  Currently, the {@link Stat} class used to do this only supports Linux
878   *  and FreeBSD, so the old {@link #deprecatedGetFileLinkStatusInternal(Path)}
879   *  implementation (deprecated) remains further OS support is added.
880   *
881   * @param f File to stat
882   * @param dereference whether to dereference symlinks
883   * @return FileStatus of f
884   * @throws IOException
885   */
886  private FileStatus getNativeFileLinkStatus(final Path f,
887      boolean dereference) throws IOException {
888    checkPath(f);
889    Stat stat = new Stat(f, getDefaultBlockSize(f), dereference, this);
890    FileStatus status = stat.getFileStatus();
891    return status;
892  }
893
894  @Override
895  public Path getLinkTarget(Path f) throws IOException {
896    FileStatus fi = getFileLinkStatusInternal(f, false);
897    // return an unqualified symlink target
898    return fi.getSymlink();
899  }
900}