001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.fs; 019 020import org.apache.commons.logging.Log; 021import org.apache.commons.logging.LogFactory; 022import org.apache.hadoop.conf.Configuration; 023import org.apache.hadoop.fs.permission.FsPermission; 024import org.apache.hadoop.io.IOUtils; 025import org.apache.hadoop.io.Text; 026import org.apache.hadoop.util.LineReader; 027import org.apache.hadoop.util.Progressable; 028 029import java.io.EOFException; 030import java.io.FileNotFoundException; 031import java.io.IOException; 032import java.io.UnsupportedEncodingException; 033import java.net.URI; 034import java.net.URISyntaxException; 035import java.net.URLDecoder; 036import java.util.*; 037 038/** 039 * This is an implementation of the Hadoop Archive 040 * Filesystem. This archive Filesystem has index files 041 * of the form _index* and has contents of the form 042 * part-*. The index files store the indexes of the 043 * real files. The index files are of the form _masterindex 044 * and _index. The master index is a level of indirection 045 * in to the index file to make the look ups faster. the index 046 * file is sorted with hash code of the paths that it contains 047 * and the master index contains pointers to the positions in 048 * index for ranges of hashcodes. 049 */ 050 051public class HarFileSystem extends FileSystem { 052 053 private static final Log LOG = LogFactory.getLog(HarFileSystem.class); 054 055 public static final String METADATA_CACHE_ENTRIES_KEY = "fs.har.metadatacache.entries"; 056 public static final int METADATA_CACHE_ENTRIES_DEFAULT = 10; 057 058 public static final int VERSION = 3; 059 060 private static Map<URI, HarMetaData> harMetaCache; 061 062 // uri representation of this Har filesystem 063 private URI uri; 064 // the top level path of the archive 065 // in the underlying file system 066 private Path archivePath; 067 // the har auth 068 private String harAuth; 069 070 // pointer into the static metadata cache 071 private HarMetaData metadata; 072 073 private FileSystem fs; 074 075 /** 076 * public construction of harfilesystem 077 */ 078 public HarFileSystem() { 079 // Must call #initialize() method to set the underlying file system 080 } 081 082 /** 083 * Return the protocol scheme for the FileSystem. 084 * <p/> 085 * 086 * @return <code>har</code> 087 */ 088 @Override 089 public String getScheme() { 090 return "har"; 091 } 092 093 /** 094 * Constructor to create a HarFileSystem with an 095 * underlying filesystem. 096 * @param fs underlying file system 097 */ 098 public HarFileSystem(FileSystem fs) { 099 this.fs = fs; 100 this.statistics = fs.statistics; 101 } 102 103 private synchronized void initializeMetadataCache(Configuration conf) { 104 if (harMetaCache == null) { 105 int cacheSize = conf.getInt(METADATA_CACHE_ENTRIES_KEY, METADATA_CACHE_ENTRIES_DEFAULT); 106 harMetaCache = Collections.synchronizedMap(new LruCache<URI, HarMetaData>(cacheSize)); 107 } 108 } 109 110 /** 111 * Initialize a Har filesystem per har archive. The 112 * archive home directory is the top level directory 113 * in the filesystem that contains the HAR archive. 114 * Be careful with this method, you do not want to go 115 * on creating new Filesystem instances per call to 116 * path.getFileSystem(). 117 * the uri of Har is 118 * har://underlyingfsscheme-host:port/archivepath. 119 * or 120 * har:///archivepath. This assumes the underlying filesystem 121 * to be used in case not specified. 122 */ 123 @Override 124 public void initialize(URI name, Configuration conf) throws IOException { 125 // initialize the metadata cache, if needed 126 initializeMetadataCache(conf); 127 128 // decode the name 129 URI underLyingURI = decodeHarURI(name, conf); 130 // we got the right har Path- now check if this is 131 // truly a har filesystem 132 Path harPath = archivePath( 133 new Path(name.getScheme(), name.getAuthority(), name.getPath())); 134 if (harPath == null) { 135 throw new IOException("Invalid path for the Har Filesystem. " + 136 name.toString()); 137 } 138 if (fs == null) { 139 fs = FileSystem.get(underLyingURI, conf); 140 } 141 uri = harPath.toUri(); 142 archivePath = new Path(uri.getPath()); 143 harAuth = getHarAuth(underLyingURI); 144 //check for the underlying fs containing 145 // the index file 146 Path masterIndexPath = new Path(archivePath, "_masterindex"); 147 Path archiveIndexPath = new Path(archivePath, "_index"); 148 if (!fs.exists(masterIndexPath) || !fs.exists(archiveIndexPath)) { 149 throw new IOException("Invalid path for the Har Filesystem. " + 150 "No index file in " + harPath); 151 } 152 153 metadata = harMetaCache.get(uri); 154 if (metadata != null) { 155 FileStatus mStat = fs.getFileStatus(masterIndexPath); 156 FileStatus aStat = fs.getFileStatus(archiveIndexPath); 157 if (mStat.getModificationTime() != metadata.getMasterIndexTimestamp() || 158 aStat.getModificationTime() != metadata.getArchiveIndexTimestamp()) { 159 // the archive has been overwritten since we last read it 160 // remove the entry from the meta data cache 161 metadata = null; 162 harMetaCache.remove(uri); 163 } 164 } 165 if (metadata == null) { 166 metadata = new HarMetaData(fs, masterIndexPath, archiveIndexPath); 167 metadata.parseMetaData(); 168 harMetaCache.put(uri, metadata); 169 } 170 } 171 172 @Override 173 public Configuration getConf() { 174 return fs.getConf(); 175 } 176 177 // get the version of the filesystem from the masterindex file 178 // the version is currently not useful since its the first version 179 // of archives 180 public int getHarVersion() throws IOException { 181 if (metadata != null) { 182 return metadata.getVersion(); 183 } 184 else { 185 throw new IOException("Invalid meta data for the Har Filesystem"); 186 } 187 } 188 189 /* 190 * find the parent path that is the 191 * archive path in the path. The last 192 * path segment that ends with .har is 193 * the path that will be returned. 194 */ 195 private Path archivePath(Path p) { 196 Path retPath = null; 197 Path tmp = p; 198 for (int i=0; i< p.depth(); i++) { 199 if (tmp.toString().endsWith(".har")) { 200 retPath = tmp; 201 break; 202 } 203 tmp = tmp.getParent(); 204 } 205 return retPath; 206 } 207 208 /** 209 * decode the raw URI to get the underlying URI 210 * @param rawURI raw Har URI 211 * @return filtered URI of the underlying fileSystem 212 */ 213 private URI decodeHarURI(URI rawURI, Configuration conf) throws IOException { 214 String tmpAuth = rawURI.getAuthority(); 215 //we are using the default file 216 //system in the config 217 //so create a underlying uri and 218 //return it 219 if (tmpAuth == null) { 220 //create a path 221 return FileSystem.getDefaultUri(conf); 222 } 223 String authority = rawURI.getAuthority(); 224 225 int i = authority.indexOf('-'); 226 if (i < 0) { 227 throw new IOException("URI: " + rawURI 228 + " is an invalid Har URI since '-' not found." 229 + " Expecting har://<scheme>-<host>/<path>."); 230 } 231 232 if (rawURI.getQuery() != null) { 233 // query component not allowed 234 throw new IOException("query component in Path not supported " + rawURI); 235 } 236 237 URI tmp; 238 try { 239 // convert <scheme>-<host> to <scheme>://<host> 240 URI baseUri = new URI(authority.replaceFirst("-", "://")); 241 242 tmp = new URI(baseUri.getScheme(), baseUri.getAuthority(), 243 rawURI.getPath(), rawURI.getQuery(), rawURI.getFragment()); 244 } catch (URISyntaxException e) { 245 throw new IOException("URI: " + rawURI 246 + " is an invalid Har URI. Expecting har://<scheme>-<host>/<path>."); 247 } 248 return tmp; 249 } 250 251 private static String decodeString(String str) 252 throws UnsupportedEncodingException { 253 return URLDecoder.decode(str, "UTF-8"); 254 } 255 256 private String decodeFileName(String fname) 257 throws UnsupportedEncodingException { 258 int version = metadata.getVersion(); 259 if (version == 2 || version == 3){ 260 return decodeString(fname); 261 } 262 return fname; 263 } 264 265 /** 266 * return the top level archive. 267 */ 268 @Override 269 public Path getWorkingDirectory() { 270 return new Path(uri.toString()); 271 } 272 273 @Override 274 public Path getInitialWorkingDirectory() { 275 return getWorkingDirectory(); 276 } 277 278 @Override 279 public FsStatus getStatus(Path p) throws IOException { 280 return fs.getStatus(p); 281 } 282 283 /** 284 * Create a har specific auth 285 * har-underlyingfs:port 286 * @param underLyingUri the uri of underlying 287 * filesystem 288 * @return har specific auth 289 */ 290 private String getHarAuth(URI underLyingUri) { 291 String auth = underLyingUri.getScheme() + "-"; 292 if (underLyingUri.getHost() != null) { 293 if (underLyingUri.getUserInfo() != null) { 294 auth += underLyingUri.getUserInfo(); 295 auth += "@"; 296 } 297 auth += underLyingUri.getHost(); 298 if (underLyingUri.getPort() != -1) { 299 auth += ":"; 300 auth += underLyingUri.getPort(); 301 } 302 } 303 else { 304 auth += ":"; 305 } 306 return auth; 307 } 308 309 /** 310 * Used for delegation token related functionality. Must delegate to 311 * underlying file system. 312 */ 313 @Override 314 protected URI getCanonicalUri() { 315 return fs.getCanonicalUri(); 316 } 317 318 @Override 319 protected URI canonicalizeUri(URI uri) { 320 return fs.canonicalizeUri(uri); 321 } 322 323 /** 324 * Returns the uri of this filesystem. 325 * The uri is of the form 326 * har://underlyingfsschema-host:port/pathintheunderlyingfs 327 */ 328 @Override 329 public URI getUri() { 330 return this.uri; 331 } 332 333 @Override 334 protected void checkPath(Path path) { 335 fs.checkPath(path); 336 } 337 338 @Override 339 public Path resolvePath(Path p) throws IOException { 340 return fs.resolvePath(p); 341 } 342 343 /** 344 * this method returns the path 345 * inside the har filesystem. 346 * this is relative path inside 347 * the har filesystem. 348 * @param path the fully qualified path in the har filesystem. 349 * @return relative path in the filesystem. 350 */ 351 private Path getPathInHar(Path path) { 352 Path harPath = new Path(path.toUri().getPath()); 353 if (archivePath.compareTo(harPath) == 0) 354 return new Path(Path.SEPARATOR); 355 Path tmp = new Path(harPath.getName()); 356 Path parent = harPath.getParent(); 357 while (!(parent.compareTo(archivePath) == 0)) { 358 if (parent.toString().equals(Path.SEPARATOR)) { 359 tmp = null; 360 break; 361 } 362 tmp = new Path(parent.getName(), tmp); 363 parent = parent.getParent(); 364 } 365 if (tmp != null) 366 tmp = new Path(Path.SEPARATOR, tmp); 367 return tmp; 368 } 369 370 //the relative path of p. basically 371 // getting rid of /. Parsing and doing 372 // string manipulation is not good - so 373 // just use the path api to do it. 374 private Path makeRelative(String initial, Path p) { 375 String scheme = this.uri.getScheme(); 376 String authority = this.uri.getAuthority(); 377 Path root = new Path(Path.SEPARATOR); 378 if (root.compareTo(p) == 0) 379 return new Path(scheme, authority, initial); 380 Path retPath = new Path(p.getName()); 381 Path parent = p.getParent(); 382 for (int i=0; i < p.depth()-1; i++) { 383 retPath = new Path(parent.getName(), retPath); 384 parent = parent.getParent(); 385 } 386 return new Path(new Path(scheme, authority, initial), 387 retPath.toString()); 388 } 389 390 /* this makes a path qualified in the har filesystem 391 * (non-Javadoc) 392 * @see org.apache.hadoop.fs.FilterFileSystem#makeQualified( 393 * org.apache.hadoop.fs.Path) 394 */ 395 @Override 396 public Path makeQualified(Path path) { 397 // make sure that we just get the 398 // path component 399 Path fsPath = path; 400 if (!path.isAbsolute()) { 401 fsPath = new Path(archivePath, path); 402 } 403 404 URI tmpURI = fsPath.toUri(); 405 //change this to Har uri 406 return new Path(uri.getScheme(), harAuth, tmpURI.getPath()); 407 } 408 409 /** 410 * Fix offset and length of block locations. 411 * Note that this method modifies the original array. 412 * @param locations block locations of har part file 413 * @param start the start of the desired range in the contained file 414 * @param len the length of the desired range 415 * @param fileOffsetInHar the offset of the desired file in the har part file 416 * @return block locations with fixed offset and length 417 */ 418 static BlockLocation[] fixBlockLocations(BlockLocation[] locations, 419 long start, 420 long len, 421 long fileOffsetInHar) { 422 // offset 1 past last byte of desired range 423 long end = start + len; 424 425 for (BlockLocation location : locations) { 426 // offset of part block relative to beginning of desired file 427 // (may be negative if file starts in this part block) 428 long harBlockStart = location.getOffset() - fileOffsetInHar; 429 // offset 1 past last byte of har block relative to beginning of 430 // desired file 431 long harBlockEnd = harBlockStart + location.getLength(); 432 433 if (start > harBlockStart) { 434 // desired range starts after beginning of this har block 435 // fix offset to beginning of relevant range (relative to desired file) 436 location.setOffset(start); 437 // fix length to relevant portion of har block 438 location.setLength(location.getLength() - (start - harBlockStart)); 439 } else { 440 // desired range includes beginning of this har block 441 location.setOffset(harBlockStart); 442 } 443 444 if (harBlockEnd > end) { 445 // range ends before end of this har block 446 // fix length to remove irrelevant portion at the end 447 location.setLength(location.getLength() - (harBlockEnd - end)); 448 } 449 } 450 451 return locations; 452 } 453 454 /** 455 * Get block locations from the underlying fs and fix their 456 * offsets and lengths. 457 * @param file the input file status to get block locations 458 * @param start the start of the desired range in the contained file 459 * @param len the length of the desired range 460 * @return block locations for this segment of file 461 * @throws IOException 462 */ 463 @Override 464 public BlockLocation[] getFileBlockLocations(FileStatus file, long start, 465 long len) throws IOException { 466 HarStatus hstatus = getFileHarStatus(file.getPath()); 467 Path partPath = new Path(archivePath, hstatus.getPartName()); 468 FileStatus partStatus = metadata.getPartFileStatus(partPath); 469 470 // get all part blocks that overlap with the desired file blocks 471 BlockLocation[] locations = 472 fs.getFileBlockLocations(partStatus, 473 hstatus.getStartIndex() + start, len); 474 475 return fixBlockLocations(locations, start, len, hstatus.getStartIndex()); 476 } 477 478 /** 479 * the hash of the path p inside the filesystem 480 * @param p the path in the harfilesystem 481 * @return the hash code of the path. 482 */ 483 public static int getHarHash(Path p) { 484 return (p.toString().hashCode() & 0x7fffffff); 485 } 486 487 static class Store { 488 public Store(long begin, long end) { 489 this.begin = begin; 490 this.end = end; 491 } 492 public long begin; 493 public long end; 494 } 495 496 /** 497 * Get filestatuses of all the children of a given directory. This just reads 498 * through index file and reads line by line to get all statuses for children 499 * of a directory. Its a brute force way of getting all such filestatuses 500 * 501 * @param parent 502 * the parent path directory 503 * @param statuses 504 * the list to add the children filestatuses to 505 */ 506 private void fileStatusesInIndex(HarStatus parent, List<FileStatus> statuses) 507 throws IOException { 508 String parentString = parent.getName(); 509 if (!parentString.endsWith(Path.SEPARATOR)){ 510 parentString += Path.SEPARATOR; 511 } 512 Path harPath = new Path(parentString); 513 int harlen = harPath.depth(); 514 final Map<String, FileStatus> cache = new TreeMap<String, FileStatus>(); 515 516 for (HarStatus hstatus : metadata.archive.values()) { 517 String child = hstatus.getName(); 518 if ((child.startsWith(parentString))) { 519 Path thisPath = new Path(child); 520 if (thisPath.depth() == harlen + 1) { 521 statuses.add(toFileStatus(hstatus, cache)); 522 } 523 } 524 } 525 } 526 527 /** 528 * Combine the status stored in the index and the underlying status. 529 * @param h status stored in the index 530 * @param cache caching the underlying file statuses 531 * @return the combined file status 532 * @throws IOException 533 */ 534 private FileStatus toFileStatus(HarStatus h, 535 Map<String, FileStatus> cache) throws IOException { 536 FileStatus underlying = null; 537 if (cache != null) { 538 underlying = cache.get(h.partName); 539 } 540 if (underlying == null) { 541 final Path p = h.isDir? archivePath: new Path(archivePath, h.partName); 542 underlying = fs.getFileStatus(p); 543 if (cache != null) { 544 cache.put(h.partName, underlying); 545 } 546 } 547 548 long modTime = 0; 549 int version = metadata.getVersion(); 550 if (version < 3) { 551 modTime = underlying.getModificationTime(); 552 } else if (version == 3) { 553 modTime = h.getModificationTime(); 554 } 555 556 return new FileStatus( 557 h.isDir()? 0L: h.getLength(), 558 h.isDir(), 559 underlying.getReplication(), 560 underlying.getBlockSize(), 561 modTime, 562 underlying.getAccessTime(), 563 underlying.getPermission(), 564 underlying.getOwner(), 565 underlying.getGroup(), 566 makeRelative(this.uri.getPath(), new Path(h.name))); 567 } 568 569 // a single line parser for hadoop archives status 570 // stored in a single line in the index files 571 // the format is of the form 572 // filename "dir"/"file" partFileName startIndex length 573 // <space separated children> 574 private class HarStatus { 575 boolean isDir; 576 String name; 577 List<String> children; 578 String partName; 579 long startIndex; 580 long length; 581 long modificationTime = 0; 582 583 public HarStatus(String harString) throws UnsupportedEncodingException { 584 String[] splits = harString.split(" "); 585 this.name = decodeFileName(splits[0]); 586 this.isDir = "dir".equals(splits[1]); 587 // this is equal to "none" if its a directory 588 this.partName = splits[2]; 589 this.startIndex = Long.parseLong(splits[3]); 590 this.length = Long.parseLong(splits[4]); 591 592 int version = metadata.getVersion(); 593 String[] propSplits = null; 594 // propSplits is used to retrieve the metainformation that Har versions 595 // 1 & 2 missed (modification time, permission, owner group). 596 // These fields are stored in an encoded string placed in different 597 // locations depending on whether it's a file or directory entry. 598 // If it's a directory, the string will be placed at the partName 599 // location (directories have no partName because they don't have data 600 // to be stored). This is done because the number of fields in a 601 // directory entry is unbounded (all children are listed at the end) 602 // If it's a file, the string will be the last field. 603 if (isDir) { 604 if (version == 3){ 605 propSplits = decodeString(this.partName).split(" "); 606 } 607 children = new ArrayList<String>(); 608 for (int i = 5; i < splits.length; i++) { 609 children.add(decodeFileName(splits[i])); 610 } 611 } else if (version == 3) { 612 propSplits = decodeString(splits[5]).split(" "); 613 } 614 615 if (propSplits != null && propSplits.length >= 4) { 616 modificationTime = Long.parseLong(propSplits[0]); 617 // the fields below are stored in the file but are currently not used 618 // by HarFileSystem 619 // permission = new FsPermission(Short.parseShort(propSplits[1])); 620 // owner = decodeString(propSplits[2]); 621 // group = decodeString(propSplits[3]); 622 } 623 } 624 public boolean isDir() { 625 return isDir; 626 } 627 628 public String getName() { 629 return name; 630 } 631 public String getPartName() { 632 return partName; 633 } 634 public long getStartIndex() { 635 return startIndex; 636 } 637 public long getLength() { 638 return length; 639 } 640 public long getModificationTime() { 641 return modificationTime; 642 } 643 } 644 645 /** 646 * return the filestatus of files in har archive. 647 * The permission returned are that of the archive 648 * index files. The permissions are not persisted 649 * while creating a hadoop archive. 650 * @param f the path in har filesystem 651 * @return filestatus. 652 * @throws IOException 653 */ 654 @Override 655 public FileStatus getFileStatus(Path f) throws IOException { 656 HarStatus hstatus = getFileHarStatus(f); 657 return toFileStatus(hstatus, null); 658 } 659 660 private HarStatus getFileHarStatus(Path f) throws IOException { 661 // get the fs DataInputStream for the underlying file 662 // look up the index. 663 Path p = makeQualified(f); 664 Path harPath = getPathInHar(p); 665 if (harPath == null) { 666 throw new IOException("Invalid file name: " + f + " in " + uri); 667 } 668 HarStatus hstatus = metadata.archive.get(harPath); 669 if (hstatus == null) { 670 throw new FileNotFoundException("File: " + f + " does not exist in " + uri); 671 } 672 return hstatus; 673 } 674 675 /** 676 * @return null since no checksum algorithm is implemented. 677 */ 678 @Override 679 public FileChecksum getFileChecksum(Path f, long length) { 680 return null; 681 } 682 683 /** 684 * Returns a har input stream which fakes end of 685 * file. It reads the index files to get the part 686 * file name and the size and start of the file. 687 */ 688 @Override 689 public FSDataInputStream open(Path f, int bufferSize) throws IOException { 690 // get the fs DataInputStream for the underlying file 691 HarStatus hstatus = getFileHarStatus(f); 692 if (hstatus.isDir()) { 693 throw new FileNotFoundException(f + " : not a file in " + 694 archivePath); 695 } 696 return new HarFSDataInputStream(fs, new Path(archivePath, 697 hstatus.getPartName()), 698 hstatus.getStartIndex(), hstatus.getLength(), bufferSize); 699 } 700 701 /** 702 * Used for delegation token related functionality. Must delegate to 703 * underlying file system. 704 */ 705 @Override 706 public FileSystem[] getChildFileSystems() { 707 return new FileSystem[]{fs}; 708 } 709 710 @Override 711 public FSDataOutputStream create(Path f, FsPermission permission, 712 boolean overwrite, int bufferSize, short replication, long blockSize, 713 Progressable progress) throws IOException { 714 throw new IOException("Har: create not allowed."); 715 } 716 717 @Override 718 public FSDataOutputStream createNonRecursive(Path f, boolean overwrite, 719 int bufferSize, short replication, long blockSize, Progressable progress) 720 throws IOException { 721 throw new IOException("Har: create not allowed."); 722 } 723 724 @Override 725 public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException { 726 throw new IOException("Har: append not allowed."); 727 } 728 729 @Override 730 public void close() throws IOException { 731 super.close(); 732 if (fs != null) { 733 try { 734 fs.close(); 735 } catch(IOException ie) { 736 //this might already be closed 737 // ignore 738 } 739 } 740 } 741 742 /** 743 * Not implemented. 744 */ 745 @Override 746 public boolean setReplication(Path src, short replication) throws IOException{ 747 throw new IOException("Har: setReplication not allowed"); 748 } 749 750 @Override 751 public boolean rename(Path src, Path dst) throws IOException { 752 throw new IOException("Har: rename not allowed"); 753 } 754 755 @Override 756 public FSDataOutputStream append(Path f) throws IOException { 757 throw new IOException("Har: append not allowed"); 758 } 759 760 /** 761 * Not implemented. 762 */ 763 @Override 764 public boolean truncate(Path f, long newLength) throws IOException { 765 throw new IOException("Har: truncate not allowed"); 766 } 767 768 /** 769 * Not implemented. 770 */ 771 @Override 772 public boolean delete(Path f, boolean recursive) throws IOException { 773 throw new IOException("Har: delete not allowed"); 774 } 775 776 /** 777 * liststatus returns the children of a directory 778 * after looking up the index files. 779 */ 780 @Override 781 public FileStatus[] listStatus(Path f) throws IOException { 782 //need to see if the file is an index in file 783 //get the filestatus of the archive directory 784 // we will create fake filestatuses to return 785 // to the client 786 List<FileStatus> statuses = new ArrayList<FileStatus>(); 787 Path tmpPath = makeQualified(f); 788 Path harPath = getPathInHar(tmpPath); 789 HarStatus hstatus = metadata.archive.get(harPath); 790 if (hstatus == null) { 791 throw new FileNotFoundException("File " + f + " not found in " + archivePath); 792 } 793 if (hstatus.isDir()) { 794 fileStatusesInIndex(hstatus, statuses); 795 } else { 796 statuses.add(toFileStatus(hstatus, null)); 797 } 798 799 return statuses.toArray(new FileStatus[statuses.size()]); 800 } 801 802 /** 803 * return the top level archive path. 804 */ 805 @Override 806 public Path getHomeDirectory() { 807 return new Path(uri.toString()); 808 } 809 810 @Override 811 public void setWorkingDirectory(Path newDir) { 812 //does nothing. 813 } 814 815 /** 816 * not implemented. 817 */ 818 @Override 819 public boolean mkdirs(Path f, FsPermission permission) throws IOException { 820 throw new IOException("Har: mkdirs not allowed"); 821 } 822 823 /** 824 * not implemented. 825 */ 826 @Override 827 public void copyFromLocalFile(boolean delSrc, boolean overwrite, 828 Path src, Path dst) throws IOException { 829 throw new IOException("Har: copyfromlocalfile not allowed"); 830 } 831 832 @Override 833 public void copyFromLocalFile(boolean delSrc, boolean overwrite, 834 Path[] srcs, Path dst) throws IOException { 835 throw new IOException("Har: copyfromlocalfile not allowed"); 836 } 837 838 /** 839 * copies the file in the har filesystem to a local file. 840 */ 841 @Override 842 public void copyToLocalFile(boolean delSrc, Path src, Path dst) 843 throws IOException { 844 FileUtil.copy(this, src, getLocal(getConf()), dst, false, getConf()); 845 } 846 847 /** 848 * not implemented. 849 */ 850 @Override 851 public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile) 852 throws IOException { 853 throw new IOException("Har: startLocalOutput not allowed"); 854 } 855 856 /** 857 * not implemented. 858 */ 859 @Override 860 public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile) 861 throws IOException { 862 throw new IOException("Har: completeLocalOutput not allowed"); 863 } 864 865 /** 866 * not implemented. 867 */ 868 @Override 869 public void setOwner(Path p, String username, String groupname) 870 throws IOException { 871 throw new IOException("Har: setowner not allowed"); 872 } 873 874 @Override 875 public void setTimes(Path p, long mtime, long atime) throws IOException { 876 throw new IOException("Har: setTimes not allowed"); 877 } 878 879 /** 880 * Not implemented. 881 */ 882 @Override 883 public void setPermission(Path p, FsPermission permission) 884 throws IOException { 885 throw new IOException("Har: setPermission not allowed"); 886 } 887 888 /** 889 * Hadoop archives input stream. This input stream fakes EOF 890 * since archive files are part of bigger part files. 891 */ 892 private static class HarFSDataInputStream extends FSDataInputStream { 893 /** 894 * Create an input stream that fakes all the reads/positions/seeking. 895 */ 896 private static class HarFsInputStream extends FSInputStream 897 implements CanSetDropBehind, CanSetReadahead { 898 private long position, start, end; 899 //The underlying data input stream that the 900 // underlying filesystem will return. 901 private final FSDataInputStream underLyingStream; 902 //one byte buffer 903 private final byte[] oneBytebuff = new byte[1]; 904 905 HarFsInputStream(FileSystem fs, Path path, long start, 906 long length, int bufferSize) throws IOException { 907 if (length < 0) { 908 throw new IllegalArgumentException("Negative length ["+length+"]"); 909 } 910 underLyingStream = fs.open(path, bufferSize); 911 underLyingStream.seek(start); 912 // the start of this file in the part file 913 this.start = start; 914 // the position pointer in the part file 915 this.position = start; 916 // the end pointer in the part file 917 this.end = start + length; 918 } 919 920 @Override 921 public synchronized int available() throws IOException { 922 long remaining = end - underLyingStream.getPos(); 923 if (remaining > Integer.MAX_VALUE) { 924 return Integer.MAX_VALUE; 925 } 926 return (int) remaining; 927 } 928 929 @Override 930 public synchronized void close() throws IOException { 931 underLyingStream.close(); 932 super.close(); 933 } 934 935 //not implemented 936 @Override 937 public void mark(int readLimit) { 938 // do nothing 939 } 940 941 /** 942 * reset is not implemented 943 */ 944 @Override 945 public void reset() throws IOException { 946 throw new IOException("reset not implemented."); 947 } 948 949 @Override 950 public synchronized int read() throws IOException { 951 int ret = read(oneBytebuff, 0, 1); 952 return (ret <= 0) ? -1: (oneBytebuff[0] & 0xff); 953 } 954 955 // NB: currently this method actually never executed becusae 956 // java.io.DataInputStream.read(byte[]) directly delegates to 957 // method java.io.InputStream.read(byte[], int, int). 958 // However, potentially it can be invoked, so leave it intact for now. 959 @Override 960 public synchronized int read(byte[] b) throws IOException { 961 final int ret = read(b, 0, b.length); 962 return ret; 963 } 964 965 /** 966 * 967 */ 968 @Override 969 public synchronized int read(byte[] b, int offset, int len) 970 throws IOException { 971 if (len == 0) { 972 return 0; 973 } 974 int newlen = len; 975 int ret = -1; 976 if (position + len > end) { 977 newlen = (int) (end - position); 978 } 979 // end case 980 if (newlen == 0) 981 return ret; 982 ret = underLyingStream.read(b, offset, newlen); 983 position += ret; 984 return ret; 985 } 986 987 @Override 988 public synchronized long skip(long n) throws IOException { 989 long tmpN = n; 990 if (tmpN > 0) { 991 final long actualRemaining = end - position; 992 if (tmpN > actualRemaining) { 993 tmpN = actualRemaining; 994 } 995 underLyingStream.seek(tmpN + position); 996 position += tmpN; 997 return tmpN; 998 } 999 // NB: the contract is described in java.io.InputStream.skip(long): 1000 // this method returns the number of bytes actually skipped, so, 1001 // the return value should never be negative. 1002 return 0; 1003 } 1004 1005 @Override 1006 public synchronized long getPos() throws IOException { 1007 return (position - start); 1008 } 1009 1010 @Override 1011 public synchronized void seek(final long pos) throws IOException { 1012 validatePosition(pos); 1013 position = start + pos; 1014 underLyingStream.seek(position); 1015 } 1016 1017 private void validatePosition(final long pos) throws IOException { 1018 if (pos < 0) { 1019 throw new IOException("Negative position: "+pos); 1020 } 1021 final long length = end - start; 1022 if (pos > length) { 1023 throw new IOException("Position behind the end " + 1024 "of the stream (length = "+length+"): " + pos); 1025 } 1026 } 1027 1028 @Override 1029 public boolean seekToNewSource(long targetPos) throws IOException { 1030 // do not need to implement this 1031 // hdfs in itself does seektonewsource 1032 // while reading. 1033 return false; 1034 } 1035 1036 /** 1037 * implementing position readable. 1038 */ 1039 @Override 1040 public int read(long pos, byte[] b, int offset, int length) 1041 throws IOException { 1042 int nlength = length; 1043 if (start + nlength + pos > end) { 1044 // length corrected to the real remaining length: 1045 nlength = (int) (end - start - pos); 1046 } 1047 if (nlength <= 0) { 1048 // EOS: 1049 return -1; 1050 } 1051 return underLyingStream.read(pos + start , b, offset, nlength); 1052 } 1053 1054 /** 1055 * position readable again. 1056 */ 1057 @Override 1058 public void readFully(long pos, byte[] b, int offset, int length) 1059 throws IOException { 1060 validatePositionedReadArgs(pos, b, offset, length); 1061 if (length == 0) { 1062 return; 1063 } 1064 if (start + length + pos > end) { 1065 throw new EOFException("Not enough bytes to read."); 1066 } 1067 underLyingStream.readFully(pos + start, b, offset, length); 1068 } 1069 1070 @Override 1071 public void setReadahead(Long readahead) throws IOException { 1072 underLyingStream.setReadahead(readahead); 1073 } 1074 1075 @Override 1076 public void setDropBehind(Boolean dropBehind) throws IOException { 1077 underLyingStream.setDropBehind(dropBehind); 1078 } 1079 } 1080 1081 /** 1082 * constructors for har input stream. 1083 * @param fs the underlying filesystem 1084 * @param p The path in the underlying filesystem 1085 * @param start the start position in the part file 1086 * @param length the length of valid data in the part file 1087 * @param bufsize the buffer size 1088 * @throws IOException 1089 */ 1090 public HarFSDataInputStream(FileSystem fs, Path p, long start, 1091 long length, int bufsize) throws IOException { 1092 super(new HarFsInputStream(fs, p, start, length, bufsize)); 1093 } 1094 } 1095 1096 private class HarMetaData { 1097 private FileSystem fs; 1098 private int version; 1099 // the masterIndex of the archive 1100 private Path masterIndexPath; 1101 // the index file 1102 private Path archiveIndexPath; 1103 1104 private long masterIndexTimestamp; 1105 private long archiveIndexTimestamp; 1106 1107 List<Store> stores = new ArrayList<Store>(); 1108 Map<Path, HarStatus> archive = new HashMap<Path, HarStatus>(); 1109 private Map<Path, FileStatus> partFileStatuses = new HashMap<Path, FileStatus>(); 1110 1111 public HarMetaData(FileSystem fs, Path masterIndexPath, Path archiveIndexPath) { 1112 this.fs = fs; 1113 this.masterIndexPath = masterIndexPath; 1114 this.archiveIndexPath = archiveIndexPath; 1115 } 1116 1117 public FileStatus getPartFileStatus(Path partPath) throws IOException { 1118 FileStatus status; 1119 status = partFileStatuses.get(partPath); 1120 if (status == null) { 1121 status = fs.getFileStatus(partPath); 1122 partFileStatuses.put(partPath, status); 1123 } 1124 return status; 1125 } 1126 1127 public long getMasterIndexTimestamp() { 1128 return masterIndexTimestamp; 1129 } 1130 1131 public long getArchiveIndexTimestamp() { 1132 return archiveIndexTimestamp; 1133 } 1134 1135 private int getVersion() { 1136 return version; 1137 } 1138 1139 private void parseMetaData() throws IOException { 1140 Text line = new Text(); 1141 long read; 1142 FSDataInputStream in = null; 1143 LineReader lin = null; 1144 1145 try { 1146 in = fs.open(masterIndexPath); 1147 FileStatus masterStat = fs.getFileStatus(masterIndexPath); 1148 masterIndexTimestamp = masterStat.getModificationTime(); 1149 lin = new LineReader(in, getConf()); 1150 read = lin.readLine(line); 1151 1152 // the first line contains the version of the index file 1153 String versionLine = line.toString(); 1154 String[] arr = versionLine.split(" "); 1155 version = Integer.parseInt(arr[0]); 1156 // make it always backwards-compatible 1157 if (this.version > HarFileSystem.VERSION) { 1158 throw new IOException("Invalid version " + 1159 this.version + " expected " + HarFileSystem.VERSION); 1160 } 1161 1162 // each line contains a hashcode range and the index file name 1163 String[] readStr; 1164 while(read < masterStat.getLen()) { 1165 int b = lin.readLine(line); 1166 read += b; 1167 readStr = line.toString().split(" "); 1168 stores.add(new Store(Long.parseLong(readStr[2]), 1169 Long.parseLong(readStr[3]))); 1170 line.clear(); 1171 } 1172 } catch (IOException ioe) { 1173 LOG.warn("Encountered exception ", ioe); 1174 throw ioe; 1175 } finally { 1176 IOUtils.cleanup(LOG, lin, in); 1177 } 1178 1179 FSDataInputStream aIn = fs.open(archiveIndexPath); 1180 try { 1181 FileStatus archiveStat = fs.getFileStatus(archiveIndexPath); 1182 archiveIndexTimestamp = archiveStat.getModificationTime(); 1183 LineReader aLin; 1184 1185 // now start reading the real index file 1186 for (Store s: stores) { 1187 read = 0; 1188 aIn.seek(s.begin); 1189 aLin = new LineReader(aIn, getConf()); 1190 while (read + s.begin < s.end) { 1191 int tmp = aLin.readLine(line); 1192 read += tmp; 1193 String lineFeed = line.toString(); 1194 String[] parsed = lineFeed.split(" "); 1195 parsed[0] = decodeFileName(parsed[0]); 1196 archive.put(new Path(parsed[0]), new HarStatus(lineFeed)); 1197 line.clear(); 1198 } 1199 } 1200 } finally { 1201 IOUtils.cleanup(LOG, aIn); 1202 } 1203 } 1204 } 1205 1206 /* 1207 * testing purposes only: 1208 */ 1209 HarMetaData getMetadata() { 1210 return metadata; 1211 } 1212 1213 private static class LruCache<K, V> extends LinkedHashMap<K, V> { 1214 private final int MAX_ENTRIES; 1215 1216 public LruCache(int maxEntries) { 1217 super(maxEntries + 1, 1.0f, true); 1218 MAX_ENTRIES = maxEntries; 1219 } 1220 1221 @Override 1222 protected boolean removeEldestEntry(Map.Entry<K, V> eldest) { 1223 return size() > MAX_ENTRIES; 1224 } 1225 } 1226 1227 @SuppressWarnings("deprecation") 1228 @Override 1229 public FsServerDefaults getServerDefaults() throws IOException { 1230 return fs.getServerDefaults(); 1231 } 1232 1233 @Override 1234 public FsServerDefaults getServerDefaults(Path f) throws IOException { 1235 return fs.getServerDefaults(f); 1236 } 1237 1238 @Override 1239 public long getUsed() throws IOException{ 1240 return fs.getUsed(); 1241 } 1242 1243 /** Return the total size of all files from a specified path.*/ 1244 @Override 1245 public long getUsed(Path path) throws IOException { 1246 return fs.getUsed(path); 1247 } 1248 1249 @SuppressWarnings("deprecation") 1250 @Override 1251 public long getDefaultBlockSize() { 1252 return fs.getDefaultBlockSize(); 1253 } 1254 1255 @SuppressWarnings("deprecation") 1256 @Override 1257 public long getDefaultBlockSize(Path f) { 1258 return fs.getDefaultBlockSize(f); 1259 } 1260 1261 @SuppressWarnings("deprecation") 1262 @Override 1263 public short getDefaultReplication() { 1264 return fs.getDefaultReplication(); 1265 } 1266 1267 @Override 1268 public short getDefaultReplication(Path f) { 1269 return fs.getDefaultReplication(f); 1270 } 1271}