001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.fs.azure; 020 021import java.io.DataInputStream; 022import java.io.DataOutputStream; 023import java.io.EOFException; 024import java.io.FileNotFoundException; 025import java.io.IOException; 026import java.io.InputStream; 027import java.io.OutputStream; 028import java.net.URI; 029import java.net.URISyntaxException; 030import java.nio.charset.Charset; 031import java.text.SimpleDateFormat; 032import java.util.ArrayList; 033import java.util.Date; 034import java.util.EnumSet; 035import java.util.Set; 036import java.util.TimeZone; 037import java.util.TreeSet; 038import java.util.UUID; 039import java.util.concurrent.atomic.AtomicInteger; 040import java.util.regex.Matcher; 041import java.util.regex.Pattern; 042 043import org.apache.commons.lang.StringUtils; 044import org.apache.hadoop.classification.InterfaceAudience; 045import org.apache.hadoop.classification.InterfaceStability; 046import org.apache.hadoop.conf.Configuration; 047import org.apache.hadoop.fs.BlockLocation; 048import org.apache.hadoop.fs.BufferedFSInputStream; 049import org.apache.hadoop.fs.CreateFlag; 050import org.apache.hadoop.fs.FSDataInputStream; 051import org.apache.hadoop.fs.FSDataOutputStream; 052import org.apache.hadoop.fs.FSExceptionMessages; 053import org.apache.hadoop.fs.FSInputStream; 054import org.apache.hadoop.fs.FileAlreadyExistsException; 055import org.apache.hadoop.fs.FileStatus; 056import org.apache.hadoop.fs.FileSystem; 057import org.apache.hadoop.fs.Path; 058import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation; 059import org.apache.hadoop.fs.azure.metrics.AzureFileSystemMetricsSystem; 060import org.apache.hadoop.fs.permission.FsPermission; 061import org.apache.hadoop.fs.permission.PermissionStatus; 062import org.apache.hadoop.fs.azure.AzureException; 063import org.apache.hadoop.security.UserGroupInformation; 064import org.apache.hadoop.util.Progressable; 065import org.slf4j.Logger; 066import org.slf4j.LoggerFactory; 067import org.codehaus.jackson.JsonNode; 068import org.codehaus.jackson.JsonParseException; 069import org.codehaus.jackson.JsonParser; 070import org.codehaus.jackson.map.JsonMappingException; 071import org.codehaus.jackson.map.ObjectMapper; 072 073import com.google.common.annotations.VisibleForTesting; 074import com.microsoft.azure.storage.StorageException; 075 076 077import org.apache.hadoop.io.IOUtils; 078 079/** 080 * A {@link FileSystem} for reading and writing files stored on <a 081 * href="http://store.azure.com/">Windows Azure</a>. This implementation is 082 * blob-based and stores files on Azure in their native form so they can be read 083 * by other Azure tools. 084 */ 085@InterfaceAudience.Public 086@InterfaceStability.Stable 087public class NativeAzureFileSystem extends FileSystem { 088 private static final int USER_WX_PERMISION = 0300; 089 /** 090 * A description of a folder rename operation, including the source and 091 * destination keys, and descriptions of the files in the source folder. 092 */ 093 public static class FolderRenamePending { 094 private SelfRenewingLease folderLease; 095 private String srcKey; 096 private String dstKey; 097 private FileMetadata[] fileMetadata = null; // descriptions of source files 098 private ArrayList<String> fileStrings = null; 099 private NativeAzureFileSystem fs; 100 private static final int MAX_RENAME_PENDING_FILE_SIZE = 10000000; 101 private static final int FORMATTING_BUFFER = 10000; 102 private boolean committed; 103 public static final String SUFFIX = "-RenamePending.json"; 104 105 // Prepare in-memory information needed to do or redo a folder rename. 106 public FolderRenamePending(String srcKey, String dstKey, SelfRenewingLease lease, 107 NativeAzureFileSystem fs) throws IOException { 108 this.srcKey = srcKey; 109 this.dstKey = dstKey; 110 this.folderLease = lease; 111 this.fs = fs; 112 ArrayList<FileMetadata> fileMetadataList = new ArrayList<FileMetadata>(); 113 114 // List all the files in the folder. 115 String priorLastKey = null; 116 do { 117 PartialListing listing = fs.getStoreInterface().listAll(srcKey, AZURE_LIST_ALL, 118 AZURE_UNBOUNDED_DEPTH, priorLastKey); 119 for(FileMetadata file : listing.getFiles()) { 120 fileMetadataList.add(file); 121 } 122 priorLastKey = listing.getPriorLastKey(); 123 } while (priorLastKey != null); 124 fileMetadata = fileMetadataList.toArray(new FileMetadata[fileMetadataList.size()]); 125 this.committed = true; 126 } 127 128 // Prepare in-memory information needed to do or redo folder rename from 129 // a -RenamePending.json file read from storage. This constructor is to use during 130 // redo processing. 131 public FolderRenamePending(Path redoFile, NativeAzureFileSystem fs) 132 throws IllegalArgumentException, IOException { 133 134 this.fs = fs; 135 136 // open redo file 137 Path f = redoFile; 138 FSDataInputStream input = fs.open(f); 139 byte[] bytes = new byte[MAX_RENAME_PENDING_FILE_SIZE]; 140 int l = input.read(bytes); 141 if (l <= 0) { 142 // Jira HADOOP-12678 -Handle empty rename pending metadata file during 143 // atomic rename in redo path. If during renamepending file is created 144 // but not written yet, then this means that rename operation 145 // has not started yet. So we should delete rename pending metadata file. 146 LOG.error("Deleting empty rename pending file " 147 + redoFile + " -- no data available"); 148 deleteRenamePendingFile(fs, redoFile); 149 return; 150 } 151 if (l == MAX_RENAME_PENDING_FILE_SIZE) { 152 throw new IOException( 153 "Error reading pending rename file contents -- " 154 + "maximum file size exceeded"); 155 } 156 String contents = new String(bytes, 0, l, Charset.forName("UTF-8")); 157 158 // parse the JSON 159 ObjectMapper objMapper = new ObjectMapper(); 160 objMapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true); 161 JsonNode json = null; 162 try { 163 json = objMapper.readValue(contents, JsonNode.class); 164 this.committed = true; 165 } catch (JsonMappingException e) { 166 167 // The -RedoPending.json file is corrupted, so we assume it was 168 // not completely written 169 // and the redo operation did not commit. 170 this.committed = false; 171 } catch (JsonParseException e) { 172 this.committed = false; 173 } catch (IOException e) { 174 this.committed = false; 175 } 176 177 if (!this.committed) { 178 LOG.error("Deleting corruped rename pending file {} \n {}", 179 redoFile, contents); 180 181 // delete the -RenamePending.json file 182 deleteRenamePendingFile(fs, redoFile); 183 return; 184 } 185 186 // initialize this object's fields 187 ArrayList<String> fileStrList = new ArrayList<String>(); 188 JsonNode oldFolderName = json.get("OldFolderName"); 189 JsonNode newFolderName = json.get("NewFolderName"); 190 if (oldFolderName == null || newFolderName == null) { 191 this.committed = false; 192 } else { 193 this.srcKey = oldFolderName.getTextValue(); 194 this.dstKey = newFolderName.getTextValue(); 195 if (this.srcKey == null || this.dstKey == null) { 196 this.committed = false; 197 } else { 198 JsonNode fileList = json.get("FileList"); 199 if (fileList == null) { 200 this.committed = false; 201 } else { 202 for (int i = 0; i < fileList.size(); i++) { 203 fileStrList.add(fileList.get(i).getTextValue()); 204 } 205 } 206 } 207 } 208 this.fileStrings = fileStrList; 209 } 210 211 public FileMetadata[] getFiles() { 212 return fileMetadata; 213 } 214 215 public SelfRenewingLease getFolderLease() { 216 return folderLease; 217 } 218 219 /** 220 * Deletes rename pending metadata file 221 * @param fs -- the file system 222 * @param redoFile - rename pending metadata file path 223 * @throws IOException - If deletion fails 224 */ 225 @VisibleForTesting 226 void deleteRenamePendingFile(FileSystem fs, Path redoFile) 227 throws IOException { 228 try { 229 fs.delete(redoFile, false); 230 } catch (IOException e) { 231 // If the rename metadata was not found then somebody probably 232 // raced with us and finished the delete first 233 Throwable t = e.getCause(); 234 if (t != null && t instanceof StorageException 235 && "BlobNotFound".equals(((StorageException) t).getErrorCode())) { 236 LOG.warn("rename pending file " + redoFile + " is already deleted"); 237 } else { 238 throw e; 239 } 240 } 241 } 242 243 /** 244 * Write to disk the information needed to redo folder rename, 245 * in JSON format. The file name will be 246 * {@code wasb://<sourceFolderPrefix>/folderName-RenamePending.json} 247 * The file format will be: 248 * <pre>{@code 249 * { 250 * FormatVersion: "1.0", 251 * OperationTime: "<YYYY-MM-DD HH:MM:SS.MMM>", 252 * OldFolderName: "<key>", 253 * NewFolderName: "<key>", 254 * FileList: [ <string> , <string> , ... ] 255 * } 256 * 257 * Here's a sample: 258 * { 259 * FormatVersion: "1.0", 260 * OperationUTCTime: "2014-07-01 23:50:35.572", 261 * OldFolderName: "user/ehans/folderToRename", 262 * NewFolderName: "user/ehans/renamedFolder", 263 * FileList: [ 264 * "innerFile", 265 * "innerFile2" 266 * ] 267 * } }</pre> 268 * @throws IOException 269 */ 270 public void writeFile(FileSystem fs) throws IOException { 271 Path path = getRenamePendingFilePath(); 272 LOG.debug("Preparing to write atomic rename state to {}", path.toString()); 273 OutputStream output = null; 274 275 String contents = makeRenamePendingFileContents(); 276 277 // Write file. 278 try { 279 output = fs.create(path); 280 output.write(contents.getBytes(Charset.forName("UTF-8"))); 281 } catch (IOException e) { 282 throw new IOException("Unable to write RenamePending file for folder rename from " 283 + srcKey + " to " + dstKey, e); 284 } finally { 285 NativeAzureFileSystemHelper.cleanup(LOG, output); 286 } 287 } 288 289 /** 290 * Return the contents of the JSON file to represent the operations 291 * to be performed for a folder rename. 292 */ 293 public String makeRenamePendingFileContents() { 294 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); 295 sdf.setTimeZone(TimeZone.getTimeZone("UTC")); 296 String time = sdf.format(new Date()); 297 298 // Make file list string 299 StringBuilder builder = new StringBuilder(); 300 builder.append("[\n"); 301 for (int i = 0; i != fileMetadata.length; i++) { 302 if (i > 0) { 303 builder.append(",\n"); 304 } 305 builder.append(" "); 306 String noPrefix = StringUtils.removeStart(fileMetadata[i].getKey(), srcKey + "/"); 307 308 // Quote string file names, escaping any possible " characters or other 309 // necessary characters in the name. 310 builder.append(quote(noPrefix)); 311 if (builder.length() >= 312 MAX_RENAME_PENDING_FILE_SIZE - FORMATTING_BUFFER) { 313 314 // Give up now to avoid using too much memory. 315 LOG.error("Internal error: Exceeded maximum rename pending file size of {} bytes.", 316 MAX_RENAME_PENDING_FILE_SIZE); 317 318 // return some bad JSON with an error message to make it human readable 319 return "exceeded maximum rename pending file size"; 320 } 321 } 322 builder.append("\n ]"); 323 String fileList = builder.toString(); 324 325 // Make file contents as a string. Again, quote file names, escaping 326 // characters as appropriate. 327 String contents = "{\n" 328 + " FormatVersion: \"1.0\",\n" 329 + " OperationUTCTime: \"" + time + "\",\n" 330 + " OldFolderName: " + quote(srcKey) + ",\n" 331 + " NewFolderName: " + quote(dstKey) + ",\n" 332 + " FileList: " + fileList + "\n" 333 + "}\n"; 334 335 return contents; 336 } 337 338 /** 339 * This is an exact copy of org.codehaus.jettison.json.JSONObject.quote 340 * method. 341 * 342 * Produce a string in double quotes with backslash sequences in all the 343 * right places. A backslash will be inserted within </, allowing JSON 344 * text to be delivered in HTML. In JSON text, a string cannot contain a 345 * control character or an unescaped quote or backslash. 346 * @param string A String 347 * @return A String correctly formatted for insertion in a JSON text. 348 */ 349 private String quote(String string) { 350 if (string == null || string.length() == 0) { 351 return "\"\""; 352 } 353 354 char c = 0; 355 int i; 356 int len = string.length(); 357 StringBuilder sb = new StringBuilder(len + 4); 358 String t; 359 360 sb.append('"'); 361 for (i = 0; i < len; i += 1) { 362 c = string.charAt(i); 363 switch (c) { 364 case '\\': 365 case '"': 366 sb.append('\\'); 367 sb.append(c); 368 break; 369 case '/': 370 sb.append('\\'); 371 sb.append(c); 372 break; 373 case '\b': 374 sb.append("\\b"); 375 break; 376 case '\t': 377 sb.append("\\t"); 378 break; 379 case '\n': 380 sb.append("\\n"); 381 break; 382 case '\f': 383 sb.append("\\f"); 384 break; 385 case '\r': 386 sb.append("\\r"); 387 break; 388 default: 389 if (c < ' ') { 390 t = "000" + Integer.toHexString(c); 391 sb.append("\\u" + t.substring(t.length() - 4)); 392 } else { 393 sb.append(c); 394 } 395 } 396 } 397 sb.append('"'); 398 return sb.toString(); 399 } 400 401 public String getSrcKey() { 402 return srcKey; 403 } 404 405 public String getDstKey() { 406 return dstKey; 407 } 408 409 public FileMetadata getSourceMetadata() throws IOException { 410 return fs.getStoreInterface().retrieveMetadata(srcKey); 411 } 412 413 /** 414 * Execute a folder rename. This is the execution path followed 415 * when everything is working normally. See redo() for the alternate 416 * execution path for the case where we're recovering from a folder rename 417 * failure. 418 * @throws IOException 419 */ 420 public void execute() throws IOException { 421 422 for (FileMetadata file : this.getFiles()) { 423 424 // Rename all materialized entries under the folder to point to the 425 // final destination. 426 if (file.getBlobMaterialization() == BlobMaterialization.Explicit) { 427 String srcName = file.getKey(); 428 String suffix = srcName.substring((this.getSrcKey()).length()); 429 String dstName = this.getDstKey() + suffix; 430 431 // Rename gets exclusive access (via a lease) for files 432 // designated for atomic rename. 433 // The main use case is for HBase write-ahead log (WAL) and data 434 // folder processing correctness. See the rename code for details. 435 boolean acquireLease = fs.getStoreInterface().isAtomicRenameKey(srcName); 436 fs.getStoreInterface().rename(srcName, dstName, acquireLease, null); 437 } 438 } 439 440 // Rename the source folder 0-byte root file itself. 441 FileMetadata srcMetadata2 = this.getSourceMetadata(); 442 if (srcMetadata2.getBlobMaterialization() == 443 BlobMaterialization.Explicit) { 444 445 // It already has a lease on it from the "prepare" phase so there's no 446 // need to get one now. Pass in existing lease to allow file delete. 447 fs.getStoreInterface().rename(this.getSrcKey(), this.getDstKey(), 448 false, folderLease); 449 } 450 451 // Update the last-modified time of the parent folders of both source and 452 // destination. 453 fs.updateParentFolderLastModifiedTime(srcKey); 454 fs.updateParentFolderLastModifiedTime(dstKey); 455 } 456 457 /** Clean up after execution of rename. 458 * @throws IOException */ 459 public void cleanup() throws IOException { 460 461 if (fs.getStoreInterface().isAtomicRenameKey(srcKey)) { 462 463 // Remove RenamePending file 464 fs.delete(getRenamePendingFilePath(), false); 465 466 // Freeing source folder lease is not necessary since the source 467 // folder file was deleted. 468 } 469 } 470 471 private Path getRenamePendingFilePath() { 472 String fileName = srcKey + SUFFIX; 473 Path fileNamePath = keyToPath(fileName); 474 Path path = fs.makeAbsolute(fileNamePath); 475 return path; 476 } 477 478 /** 479 * Recover from a folder rename failure by redoing the intended work, 480 * as recorded in the -RenamePending.json file. 481 * 482 * @throws IOException 483 */ 484 public void redo() throws IOException { 485 486 if (!committed) { 487 488 // Nothing to do. The -RedoPending.json file should have already been 489 // deleted. 490 return; 491 } 492 493 // Try to get a lease on source folder to block concurrent access to it. 494 // It may fail if the folder is already gone. We don't check if the 495 // source exists explicitly because that could recursively trigger redo 496 // and give an infinite recursion. 497 SelfRenewingLease lease = null; 498 boolean sourceFolderGone = false; 499 try { 500 lease = fs.leaseSourceFolder(srcKey); 501 } catch (AzureException e) { 502 503 // If the source folder was not found then somebody probably 504 // raced with us and finished the rename first, or the 505 // first rename failed right before deleting the rename pending 506 // file. 507 String errorCode = ""; 508 try { 509 StorageException se = (StorageException) e.getCause(); 510 errorCode = se.getErrorCode(); 511 } catch (Exception e2) { 512 ; // do nothing -- could not get errorCode 513 } 514 if (errorCode.equals("BlobNotFound")) { 515 sourceFolderGone = true; 516 } else { 517 throw new IOException( 518 "Unexpected error when trying to lease source folder name during " 519 + "folder rename redo", 520 e); 521 } 522 } 523 524 if (!sourceFolderGone) { 525 // Make sure the target folder exists. 526 Path dst = fullPath(dstKey); 527 if (!fs.exists(dst)) { 528 fs.mkdirs(dst); 529 } 530 531 // For each file inside the folder to be renamed, 532 // make sure it has been renamed. 533 for(String fileName : fileStrings) { 534 finishSingleFileRename(fileName); 535 } 536 537 // Remove the source folder. Don't check explicitly if it exists, 538 // to avoid triggering redo recursively. 539 try { 540 fs.getStoreInterface().delete(srcKey, lease); 541 } catch (Exception e) { 542 LOG.info("Unable to delete source folder during folder rename redo. " 543 + "If the source folder is already gone, this is not an error " 544 + "condition. Continuing with redo.", e); 545 } 546 547 // Update the last-modified time of the parent folders of both source 548 // and destination. 549 fs.updateParentFolderLastModifiedTime(srcKey); 550 fs.updateParentFolderLastModifiedTime(dstKey); 551 } 552 553 // Remove the -RenamePending.json file. 554 fs.delete(getRenamePendingFilePath(), false); 555 } 556 557 // See if the source file is still there, and if it is, rename it. 558 private void finishSingleFileRename(String fileName) 559 throws IOException { 560 Path srcFile = fullPath(srcKey, fileName); 561 Path dstFile = fullPath(dstKey, fileName); 562 String srcName = fs.pathToKey(srcFile); 563 String dstName = fs.pathToKey(dstFile); 564 boolean srcExists = fs.getStoreInterface().explicitFileExists(srcName); 565 boolean dstExists = fs.getStoreInterface().explicitFileExists(dstName); 566 if(srcExists) { 567 // Rename gets exclusive access (via a lease) for HBase write-ahead log 568 // (WAL) file processing correctness. See the rename code for details. 569 fs.getStoreInterface().rename(srcName, dstName, true, null); 570 } else if (!srcExists && dstExists) { 571 // The rename already finished, so do nothing. 572 ; 573 } else { 574 throw new IOException( 575 "Attempting to complete rename of file " + srcKey + "/" + fileName 576 + " during folder rename redo, and file was not found in source " 577 + "or destination."); 578 } 579 } 580 581 // Return an absolute path for the specific fileName within the folder 582 // specified by folderKey. 583 private Path fullPath(String folderKey, String fileName) { 584 return new Path(new Path(fs.getUri()), "/" + folderKey + "/" + fileName); 585 } 586 587 private Path fullPath(String fileKey) { 588 return new Path(new Path(fs.getUri()), "/" + fileKey); 589 } 590 } 591 592 private static final String TRAILING_PERIOD_PLACEHOLDER = "[[.]]"; 593 private static final Pattern TRAILING_PERIOD_PLACEHOLDER_PATTERN = 594 Pattern.compile("\\[\\[\\.\\]\\](?=$|/)"); 595 private static final Pattern TRAILING_PERIOD_PATTERN = Pattern.compile("\\.(?=$|/)"); 596 597 @Override 598 public String getScheme() { 599 return "wasb"; 600 } 601 602 603 /** 604 * <p> 605 * A {@link FileSystem} for reading and writing files stored on <a 606 * href="http://store.azure.com/">Windows Azure</a>. This implementation is 607 * blob-based and stores files on Azure in their native form so they can be read 608 * by other Azure tools. This implementation uses HTTPS for secure network communication. 609 * </p> 610 */ 611 public static class Secure extends NativeAzureFileSystem { 612 @Override 613 public String getScheme() { 614 return "wasbs"; 615 } 616 } 617 618 public static final Logger LOG = LoggerFactory.getLogger(NativeAzureFileSystem.class); 619 620 static final String AZURE_BLOCK_SIZE_PROPERTY_NAME = "fs.azure.block.size"; 621 /** 622 * The time span in seconds before which we consider a temp blob to be 623 * dangling (not being actively uploaded to) and up for reclamation. 624 * 625 * So e.g. if this is 60, then any temporary blobs more than a minute old 626 * would be considered dangling. 627 */ 628 static final String AZURE_TEMP_EXPIRY_PROPERTY_NAME = "fs.azure.fsck.temp.expiry.seconds"; 629 private static final int AZURE_TEMP_EXPIRY_DEFAULT = 3600; 630 static final String PATH_DELIMITER = Path.SEPARATOR; 631 static final String AZURE_TEMP_FOLDER = "_$azuretmpfolder$"; 632 633 private static final int AZURE_LIST_ALL = -1; 634 private static final int AZURE_UNBOUNDED_DEPTH = -1; 635 636 private static final long MAX_AZURE_BLOCK_SIZE = 512 * 1024 * 1024L; 637 638 /** 639 * The configuration property that determines which group owns files created 640 * in WASB. 641 */ 642 private static final String AZURE_DEFAULT_GROUP_PROPERTY_NAME = "fs.azure.permissions.supergroup"; 643 /** 644 * The default value for fs.azure.permissions.supergroup. Chosen as the same 645 * default as DFS. 646 */ 647 static final String AZURE_DEFAULT_GROUP_DEFAULT = "supergroup"; 648 649 static final String AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME = 650 "fs.azure.block.location.impersonatedhost"; 651 private static final String AZURE_BLOCK_LOCATION_HOST_DEFAULT = 652 "localhost"; 653 static final String AZURE_RINGBUFFER_CAPACITY_PROPERTY_NAME = 654 "fs.azure.ring.buffer.capacity"; 655 static final String AZURE_OUTPUT_STREAM_BUFFER_SIZE_PROPERTY_NAME = 656 "fs.azure.output.stream.buffer.size"; 657 658 public static final String SKIP_AZURE_METRICS_PROPERTY_NAME = "fs.azure.skip.metrics"; 659 660 /* 661 * Property to enable Append API. 662 */ 663 public static final String APPEND_SUPPORT_ENABLE_PROPERTY_NAME = "fs.azure.enable.append.support"; 664 665 private class NativeAzureFsInputStream extends FSInputStream { 666 private InputStream in; 667 private final String key; 668 private long pos = 0; 669 private boolean closed = false; 670 private boolean isPageBlob; 671 672 // File length, valid only for streams over block blobs. 673 private long fileLength; 674 675 public NativeAzureFsInputStream(DataInputStream in, String key, long fileLength) { 676 this.in = in; 677 this.key = key; 678 this.isPageBlob = store.isPageBlobKey(key); 679 this.fileLength = fileLength; 680 } 681 682 /** 683 * Return the size of the remaining available bytes 684 * if the size is less than or equal to {@link Integer#MAX_VALUE}, 685 * otherwise, return {@link Integer#MAX_VALUE}. 686 * 687 * This is to match the behavior of DFSInputStream.available(), 688 * which some clients may rely on (HBase write-ahead log reading in 689 * particular). 690 */ 691 @Override 692 public synchronized int available() throws IOException { 693 if (isPageBlob) { 694 return in.available(); 695 } else { 696 if (closed) { 697 throw new IOException("Stream closed"); 698 } 699 final long remaining = this.fileLength - pos; 700 return remaining <= Integer.MAX_VALUE ? 701 (int) remaining : Integer.MAX_VALUE; 702 } 703 } 704 705 /* 706 * Reads the next byte of data from the input stream. The value byte is 707 * returned as an integer in the range 0 to 255. If no byte is available 708 * because the end of the stream has been reached, the value -1 is returned. 709 * This method blocks until input data is available, the end of the stream 710 * is detected, or an exception is thrown. 711 * 712 * @returns int An integer corresponding to the byte read. 713 */ 714 @Override 715 public synchronized int read() throws FileNotFoundException, IOException { 716 try { 717 int result = 0; 718 result = in.read(); 719 if (result != -1) { 720 pos++; 721 if (statistics != null) { 722 statistics.incrementBytesRead(1); 723 } 724 } 725 // Return to the caller with the result. 726 // 727 return result; 728 } catch(EOFException e) { 729 return -1; 730 } catch(IOException e) { 731 732 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e); 733 734 if (innerException instanceof StorageException) { 735 736 LOG.error("Encountered Storage Exception for read on Blob : {}" 737 + " Exception details: {} Error Code : {}", 738 key, e, ((StorageException) innerException).getErrorCode()); 739 740 if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 741 throw new FileNotFoundException(String.format("%s is not found", key)); 742 } 743 } 744 745 throw e; 746 } 747 } 748 749 /* 750 * Reads up to len bytes of data from the input stream into an array of 751 * bytes. An attempt is made to read as many as len bytes, but a smaller 752 * number may be read. The number of bytes actually read is returned as an 753 * integer. This method blocks until input data is available, end of file is 754 * detected, or an exception is thrown. If len is zero, then no bytes are 755 * read and 0 is returned; otherwise, there is an attempt to read at least 756 * one byte. If no byte is available because the stream is at end of file, 757 * the value -1 is returned; otherwise, at least one byte is read and stored 758 * into b. 759 * 760 * @param b -- the buffer into which data is read 761 * 762 * @param off -- the start offset in the array b at which data is written 763 * 764 * @param len -- the maximum number of bytes read 765 * 766 * @ returns int The total number of byes read into the buffer, or -1 if 767 * there is no more data because the end of stream is reached. 768 */ 769 @Override 770 public synchronized int read(byte[] b, int off, int len) throws FileNotFoundException, IOException { 771 try { 772 int result = 0; 773 result = in.read(b, off, len); 774 if (result > 0) { 775 pos += result; 776 } 777 778 if (null != statistics && result > 0) { 779 statistics.incrementBytesRead(result); 780 } 781 782 // Return to the caller with the result. 783 return result; 784 } catch(IOException e) { 785 786 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e); 787 788 if (innerException instanceof StorageException) { 789 790 LOG.error("Encountered Storage Exception for read on Blob : {}" 791 + " Exception details: {} Error Code : {}", 792 key, e, ((StorageException) innerException).getErrorCode()); 793 794 if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 795 throw new FileNotFoundException(String.format("%s is not found", key)); 796 } 797 } 798 799 throw e; 800 } 801 } 802 803 @Override 804 public synchronized void close() throws IOException { 805 if (!closed) { 806 closed = true; 807 IOUtils.closeStream(in); 808 in = null; 809 } 810 } 811 812 @Override 813 public synchronized void seek(long pos) throws FileNotFoundException, EOFException, IOException { 814 try { 815 checkNotClosed(); 816 if (pos < 0) { 817 throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK); 818 } 819 IOUtils.closeStream(in); 820 in = store.retrieve(key); 821 this.pos = in.skip(pos); 822 LOG.debug("Seek to position {}. Bytes skipped {}", pos, 823 this.pos); 824 } catch(IOException e) { 825 826 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e); 827 828 if (innerException instanceof StorageException 829 && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 830 throw new FileNotFoundException(String.format("%s is not found", key)); 831 } 832 833 throw e; 834 } catch(IndexOutOfBoundsException e) { 835 throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF); 836 } 837 } 838 839 @Override 840 public synchronized long getPos() throws IOException { 841 return pos; 842 } 843 844 @Override 845 public boolean seekToNewSource(long targetPos) throws IOException { 846 return false; 847 } 848 849 850 /* 851 * Helper method to check if a stream is closed. 852 */ 853 private void checkNotClosed() throws IOException { 854 if (closed) { 855 throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); 856 } 857 } 858 } 859 860 private class NativeAzureFsOutputStream extends OutputStream { 861 // We should not override flush() to actually close current block and flush 862 // to DFS, this will break applications that assume flush() is a no-op. 863 // Applications are advised to use Syncable.hflush() for that purpose. 864 // NativeAzureFsOutputStream needs to implement Syncable if needed. 865 private String key; 866 private String keyEncoded; 867 private OutputStream out; 868 869 public NativeAzureFsOutputStream(OutputStream out, String aKey, 870 String anEncodedKey) throws IOException { 871 // Check input arguments. The output stream should be non-null and the 872 // keys 873 // should be valid strings. 874 if (null == out) { 875 throw new IllegalArgumentException( 876 "Illegal argument: the output stream is null."); 877 } 878 879 if (null == aKey || 0 == aKey.length()) { 880 throw new IllegalArgumentException( 881 "Illegal argument the key string is null or empty"); 882 } 883 884 if (null == anEncodedKey || 0 == anEncodedKey.length()) { 885 throw new IllegalArgumentException( 886 "Illegal argument the encoded key string is null or empty"); 887 } 888 889 // Initialize the member variables with the incoming parameters. 890 this.out = out; 891 892 setKey(aKey); 893 setEncodedKey(anEncodedKey); 894 } 895 896 @Override 897 public synchronized void close() throws IOException { 898 if (out != null) { 899 // Close the output stream and decode the key for the output stream 900 // before returning to the caller. 901 // 902 out.close(); 903 restoreKey(); 904 out = null; 905 } 906 } 907 908 /** 909 * Writes the specified byte to this output stream. The general contract for 910 * write is that one byte is written to the output stream. The byte to be 911 * written is the eight low-order bits of the argument b. The 24 high-order 912 * bits of b are ignored. 913 * 914 * @param b 915 * 32-bit integer of block of 4 bytes 916 */ 917 @Override 918 public void write(int b) throws IOException { 919 try { 920 out.write(b); 921 } catch(IOException e) { 922 if (e.getCause() instanceof StorageException) { 923 StorageException storageExcp = (StorageException) e.getCause(); 924 LOG.error("Encountered Storage Exception for write on Blob : {}" 925 + " Exception details: {} Error Code : {}", 926 key, e.getMessage(), storageExcp.getErrorCode()); 927 } 928 throw e; 929 } 930 } 931 932 /** 933 * Writes b.length bytes from the specified byte array to this output 934 * stream. The general contract for write(b) is that it should have exactly 935 * the same effect as the call write(b, 0, b.length). 936 * 937 * @param b 938 * Block of bytes to be written to the output stream. 939 */ 940 @Override 941 public void write(byte[] b) throws IOException { 942 try { 943 out.write(b); 944 } catch(IOException e) { 945 if (e.getCause() instanceof StorageException) { 946 StorageException storageExcp = (StorageException) e.getCause(); 947 LOG.error("Encountered Storage Exception for write on Blob : {}" 948 + " Exception details: {} Error Code : {}", 949 key, e.getMessage(), storageExcp.getErrorCode()); 950 } 951 throw e; 952 } 953 } 954 955 /** 956 * Writes <code>len</code> from the specified byte array starting at offset 957 * <code>off</code> to the output stream. The general contract for write(b, 958 * off, len) is that some of the bytes in the array <code> 959 * b</code b> are written to the output stream in order; element 960 * <code>b[off]</code> is the first byte written and 961 * <code>b[off+len-1]</code> is the last byte written by this operation. 962 * 963 * @param b 964 * Byte array to be written. 965 * @param off 966 * Write this offset in stream. 967 * @param len 968 * Number of bytes to be written. 969 */ 970 @Override 971 public void write(byte[] b, int off, int len) throws IOException { 972 try { 973 out.write(b, off, len); 974 } catch(IOException e) { 975 if (e.getCause() instanceof StorageException) { 976 StorageException storageExcp = (StorageException) e.getCause(); 977 LOG.error("Encountered Storage Exception for write on Blob : {}" 978 + " Exception details: {} Error Code : {}", 979 key, e.getMessage(), storageExcp.getErrorCode()); 980 } 981 throw e; 982 } 983 } 984 985 /** 986 * Get the blob name. 987 * 988 * @return String Blob name. 989 */ 990 public String getKey() { 991 return key; 992 } 993 994 /** 995 * Set the blob name. 996 * 997 * @param key 998 * Blob name. 999 */ 1000 public void setKey(String key) { 1001 this.key = key; 1002 } 1003 1004 /** 1005 * Get the blob name. 1006 * 1007 * @return String Blob name. 1008 */ 1009 public String getEncodedKey() { 1010 return keyEncoded; 1011 } 1012 1013 /** 1014 * Set the blob name. 1015 * 1016 * @param anEncodedKey 1017 * Blob name. 1018 */ 1019 public void setEncodedKey(String anEncodedKey) { 1020 this.keyEncoded = anEncodedKey; 1021 } 1022 1023 /** 1024 * Restore the original key name from the m_key member variable. Note: The 1025 * output file stream is created with an encoded blob store key to guarantee 1026 * load balancing on the front end of the Azure storage partition servers. 1027 * The create also includes the name of the original key value which is 1028 * stored in the m_key member variable. This method should only be called 1029 * when the stream is closed. 1030 */ 1031 private void restoreKey() throws IOException { 1032 store.rename(getEncodedKey(), getKey()); 1033 } 1034 } 1035 1036 private URI uri; 1037 private NativeFileSystemStore store; 1038 private AzureNativeFileSystemStore actualStore; 1039 private Path workingDir; 1040 private long blockSize = MAX_AZURE_BLOCK_SIZE; 1041 private AzureFileSystemInstrumentation instrumentation; 1042 private String metricsSourceName; 1043 private boolean isClosed = false; 1044 private static boolean suppressRetryPolicy = false; 1045 // A counter to create unique (within-process) names for my metrics sources. 1046 private static AtomicInteger metricsSourceNameCounter = new AtomicInteger(); 1047 private boolean appendSupportEnabled = false; 1048 1049 public NativeAzureFileSystem() { 1050 // set store in initialize() 1051 } 1052 1053 public NativeAzureFileSystem(NativeFileSystemStore store) { 1054 this.store = store; 1055 } 1056 1057 /** 1058 * Suppress the default retry policy for the Storage, useful in unit tests to 1059 * test negative cases without waiting forever. 1060 */ 1061 @VisibleForTesting 1062 static void suppressRetryPolicy() { 1063 suppressRetryPolicy = true; 1064 } 1065 1066 /** 1067 * Undo the effect of suppressRetryPolicy. 1068 */ 1069 @VisibleForTesting 1070 static void resumeRetryPolicy() { 1071 suppressRetryPolicy = false; 1072 } 1073 1074 /** 1075 * Creates a new metrics source name that's unique within this process. 1076 */ 1077 @VisibleForTesting 1078 public static String newMetricsSourceName() { 1079 int number = metricsSourceNameCounter.incrementAndGet(); 1080 final String baseName = "AzureFileSystemMetrics"; 1081 if (number == 1) { // No need for a suffix for the first one 1082 return baseName; 1083 } else { 1084 return baseName + number; 1085 } 1086 } 1087 1088 /** 1089 * Checks if the given URI scheme is a scheme that's affiliated with the Azure 1090 * File System. 1091 * 1092 * @param scheme 1093 * The URI scheme. 1094 * @return true iff it's an Azure File System URI scheme. 1095 */ 1096 private static boolean isWasbScheme(String scheme) { 1097 // The valid schemes are: asv (old name), asvs (old name over HTTPS), 1098 // wasb (new name), wasbs (new name over HTTPS). 1099 return scheme != null 1100 && (scheme.equalsIgnoreCase("asv") || scheme.equalsIgnoreCase("asvs") 1101 || scheme.equalsIgnoreCase("wasb") || scheme 1102 .equalsIgnoreCase("wasbs")); 1103 } 1104 1105 /** 1106 * Puts in the authority of the default file system if it is a WASB file 1107 * system and the given URI's authority is null. 1108 * 1109 * @return The URI with reconstructed authority if necessary and possible. 1110 */ 1111 private static URI reconstructAuthorityIfNeeded(URI uri, Configuration conf) { 1112 if (null == uri.getAuthority()) { 1113 // If WASB is the default file system, get the authority from there 1114 URI defaultUri = FileSystem.getDefaultUri(conf); 1115 if (defaultUri != null && isWasbScheme(defaultUri.getScheme())) { 1116 try { 1117 // Reconstruct the URI with the authority from the default URI. 1118 return new URI(uri.getScheme(), defaultUri.getAuthority(), 1119 uri.getPath(), uri.getQuery(), uri.getFragment()); 1120 } catch (URISyntaxException e) { 1121 // This should never happen. 1122 throw new Error("Bad URI construction", e); 1123 } 1124 } 1125 } 1126 return uri; 1127 } 1128 1129 @Override 1130 protected void checkPath(Path path) { 1131 // Make sure to reconstruct the path's authority if needed 1132 super.checkPath(new Path(reconstructAuthorityIfNeeded(path.toUri(), 1133 getConf()))); 1134 } 1135 1136 @Override 1137 public void initialize(URI uri, Configuration conf) 1138 throws IOException, IllegalArgumentException { 1139 // Check authority for the URI to guarantee that it is non-null. 1140 uri = reconstructAuthorityIfNeeded(uri, conf); 1141 if (null == uri.getAuthority()) { 1142 final String errMsg = String 1143 .format("Cannot initialize WASB file system, URI authority not recognized."); 1144 throw new IllegalArgumentException(errMsg); 1145 } 1146 super.initialize(uri, conf); 1147 1148 if (store == null) { 1149 store = createDefaultStore(conf); 1150 } 1151 1152 instrumentation = new AzureFileSystemInstrumentation(conf); 1153 if(!conf.getBoolean(SKIP_AZURE_METRICS_PROPERTY_NAME, false)) { 1154 // Make sure the metrics system is available before interacting with Azure 1155 AzureFileSystemMetricsSystem.fileSystemStarted(); 1156 metricsSourceName = newMetricsSourceName(); 1157 String sourceDesc = "Azure Storage Volume File System metrics"; 1158 AzureFileSystemMetricsSystem.registerSource(metricsSourceName, sourceDesc, 1159 instrumentation); 1160 } 1161 1162 store.initialize(uri, conf, instrumentation); 1163 setConf(conf); 1164 this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); 1165 this.workingDir = new Path("/user", UserGroupInformation.getCurrentUser() 1166 .getShortUserName()).makeQualified(getUri(), getWorkingDirectory()); 1167 this.blockSize = conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME, 1168 MAX_AZURE_BLOCK_SIZE); 1169 1170 this.appendSupportEnabled = conf.getBoolean(APPEND_SUPPORT_ENABLE_PROPERTY_NAME, false); 1171 LOG.debug("NativeAzureFileSystem. Initializing."); 1172 LOG.debug(" blockSize = {}", 1173 conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME, MAX_AZURE_BLOCK_SIZE)); 1174 1175 } 1176 1177 private NativeFileSystemStore createDefaultStore(Configuration conf) { 1178 actualStore = new AzureNativeFileSystemStore(); 1179 1180 if (suppressRetryPolicy) { 1181 actualStore.suppressRetryPolicy(); 1182 } 1183 return actualStore; 1184 } 1185 1186 /** 1187 * Azure Storage doesn't allow the blob names to end in a period, 1188 * so encode this here to work around that limitation. 1189 */ 1190 private static String encodeTrailingPeriod(String toEncode) { 1191 Matcher matcher = TRAILING_PERIOD_PATTERN.matcher(toEncode); 1192 return matcher.replaceAll(TRAILING_PERIOD_PLACEHOLDER); 1193 } 1194 1195 /** 1196 * Reverse the encoding done by encodeTrailingPeriod(). 1197 */ 1198 private static String decodeTrailingPeriod(String toDecode) { 1199 Matcher matcher = TRAILING_PERIOD_PLACEHOLDER_PATTERN.matcher(toDecode); 1200 return matcher.replaceAll("."); 1201 } 1202 1203 /** 1204 * Convert the path to a key. By convention, any leading or trailing slash is 1205 * removed, except for the special case of a single slash. 1206 */ 1207 @VisibleForTesting 1208 public String pathToKey(Path path) { 1209 // Convert the path to a URI to parse the scheme, the authority, and the 1210 // path from the path object. 1211 URI tmpUri = path.toUri(); 1212 String pathUri = tmpUri.getPath(); 1213 1214 // The scheme and authority is valid. If the path does not exist add a "/" 1215 // separator to list the root of the container. 1216 Path newPath = path; 1217 if ("".equals(pathUri)) { 1218 newPath = new Path(tmpUri.toString() + Path.SEPARATOR); 1219 } 1220 1221 // Verify path is absolute if the path refers to a windows drive scheme. 1222 if (!newPath.isAbsolute()) { 1223 throw new IllegalArgumentException("Path must be absolute: " + path); 1224 } 1225 1226 String key = null; 1227 key = newPath.toUri().getPath(); 1228 key = removeTrailingSlash(key); 1229 key = encodeTrailingPeriod(key); 1230 if (key.length() == 1) { 1231 return key; 1232 } else { 1233 return key.substring(1); // remove initial slash 1234 } 1235 } 1236 1237 // Remove any trailing slash except for the case of a single slash. 1238 private static String removeTrailingSlash(String key) { 1239 if (key.length() == 0 || key.length() == 1) { 1240 return key; 1241 } 1242 if (key.charAt(key.length() - 1) == '/') { 1243 return key.substring(0, key.length() - 1); 1244 } else { 1245 return key; 1246 } 1247 } 1248 1249 private static Path keyToPath(String key) { 1250 if (key.equals("/")) { 1251 return new Path("/"); // container 1252 } 1253 return new Path("/" + decodeTrailingPeriod(key)); 1254 } 1255 1256 /** 1257 * Get the absolute version of the path (fully qualified). 1258 * This is public for testing purposes. 1259 * 1260 * @param path 1261 * @return fully qualified path 1262 */ 1263 @VisibleForTesting 1264 public Path makeAbsolute(Path path) { 1265 if (path.isAbsolute()) { 1266 return path; 1267 } 1268 return new Path(workingDir, path); 1269 } 1270 1271 /** 1272 * For unit test purposes, retrieves the AzureNativeFileSystemStore store 1273 * backing this file system. 1274 * 1275 * @return The store object. 1276 */ 1277 @VisibleForTesting 1278 public AzureNativeFileSystemStore getStore() { 1279 return actualStore; 1280 } 1281 1282 NativeFileSystemStore getStoreInterface() { 1283 return store; 1284 } 1285 1286 /** 1287 * Gets the metrics source for this file system. 1288 * This is mainly here for unit testing purposes. 1289 * 1290 * @return the metrics source. 1291 */ 1292 public AzureFileSystemInstrumentation getInstrumentation() { 1293 return instrumentation; 1294 } 1295 1296 /** This optional operation is not yet supported. */ 1297 @Override 1298 public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) 1299 throws IOException { 1300 1301 if (!appendSupportEnabled) { 1302 throw new UnsupportedOperationException("Append Support not enabled"); 1303 } 1304 1305 LOG.debug("Opening file: {} for append", f); 1306 1307 Path absolutePath = makeAbsolute(f); 1308 String key = pathToKey(absolutePath); 1309 FileMetadata meta = null; 1310 try { 1311 meta = store.retrieveMetadata(key); 1312 } catch(Exception ex) { 1313 1314 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); 1315 1316 if (innerException instanceof StorageException 1317 && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 1318 1319 throw new FileNotFoundException(String.format("%s is not found", key)); 1320 } else { 1321 throw ex; 1322 } 1323 } 1324 1325 if (meta == null) { 1326 throw new FileNotFoundException(f.toString()); 1327 } 1328 1329 if (meta.isDir()) { 1330 throw new FileNotFoundException(f.toString() 1331 + " is a directory not a file."); 1332 } 1333 1334 if (store.isPageBlobKey(key)) { 1335 throw new IOException("Append not supported for Page Blobs"); 1336 } 1337 1338 DataOutputStream appendStream = null; 1339 1340 try { 1341 appendStream = store.retrieveAppendStream(key, bufferSize); 1342 } catch (Exception ex) { 1343 1344 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); 1345 1346 if (innerException instanceof StorageException 1347 && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 1348 throw new FileNotFoundException(String.format("%s is not found", key)); 1349 } else { 1350 throw ex; 1351 } 1352 } 1353 1354 return new FSDataOutputStream(appendStream, statistics); 1355 } 1356 1357 @Override 1358 public FSDataOutputStream create(Path f, FsPermission permission, 1359 boolean overwrite, int bufferSize, short replication, long blockSize, 1360 Progressable progress) throws IOException { 1361 return create(f, permission, overwrite, true, 1362 bufferSize, replication, blockSize, progress, 1363 (SelfRenewingLease) null); 1364 } 1365 1366 /** 1367 * Get a self-renewing lease on the specified file. 1368 */ 1369 public SelfRenewingLease acquireLease(Path path) throws AzureException { 1370 String fullKey = pathToKey(makeAbsolute(path)); 1371 return getStore().acquireLease(fullKey); 1372 } 1373 1374 @Override 1375 public FSDataOutputStream createNonRecursive(Path f, FsPermission permission, 1376 boolean overwrite, int bufferSize, short replication, long blockSize, 1377 Progressable progress) throws IOException { 1378 1379 Path parent = f.getParent(); 1380 1381 // Get exclusive access to folder if this is a directory designated 1382 // for atomic rename. The primary use case of for HBase write-ahead 1383 // log file management. 1384 SelfRenewingLease lease = null; 1385 if (store.isAtomicRenameKey(pathToKey(f))) { 1386 try { 1387 lease = acquireLease(parent); 1388 } catch (AzureException e) { 1389 1390 String errorCode = ""; 1391 try { 1392 StorageException e2 = (StorageException) e.getCause(); 1393 errorCode = e2.getErrorCode(); 1394 } catch (Exception e3) { 1395 // do nothing if cast fails 1396 } 1397 if (errorCode.equals("BlobNotFound")) { 1398 throw new FileNotFoundException("Cannot create file " + 1399 f.getName() + " because parent folder does not exist."); 1400 } 1401 1402 LOG.warn("Got unexpected exception trying to get lease on {} . {}", 1403 pathToKey(parent), e.getMessage()); 1404 throw e; 1405 } 1406 } 1407 1408 // See if the parent folder exists. If not, throw error. 1409 // The exists() check will push any pending rename operation forward, 1410 // if there is one, and return false. 1411 // 1412 // At this point, we have exclusive access to the source folder 1413 // via the lease, so we will not conflict with an active folder 1414 // rename operation. 1415 if (!exists(parent)) { 1416 try { 1417 1418 // This'll let the keep-alive thread exit as soon as it wakes up. 1419 lease.free(); 1420 } catch (Exception e) { 1421 LOG.warn("Unable to free lease because: {}", e.getMessage()); 1422 } 1423 throw new FileNotFoundException("Cannot create file " + 1424 f.getName() + " because parent folder does not exist."); 1425 } 1426 1427 // Create file inside folder. 1428 FSDataOutputStream out = null; 1429 try { 1430 out = create(f, permission, overwrite, false, 1431 bufferSize, replication, blockSize, progress, lease); 1432 } finally { 1433 // Release exclusive access to folder. 1434 try { 1435 if (lease != null) { 1436 lease.free(); 1437 } 1438 } catch (Exception e) { 1439 NativeAzureFileSystemHelper.cleanup(LOG, out); 1440 String msg = "Unable to free lease on " + parent.toUri(); 1441 LOG.error(msg); 1442 throw new IOException(msg, e); 1443 } 1444 } 1445 return out; 1446 } 1447 1448 @Override 1449 public FSDataOutputStream createNonRecursive(Path f, FsPermission permission, 1450 EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize, 1451 Progressable progress) throws IOException { 1452 1453 // Check if file should be appended or overwritten. Assume that the file 1454 // is overwritten on if the CREATE and OVERWRITE create flags are set. Note 1455 // that any other combinations of create flags will result in an open new or 1456 // open with append. 1457 final EnumSet<CreateFlag> createflags = 1458 EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE); 1459 boolean overwrite = flags.containsAll(createflags); 1460 1461 // Delegate the create non-recursive call. 1462 return this.createNonRecursive(f, permission, overwrite, 1463 bufferSize, replication, blockSize, progress); 1464 } 1465 1466 @Override 1467 public FSDataOutputStream createNonRecursive(Path f, 1468 boolean overwrite, int bufferSize, short replication, long blockSize, 1469 Progressable progress) throws IOException { 1470 return this.createNonRecursive(f, FsPermission.getFileDefault(), 1471 overwrite, bufferSize, replication, blockSize, progress); 1472 } 1473 1474 1475 /** 1476 * Create an Azure blob and return an output stream to use 1477 * to write data to it. 1478 * 1479 * @param f 1480 * @param permission 1481 * @param overwrite 1482 * @param createParent 1483 * @param bufferSize 1484 * @param replication 1485 * @param blockSize 1486 * @param progress 1487 * @param parentFolderLease Lease on parent folder (or null if 1488 * no lease). 1489 * @return 1490 * @throws IOException 1491 */ 1492 private FSDataOutputStream create(Path f, FsPermission permission, 1493 boolean overwrite, boolean createParent, int bufferSize, 1494 short replication, long blockSize, Progressable progress, 1495 SelfRenewingLease parentFolderLease) 1496 throws FileAlreadyExistsException, IOException { 1497 1498 LOG.debug("Creating file: {}", f.toString()); 1499 1500 if (containsColon(f)) { 1501 throw new IOException("Cannot create file " + f 1502 + " through WASB that has colons in the name"); 1503 } 1504 1505 Path absolutePath = makeAbsolute(f); 1506 String key = pathToKey(absolutePath); 1507 1508 FileMetadata existingMetadata = store.retrieveMetadata(key); 1509 if (existingMetadata != null) { 1510 if (existingMetadata.isDir()) { 1511 throw new FileAlreadyExistsException("Cannot create file " + f 1512 + "; already exists as a directory."); 1513 } 1514 if (!overwrite) { 1515 throw new FileAlreadyExistsException("File already exists:" + f); 1516 } 1517 } 1518 1519 Path parentFolder = absolutePath.getParent(); 1520 if (parentFolder != null && parentFolder.getParent() != null) { // skip root 1521 // Update the parent folder last modified time if the parent folder 1522 // already exists. 1523 String parentKey = pathToKey(parentFolder); 1524 FileMetadata parentMetadata = store.retrieveMetadata(parentKey); 1525 if (parentMetadata != null && parentMetadata.isDir() && 1526 parentMetadata.getBlobMaterialization() == BlobMaterialization.Explicit) { 1527 if (parentFolderLease != null) { 1528 store.updateFolderLastModifiedTime(parentKey, parentFolderLease); 1529 } else { 1530 updateParentFolderLastModifiedTime(key); 1531 } 1532 } else { 1533 // Make sure that the parent folder exists. 1534 // Create it using inherited permissions from the first existing directory going up the path 1535 Path firstExisting = parentFolder.getParent(); 1536 FileMetadata metadata = store.retrieveMetadata(pathToKey(firstExisting)); 1537 while(metadata == null) { 1538 // Guaranteed to terminate properly because we will eventually hit root, which will return non-null metadata 1539 firstExisting = firstExisting.getParent(); 1540 metadata = store.retrieveMetadata(pathToKey(firstExisting)); 1541 } 1542 mkdirs(parentFolder, metadata.getPermissionStatus().getPermission(), true); 1543 } 1544 } 1545 1546 // Mask the permission first (with the default permission mask as well). 1547 FsPermission masked = applyUMask(permission, UMaskApplyMode.NewFile); 1548 PermissionStatus permissionStatus = createPermissionStatus(masked); 1549 1550 OutputStream bufOutStream; 1551 if (store.isPageBlobKey(key)) { 1552 // Store page blobs directly in-place without renames. 1553 bufOutStream = store.storefile(key, permissionStatus); 1554 } else { 1555 // This is a block blob, so open the output blob stream based on the 1556 // encoded key. 1557 // 1558 String keyEncoded = encodeKey(key); 1559 1560 1561 // First create a blob at the real key, pointing back to the temporary file 1562 // This accomplishes a few things: 1563 // 1. Makes sure we can create a file there. 1564 // 2. Makes it visible to other concurrent threads/processes/nodes what 1565 // we're 1566 // doing. 1567 // 3. Makes it easier to restore/cleanup data in the event of us crashing. 1568 store.storeEmptyLinkFile(key, keyEncoded, permissionStatus); 1569 1570 // The key is encoded to point to a common container at the storage server. 1571 // This reduces the number of splits on the server side when load balancing. 1572 // Ingress to Azure storage can take advantage of earlier splits. We remove 1573 // the root path to the key and prefix a random GUID to the tail (or leaf 1574 // filename) of the key. Keys are thus broadly and randomly distributed over 1575 // a single container to ease load balancing on the storage server. When the 1576 // blob is committed it is renamed to its earlier key. Uncommitted blocks 1577 // are not cleaned up and we leave it to Azure storage to garbage collect 1578 // these 1579 // blocks. 1580 bufOutStream = new NativeAzureFsOutputStream(store.storefile( 1581 keyEncoded, permissionStatus), key, keyEncoded); 1582 } 1583 // Construct the data output stream from the buffered output stream. 1584 FSDataOutputStream fsOut = new FSDataOutputStream(bufOutStream, statistics); 1585 1586 1587 // Increment the counter 1588 instrumentation.fileCreated(); 1589 1590 // Return data output stream to caller. 1591 return fsOut; 1592 } 1593 1594 @Override 1595 @Deprecated 1596 public boolean delete(Path path) throws IOException { 1597 return delete(path, true); 1598 } 1599 1600 @Override 1601 public boolean delete(Path f, boolean recursive) throws IOException { 1602 return delete(f, recursive, false); 1603 } 1604 1605 /** 1606 * Delete the specified file or folder. The parameter 1607 * skipParentFolderLastModifidedTimeUpdate 1608 * is used in the case of atomic folder rename redo. In that case, there is 1609 * a lease on the parent folder, so (without reworking the code) modifying 1610 * the parent folder update time will fail because of a conflict with the 1611 * lease. Since we are going to delete the folder soon anyway so accurate 1612 * modified time is not necessary, it's easier to just skip 1613 * the modified time update. 1614 * 1615 * @param f 1616 * @param recursive 1617 * @param skipParentFolderLastModifidedTimeUpdate If true, don't update the folder last 1618 * modified time. 1619 * @return true if and only if the file is deleted 1620 * @throws IOException 1621 */ 1622 public boolean delete(Path f, boolean recursive, 1623 boolean skipParentFolderLastModifidedTimeUpdate) throws IOException { 1624 1625 LOG.debug("Deleting file: {}", f.toString()); 1626 1627 Path absolutePath = makeAbsolute(f); 1628 String key = pathToKey(absolutePath); 1629 1630 // Capture the metadata for the path. 1631 // 1632 FileMetadata metaFile = null; 1633 try { 1634 metaFile = store.retrieveMetadata(key); 1635 } catch (IOException e) { 1636 1637 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e); 1638 1639 if (innerException instanceof StorageException 1640 && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 1641 1642 return false; 1643 } 1644 throw e; 1645 } 1646 1647 if (null == metaFile) { 1648 // The path to be deleted does not exist. 1649 return false; 1650 } 1651 1652 // The path exists, determine if it is a folder containing objects, 1653 // an empty folder, or a simple file and take the appropriate actions. 1654 if (!metaFile.isDir()) { 1655 // The path specifies a file. We need to check the parent path 1656 // to make sure it's a proper materialized directory before we 1657 // delete the file. Otherwise we may get into a situation where 1658 // the file we were deleting was the last one in an implicit directory 1659 // (e.g. the blob store only contains the blob a/b and there's no 1660 // corresponding directory blob a) and that would implicitly delete 1661 // the directory as well, which is not correct. 1662 Path parentPath = absolutePath.getParent(); 1663 if (parentPath.getParent() != null) {// Not root 1664 String parentKey = pathToKey(parentPath); 1665 1666 FileMetadata parentMetadata = null; 1667 try { 1668 parentMetadata = store.retrieveMetadata(parentKey); 1669 } catch (IOException e) { 1670 1671 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e); 1672 1673 if (innerException instanceof StorageException) { 1674 // Invalid State. 1675 // A FileNotFoundException is not thrown here as the API returns false 1676 // if the file not present. But not retrieving metadata here is an 1677 // unrecoverable state and can only happen if there is a race condition 1678 // hence throwing a IOException 1679 if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 1680 throw new IOException("File " + f + " has a parent directory " 1681 + parentPath + " whose metadata cannot be retrieved. Can't resolve"); 1682 } 1683 } 1684 throw e; 1685 } 1686 1687 // Invalid State. 1688 // A FileNotFoundException is not thrown here as the API returns false 1689 // if the file not present. But not retrieving metadata here is an 1690 // unrecoverable state and can only happen if there is a race condition 1691 // hence throwing a IOException 1692 if (parentMetadata == null) { 1693 throw new IOException("File " + f + " has a parent directory " 1694 + parentPath + " whose metadata cannot be retrieved. Can't resolve"); 1695 } 1696 1697 if (!parentMetadata.isDir()) { 1698 // Invalid state: the parent path is actually a file. Throw. 1699 throw new AzureException("File " + f + " has a parent directory " 1700 + parentPath + " which is also a file. Can't resolve."); 1701 } 1702 1703 if (parentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) { 1704 LOG.debug("Found an implicit parent directory while trying to" 1705 + " delete the file {}. Creating the directory blob for" 1706 + " it in {}.", f, parentKey); 1707 1708 store.storeEmptyFolder(parentKey, 1709 createPermissionStatus(FsPermission.getDefault())); 1710 } else { 1711 if (!skipParentFolderLastModifidedTimeUpdate) { 1712 updateParentFolderLastModifiedTime(key); 1713 } 1714 } 1715 } 1716 1717 try { 1718 store.delete(key); 1719 instrumentation.fileDeleted(); 1720 } catch(IOException e) { 1721 1722 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e); 1723 1724 if (innerException instanceof StorageException 1725 && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 1726 return false; 1727 } 1728 1729 throw e; 1730 } 1731 } else { 1732 // The path specifies a folder. Recursively delete all entries under the 1733 // folder. 1734 LOG.debug("Directory Delete encountered: {}", f.toString()); 1735 Path parentPath = absolutePath.getParent(); 1736 if (parentPath.getParent() != null) { 1737 String parentKey = pathToKey(parentPath); 1738 FileMetadata parentMetadata = null; 1739 1740 try { 1741 parentMetadata = store.retrieveMetadata(parentKey); 1742 } catch (IOException e) { 1743 1744 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e); 1745 1746 if (innerException instanceof StorageException) { 1747 // Invalid State. 1748 // A FileNotFoundException is not thrown here as the API returns false 1749 // if the file not present. But not retrieving metadata here is an 1750 // unrecoverable state and can only happen if there is a race condition 1751 // hence throwing a IOException 1752 if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 1753 throw new IOException("File " + f + " has a parent directory " 1754 + parentPath + " whose metadata cannot be retrieved. Can't resolve"); 1755 } 1756 } 1757 throw e; 1758 } 1759 1760 // Invalid State. 1761 // A FileNotFoundException is not thrown here as the API returns false 1762 // if the file not present. But not retrieving metadata here is an 1763 // unrecoverable state and can only happen if there is a race condition 1764 // hence throwing a IOException 1765 if (parentMetadata == null) { 1766 throw new IOException("File " + f + " has a parent directory " 1767 + parentPath + " whose metadata cannot be retrieved. Can't resolve"); 1768 } 1769 1770 if (parentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) { 1771 LOG.debug("Found an implicit parent directory while trying to" 1772 + " delete the directory {}. Creating the directory blob for" 1773 + " it in {}. ", f, parentKey); 1774 1775 store.storeEmptyFolder(parentKey, 1776 createPermissionStatus(FsPermission.getDefault())); 1777 } 1778 } 1779 1780 // List all the blobs in the current folder. 1781 String priorLastKey = null; 1782 PartialListing listing = null; 1783 try { 1784 listing = store.listAll(key, AZURE_LIST_ALL, 1, 1785 priorLastKey); 1786 } catch(IOException e) { 1787 1788 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e); 1789 1790 if (innerException instanceof StorageException 1791 && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 1792 return false; 1793 } 1794 1795 throw e; 1796 } 1797 1798 if (listing == null) { 1799 return false; 1800 } 1801 1802 FileMetadata[] contents = listing.getFiles(); 1803 if (!recursive && contents.length > 0) { 1804 // The folder is non-empty and recursive delete was not specified. 1805 // Throw an exception indicating that a non-recursive delete was 1806 // specified for a non-empty folder. 1807 throw new IOException("Non-recursive delete of non-empty directory " 1808 + f.toString()); 1809 } 1810 1811 // Delete all the files in the folder. 1812 for (FileMetadata p : contents) { 1813 // Tag on the directory name found as the suffix of the suffix of the 1814 // parent directory to get the new absolute path. 1815 String suffix = p.getKey().substring( 1816 p.getKey().lastIndexOf(PATH_DELIMITER)); 1817 if (!p.isDir()) { 1818 try { 1819 store.delete(key + suffix); 1820 instrumentation.fileDeleted(); 1821 } catch(IOException e) { 1822 1823 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e); 1824 1825 if (innerException instanceof StorageException 1826 && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 1827 return false; 1828 } 1829 1830 throw e; 1831 } 1832 } else { 1833 // Recursively delete contents of the sub-folders. Notice this also 1834 // deletes the blob for the directory. 1835 if (!delete(new Path(f.toString() + suffix), true)) { 1836 return false; 1837 } 1838 } 1839 } 1840 1841 try { 1842 store.delete(key); 1843 } catch(IOException e) { 1844 1845 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e); 1846 1847 if (innerException instanceof StorageException 1848 && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 1849 return false; 1850 } 1851 1852 throw e; 1853 } 1854 1855 // Update parent directory last modified time 1856 Path parent = absolutePath.getParent(); 1857 if (parent != null && parent.getParent() != null) { // not root 1858 if (!skipParentFolderLastModifidedTimeUpdate) { 1859 updateParentFolderLastModifiedTime(key); 1860 } 1861 } 1862 instrumentation.directoryDeleted(); 1863 } 1864 1865 // File or directory was successfully deleted. 1866 LOG.debug("Delete Successful for : {}", f.toString()); 1867 return true; 1868 } 1869 1870 @Override 1871 public FileStatus getFileStatus(Path f) throws FileNotFoundException, IOException { 1872 1873 LOG.debug("Getting the file status for {}", f.toString()); 1874 1875 // Capture the absolute path and the path to key. 1876 Path absolutePath = makeAbsolute(f); 1877 String key = pathToKey(absolutePath); 1878 if (key.length() == 0) { // root always exists 1879 return newDirectory(null, absolutePath); 1880 } 1881 1882 // The path is either a folder or a file. Retrieve metadata to 1883 // determine if it is a directory or file. 1884 FileMetadata meta = null; 1885 try { 1886 meta = store.retrieveMetadata(key); 1887 } catch(Exception ex) { 1888 1889 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); 1890 1891 if (innerException instanceof StorageException 1892 && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 1893 1894 throw new FileNotFoundException(String.format("%s is not found", key)); 1895 } 1896 1897 throw ex; 1898 } 1899 1900 if (meta != null) { 1901 if (meta.isDir()) { 1902 // The path is a folder with files in it. 1903 // 1904 1905 LOG.debug("Path {} is a folder.", f.toString()); 1906 1907 // If a rename operation for the folder was pending, redo it. 1908 // Then the file does not exist, so signal that. 1909 if (conditionalRedoFolderRename(f)) { 1910 throw new FileNotFoundException( 1911 absolutePath + ": No such file or directory."); 1912 } 1913 1914 // Return reference to the directory object. 1915 return newDirectory(meta, absolutePath); 1916 } 1917 1918 // The path is a file. 1919 LOG.debug("Found the path: {} as a file.", f.toString()); 1920 1921 // Return with reference to a file object. 1922 return newFile(meta, absolutePath); 1923 } 1924 1925 // File not found. Throw exception no such file or directory. 1926 // 1927 throw new FileNotFoundException( 1928 absolutePath + ": No such file or directory."); 1929 } 1930 1931 // Return true if there is a rename pending and we redo it, otherwise false. 1932 private boolean conditionalRedoFolderRename(Path f) throws IOException { 1933 1934 // Can't rename /, so return immediately in that case. 1935 if (f.getName().equals("")) { 1936 return false; 1937 } 1938 1939 // Check if there is a -RenamePending.json file for this folder, and if so, 1940 // redo the rename. 1941 Path absoluteRenamePendingFile = renamePendingFilePath(f); 1942 if (exists(absoluteRenamePendingFile)) { 1943 FolderRenamePending pending = 1944 new FolderRenamePending(absoluteRenamePendingFile, this); 1945 pending.redo(); 1946 return true; 1947 } else { 1948 return false; 1949 } 1950 } 1951 1952 // Return the path name that would be used for rename of folder with path f. 1953 private Path renamePendingFilePath(Path f) { 1954 Path absPath = makeAbsolute(f); 1955 String key = pathToKey(absPath); 1956 key += "-RenamePending.json"; 1957 return keyToPath(key); 1958 } 1959 1960 @Override 1961 public URI getUri() { 1962 return uri; 1963 } 1964 1965 /** 1966 * Retrieve the status of a given path if it is a file, or of all the 1967 * contained files if it is a directory. 1968 */ 1969 @Override 1970 public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException { 1971 1972 LOG.debug("Listing status for {}", f.toString()); 1973 1974 Path absolutePath = makeAbsolute(f); 1975 String key = pathToKey(absolutePath); 1976 Set<FileStatus> status = new TreeSet<FileStatus>(); 1977 FileMetadata meta = null; 1978 try { 1979 meta = store.retrieveMetadata(key); 1980 } catch (IOException ex) { 1981 1982 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); 1983 1984 if (innerException instanceof StorageException 1985 && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 1986 1987 throw new FileNotFoundException(String.format("%s is not found", f)); 1988 } 1989 1990 throw ex; 1991 } 1992 1993 if (meta != null) { 1994 if (!meta.isDir()) { 1995 1996 LOG.debug("Found path as a file"); 1997 1998 return new FileStatus[] { newFile(meta, absolutePath) }; 1999 } 2000 2001 String partialKey = null; 2002 PartialListing listing = null; 2003 2004 try { 2005 listing = store.list(key, AZURE_LIST_ALL, 1, partialKey); 2006 } catch (IOException ex) { 2007 2008 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); 2009 2010 if (innerException instanceof StorageException 2011 && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 2012 2013 throw new FileNotFoundException(String.format("%s is not found", key)); 2014 } 2015 2016 throw ex; 2017 } 2018 // NOTE: We don't check for Null condition as the Store API should return 2019 // an empty list if there are not listing. 2020 2021 // For any -RenamePending.json files in the listing, 2022 // push the rename forward. 2023 boolean renamed = conditionalRedoFolderRenames(listing); 2024 2025 // If any renames were redone, get another listing, 2026 // since the current one may have changed due to the redo. 2027 if (renamed) { 2028 listing = null; 2029 try { 2030 listing = store.list(key, AZURE_LIST_ALL, 1, partialKey); 2031 } catch (IOException ex) { 2032 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); 2033 2034 if (innerException instanceof StorageException 2035 && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 2036 2037 throw new FileNotFoundException(String.format("%s is not found", key)); 2038 } 2039 2040 throw ex; 2041 } 2042 } 2043 2044 // NOTE: We don't check for Null condition as the Store API should return 2045 // and empty list if there are not listing. 2046 2047 for (FileMetadata fileMetadata : listing.getFiles()) { 2048 Path subpath = keyToPath(fileMetadata.getKey()); 2049 2050 // Test whether the metadata represents a file or directory and 2051 // add the appropriate metadata object. 2052 // 2053 // Note: There was a very old bug here where directories were added 2054 // to the status set as files flattening out recursive listings 2055 // using "-lsr" down the file system hierarchy. 2056 if (fileMetadata.isDir()) { 2057 // Make sure we hide the temp upload folder 2058 if (fileMetadata.getKey().equals(AZURE_TEMP_FOLDER)) { 2059 // Don't expose that. 2060 continue; 2061 } 2062 status.add(newDirectory(fileMetadata, subpath)); 2063 } else { 2064 status.add(newFile(fileMetadata, subpath)); 2065 } 2066 } 2067 2068 LOG.debug("Found path as a directory with {}" 2069 + " files in it.", status.size()); 2070 2071 } else { 2072 // There is no metadata found for the path. 2073 LOG.debug("Did not find any metadata for path: {}", key); 2074 2075 throw new FileNotFoundException("File" + f + " does not exist."); 2076 } 2077 2078 return status.toArray(new FileStatus[0]); 2079 } 2080 2081 // Redo any folder renames needed if there are rename pending files in the 2082 // directory listing. Return true if one or more redo operations were done. 2083 private boolean conditionalRedoFolderRenames(PartialListing listing) 2084 throws IllegalArgumentException, IOException { 2085 boolean renamed = false; 2086 for (FileMetadata fileMetadata : listing.getFiles()) { 2087 Path subpath = keyToPath(fileMetadata.getKey()); 2088 if (isRenamePendingFile(subpath)) { 2089 FolderRenamePending pending = 2090 new FolderRenamePending(subpath, this); 2091 pending.redo(); 2092 renamed = true; 2093 } 2094 } 2095 return renamed; 2096 } 2097 2098 // True if this is a folder rename pending file, else false. 2099 private boolean isRenamePendingFile(Path path) { 2100 return path.toString().endsWith(FolderRenamePending.SUFFIX); 2101 } 2102 2103 private FileStatus newFile(FileMetadata meta, Path path) { 2104 return new FileStatus ( 2105 meta.getLength(), 2106 false, 2107 1, 2108 blockSize, 2109 meta.getLastModified(), 2110 0, 2111 meta.getPermissionStatus().getPermission(), 2112 meta.getPermissionStatus().getUserName(), 2113 meta.getPermissionStatus().getGroupName(), 2114 path.makeQualified(getUri(), getWorkingDirectory())); 2115 } 2116 2117 private FileStatus newDirectory(FileMetadata meta, Path path) { 2118 return new FileStatus ( 2119 0, 2120 true, 2121 1, 2122 blockSize, 2123 meta == null ? 0 : meta.getLastModified(), 2124 0, 2125 meta == null ? FsPermission.getDefault() : meta.getPermissionStatus().getPermission(), 2126 meta == null ? "" : meta.getPermissionStatus().getUserName(), 2127 meta == null ? "" : meta.getPermissionStatus().getGroupName(), 2128 path.makeQualified(getUri(), getWorkingDirectory())); 2129 } 2130 2131 private static enum UMaskApplyMode { 2132 NewFile, 2133 NewDirectory, 2134 NewDirectoryNoUmask, 2135 ChangeExistingFile, 2136 ChangeExistingDirectory, 2137 } 2138 2139 /** 2140 * Applies the applicable UMASK's on the given permission. 2141 * 2142 * @param permission 2143 * The permission to mask. 2144 * @param applyMode 2145 * Whether to also apply the default umask. 2146 * @return The masked persmission. 2147 */ 2148 private FsPermission applyUMask(final FsPermission permission, 2149 final UMaskApplyMode applyMode) { 2150 FsPermission newPermission = new FsPermission(permission); 2151 // Apply the default umask - this applies for new files or directories. 2152 if (applyMode == UMaskApplyMode.NewFile 2153 || applyMode == UMaskApplyMode.NewDirectory) { 2154 newPermission = newPermission 2155 .applyUMask(FsPermission.getUMask(getConf())); 2156 } 2157 return newPermission; 2158 } 2159 2160 /** 2161 * Creates the PermissionStatus object to use for the given permission, based 2162 * on the current user in context. 2163 * 2164 * @param permission 2165 * The permission for the file. 2166 * @return The permission status object to use. 2167 * @throws IOException 2168 * If login fails in getCurrentUser 2169 */ 2170 @VisibleForTesting 2171 PermissionStatus createPermissionStatus(FsPermission permission) 2172 throws IOException { 2173 // Create the permission status for this file based on current user 2174 return new PermissionStatus( 2175 UserGroupInformation.getCurrentUser().getShortUserName(), 2176 getConf().get(AZURE_DEFAULT_GROUP_PROPERTY_NAME, 2177 AZURE_DEFAULT_GROUP_DEFAULT), 2178 permission); 2179 } 2180 2181 @Override 2182 public boolean mkdirs(Path f, FsPermission permission) throws IOException { 2183 return mkdirs(f, permission, false); 2184 } 2185 2186 public boolean mkdirs(Path f, FsPermission permission, boolean noUmask) throws IOException { 2187 2188 2189 LOG.debug("Creating directory: {}", f.toString()); 2190 2191 if (containsColon(f)) { 2192 throw new IOException("Cannot create directory " + f 2193 + " through WASB that has colons in the name"); 2194 } 2195 2196 Path absolutePath = makeAbsolute(f); 2197 PermissionStatus permissionStatus = null; 2198 if(noUmask) { 2199 // ensure owner still has wx permissions at the minimum 2200 permissionStatus = createPermissionStatus( 2201 applyUMask(FsPermission.createImmutable((short) (permission.toShort() | USER_WX_PERMISION)), 2202 UMaskApplyMode.NewDirectoryNoUmask)); 2203 } else { 2204 permissionStatus = createPermissionStatus( 2205 applyUMask(permission, UMaskApplyMode.NewDirectory)); 2206 } 2207 2208 2209 ArrayList<String> keysToCreateAsFolder = new ArrayList<String>(); 2210 ArrayList<String> keysToUpdateAsFolder = new ArrayList<String>(); 2211 boolean childCreated = false; 2212 // Check that there is no file in the parent chain of the given path. 2213 for (Path current = absolutePath, parent = current.getParent(); 2214 parent != null; // Stop when you get to the root 2215 current = parent, parent = current.getParent()) { 2216 String currentKey = pathToKey(current); 2217 FileMetadata currentMetadata = store.retrieveMetadata(currentKey); 2218 if (currentMetadata != null && !currentMetadata.isDir()) { 2219 throw new FileAlreadyExistsException("Cannot create directory " + f + " because " 2220 + current + " is an existing file."); 2221 } else if (currentMetadata == null) { 2222 keysToCreateAsFolder.add(currentKey); 2223 childCreated = true; 2224 } else { 2225 // The directory already exists. Its last modified time need to be 2226 // updated if there is a child directory created under it. 2227 if (childCreated) { 2228 keysToUpdateAsFolder.add(currentKey); 2229 } 2230 childCreated = false; 2231 } 2232 } 2233 2234 for (String currentKey : keysToCreateAsFolder) { 2235 store.storeEmptyFolder(currentKey, permissionStatus); 2236 } 2237 2238 instrumentation.directoryCreated(); 2239 2240 // otherwise throws exception 2241 return true; 2242 } 2243 2244 @Override 2245 public FSDataInputStream open(Path f, int bufferSize) throws FileNotFoundException, IOException { 2246 2247 LOG.debug("Opening file: {}", f.toString()); 2248 2249 Path absolutePath = makeAbsolute(f); 2250 String key = pathToKey(absolutePath); 2251 FileMetadata meta = null; 2252 try { 2253 meta = store.retrieveMetadata(key); 2254 } catch(Exception ex) { 2255 2256 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); 2257 2258 if (innerException instanceof StorageException 2259 && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 2260 2261 throw new FileNotFoundException(String.format("%s is not found", key)); 2262 } 2263 2264 throw ex; 2265 } 2266 2267 if (meta == null) { 2268 throw new FileNotFoundException(f.toString()); 2269 } 2270 if (meta.isDir()) { 2271 throw new FileNotFoundException(f.toString() 2272 + " is a directory not a file."); 2273 } 2274 2275 DataInputStream inputStream = null; 2276 try { 2277 inputStream = store.retrieve(key); 2278 } catch(Exception ex) { 2279 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); 2280 2281 if (innerException instanceof StorageException 2282 && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 2283 2284 throw new FileNotFoundException(String.format("%s is not found", key)); 2285 } 2286 2287 throw ex; 2288 } 2289 2290 return new FSDataInputStream(new BufferedFSInputStream( 2291 new NativeAzureFsInputStream(inputStream, key, meta.getLength()), bufferSize)); 2292 } 2293 2294 @Override 2295 public boolean rename(Path src, Path dst) throws FileNotFoundException, IOException { 2296 2297 FolderRenamePending renamePending = null; 2298 2299 LOG.debug("Moving {} to {}", src, dst); 2300 2301 if (containsColon(dst)) { 2302 throw new IOException("Cannot rename to file " + dst 2303 + " through WASB that has colons in the name"); 2304 } 2305 2306 String srcKey = pathToKey(makeAbsolute(src)); 2307 2308 if (srcKey.length() == 0) { 2309 // Cannot rename root of file system 2310 return false; 2311 } 2312 2313 // Figure out the final destination 2314 Path absoluteDst = makeAbsolute(dst); 2315 String dstKey = pathToKey(absoluteDst); 2316 FileMetadata dstMetadata = null; 2317 try { 2318 dstMetadata = store.retrieveMetadata(dstKey); 2319 } catch (IOException ex) { 2320 2321 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); 2322 2323 // A BlobNotFound storage exception in only thrown from retrieveMetdata API when 2324 // there is a race condition. If there is another thread which deletes the destination 2325 // file or folder, then this thread calling rename should be able to continue with 2326 // rename gracefully. Hence the StorageException is swallowed here. 2327 if (innerException instanceof StorageException) { 2328 if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 2329 LOG.debug("BlobNotFound exception encountered for Destination key : {}. " 2330 + "Swallowin the exception to handle race condition gracefully", dstKey); 2331 } 2332 } else { 2333 throw ex; 2334 } 2335 } 2336 2337 if (dstMetadata != null && dstMetadata.isDir()) { 2338 // It's an existing directory. 2339 dstKey = pathToKey(makeAbsolute(new Path(dst, src.getName()))); 2340 LOG.debug("Destination {} " 2341 + " is a directory, adjusted the destination to be {}", dst, dstKey); 2342 } else if (dstMetadata != null) { 2343 // Attempting to overwrite a file using rename() 2344 LOG.debug("Destination {}" 2345 + " is an already existing file, failing the rename.", dst); 2346 return false; 2347 } else { 2348 // Check that the parent directory exists. 2349 FileMetadata parentOfDestMetadata = null; 2350 try { 2351 parentOfDestMetadata = store.retrieveMetadata(pathToKey(absoluteDst.getParent())); 2352 } catch (IOException ex) { 2353 2354 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); 2355 2356 if (innerException instanceof StorageException 2357 && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 2358 2359 LOG.debug("Parent of destination {} doesn't exists. Failing rename", dst); 2360 return false; 2361 } 2362 2363 throw ex; 2364 } 2365 2366 if (parentOfDestMetadata == null) { 2367 LOG.debug("Parent of the destination {}" 2368 + " doesn't exist, failing the rename.", dst); 2369 return false; 2370 } else if (!parentOfDestMetadata.isDir()) { 2371 LOG.debug("Parent of the destination {}" 2372 + " is a file, failing the rename.", dst); 2373 return false; 2374 } 2375 } 2376 FileMetadata srcMetadata = null; 2377 try { 2378 srcMetadata = store.retrieveMetadata(srcKey); 2379 } catch (IOException ex) { 2380 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); 2381 2382 if (innerException instanceof StorageException 2383 && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 2384 2385 LOG.debug("Source {} doesn't exists. Failing rename", src); 2386 return false; 2387 } 2388 2389 throw ex; 2390 } 2391 2392 if (srcMetadata == null) { 2393 // Source doesn't exist 2394 LOG.debug("Source {} doesn't exist, failing the rename.", src); 2395 return false; 2396 } else if (!srcMetadata.isDir()) { 2397 LOG.debug("Source {} found as a file, renaming.", src); 2398 try { 2399 store.rename(srcKey, dstKey); 2400 } catch(IOException ex) { 2401 2402 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); 2403 2404 if (innerException instanceof StorageException 2405 && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 2406 2407 LOG.debug("BlobNotFoundException encountered. Failing rename", src); 2408 return false; 2409 } 2410 2411 throw ex; 2412 } 2413 } else { 2414 2415 // Prepare for, execute and clean up after of all files in folder, and 2416 // the root file, and update the last modified time of the source and 2417 // target parent folders. The operation can be redone if it fails part 2418 // way through, by applying the "Rename Pending" file. 2419 2420 // The following code (internally) only does atomic rename preparation 2421 // and lease management for page blob folders, limiting the scope of the 2422 // operation to HBase log file folders, where atomic rename is required. 2423 // In the future, we could generalize it easily to all folders. 2424 renamePending = prepareAtomicFolderRename(srcKey, dstKey); 2425 renamePending.execute(); 2426 2427 LOG.debug("Renamed {} to {} successfully.", src, dst); 2428 renamePending.cleanup(); 2429 return true; 2430 } 2431 2432 // Update the last-modified time of the parent folders of both source 2433 // and destination. 2434 updateParentFolderLastModifiedTime(srcKey); 2435 updateParentFolderLastModifiedTime(dstKey); 2436 2437 LOG.debug("Renamed {} to {} successfully.", src, dst); 2438 return true; 2439 } 2440 2441 /** 2442 * Update the last-modified time of the parent folder of the file 2443 * identified by key. 2444 * @param key 2445 * @throws IOException 2446 */ 2447 private void updateParentFolderLastModifiedTime(String key) 2448 throws IOException { 2449 Path parent = makeAbsolute(keyToPath(key)).getParent(); 2450 if (parent != null && parent.getParent() != null) { // not root 2451 String parentKey = pathToKey(parent); 2452 2453 // ensure the parent is a materialized folder 2454 FileMetadata parentMetadata = store.retrieveMetadata(parentKey); 2455 // The metadata could be null if the implicit folder only contains a 2456 // single file. In this case, the parent folder no longer exists if the 2457 // file is renamed; so we can safely ignore the null pointer case. 2458 if (parentMetadata != null) { 2459 if (parentMetadata.isDir() 2460 && parentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) { 2461 store.storeEmptyFolder(parentKey, 2462 createPermissionStatus(FsPermission.getDefault())); 2463 } 2464 2465 if (store.isAtomicRenameKey(parentKey)) { 2466 SelfRenewingLease lease = null; 2467 try { 2468 lease = leaseSourceFolder(parentKey); 2469 store.updateFolderLastModifiedTime(parentKey, lease); 2470 } catch (AzureException e) { 2471 String errorCode = ""; 2472 try { 2473 StorageException e2 = (StorageException) e.getCause(); 2474 errorCode = e2.getErrorCode(); 2475 } catch (Exception e3) { 2476 // do nothing if cast fails 2477 } 2478 if (errorCode.equals("BlobNotFound")) { 2479 throw new FileNotFoundException("Folder does not exist: " + parentKey); 2480 } 2481 LOG.warn("Got unexpected exception trying to get lease on {}. {}", 2482 parentKey, e.getMessage()); 2483 throw e; 2484 } finally { 2485 try { 2486 if (lease != null) { 2487 lease.free(); 2488 } 2489 } catch (Exception e) { 2490 LOG.error("Unable to free lease on {}", parentKey, e); 2491 } 2492 } 2493 } else { 2494 store.updateFolderLastModifiedTime(parentKey, null); 2495 } 2496 } 2497 } 2498 } 2499 2500 /** 2501 * If the source is a page blob folder, 2502 * prepare to rename this folder atomically. This means to get exclusive 2503 * access to the source folder, and record the actions to be performed for 2504 * this rename in a "Rename Pending" file. This code was designed to 2505 * meet the needs of HBase, which requires atomic rename of write-ahead log 2506 * (WAL) folders for correctness. 2507 * 2508 * Before calling this method, the caller must ensure that the source is a 2509 * folder. 2510 * 2511 * For non-page-blob directories, prepare the in-memory information needed, 2512 * but don't take the lease or write the redo file. This is done to limit the 2513 * scope of atomic folder rename to HBase, at least at the time of writing 2514 * this code. 2515 * 2516 * @param srcKey Source folder name. 2517 * @param dstKey Destination folder name. 2518 * @throws IOException 2519 */ 2520 private FolderRenamePending prepareAtomicFolderRename( 2521 String srcKey, String dstKey) throws IOException { 2522 2523 if (store.isAtomicRenameKey(srcKey)) { 2524 2525 // Block unwanted concurrent access to source folder. 2526 SelfRenewingLease lease = leaseSourceFolder(srcKey); 2527 2528 // Prepare in-memory information needed to do or redo a folder rename. 2529 FolderRenamePending renamePending = 2530 new FolderRenamePending(srcKey, dstKey, lease, this); 2531 2532 // Save it to persistent storage to help recover if the operation fails. 2533 renamePending.writeFile(this); 2534 return renamePending; 2535 } else { 2536 FolderRenamePending renamePending = 2537 new FolderRenamePending(srcKey, dstKey, null, this); 2538 return renamePending; 2539 } 2540 } 2541 2542 /** 2543 * Get a self-renewing Azure blob lease on the source folder zero-byte file. 2544 */ 2545 private SelfRenewingLease leaseSourceFolder(String srcKey) 2546 throws AzureException { 2547 return store.acquireLease(srcKey); 2548 } 2549 2550 /** 2551 * Return an array containing hostnames, offset and size of 2552 * portions of the given file. For WASB we'll just lie and give 2553 * fake hosts to make sure we get many splits in MR jobs. 2554 */ 2555 @Override 2556 public BlockLocation[] getFileBlockLocations(FileStatus file, 2557 long start, long len) throws IOException { 2558 if (file == null) { 2559 return null; 2560 } 2561 2562 if ((start < 0) || (len < 0)) { 2563 throw new IllegalArgumentException("Invalid start or len parameter"); 2564 } 2565 2566 if (file.getLen() < start) { 2567 return new BlockLocation[0]; 2568 } 2569 final String blobLocationHost = getConf().get( 2570 AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME, 2571 AZURE_BLOCK_LOCATION_HOST_DEFAULT); 2572 final String[] name = { blobLocationHost }; 2573 final String[] host = { blobLocationHost }; 2574 long blockSize = file.getBlockSize(); 2575 if (blockSize <= 0) { 2576 throw new IllegalArgumentException( 2577 "The block size for the given file is not a positive number: " 2578 + blockSize); 2579 } 2580 int numberOfLocations = (int) (len / blockSize) 2581 + ((len % blockSize == 0) ? 0 : 1); 2582 BlockLocation[] locations = new BlockLocation[numberOfLocations]; 2583 for (int i = 0; i < locations.length; i++) { 2584 long currentOffset = start + (i * blockSize); 2585 long currentLength = Math.min(blockSize, start + len - currentOffset); 2586 locations[i] = new BlockLocation(name, host, currentOffset, currentLength); 2587 } 2588 return locations; 2589 } 2590 2591 /** 2592 * Set the working directory to the given directory. 2593 */ 2594 @Override 2595 public void setWorkingDirectory(Path newDir) { 2596 workingDir = makeAbsolute(newDir); 2597 } 2598 2599 @Override 2600 public Path getWorkingDirectory() { 2601 return workingDir; 2602 } 2603 2604 @Override 2605 public void setPermission(Path p, FsPermission permission) throws FileNotFoundException, IOException { 2606 Path absolutePath = makeAbsolute(p); 2607 String key = pathToKey(absolutePath); 2608 FileMetadata metadata = null; 2609 try { 2610 metadata = store.retrieveMetadata(key); 2611 } catch (IOException ex) { 2612 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); 2613 2614 if (innerException instanceof StorageException 2615 && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 2616 2617 throw new FileNotFoundException(String.format("File %s doesn't exists.", p)); 2618 } 2619 2620 throw ex; 2621 } 2622 2623 if (metadata == null) { 2624 throw new FileNotFoundException("File doesn't exist: " + p); 2625 } 2626 permission = applyUMask(permission, 2627 metadata.isDir() ? UMaskApplyMode.ChangeExistingDirectory 2628 : UMaskApplyMode.ChangeExistingFile); 2629 if (metadata.getBlobMaterialization() == BlobMaterialization.Implicit) { 2630 // It's an implicit folder, need to materialize it. 2631 store.storeEmptyFolder(key, createPermissionStatus(permission)); 2632 } else if (!metadata.getPermissionStatus().getPermission(). 2633 equals(permission)) { 2634 store.changePermissionStatus(key, new PermissionStatus( 2635 metadata.getPermissionStatus().getUserName(), 2636 metadata.getPermissionStatus().getGroupName(), 2637 permission)); 2638 } 2639 } 2640 2641 @Override 2642 public void setOwner(Path p, String username, String groupname) 2643 throws IOException { 2644 Path absolutePath = makeAbsolute(p); 2645 String key = pathToKey(absolutePath); 2646 FileMetadata metadata = null; 2647 2648 try { 2649 metadata = store.retrieveMetadata(key); 2650 } catch (IOException ex) { 2651 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); 2652 2653 if (innerException instanceof StorageException 2654 && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 2655 2656 throw new FileNotFoundException(String.format("File %s doesn't exists.", p)); 2657 } 2658 2659 throw ex; 2660 } 2661 2662 if (metadata == null) { 2663 throw new FileNotFoundException("File doesn't exist: " + p); 2664 } 2665 2666 PermissionStatus newPermissionStatus = new PermissionStatus( 2667 username == null ? 2668 metadata.getPermissionStatus().getUserName() : username, 2669 groupname == null ? 2670 metadata.getPermissionStatus().getGroupName() : groupname, 2671 metadata.getPermissionStatus().getPermission()); 2672 if (metadata.getBlobMaterialization() == BlobMaterialization.Implicit) { 2673 // It's an implicit folder, need to materialize it. 2674 store.storeEmptyFolder(key, newPermissionStatus); 2675 } else { 2676 store.changePermissionStatus(key, newPermissionStatus); 2677 } 2678 } 2679 2680 @Override 2681 public synchronized void close() throws IOException { 2682 if (isClosed) { 2683 return; 2684 } 2685 2686 // Call the base close() to close any resources there. 2687 super.close(); 2688 // Close the store to close any resources there - e.g. the bandwidth 2689 // updater thread would be stopped at this time. 2690 store.close(); 2691 // Notify the metrics system that this file system is closed, which may 2692 // trigger one final metrics push to get the accurate final file system 2693 // metrics out. 2694 2695 long startTime = System.currentTimeMillis(); 2696 2697 if(!getConf().getBoolean(SKIP_AZURE_METRICS_PROPERTY_NAME, false)) { 2698 AzureFileSystemMetricsSystem.unregisterSource(metricsSourceName); 2699 AzureFileSystemMetricsSystem.fileSystemClosed(); 2700 } 2701 2702 LOG.debug("Submitting metrics when file system closed took {} ms.", 2703 (System.currentTimeMillis() - startTime)); 2704 isClosed = true; 2705 } 2706 2707 /** 2708 * A handler that defines what to do with blobs whose upload was 2709 * interrupted. 2710 */ 2711 private abstract class DanglingFileHandler { 2712 abstract void handleFile(FileMetadata file, FileMetadata tempFile) 2713 throws IOException; 2714 } 2715 2716 /** 2717 * Handler implementation for just deleting dangling files and cleaning 2718 * them up. 2719 */ 2720 private class DanglingFileDeleter extends DanglingFileHandler { 2721 @Override 2722 void handleFile(FileMetadata file, FileMetadata tempFile) 2723 throws IOException { 2724 2725 LOG.debug("Deleting dangling file {}", file.getKey()); 2726 store.delete(file.getKey()); 2727 store.delete(tempFile.getKey()); 2728 } 2729 } 2730 2731 /** 2732 * Handler implementation for just moving dangling files to recovery 2733 * location (/lost+found). 2734 */ 2735 private class DanglingFileRecoverer extends DanglingFileHandler { 2736 private final Path destination; 2737 2738 DanglingFileRecoverer(Path destination) { 2739 this.destination = destination; 2740 } 2741 2742 @Override 2743 void handleFile(FileMetadata file, FileMetadata tempFile) 2744 throws IOException { 2745 2746 LOG.debug("Recovering {}", file.getKey()); 2747 // Move to the final destination 2748 String finalDestinationKey = 2749 pathToKey(new Path(destination, file.getKey())); 2750 store.rename(tempFile.getKey(), finalDestinationKey); 2751 if (!finalDestinationKey.equals(file.getKey())) { 2752 // Delete the empty link file now that we've restored it. 2753 store.delete(file.getKey()); 2754 } 2755 } 2756 } 2757 2758 /** 2759 * Check if a path has colons in its name 2760 */ 2761 private boolean containsColon(Path p) { 2762 return p.toUri().getPath().toString().contains(":"); 2763 } 2764 2765 /** 2766 * Implements recover and delete (-move and -delete) behaviors for handling 2767 * dangling files (blobs whose upload was interrupted). 2768 * 2769 * @param root 2770 * The root path to check from. 2771 * @param handler 2772 * The handler that deals with dangling files. 2773 */ 2774 private void handleFilesWithDanglingTempData(Path root, 2775 DanglingFileHandler handler) throws IOException { 2776 // Calculate the cut-off for when to consider a blob to be dangling. 2777 long cutoffForDangling = new Date().getTime() 2778 - getConf().getInt(AZURE_TEMP_EXPIRY_PROPERTY_NAME, 2779 AZURE_TEMP_EXPIRY_DEFAULT) * 1000; 2780 // Go over all the blobs under the given root and look for blobs to 2781 // recover. 2782 String priorLastKey = null; 2783 do { 2784 PartialListing listing = store.listAll(pathToKey(root), AZURE_LIST_ALL, 2785 AZURE_UNBOUNDED_DEPTH, priorLastKey); 2786 2787 for (FileMetadata file : listing.getFiles()) { 2788 if (!file.isDir()) { // We don't recover directory blobs 2789 // See if this blob has a link in it (meaning it's a place-holder 2790 // blob for when the upload to the temp blob is complete). 2791 String link = store.getLinkInFileMetadata(file.getKey()); 2792 if (link != null) { 2793 // It has a link, see if the temp blob it is pointing to is 2794 // existent and old enough to be considered dangling. 2795 FileMetadata linkMetadata = store.retrieveMetadata(link); 2796 if (linkMetadata != null 2797 && linkMetadata.getLastModified() >= cutoffForDangling) { 2798 // Found one! 2799 handler.handleFile(file, linkMetadata); 2800 } 2801 } 2802 } 2803 } 2804 priorLastKey = listing.getPriorLastKey(); 2805 } while (priorLastKey != null); 2806 } 2807 2808 /** 2809 * Looks under the given root path for any blob that are left "dangling", 2810 * meaning that they are place-holder blobs that we created while we upload 2811 * the data to a temporary blob, but for some reason we crashed in the middle 2812 * of the upload and left them there. If any are found, we move them to the 2813 * destination given. 2814 * 2815 * @param root 2816 * The root path to consider. 2817 * @param destination 2818 * The destination path to move any recovered files to. 2819 * @throws IOException 2820 */ 2821 public void recoverFilesWithDanglingTempData(Path root, Path destination) 2822 throws IOException { 2823 2824 LOG.debug("Recovering files with dangling temp data in {}", root); 2825 handleFilesWithDanglingTempData(root, 2826 new DanglingFileRecoverer(destination)); 2827 } 2828 2829 /** 2830 * Looks under the given root path for any blob that are left "dangling", 2831 * meaning that they are place-holder blobs that we created while we upload 2832 * the data to a temporary blob, but for some reason we crashed in the middle 2833 * of the upload and left them there. If any are found, we delete them. 2834 * 2835 * @param root 2836 * The root path to consider. 2837 * @throws IOException 2838 */ 2839 public void deleteFilesWithDanglingTempData(Path root) throws IOException { 2840 2841 LOG.debug("Deleting files with dangling temp data in {}", root); 2842 handleFilesWithDanglingTempData(root, new DanglingFileDeleter()); 2843 } 2844 2845 @Override 2846 protected void finalize() throws Throwable { 2847 LOG.debug("finalize() called."); 2848 close(); 2849 super.finalize(); 2850 } 2851 2852 /** 2853 * Encode the key with a random prefix for load balancing in Azure storage. 2854 * Upload data to a random temporary file then do storage side renaming to 2855 * recover the original key. 2856 * 2857 * @param aKey 2858 * @return Encoded version of the original key. 2859 */ 2860 private static String encodeKey(String aKey) { 2861 // Get the tail end of the key name. 2862 // 2863 String fileName = aKey.substring(aKey.lastIndexOf(Path.SEPARATOR) + 1, 2864 aKey.length()); 2865 2866 // Construct the randomized prefix of the file name. The prefix ensures the 2867 // file always drops into the same folder but with a varying tail key name. 2868 String filePrefix = AZURE_TEMP_FOLDER + Path.SEPARATOR 2869 + UUID.randomUUID().toString(); 2870 2871 // Concatenate the randomized prefix with the tail of the key name. 2872 String randomizedKey = filePrefix + fileName; 2873 2874 // Return to the caller with the randomized key. 2875 return randomizedKey; 2876 } 2877}