001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.fs;
020
021import java.io.BufferedOutputStream;
022import java.io.DataOutput;
023import java.io.File;
024import java.io.FileInputStream;
025import java.io.FileNotFoundException;
026import java.io.FileOutputStream;
027import java.io.IOException;
028import java.io.OutputStream;
029import java.net.URI;
030import java.nio.ByteBuffer;
031import java.util.Arrays;
032import java.util.StringTokenizer;
033
034import org.apache.hadoop.classification.InterfaceAudience;
035import org.apache.hadoop.classification.InterfaceStability;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.fs.permission.FsPermission;
038import org.apache.hadoop.io.nativeio.NativeIO;
039import org.apache.hadoop.util.Progressable;
040import org.apache.hadoop.util.Shell;
041import org.apache.hadoop.util.StringUtils;
042
043/****************************************************************
044 * Implement the FileSystem API for the raw local filesystem.
045 *
046 *****************************************************************/
047@InterfaceAudience.Public
048@InterfaceStability.Stable
049public class RawLocalFileSystem extends FileSystem {
050  static final URI NAME = URI.create("file:///");
051  private Path workingDir;
052  
053  public RawLocalFileSystem() {
054    workingDir = getInitialWorkingDirectory();
055  }
056  
057  private Path makeAbsolute(Path f) {
058    if (f.isAbsolute()) {
059      return f;
060    } else {
061      return new Path(workingDir, f);
062    }
063  }
064  
065  /** Convert a path to a File. */
066  public File pathToFile(Path path) {
067    checkPath(path);
068    if (!path.isAbsolute()) {
069      path = new Path(getWorkingDirectory(), path);
070    }
071    return new File(path.toUri().getPath());
072  }
073
074  public URI getUri() { return NAME; }
075  
076  public void initialize(URI uri, Configuration conf) throws IOException {
077    super.initialize(uri, conf);
078    setConf(conf);
079  }
080  
081  class TrackingFileInputStream extends FileInputStream {
082    public TrackingFileInputStream(File f) throws IOException {
083      super(f);
084    }
085    
086    public int read() throws IOException {
087      int result = super.read();
088      if (result != -1) {
089        statistics.incrementBytesRead(1);
090      }
091      return result;
092    }
093    
094    public int read(byte[] data) throws IOException {
095      int result = super.read(data);
096      if (result != -1) {
097        statistics.incrementBytesRead(result);
098      }
099      return result;
100    }
101    
102    public int read(byte[] data, int offset, int length) throws IOException {
103      int result = super.read(data, offset, length);
104      if (result != -1) {
105        statistics.incrementBytesRead(result);
106      }
107      return result;
108    }
109  }
110
111  /*******************************************************
112   * For open()'s FSInputStream.
113   *******************************************************/
114  class LocalFSFileInputStream extends FSInputStream {
115    private FileInputStream fis;
116    private long position;
117
118    public LocalFSFileInputStream(Path f) throws IOException {
119      this.fis = new TrackingFileInputStream(pathToFile(f));
120    }
121    
122    public void seek(long pos) throws IOException {
123      fis.getChannel().position(pos);
124      this.position = pos;
125    }
126    
127    public long getPos() throws IOException {
128      return this.position;
129    }
130    
131    public boolean seekToNewSource(long targetPos) throws IOException {
132      return false;
133    }
134    
135    /*
136     * Just forward to the fis
137     */
138    public int available() throws IOException { return fis.available(); }
139    public void close() throws IOException { fis.close(); }
140    @Override
141    public boolean markSupported() { return false; }
142    
143    public int read() throws IOException {
144      try {
145        int value = fis.read();
146        if (value >= 0) {
147          this.position++;
148        }
149        return value;
150      } catch (IOException e) {                 // unexpected exception
151        throw new FSError(e);                   // assume native fs error
152      }
153    }
154    
155    public int read(byte[] b, int off, int len) throws IOException {
156      try {
157        int value = fis.read(b, off, len);
158        if (value > 0) {
159          this.position += value;
160        }
161        return value;
162      } catch (IOException e) {                 // unexpected exception
163        throw new FSError(e);                   // assume native fs error
164      }
165    }
166    
167    public int read(long position, byte[] b, int off, int len)
168      throws IOException {
169      ByteBuffer bb = ByteBuffer.wrap(b, off, len);
170      try {
171        return fis.getChannel().read(bb, position);
172      } catch (IOException e) {
173        throw new FSError(e);
174      }
175    }
176    
177    public long skip(long n) throws IOException {
178      long value = fis.skip(n);
179      if (value > 0) {
180        this.position += value;
181      }
182      return value;
183    }
184  }
185  
186  public FSDataInputStream open(Path f, int bufferSize) throws IOException {
187    if (!exists(f)) {
188      throw new FileNotFoundException(f.toString());
189    }
190    return new FSDataInputStream(new BufferedFSInputStream(
191        new LocalFSFileInputStream(f), bufferSize));
192  }
193  
194  /*********************************************************
195   * For create()'s FSOutputStream.
196   *********************************************************/
197  class LocalFSFileOutputStream extends OutputStream {
198    private FileOutputStream fos;
199    
200    private LocalFSFileOutputStream(Path f, boolean append) throws IOException {
201      this.fos = new FileOutputStream(pathToFile(f), append);
202    }
203    
204    /*
205     * Just forward to the fos
206     */
207    public void close() throws IOException { fos.close(); }
208    public void flush() throws IOException { fos.flush(); }
209    public void write(byte[] b, int off, int len) throws IOException {
210      try {
211        fos.write(b, off, len);
212      } catch (IOException e) {                // unexpected exception
213        throw new FSError(e);                  // assume native fs error
214      }
215    }
216    
217    public void write(int b) throws IOException {
218      try {
219        fos.write(b);
220      } catch (IOException e) {              // unexpected exception
221        throw new FSError(e);                // assume native fs error
222      }
223    }
224  }
225
226  /** {@inheritDoc} */
227  public FSDataOutputStream append(Path f, int bufferSize,
228      Progressable progress) throws IOException {
229    if (!exists(f)) {
230      throw new FileNotFoundException("File " + f + " not found");
231    }
232    if (getFileStatus(f).isDirectory()) {
233      throw new IOException("Cannot append to a diretory (=" + f + " )");
234    }
235    return new FSDataOutputStream(new BufferedOutputStream(
236        new LocalFSFileOutputStream(f, true), bufferSize), statistics);
237  }
238
239  /** {@inheritDoc} */
240  @Override
241  public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize,
242    short replication, long blockSize, Progressable progress)
243    throws IOException {
244    return create(f, overwrite, true, bufferSize, replication, blockSize, progress);
245  }
246
247  private FSDataOutputStream create(Path f, boolean overwrite,
248      boolean createParent, int bufferSize, short replication, long blockSize,
249      Progressable progress) throws IOException {
250    if (exists(f) && !overwrite) {
251      throw new IOException("File already exists: "+f);
252    }
253    Path parent = f.getParent();
254    if (parent != null && !mkdirs(parent)) {
255      throw new IOException("Mkdirs failed to create " + parent.toString());
256    }
257    return new FSDataOutputStream(new BufferedOutputStream(
258        new LocalFSFileOutputStream(f, false), bufferSize), statistics);
259  }
260
261  /** {@inheritDoc} */
262  @Override
263  public FSDataOutputStream create(Path f, FsPermission permission,
264    boolean overwrite, int bufferSize, short replication, long blockSize,
265    Progressable progress) throws IOException {
266
267    FSDataOutputStream out = create(f,
268        overwrite, bufferSize, replication, blockSize, progress);
269    setPermission(f, permission);
270    return out;
271  }
272
273  /** {@inheritDoc} */
274  @Override
275  public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
276      boolean overwrite,
277      int bufferSize, short replication, long blockSize,
278      Progressable progress) throws IOException {
279    FSDataOutputStream out = create(f,
280        overwrite, false, bufferSize, replication, blockSize, progress);
281    setPermission(f, permission);
282    return out;
283  }
284
285  public boolean rename(Path src, Path dst) throws IOException {
286    if (pathToFile(src).renameTo(pathToFile(dst))) {
287      return true;
288    }
289    return FileUtil.copy(this, src, this, dst, true, getConf());
290  }
291  
292  /**
293   * Delete the given path to a file or directory.
294   * @param p the path to delete
295   * @param recursive to delete sub-directories
296   * @return true if the file or directory and all its contents were deleted
297   * @throws IOException if p is non-empty and recursive is false 
298   */
299  public boolean delete(Path p, boolean recursive) throws IOException {
300    File f = pathToFile(p);
301    if (f.isFile()) {
302      return f.delete();
303    } else if (!recursive && f.isDirectory() && 
304        (FileUtil.listFiles(f).length != 0)) {
305      throw new IOException("Directory " + f.toString() + " is not empty");
306    }
307    return FileUtil.fullyDelete(f);
308  }
309 
310  public FileStatus[] listStatus(Path f) throws IOException {
311    File localf = pathToFile(f);
312    FileStatus[] results;
313
314    if (!localf.exists()) {
315      throw new FileNotFoundException("File " + f + " does not exist");
316    }
317    if (localf.isFile()) {
318      return new FileStatus[] {
319        new RawLocalFileStatus(localf, getDefaultBlockSize(), this) };
320    }
321
322    File[] names = localf.listFiles();
323    if (names == null) {
324      return null;
325    }
326    results = new FileStatus[names.length];
327    int j = 0;
328    for (int i = 0; i < names.length; i++) {
329      try {
330        results[j] = getFileStatus(new Path(names[i].getAbsolutePath()));
331        j++;
332      } catch (FileNotFoundException e) {
333        // ignore the files not found since the dir list may have have changed
334        // since the names[] list was generated.
335      }
336    }
337    if (j == names.length) {
338      return results;
339    }
340    return Arrays.copyOf(results, j);
341  }
342
343  /**
344   * Creates the specified directory hierarchy. Does not
345   * treat existence as an error.
346   */
347  public boolean mkdirs(Path f) throws IOException {
348    if(f == null) {
349      throw new IllegalArgumentException("mkdirs path arg is null");
350    }
351    Path parent = f.getParent();
352    File p2f = pathToFile(f);
353    if(parent != null) {
354      File parent2f = pathToFile(parent);
355      if(parent2f != null && parent2f.exists() && !parent2f.isDirectory()) {
356        throw new FileAlreadyExistsException("Parent path is not a directory: " 
357            + parent);
358      }
359    }
360    return (parent == null || mkdirs(parent)) &&
361      (p2f.mkdir() || p2f.isDirectory());
362  }
363
364  /** {@inheritDoc} */
365  @Override
366  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
367    boolean b = mkdirs(f);
368    if(b) {
369      setPermission(f, permission);
370    }
371    return b;
372  }
373  
374
375  @Override
376  protected boolean primitiveMkdir(Path f, FsPermission absolutePermission)
377    throws IOException {
378    boolean b = mkdirs(f);
379    setPermission(f, absolutePermission);
380    return b;
381  }
382  
383  
384  @Override
385  public Path getHomeDirectory() {
386    return this.makeQualified(new Path(System.getProperty("user.home")));
387  }
388
389  /**
390   * Set the working directory to the given directory.
391   */
392  @Override
393  public void setWorkingDirectory(Path newDir) {
394    workingDir = makeAbsolute(newDir);
395    checkPath(workingDir);
396    
397  }
398  
399  @Override
400  public Path getWorkingDirectory() {
401    return workingDir;
402  }
403  
404  @Override
405  protected Path getInitialWorkingDirectory() {
406    return this.makeQualified(new Path(System.getProperty("user.dir")));
407  }
408
409  /** {@inheritDoc} */
410  @Override
411  public FsStatus getStatus(Path p) throws IOException {
412    File partition = pathToFile(p == null ? new Path("/") : p);
413    //File provides getUsableSpace() and getFreeSpace()
414    //File provides no API to obtain used space, assume used = total - free
415    return new FsStatus(partition.getTotalSpace(), 
416      partition.getTotalSpace() - partition.getFreeSpace(),
417      partition.getFreeSpace());
418  }
419  
420  // In the case of the local filesystem, we can just rename the file.
421  public void moveFromLocalFile(Path src, Path dst) throws IOException {
422    rename(src, dst);
423  }
424  
425  // We can write output directly to the final location
426  public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
427    throws IOException {
428    return fsOutputFile;
429  }
430  
431  // It's in the right place - nothing to do.
432  public void completeLocalOutput(Path fsWorkingFile, Path tmpLocalFile)
433    throws IOException {
434  }
435  
436  public void close() throws IOException {
437    super.close();
438  }
439  
440  public String toString() {
441    return "LocalFS";
442  }
443  
444  public FileStatus getFileStatus(Path f) throws IOException {
445    File path = pathToFile(f);
446    if (path.exists()) {
447      return new RawLocalFileStatus(pathToFile(f), getDefaultBlockSize(), this);
448    } else {
449      throw new FileNotFoundException("File " + f + " does not exist");
450    }
451  }
452
453  static class RawLocalFileStatus extends FileStatus {
454    /* We can add extra fields here. It breaks at least CopyFiles.FilePair().
455     * We recognize if the information is already loaded by check if
456     * onwer.equals("").
457     */
458    private boolean isPermissionLoaded() {
459      return !super.getOwner().equals(""); 
460    }
461    
462    RawLocalFileStatus(File f, long defaultBlockSize, FileSystem fs) {
463      super(f.length(), f.isDirectory(), 1, defaultBlockSize,
464            f.lastModified(), fs.makeQualified(new Path(f.getPath())));
465    }
466    
467    @Override
468    public FsPermission getPermission() {
469      if (!isPermissionLoaded()) {
470        loadPermissionInfo();
471      }
472      return super.getPermission();
473    }
474
475    @Override
476    public String getOwner() {
477      if (!isPermissionLoaded()) {
478        loadPermissionInfo();
479      }
480      return super.getOwner();
481    }
482
483    @Override
484    public String getGroup() {
485      if (!isPermissionLoaded()) {
486        loadPermissionInfo();
487      }
488      return super.getGroup();
489    }
490
491    /// loads permissions, owner, and group from `ls -ld`
492    private void loadPermissionInfo() {
493      IOException e = null;
494      try {
495        StringTokenizer t = new StringTokenizer(
496            execCommand(new File(getPath().toUri()), 
497                        Shell.getGET_PERMISSION_COMMAND()));
498        //expected format
499        //-rw-------    1 username groupname ...
500        String permission = t.nextToken();
501        if (permission.length() > 10) { //files with ACLs might have a '+'
502          permission = permission.substring(0, 10);
503        }
504        setPermission(FsPermission.valueOf(permission));
505        t.nextToken();
506        setOwner(t.nextToken());
507        setGroup(t.nextToken());
508      } catch (Shell.ExitCodeException ioe) {
509        if (ioe.getExitCode() != 1) {
510          e = ioe;
511        } else {
512          setPermission(null);
513          setOwner(null);
514          setGroup(null);
515        }
516      } catch (IOException ioe) {
517        e = ioe;
518      } finally {
519        if (e != null) {
520          throw new RuntimeException("Error while running command to get " +
521                                     "file permissions : " + 
522                                     StringUtils.stringifyException(e));
523        }
524      }
525    }
526
527    @Override
528    public void write(DataOutput out) throws IOException {
529      if (!isPermissionLoaded()) {
530        loadPermissionInfo();
531      }
532      super.write(out);
533    }
534  }
535
536  /**
537   * Use the command chown to set owner.
538   */
539  @Override
540  public void setOwner(Path p, String username, String groupname)
541    throws IOException {
542    if (username == null && groupname == null) {
543      throw new IOException("username == null && groupname == null");
544    }
545
546    if (username == null) {
547      execCommand(pathToFile(p), Shell.SET_GROUP_COMMAND, groupname); 
548    } else {
549      //OWNER[:[GROUP]]
550      String s = username + (groupname == null? "": ":" + groupname);
551      execCommand(pathToFile(p), Shell.SET_OWNER_COMMAND, s);
552    }
553  }
554
555  /**
556   * Use the command chmod to set permission.
557   */
558  @Override
559  public void setPermission(Path p, FsPermission permission)
560    throws IOException {
561    if (NativeIO.isAvailable()) {
562      NativeIO.chmod(pathToFile(p).getCanonicalPath(),
563                     permission.toShort());
564    } else {
565      execCommand(pathToFile(p), Shell.SET_PERMISSION_COMMAND,
566          String.format("%05o", permission.toShort()));
567    }
568  }
569
570  private static String execCommand(File f, String... cmd) throws IOException {
571    String[] args = new String[cmd.length + 1];
572    System.arraycopy(cmd, 0, args, 0, cmd.length);
573    args[cmd.length] = FileUtil.makeShellPath(f, true);
574    String output = Shell.execCommand(args);
575    return output;
576  }
577
578}