001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.fs;
019
020import org.apache.commons.logging.Log;
021import org.apache.commons.logging.LogFactory;
022import org.apache.hadoop.conf.Configuration;
023import org.apache.hadoop.fs.permission.FsPermission;
024import org.apache.hadoop.io.IOUtils;
025import org.apache.hadoop.io.Text;
026import org.apache.hadoop.util.LineReader;
027import org.apache.hadoop.util.Progressable;
028
029import java.io.EOFException;
030import java.io.FileNotFoundException;
031import java.io.IOException;
032import java.io.UnsupportedEncodingException;
033import java.net.URI;
034import java.net.URISyntaxException;
035import java.net.URLDecoder;
036import java.util.*;
037
038/**
039 * This is an implementation of the Hadoop Archive 
040 * Filesystem. This archive Filesystem has index files
041 * of the form _index* and has contents of the form
042 * part-*. The index files store the indexes of the 
043 * real files. The index files are of the form _masterindex
044 * and _index. The master index is a level of indirection 
045 * in to the index file to make the look ups faster. the index
046 * file is sorted with hash code of the paths that it contains 
047 * and the master index contains pointers to the positions in 
048 * index for ranges of hashcodes.
049 */
050
051public class HarFileSystem extends FileSystem {
052
053  private static final Log LOG = LogFactory.getLog(HarFileSystem.class);
054
055  public static final String METADATA_CACHE_ENTRIES_KEY = "fs.har.metadatacache.entries";
056  public static final int METADATA_CACHE_ENTRIES_DEFAULT = 10;
057
058  public static final int VERSION = 3;
059
060  private static Map<URI, HarMetaData> harMetaCache;
061
062  // uri representation of this Har filesystem
063  private URI uri;
064  // the top level path of the archive
065  // in the underlying file system
066  private Path archivePath;
067  // the har auth
068  private String harAuth;
069
070  // pointer into the static metadata cache
071  private HarMetaData metadata;
072
073  private FileSystem fs;
074
075  /**
076   * public construction of harfilesystem
077   */
078  public HarFileSystem() {
079    // Must call #initialize() method to set the underlying file system
080  }
081
082  /**
083   * Return the protocol scheme for the FileSystem.
084   * <p/>
085   *
086   * @return <code>har</code>
087   */
088  @Override
089  public String getScheme() {
090    return "har";
091  }
092
093  /**
094   * Constructor to create a HarFileSystem with an
095   * underlying filesystem.
096   * @param fs underlying file system
097   */
098  public HarFileSystem(FileSystem fs) {
099    this.fs = fs;
100    this.statistics = fs.statistics;
101  }
102 
103  private synchronized void initializeMetadataCache(Configuration conf) {
104    if (harMetaCache == null) {
105      int cacheSize = conf.getInt(METADATA_CACHE_ENTRIES_KEY, METADATA_CACHE_ENTRIES_DEFAULT);
106      harMetaCache = Collections.synchronizedMap(new LruCache<URI, HarMetaData>(cacheSize));
107    }
108  }
109 
110  /**
111   * Initialize a Har filesystem per har archive. The 
112   * archive home directory is the top level directory
113   * in the filesystem that contains the HAR archive.
114   * Be careful with this method, you do not want to go 
115   * on creating new Filesystem instances per call to 
116   * path.getFileSystem().
117   * the uri of Har is 
118   * har://underlyingfsscheme-host:port/archivepath.
119   * or 
120   * har:///archivepath. This assumes the underlying filesystem
121   * to be used in case not specified.
122   */
123  @Override
124  public void initialize(URI name, Configuration conf) throws IOException {
125    // initialize the metadata cache, if needed
126    initializeMetadataCache(conf);
127
128    // decode the name
129    URI underLyingURI = decodeHarURI(name, conf);
130    // we got the right har Path- now check if this is 
131    // truly a har filesystem
132    Path harPath = archivePath(
133      new Path(name.getScheme(), name.getAuthority(), name.getPath()));
134    if (harPath == null) { 
135      throw new IOException("Invalid path for the Har Filesystem. " + 
136                           name.toString());
137    }
138    if (fs == null) {
139      fs = FileSystem.get(underLyingURI, conf);
140    }
141    uri = harPath.toUri();
142    archivePath = new Path(uri.getPath());
143    harAuth = getHarAuth(underLyingURI);
144    //check for the underlying fs containing
145    // the index file
146    Path masterIndexPath = new Path(archivePath, "_masterindex");
147    Path archiveIndexPath = new Path(archivePath, "_index");
148    if (!fs.exists(masterIndexPath) || !fs.exists(archiveIndexPath)) {
149      throw new IOException("Invalid path for the Har Filesystem. " +
150          "No index file in " + harPath);
151    }
152
153    metadata = harMetaCache.get(uri);
154    if (metadata != null) {
155      FileStatus mStat = fs.getFileStatus(masterIndexPath);
156      FileStatus aStat = fs.getFileStatus(archiveIndexPath);
157      if (mStat.getModificationTime() != metadata.getMasterIndexTimestamp() ||
158          aStat.getModificationTime() != metadata.getArchiveIndexTimestamp()) {
159        // the archive has been overwritten since we last read it
160        // remove the entry from the meta data cache
161        metadata = null;
162        harMetaCache.remove(uri);
163      }
164    }
165    if (metadata == null) {
166      metadata = new HarMetaData(fs, masterIndexPath, archiveIndexPath);
167      metadata.parseMetaData();
168      harMetaCache.put(uri, metadata);
169    }
170  }
171
172  @Override
173  public Configuration getConf() {
174    return fs.getConf();
175  }
176
177  // get the version of the filesystem from the masterindex file
178  // the version is currently not useful since its the first version
179  // of archives
180  public int getHarVersion() throws IOException {
181    if (metadata != null) {
182      return metadata.getVersion();
183    }
184    else {
185      throw new IOException("Invalid meta data for the Har Filesystem");
186    }
187  }
188
189  /*
190   * find the parent path that is the 
191   * archive path in the path. The last
192   * path segment that ends with .har is 
193   * the path that will be returned.
194   */
195  private Path archivePath(Path p) {
196    Path retPath = null;
197    Path tmp = p;
198    for (int i=0; i< p.depth(); i++) {
199      if (tmp.toString().endsWith(".har")) {
200        retPath = tmp;
201        break;
202      }
203      tmp = tmp.getParent();
204    }
205    return retPath;
206  }
207
208  /**
209   * decode the raw URI to get the underlying URI
210   * @param rawURI raw Har URI
211   * @return filtered URI of the underlying fileSystem
212   */
213  private URI decodeHarURI(URI rawURI, Configuration conf) throws IOException {
214    String tmpAuth = rawURI.getAuthority();
215    //we are using the default file
216    //system in the config 
217    //so create a underlying uri and 
218    //return it
219    if (tmpAuth == null) {
220      //create a path 
221      return FileSystem.getDefaultUri(conf);
222    }
223    String authority = rawURI.getAuthority();
224
225    int i = authority.indexOf('-');
226    if (i < 0) {
227      throw new IOException("URI: " + rawURI
228          + " is an invalid Har URI since '-' not found."
229          + "  Expecting har://<scheme>-<host>/<path>.");
230    }
231 
232    if (rawURI.getQuery() != null) {
233      // query component not allowed
234      throw new IOException("query component in Path not supported  " + rawURI);
235    }
236 
237    URI tmp;
238    try {
239      // convert <scheme>-<host> to <scheme>://<host>
240      URI baseUri = new URI(authority.replaceFirst("-", "://"));
241 
242      tmp = new URI(baseUri.getScheme(), baseUri.getAuthority(),
243            rawURI.getPath(), rawURI.getQuery(), rawURI.getFragment());
244    } catch (URISyntaxException e) {
245      throw new IOException("URI: " + rawURI
246          + " is an invalid Har URI. Expecting har://<scheme>-<host>/<path>.");
247    }
248    return tmp;
249  }
250
251  private static String decodeString(String str)
252    throws UnsupportedEncodingException {
253    return URLDecoder.decode(str, "UTF-8");
254  }
255
256  private String decodeFileName(String fname)
257    throws UnsupportedEncodingException {
258    int version = metadata.getVersion();
259    if (version == 2 || version == 3){
260      return decodeString(fname);
261    }
262    return fname;
263  }
264
265  /**
266   * return the top level archive.
267   */
268  @Override
269  public Path getWorkingDirectory() {
270    return new Path(uri.toString());
271  }
272
273  @Override
274  public Path getInitialWorkingDirectory() {
275    return getWorkingDirectory();
276  }
277
278  @Override
279  public FsStatus getStatus(Path p) throws IOException {
280    return fs.getStatus(p);
281  }
282
283  /**
284   * Create a har specific auth 
285   * har-underlyingfs:port
286   * @param underLyingUri the uri of underlying
287   * filesystem
288   * @return har specific auth
289   */
290  private String getHarAuth(URI underLyingUri) {
291    String auth = underLyingUri.getScheme() + "-";
292    if (underLyingUri.getHost() != null) {
293      if (underLyingUri.getUserInfo() != null) {
294        auth += underLyingUri.getUserInfo();
295        auth += "@";
296      }
297      auth += underLyingUri.getHost();
298      if (underLyingUri.getPort() != -1) {
299        auth += ":";
300        auth +=  underLyingUri.getPort();
301      }
302    }
303    else {
304      auth += ":";
305    }
306    return auth;
307  }
308
309  /**
310   * Used for delegation token related functionality. Must delegate to
311   * underlying file system.
312   */
313  @Override
314  protected URI getCanonicalUri() {
315    return fs.getCanonicalUri();
316  }
317
318  @Override
319  protected URI canonicalizeUri(URI uri) {
320    return fs.canonicalizeUri(uri);
321  }
322
323  /**
324   * Returns the uri of this filesystem.
325   * The uri is of the form 
326   * har://underlyingfsschema-host:port/pathintheunderlyingfs
327   */
328  @Override
329  public URI getUri() {
330    return this.uri;
331  }
332  
333  @Override
334  protected void checkPath(Path path) {
335    fs.checkPath(path);
336  }
337
338  @Override
339  public Path resolvePath(Path p) throws IOException {
340    return fs.resolvePath(p);
341  }
342
343  /**
344   * this method returns the path 
345   * inside the har filesystem.
346   * this is relative path inside 
347   * the har filesystem.
348   * @param path the fully qualified path in the har filesystem.
349   * @return relative path in the filesystem.
350   */
351  private Path getPathInHar(Path path) {
352    Path harPath = new Path(path.toUri().getPath());
353    if (archivePath.compareTo(harPath) == 0)
354      return new Path(Path.SEPARATOR);
355    Path tmp = new Path(harPath.getName());
356    Path parent = harPath.getParent();
357    while (!(parent.compareTo(archivePath) == 0)) {
358      if (parent.toString().equals(Path.SEPARATOR)) {
359        tmp = null;
360        break;
361      }
362      tmp = new Path(parent.getName(), tmp);
363      parent = parent.getParent();
364    }
365    if (tmp != null) 
366      tmp = new Path(Path.SEPARATOR, tmp);
367    return tmp;
368  }
369  
370  //the relative path of p. basically 
371  // getting rid of /. Parsing and doing 
372  // string manipulation is not good - so
373  // just use the path api to do it.
374  private Path makeRelative(String initial, Path p) {
375    String scheme = this.uri.getScheme();
376    String authority = this.uri.getAuthority();
377    Path root = new Path(Path.SEPARATOR);
378    if (root.compareTo(p) == 0)
379      return new Path(scheme, authority, initial);
380    Path retPath = new Path(p.getName());
381    Path parent = p.getParent();
382    for (int i=0; i < p.depth()-1; i++) {
383      retPath = new Path(parent.getName(), retPath);
384      parent = parent.getParent();
385    }
386    return new Path(new Path(scheme, authority, initial),
387      retPath.toString());
388  }
389  
390  /* this makes a path qualified in the har filesystem
391   * (non-Javadoc)
392   * @see org.apache.hadoop.fs.FilterFileSystem#makeQualified(
393   * org.apache.hadoop.fs.Path)
394   */
395  @Override
396  public Path makeQualified(Path path) {
397    // make sure that we just get the 
398    // path component 
399    Path fsPath = path;
400    if (!path.isAbsolute()) {
401      fsPath = new Path(archivePath, path);
402    }
403
404    URI tmpURI = fsPath.toUri();
405    //change this to Har uri 
406    return new Path(uri.getScheme(), harAuth, tmpURI.getPath());
407  }
408
409  /**
410   * Fix offset and length of block locations.
411   * Note that this method modifies the original array.
412   * @param locations block locations of har part file
413   * @param start the start of the desired range in the contained file
414   * @param len the length of the desired range
415   * @param fileOffsetInHar the offset of the desired file in the har part file
416   * @return block locations with fixed offset and length
417   */  
418  static BlockLocation[] fixBlockLocations(BlockLocation[] locations,
419                                          long start,
420                                          long len,
421                                          long fileOffsetInHar) {
422    // offset 1 past last byte of desired range
423    long end = start + len;
424
425    for (BlockLocation location : locations) {
426      // offset of part block relative to beginning of desired file
427      // (may be negative if file starts in this part block)
428      long harBlockStart = location.getOffset() - fileOffsetInHar;
429      // offset 1 past last byte of har block relative to beginning of
430      // desired file
431      long harBlockEnd = harBlockStart + location.getLength();
432      
433      if (start > harBlockStart) {
434        // desired range starts after beginning of this har block
435        // fix offset to beginning of relevant range (relative to desired file)
436        location.setOffset(start);
437        // fix length to relevant portion of har block
438        location.setLength(location.getLength() - (start - harBlockStart));
439      } else {
440        // desired range includes beginning of this har block
441        location.setOffset(harBlockStart);
442      }
443      
444      if (harBlockEnd > end) {
445        // range ends before end of this har block
446        // fix length to remove irrelevant portion at the end
447        location.setLength(location.getLength() - (harBlockEnd - end));
448      }
449    }
450    
451    return locations;
452  }
453  
454  /**
455   * Get block locations from the underlying fs and fix their
456   * offsets and lengths.
457   * @param file the input file status to get block locations
458   * @param start the start of the desired range in the contained file
459   * @param len the length of the desired range
460   * @return block locations for this segment of file
461   * @throws IOException
462   */
463  @Override
464  public BlockLocation[] getFileBlockLocations(FileStatus file, long start,
465                                               long len) throws IOException {
466    HarStatus hstatus = getFileHarStatus(file.getPath());
467    Path partPath = new Path(archivePath, hstatus.getPartName());
468    FileStatus partStatus = metadata.getPartFileStatus(partPath);
469
470    // get all part blocks that overlap with the desired file blocks
471    BlockLocation[] locations = 
472      fs.getFileBlockLocations(partStatus,
473                               hstatus.getStartIndex() + start, len);
474
475    return fixBlockLocations(locations, start, len, hstatus.getStartIndex());
476  }
477  
478  /**
479   * the hash of the path p inside  the filesystem
480   * @param p the path in the harfilesystem
481   * @return the hash code of the path.
482   */
483  public static int getHarHash(Path p) {
484    return (p.toString().hashCode() & 0x7fffffff);
485  }
486  
487  static class Store {
488    public Store(long begin, long end) {
489      this.begin = begin;
490      this.end = end;
491    }
492    public long begin;
493    public long end;
494  }
495  
496  /**
497   * Get filestatuses of all the children of a given directory. This just reads
498   * through index file and reads line by line to get all statuses for children
499   * of a directory. Its a brute force way of getting all such filestatuses
500   * 
501   * @param parent
502   *          the parent path directory
503   * @param statuses
504   *          the list to add the children filestatuses to
505   */
506  private void fileStatusesInIndex(HarStatus parent, List<FileStatus> statuses)
507          throws IOException {
508    String parentString = parent.getName();
509    if (!parentString.endsWith(Path.SEPARATOR)){
510        parentString += Path.SEPARATOR;
511    }
512    Path harPath = new Path(parentString);
513    int harlen = harPath.depth();
514    final Map<String, FileStatus> cache = new TreeMap<String, FileStatus>();
515
516    for (HarStatus hstatus : metadata.archive.values()) {
517      String child = hstatus.getName();
518      if ((child.startsWith(parentString))) {
519        Path thisPath = new Path(child);
520        if (thisPath.depth() == harlen + 1) {
521          statuses.add(toFileStatus(hstatus, cache));
522        }
523      }
524    }
525  }
526
527  /**
528   * Combine the status stored in the index and the underlying status. 
529   * @param h status stored in the index
530   * @param cache caching the underlying file statuses
531   * @return the combined file status
532   * @throws IOException
533   */
534  private FileStatus toFileStatus(HarStatus h,
535      Map<String, FileStatus> cache) throws IOException {
536    FileStatus underlying = null;
537    if (cache != null) {
538      underlying = cache.get(h.partName);
539    }
540    if (underlying == null) {
541      final Path p = h.isDir? archivePath: new Path(archivePath, h.partName);
542      underlying = fs.getFileStatus(p);
543      if (cache != null) {
544        cache.put(h.partName, underlying);
545      }
546    }
547
548    long modTime = 0;
549    int version = metadata.getVersion();
550    if (version < 3) {
551      modTime = underlying.getModificationTime();
552    } else if (version == 3) {
553      modTime = h.getModificationTime();
554    }
555
556    return new FileStatus(
557        h.isDir()? 0L: h.getLength(),
558        h.isDir(),
559        underlying.getReplication(),
560        underlying.getBlockSize(),
561        modTime,
562        underlying.getAccessTime(),
563        underlying.getPermission(),
564        underlying.getOwner(),
565        underlying.getGroup(),
566        makeRelative(this.uri.getPath(), new Path(h.name)));
567  }
568
569  // a single line parser for hadoop archives status 
570  // stored in a single line in the index files 
571  // the format is of the form 
572  // filename "dir"/"file" partFileName startIndex length 
573  // <space separated children>
574  private class HarStatus {
575    boolean isDir;
576    String name;
577    List<String> children;
578    String partName;
579    long startIndex;
580    long length;
581    long modificationTime = 0;
582
583    public HarStatus(String harString) throws UnsupportedEncodingException {
584      String[] splits = harString.split(" ");
585      this.name = decodeFileName(splits[0]);
586      this.isDir = "dir".equals(splits[1]);
587      // this is equal to "none" if its a directory
588      this.partName = splits[2];
589      this.startIndex = Long.parseLong(splits[3]);
590      this.length = Long.parseLong(splits[4]);
591
592      int version = metadata.getVersion();
593      String[] propSplits = null;
594      // propSplits is used to retrieve the metainformation that Har versions
595      // 1 & 2 missed (modification time, permission, owner group).
596      // These fields are stored in an encoded string placed in different
597      // locations depending on whether it's a file or directory entry.
598      // If it's a directory, the string will be placed at the partName
599      // location (directories have no partName because they don't have data
600      // to be stored). This is done because the number of fields in a
601      // directory entry is unbounded (all children are listed at the end)
602      // If it's a file, the string will be the last field.
603      if (isDir) {
604        if (version == 3){
605          propSplits = decodeString(this.partName).split(" ");
606        }
607        children = new ArrayList<String>();
608        for (int i = 5; i < splits.length; i++) {
609          children.add(decodeFileName(splits[i]));
610        }
611      } else if (version == 3) {
612        propSplits = decodeString(splits[5]).split(" ");
613      }
614
615      if (propSplits != null && propSplits.length >= 4) {
616        modificationTime = Long.parseLong(propSplits[0]);
617        // the fields below are stored in the file but are currently not used
618        // by HarFileSystem
619        // permission = new FsPermission(Short.parseShort(propSplits[1]));
620        // owner = decodeString(propSplits[2]);
621        // group = decodeString(propSplits[3]);
622      }
623    }
624    public boolean isDir() {
625      return isDir;
626    }
627    
628    public String getName() {
629      return name;
630    }
631    public String getPartName() {
632      return partName;
633    }
634    public long getStartIndex() {
635      return startIndex;
636    }
637    public long getLength() {
638      return length;
639    }
640    public long getModificationTime() {
641      return modificationTime;
642    }
643  }
644  
645  /**
646   * return the filestatus of files in har archive.
647   * The permission returned are that of the archive
648   * index files. The permissions are not persisted 
649   * while creating a hadoop archive.
650   * @param f the path in har filesystem
651   * @return filestatus.
652   * @throws IOException
653   */
654  @Override
655  public FileStatus getFileStatus(Path f) throws IOException {
656    HarStatus hstatus = getFileHarStatus(f);
657    return toFileStatus(hstatus, null);
658  }
659
660  private HarStatus getFileHarStatus(Path f) throws IOException {
661    // get the fs DataInputStream for the underlying file
662    // look up the index.
663    Path p = makeQualified(f);
664    Path harPath = getPathInHar(p);
665    if (harPath == null) {
666      throw new IOException("Invalid file name: " + f + " in " + uri);
667    }
668    HarStatus hstatus = metadata.archive.get(harPath);
669    if (hstatus == null) {
670      throw new FileNotFoundException("File: " +  f + " does not exist in " + uri);
671    }
672    return hstatus;
673  }
674
675  /**
676   * @return null since no checksum algorithm is implemented.
677   */
678  @Override
679  public FileChecksum getFileChecksum(Path f, long length) {
680    return null;
681  }
682
683  /**
684   * Returns a har input stream which fakes end of 
685   * file. It reads the index files to get the part 
686   * file name and the size and start of the file.
687   */
688  @Override
689  public FSDataInputStream open(Path f, int bufferSize) throws IOException {
690    // get the fs DataInputStream for the underlying file
691    HarStatus hstatus = getFileHarStatus(f);
692    if (hstatus.isDir()) {
693      throw new FileNotFoundException(f + " : not a file in " +
694                archivePath);
695    }
696    return new HarFSDataInputStream(fs, new Path(archivePath, 
697        hstatus.getPartName()),
698        hstatus.getStartIndex(), hstatus.getLength(), bufferSize);
699  }
700
701  /**
702   * Used for delegation token related functionality. Must delegate to
703   * underlying file system.
704   */
705  @Override
706  public FileSystem[] getChildFileSystems() {
707    return new FileSystem[]{fs};
708  }
709
710  @Override
711  public FSDataOutputStream create(Path f, FsPermission permission,
712      boolean overwrite, int bufferSize, short replication, long blockSize,
713      Progressable progress) throws IOException {
714    throw new IOException("Har: create not allowed.");
715  }
716
717  @Override
718  public FSDataOutputStream createNonRecursive(Path f, boolean overwrite,
719      int bufferSize, short replication, long blockSize, Progressable progress)
720      throws IOException {
721    throw new IOException("Har: create not allowed.");
722  }
723
724  @Override
725  public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException {
726    throw new IOException("Har: append not allowed.");
727  }
728
729  @Override
730  public void close() throws IOException {
731    super.close();
732    if (fs != null) {
733      try {
734        fs.close();
735      } catch(IOException ie) {
736        //this might already be closed
737        // ignore
738      }
739    }
740  }
741  
742  /**
743   * Not implemented.
744   */
745  @Override
746  public boolean setReplication(Path src, short replication) throws IOException{
747    throw new IOException("Har: setReplication not allowed");
748  }
749
750  @Override
751  public boolean rename(Path src, Path dst) throws IOException {
752    throw new IOException("Har: rename not allowed");
753  }
754
755  @Override
756  public FSDataOutputStream append(Path f) throws IOException {
757    throw new IOException("Har: append not allowed");
758  }
759
760  /**
761   * Not implemented.
762   */
763  @Override
764  public boolean truncate(Path f, long newLength) throws IOException {
765    throw new IOException("Har: truncate not allowed");
766  }
767
768  /**
769   * Not implemented.
770   */
771  @Override
772  public boolean delete(Path f, boolean recursive) throws IOException { 
773    throw new IOException("Har: delete not allowed");
774  }
775
776  /**
777   * liststatus returns the children of a directory 
778   * after looking up the index files.
779   */
780  @Override
781  public FileStatus[] listStatus(Path f) throws IOException {
782    //need to see if the file is an index in file
783    //get the filestatus of the archive directory
784    // we will create fake filestatuses to return
785    // to the client
786    List<FileStatus> statuses = new ArrayList<FileStatus>();
787    Path tmpPath = makeQualified(f);
788    Path harPath = getPathInHar(tmpPath);
789    HarStatus hstatus = metadata.archive.get(harPath);
790    if (hstatus == null) {
791      throw new FileNotFoundException("File " + f + " not found in " + archivePath);
792    }
793    if (hstatus.isDir()) {
794      fileStatusesInIndex(hstatus, statuses);
795    } else {
796      statuses.add(toFileStatus(hstatus, null));
797    }
798    
799    return statuses.toArray(new FileStatus[statuses.size()]);
800  }
801  
802  /**
803   * return the top level archive path.
804   */
805  @Override
806  public Path getHomeDirectory() {
807    return new Path(uri.toString());
808  }
809
810  @Override
811  public void setWorkingDirectory(Path newDir) {
812    //does nothing.
813  }
814  
815  /**
816   * not implemented.
817   */
818  @Override
819  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
820    throw new IOException("Har: mkdirs not allowed");
821  }
822  
823  /**
824   * not implemented.
825   */
826  @Override
827  public void copyFromLocalFile(boolean delSrc, boolean overwrite,
828      Path src, Path dst) throws IOException {
829    throw new IOException("Har: copyfromlocalfile not allowed");
830  }
831
832  @Override
833  public void copyFromLocalFile(boolean delSrc, boolean overwrite,
834      Path[] srcs, Path dst) throws IOException {
835    throw new IOException("Har: copyfromlocalfile not allowed");
836  }
837
838  /**
839   * copies the file in the har filesystem to a local file.
840   */
841  @Override
842  public void copyToLocalFile(boolean delSrc, Path src, Path dst) 
843    throws IOException {
844    FileUtil.copy(this, src, getLocal(getConf()), dst, false, getConf());
845  }
846  
847  /**
848   * not implemented.
849   */
850  @Override
851  public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile) 
852    throws IOException {
853    throw new IOException("Har: startLocalOutput not allowed");
854  }
855  
856  /**
857   * not implemented.
858   */
859  @Override
860  public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile) 
861    throws IOException {
862    throw new IOException("Har: completeLocalOutput not allowed");
863  }
864  
865  /**
866   * not implemented.
867   */
868  @Override
869  public void setOwner(Path p, String username, String groupname)
870    throws IOException {
871    throw new IOException("Har: setowner not allowed");
872  }
873
874  @Override
875  public void setTimes(Path p, long mtime, long atime) throws IOException {
876    throw new IOException("Har: setTimes not allowed");
877  }
878
879  /**
880   * Not implemented.
881   */
882  @Override
883  public void setPermission(Path p, FsPermission permission)
884    throws IOException {
885    throw new IOException("Har: setPermission not allowed");
886  }
887  
888  /**
889   * Hadoop archives input stream. This input stream fakes EOF 
890   * since archive files are part of bigger part files.
891   */
892  private static class HarFSDataInputStream extends FSDataInputStream {
893    /**
894     * Create an input stream that fakes all the reads/positions/seeking.
895     */
896    private static class HarFsInputStream extends FSInputStream
897        implements CanSetDropBehind, CanSetReadahead {
898      private long position, start, end;
899      //The underlying data input stream that the
900      // underlying filesystem will return.
901      private final FSDataInputStream underLyingStream;
902      //one byte buffer
903      private final byte[] oneBytebuff = new byte[1];
904      
905      HarFsInputStream(FileSystem fs, Path path, long start,
906          long length, int bufferSize) throws IOException {
907        if (length < 0) {
908          throw new IllegalArgumentException("Negative length ["+length+"]");
909        }
910        underLyingStream = fs.open(path, bufferSize);
911        underLyingStream.seek(start);
912        // the start of this file in the part file
913        this.start = start;
914        // the position pointer in the part file
915        this.position = start;
916        // the end pointer in the part file
917        this.end = start + length;
918      }
919      
920      @Override
921      public synchronized int available() throws IOException {
922        long remaining = end - underLyingStream.getPos();
923        if (remaining > Integer.MAX_VALUE) {
924          return Integer.MAX_VALUE;
925        }
926        return (int) remaining;
927      }
928      
929      @Override
930      public synchronized  void close() throws IOException {
931        underLyingStream.close();
932        super.close();
933      }
934      
935      //not implemented
936      @Override
937      public void mark(int readLimit) {
938        // do nothing 
939      }
940      
941      /**
942       * reset is not implemented
943       */
944      @Override
945      public void reset() throws IOException {
946        throw new IOException("reset not implemented.");
947      }
948      
949      @Override
950      public synchronized int read() throws IOException {
951        int ret = read(oneBytebuff, 0, 1);
952        return (ret <= 0) ? -1: (oneBytebuff[0] & 0xff);
953      }
954      
955      // NB: currently this method actually never executed becusae
956      // java.io.DataInputStream.read(byte[]) directly delegates to 
957      // method java.io.InputStream.read(byte[], int, int).
958      // However, potentially it can be invoked, so leave it intact for now.
959      @Override
960      public synchronized int read(byte[] b) throws IOException {
961        final int ret = read(b, 0, b.length);
962        return ret;
963      }
964      
965      /**
966       * 
967       */
968      @Override
969      public synchronized int read(byte[] b, int offset, int len) 
970        throws IOException {
971        if (len == 0) {
972          return 0;
973        }
974        int newlen = len;
975        int ret = -1;
976        if (position + len > end) {
977          newlen = (int) (end - position);
978        }
979        // end case
980        if (newlen == 0)
981          return ret;
982        ret = underLyingStream.read(b, offset, newlen);
983        position += ret;
984        return ret;
985      }
986      
987      @Override
988      public synchronized long skip(long n) throws IOException {
989        long tmpN = n;
990        if (tmpN > 0) {
991          final long actualRemaining = end - position; 
992          if (tmpN > actualRemaining) {
993            tmpN = actualRemaining;
994          }   
995          underLyingStream.seek(tmpN + position);
996          position += tmpN;
997          return tmpN;
998        }   
999        // NB: the contract is described in java.io.InputStream.skip(long):
1000        // this method returns the number of bytes actually skipped, so,
1001        // the return value should never be negative. 
1002        return 0;
1003      }   
1004      
1005      @Override
1006      public synchronized long getPos() throws IOException {
1007        return (position - start);
1008      }
1009      
1010      @Override
1011      public synchronized void seek(final long pos) throws IOException {
1012        validatePosition(pos);
1013        position = start + pos;
1014        underLyingStream.seek(position);
1015      }
1016
1017      private void validatePosition(final long pos) throws IOException {
1018        if (pos < 0) {
1019          throw new IOException("Negative position: "+pos);
1020         }
1021         final long length = end - start;
1022         if (pos > length) {
1023           throw new IOException("Position behind the end " +
1024               "of the stream (length = "+length+"): " + pos);
1025         }
1026      }
1027
1028      @Override
1029      public boolean seekToNewSource(long targetPos) throws IOException {
1030        // do not need to implement this
1031        // hdfs in itself does seektonewsource
1032        // while reading.
1033        return false;
1034      }
1035      
1036      /**
1037       * implementing position readable. 
1038       */
1039      @Override
1040      public int read(long pos, byte[] b, int offset, int length) 
1041      throws IOException {
1042        int nlength = length;
1043        if (start + nlength + pos > end) {
1044          // length corrected to the real remaining length:
1045          nlength = (int) (end - start - pos);
1046        }
1047        if (nlength <= 0) {
1048          // EOS:
1049          return -1;
1050        }
1051        return underLyingStream.read(pos + start , b, offset, nlength);
1052      }
1053      
1054      /**
1055       * position readable again.
1056       */
1057      @Override
1058      public void readFully(long pos, byte[] b, int offset, int length) 
1059      throws IOException {
1060        validatePositionedReadArgs(pos, b, offset, length);
1061        if (length == 0) {
1062          return;
1063        }
1064        if (start + length + pos > end) {
1065          throw new EOFException("Not enough bytes to read.");
1066        }
1067        underLyingStream.readFully(pos + start, b, offset, length);
1068      }
1069
1070      @Override
1071      public void setReadahead(Long readahead) throws IOException {
1072        underLyingStream.setReadahead(readahead);
1073      }
1074
1075      @Override
1076      public void setDropBehind(Boolean dropBehind) throws IOException {
1077        underLyingStream.setDropBehind(dropBehind);
1078      }
1079    }
1080  
1081    /**
1082     * constructors for har input stream.
1083     * @param fs the underlying filesystem
1084     * @param p The path in the underlying filesystem
1085     * @param start the start position in the part file
1086     * @param length the length of valid data in the part file
1087     * @param bufsize the buffer size
1088     * @throws IOException
1089     */
1090    public HarFSDataInputStream(FileSystem fs, Path  p, long start, 
1091        long length, int bufsize) throws IOException {
1092        super(new HarFsInputStream(fs, p, start, length, bufsize));
1093    }
1094  }
1095
1096  private class HarMetaData {
1097    private FileSystem fs;
1098    private int version;
1099    // the masterIndex of the archive
1100    private Path masterIndexPath;
1101    // the index file 
1102    private Path archiveIndexPath;
1103
1104    private long masterIndexTimestamp;
1105    private long archiveIndexTimestamp;
1106
1107    List<Store> stores = new ArrayList<Store>();
1108    Map<Path, HarStatus> archive = new HashMap<Path, HarStatus>();
1109    private Map<Path, FileStatus> partFileStatuses = new HashMap<Path, FileStatus>();
1110
1111    public HarMetaData(FileSystem fs, Path masterIndexPath, Path archiveIndexPath) {
1112      this.fs = fs;
1113      this.masterIndexPath = masterIndexPath;
1114      this.archiveIndexPath = archiveIndexPath;
1115    }
1116
1117    public FileStatus getPartFileStatus(Path partPath) throws IOException {
1118      FileStatus status;
1119      status = partFileStatuses.get(partPath);
1120      if (status == null) {
1121        status = fs.getFileStatus(partPath);
1122        partFileStatuses.put(partPath, status);
1123      }
1124      return status;
1125    }
1126
1127    public long getMasterIndexTimestamp() {
1128      return masterIndexTimestamp;
1129    }
1130
1131    public long getArchiveIndexTimestamp() {
1132      return archiveIndexTimestamp;
1133    }
1134
1135    private int getVersion() {
1136      return version;
1137    }
1138
1139    private void parseMetaData() throws IOException {
1140      Text line = new Text();
1141      long read;
1142      FSDataInputStream in = null;
1143      LineReader lin = null;
1144
1145      try {
1146        in = fs.open(masterIndexPath);
1147        FileStatus masterStat = fs.getFileStatus(masterIndexPath);
1148        masterIndexTimestamp = masterStat.getModificationTime();
1149        lin = new LineReader(in, getConf());
1150        read = lin.readLine(line);
1151
1152        // the first line contains the version of the index file
1153        String versionLine = line.toString();
1154        String[] arr = versionLine.split(" ");
1155        version = Integer.parseInt(arr[0]);
1156        // make it always backwards-compatible
1157        if (this.version > HarFileSystem.VERSION) {
1158          throw new IOException("Invalid version " + 
1159              this.version + " expected " + HarFileSystem.VERSION);
1160        }
1161
1162        // each line contains a hashcode range and the index file name
1163        String[] readStr;
1164        while(read < masterStat.getLen()) {
1165          int b = lin.readLine(line);
1166          read += b;
1167          readStr = line.toString().split(" ");
1168          stores.add(new Store(Long.parseLong(readStr[2]), 
1169              Long.parseLong(readStr[3])));
1170          line.clear();
1171        }
1172      } catch (IOException ioe) {
1173        LOG.warn("Encountered exception ", ioe);
1174        throw ioe;
1175      } finally {
1176        IOUtils.cleanup(LOG, lin, in);
1177      }
1178
1179      FSDataInputStream aIn = fs.open(archiveIndexPath);
1180      try {
1181        FileStatus archiveStat = fs.getFileStatus(archiveIndexPath);
1182        archiveIndexTimestamp = archiveStat.getModificationTime();
1183        LineReader aLin;
1184
1185        // now start reading the real index file
1186        for (Store s: stores) {
1187          read = 0;
1188          aIn.seek(s.begin);
1189          aLin = new LineReader(aIn, getConf());
1190          while (read + s.begin < s.end) {
1191            int tmp = aLin.readLine(line);
1192            read += tmp;
1193            String lineFeed = line.toString();
1194            String[] parsed = lineFeed.split(" ");
1195            parsed[0] = decodeFileName(parsed[0]);
1196            archive.put(new Path(parsed[0]), new HarStatus(lineFeed));
1197            line.clear();
1198          }
1199        }
1200      } finally {
1201        IOUtils.cleanup(LOG, aIn);
1202      }
1203    }
1204  }
1205  
1206  /*
1207   * testing purposes only:
1208   */
1209  HarMetaData getMetadata() {
1210    return metadata;
1211  }
1212
1213  private static class LruCache<K, V> extends LinkedHashMap<K, V> {
1214    private final int MAX_ENTRIES;
1215
1216    public LruCache(int maxEntries) {
1217        super(maxEntries + 1, 1.0f, true);
1218        MAX_ENTRIES = maxEntries;
1219    }
1220
1221    @Override
1222    protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
1223        return size() > MAX_ENTRIES;
1224    }
1225  }
1226
1227  @SuppressWarnings("deprecation")
1228  @Override
1229  public FsServerDefaults getServerDefaults() throws IOException {
1230    return fs.getServerDefaults();
1231  }
1232
1233  @Override
1234  public FsServerDefaults getServerDefaults(Path f) throws IOException {
1235    return fs.getServerDefaults(f);
1236  }
1237
1238  @Override
1239  public long getUsed() throws IOException{
1240    return fs.getUsed();
1241  }
1242
1243  /** Return the total size of all files from a specified path.*/
1244  @Override
1245  public long getUsed(Path path) throws IOException {
1246    return fs.getUsed(path);
1247  }
1248
1249  @SuppressWarnings("deprecation")
1250  @Override
1251  public long getDefaultBlockSize() {
1252    return fs.getDefaultBlockSize();
1253  }
1254
1255  @SuppressWarnings("deprecation")
1256  @Override
1257  public long getDefaultBlockSize(Path f) {
1258    return fs.getDefaultBlockSize(f);
1259  }
1260
1261  @SuppressWarnings("deprecation")
1262  @Override
1263  public short getDefaultReplication() {
1264    return fs.getDefaultReplication();
1265  }
1266
1267  @Override
1268  public short getDefaultReplication(Path f) {
1269    return fs.getDefaultReplication(f);
1270  }
1271}