001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.fs.azure;
020
021import java.io.DataInputStream;
022import java.io.DataOutputStream;
023import java.io.EOFException;
024import java.io.FileNotFoundException;
025import java.io.IOException;
026import java.io.InputStream;
027import java.io.OutputStream;
028import java.net.URI;
029import java.net.URISyntaxException;
030import java.nio.charset.Charset;
031import java.text.SimpleDateFormat;
032import java.util.ArrayList;
033import java.util.Date;
034import java.util.EnumSet;
035import java.util.Set;
036import java.util.TimeZone;
037import java.util.TreeSet;
038import java.util.UUID;
039import java.util.concurrent.atomic.AtomicInteger;
040import java.util.regex.Matcher;
041import java.util.regex.Pattern;
042
043import org.apache.commons.lang.StringUtils;
044import org.apache.hadoop.classification.InterfaceAudience;
045import org.apache.hadoop.classification.InterfaceStability;
046import org.apache.hadoop.conf.Configuration;
047import org.apache.hadoop.fs.BlockLocation;
048import org.apache.hadoop.fs.BufferedFSInputStream;
049import org.apache.hadoop.fs.CreateFlag;
050import org.apache.hadoop.fs.FSDataInputStream;
051import org.apache.hadoop.fs.FSDataOutputStream;
052import org.apache.hadoop.fs.FSExceptionMessages;
053import org.apache.hadoop.fs.FSInputStream;
054import org.apache.hadoop.fs.FileAlreadyExistsException;
055import org.apache.hadoop.fs.FileStatus;
056import org.apache.hadoop.fs.FileSystem;
057import org.apache.hadoop.fs.Path;
058import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
059import org.apache.hadoop.fs.azure.metrics.AzureFileSystemMetricsSystem;
060import org.apache.hadoop.fs.permission.FsPermission;
061import org.apache.hadoop.fs.permission.PermissionStatus;
062import org.apache.hadoop.fs.azure.AzureException;
063import org.apache.hadoop.security.UserGroupInformation;
064import org.apache.hadoop.util.Progressable;
065import org.slf4j.Logger;
066import org.slf4j.LoggerFactory;
067import org.codehaus.jackson.JsonNode;
068import org.codehaus.jackson.JsonParseException;
069import org.codehaus.jackson.JsonParser;
070import org.codehaus.jackson.map.JsonMappingException;
071import org.codehaus.jackson.map.ObjectMapper;
072
073import com.google.common.annotations.VisibleForTesting;
074import com.microsoft.azure.storage.StorageException;
075
076
077import org.apache.hadoop.io.IOUtils;
078
079/**
080 * A {@link FileSystem} for reading and writing files stored on <a
081 * href="http://store.azure.com/">Windows Azure</a>. This implementation is
082 * blob-based and stores files on Azure in their native form so they can be read
083 * by other Azure tools.
084 */
085@InterfaceAudience.Public
086@InterfaceStability.Stable
087public class NativeAzureFileSystem extends FileSystem {
088  private static final int USER_WX_PERMISION = 0300;
089  /**
090   * A description of a folder rename operation, including the source and
091   * destination keys, and descriptions of the files in the source folder.
092   */
093  public static class FolderRenamePending {
094    private SelfRenewingLease folderLease;
095    private String srcKey;
096    private String dstKey;
097    private FileMetadata[] fileMetadata = null;    // descriptions of source files
098    private ArrayList<String> fileStrings = null;
099    private NativeAzureFileSystem fs;
100    private static final int MAX_RENAME_PENDING_FILE_SIZE = 10000000;
101    private static final int FORMATTING_BUFFER = 10000;
102    private boolean committed;
103    public static final String SUFFIX = "-RenamePending.json";
104
105    // Prepare in-memory information needed to do or redo a folder rename.
106    public FolderRenamePending(String srcKey, String dstKey, SelfRenewingLease lease,
107        NativeAzureFileSystem fs) throws IOException {
108      this.srcKey = srcKey;
109      this.dstKey = dstKey;
110      this.folderLease = lease;
111      this.fs = fs;
112      ArrayList<FileMetadata> fileMetadataList = new ArrayList<FileMetadata>();
113
114      // List all the files in the folder.
115      String priorLastKey = null;
116      do {
117        PartialListing listing = fs.getStoreInterface().listAll(srcKey, AZURE_LIST_ALL,
118          AZURE_UNBOUNDED_DEPTH, priorLastKey);
119        for(FileMetadata file : listing.getFiles()) {
120          fileMetadataList.add(file);
121        }
122        priorLastKey = listing.getPriorLastKey();
123      } while (priorLastKey != null);
124      fileMetadata = fileMetadataList.toArray(new FileMetadata[fileMetadataList.size()]);
125      this.committed = true;
126    }
127
128    // Prepare in-memory information needed to do or redo folder rename from
129    // a -RenamePending.json file read from storage. This constructor is to use during
130    // redo processing.
131    public FolderRenamePending(Path redoFile, NativeAzureFileSystem fs)
132        throws IllegalArgumentException, IOException {
133
134      this.fs = fs;
135
136      // open redo file
137      Path f = redoFile;
138      FSDataInputStream input = fs.open(f);
139      byte[] bytes = new byte[MAX_RENAME_PENDING_FILE_SIZE];
140      int l = input.read(bytes);
141      if (l <= 0) {
142        // Jira HADOOP-12678 -Handle empty rename pending metadata file during
143        // atomic rename in redo path. If during renamepending file is created
144        // but not written yet, then this means that rename operation
145        // has not started yet. So we should delete rename pending metadata file.
146        LOG.error("Deleting empty rename pending file "
147            + redoFile + " -- no data available");
148        deleteRenamePendingFile(fs, redoFile);
149        return;
150      }
151      if (l == MAX_RENAME_PENDING_FILE_SIZE) {
152        throw new IOException(
153            "Error reading pending rename file contents -- "
154                + "maximum file size exceeded");
155      }
156      String contents = new String(bytes, 0, l, Charset.forName("UTF-8"));
157
158      // parse the JSON
159      ObjectMapper objMapper = new ObjectMapper();
160      objMapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
161      JsonNode json = null;
162      try {
163        json = objMapper.readValue(contents, JsonNode.class);
164        this.committed = true;
165      } catch (JsonMappingException e) {
166
167        // The -RedoPending.json file is corrupted, so we assume it was
168        // not completely written
169        // and the redo operation did not commit.
170        this.committed = false;
171      } catch (JsonParseException e) {
172        this.committed = false;
173      } catch (IOException e) {
174        this.committed = false;
175      }
176      
177      if (!this.committed) {
178        LOG.error("Deleting corruped rename pending file {} \n {}",
179            redoFile, contents);
180
181        // delete the -RenamePending.json file
182        deleteRenamePendingFile(fs, redoFile);
183        return;
184      }
185
186      // initialize this object's fields
187      ArrayList<String> fileStrList = new ArrayList<String>();
188      JsonNode oldFolderName = json.get("OldFolderName");
189      JsonNode newFolderName = json.get("NewFolderName");
190      if (oldFolderName == null || newFolderName == null) {
191          this.committed = false;
192      } else {
193        this.srcKey = oldFolderName.getTextValue();
194        this.dstKey = newFolderName.getTextValue();
195        if (this.srcKey == null || this.dstKey == null) {
196          this.committed = false;
197        } else {
198          JsonNode fileList = json.get("FileList");
199          if (fileList == null) {
200            this.committed = false;
201          } else {
202            for (int i = 0; i < fileList.size(); i++) {
203              fileStrList.add(fileList.get(i).getTextValue());
204            }
205          }
206        }
207      }
208      this.fileStrings = fileStrList;
209    }
210
211    public FileMetadata[] getFiles() {
212      return fileMetadata;
213    }
214
215    public SelfRenewingLease getFolderLease() {
216      return folderLease;
217    }
218
219    /**
220     * Deletes rename pending metadata file
221     * @param fs -- the file system
222     * @param redoFile - rename pending metadata file path
223     * @throws IOException - If deletion fails
224     */
225    @VisibleForTesting
226    void deleteRenamePendingFile(FileSystem fs, Path redoFile)
227        throws IOException {
228      try {
229        fs.delete(redoFile, false);
230      } catch (IOException e) {
231        // If the rename metadata was not found then somebody probably
232        // raced with us and finished the delete first
233        Throwable t = e.getCause();
234        if (t != null && t instanceof StorageException
235            && "BlobNotFound".equals(((StorageException) t).getErrorCode())) {
236          LOG.warn("rename pending file " + redoFile + " is already deleted");
237        } else {
238          throw e;
239        }
240      }
241    }
242
243    /**
244     * Write to disk the information needed to redo folder rename,
245     * in JSON format. The file name will be
246     * {@code wasb://<sourceFolderPrefix>/folderName-RenamePending.json}
247     * The file format will be:
248     * <pre>{@code
249     * {
250     *   FormatVersion: "1.0",
251     *   OperationTime: "<YYYY-MM-DD HH:MM:SS.MMM>",
252     *   OldFolderName: "<key>",
253     *   NewFolderName: "<key>",
254     *   FileList: [ <string> , <string> , ... ]
255     * }
256     *
257     * Here's a sample:
258     * {
259     *  FormatVersion: "1.0",
260     *  OperationUTCTime: "2014-07-01 23:50:35.572",
261     *  OldFolderName: "user/ehans/folderToRename",
262     *  NewFolderName: "user/ehans/renamedFolder",
263     *  FileList: [
264     *    "innerFile",
265     *    "innerFile2"
266     *  ]
267     * } }</pre>
268     * @throws IOException
269     */
270    public void writeFile(FileSystem fs) throws IOException {
271      Path path = getRenamePendingFilePath();
272      LOG.debug("Preparing to write atomic rename state to {}", path.toString());
273      OutputStream output = null;
274
275      String contents = makeRenamePendingFileContents();
276
277      // Write file.
278      try {
279        output = fs.create(path);
280        output.write(contents.getBytes(Charset.forName("UTF-8")));
281      } catch (IOException e) {
282        throw new IOException("Unable to write RenamePending file for folder rename from "
283            + srcKey + " to " + dstKey, e);
284      } finally {
285        NativeAzureFileSystemHelper.cleanup(LOG, output);
286      }
287    }
288
289    /**
290     * Return the contents of the JSON file to represent the operations
291     * to be performed for a folder rename.
292     */
293    public String makeRenamePendingFileContents() {
294      SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
295      sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
296      String time = sdf.format(new Date());
297
298      // Make file list string
299      StringBuilder builder = new StringBuilder();
300      builder.append("[\n");
301      for (int i = 0; i != fileMetadata.length; i++) {
302        if (i > 0) {
303          builder.append(",\n");
304        }
305        builder.append("    ");
306        String noPrefix = StringUtils.removeStart(fileMetadata[i].getKey(), srcKey + "/");
307
308        // Quote string file names, escaping any possible " characters or other
309        // necessary characters in the name.
310        builder.append(quote(noPrefix));
311        if (builder.length() >=
312            MAX_RENAME_PENDING_FILE_SIZE - FORMATTING_BUFFER) {
313
314          // Give up now to avoid using too much memory.
315          LOG.error("Internal error: Exceeded maximum rename pending file size of {} bytes.",
316              MAX_RENAME_PENDING_FILE_SIZE);
317
318          // return some bad JSON with an error message to make it human readable
319          return "exceeded maximum rename pending file size";
320        }
321      }
322      builder.append("\n  ]");
323      String fileList = builder.toString();
324
325      // Make file contents as a string. Again, quote file names, escaping
326      // characters as appropriate.
327      String contents = "{\n"
328          + "  FormatVersion: \"1.0\",\n"
329          + "  OperationUTCTime: \"" + time + "\",\n"
330          + "  OldFolderName: " + quote(srcKey) + ",\n"
331          + "  NewFolderName: " + quote(dstKey) + ",\n"
332          + "  FileList: " + fileList + "\n"
333          + "}\n";
334
335      return contents;
336    }
337    
338    /**
339     * This is an exact copy of org.codehaus.jettison.json.JSONObject.quote 
340     * method.
341     * 
342     * Produce a string in double quotes with backslash sequences in all the
343     * right places. A backslash will be inserted within </, allowing JSON
344     * text to be delivered in HTML. In JSON text, a string cannot contain a
345     * control character or an unescaped quote or backslash.
346     * @param string A String
347     * @return  A String correctly formatted for insertion in a JSON text.
348     */
349    private String quote(String string) {
350        if (string == null || string.length() == 0) {
351            return "\"\"";
352        }
353
354        char c = 0;
355        int  i;
356        int  len = string.length();
357        StringBuilder sb = new StringBuilder(len + 4);
358        String t;
359
360        sb.append('"');
361        for (i = 0; i < len; i += 1) {
362            c = string.charAt(i);
363            switch (c) {
364            case '\\':
365            case '"':
366                sb.append('\\');
367                sb.append(c);
368                break;
369            case '/':
370                sb.append('\\');
371                sb.append(c);
372                break;
373            case '\b':
374                sb.append("\\b");
375                break;
376            case '\t':
377                sb.append("\\t");
378                break;
379            case '\n':
380                sb.append("\\n");
381                break;
382            case '\f':
383                sb.append("\\f");
384                break;
385            case '\r':
386                sb.append("\\r");
387                break;
388            default:
389                if (c < ' ') {
390                    t = "000" + Integer.toHexString(c);
391                    sb.append("\\u" + t.substring(t.length() - 4));
392                } else {
393                    sb.append(c);
394                }
395            }
396        }
397        sb.append('"');
398        return sb.toString();
399    }
400
401    public String getSrcKey() {
402      return srcKey;
403    }
404
405    public String getDstKey() {
406      return dstKey;
407    }
408
409    public FileMetadata getSourceMetadata() throws IOException {
410      return fs.getStoreInterface().retrieveMetadata(srcKey);
411    }
412
413    /**
414     * Execute a folder rename. This is the execution path followed
415     * when everything is working normally. See redo() for the alternate
416     * execution path for the case where we're recovering from a folder rename
417     * failure.
418     * @throws IOException
419     */
420    public void execute() throws IOException {
421
422      for (FileMetadata file : this.getFiles()) {
423
424        // Rename all materialized entries under the folder to point to the
425        // final destination.
426        if (file.getBlobMaterialization() == BlobMaterialization.Explicit) {
427          String srcName = file.getKey();
428          String suffix  = srcName.substring((this.getSrcKey()).length());
429          String dstName = this.getDstKey() + suffix;
430
431          // Rename gets exclusive access (via a lease) for files
432          // designated for atomic rename.
433          // The main use case is for HBase write-ahead log (WAL) and data
434          // folder processing correctness.  See the rename code for details.
435          boolean acquireLease = fs.getStoreInterface().isAtomicRenameKey(srcName);
436          fs.getStoreInterface().rename(srcName, dstName, acquireLease, null);
437        }
438      }
439
440      // Rename the source folder 0-byte root file itself.
441      FileMetadata srcMetadata2 = this.getSourceMetadata();
442      if (srcMetadata2.getBlobMaterialization() ==
443          BlobMaterialization.Explicit) {
444
445        // It already has a lease on it from the "prepare" phase so there's no
446        // need to get one now. Pass in existing lease to allow file delete.
447        fs.getStoreInterface().rename(this.getSrcKey(), this.getDstKey(),
448            false, folderLease);
449      }
450
451      // Update the last-modified time of the parent folders of both source and
452      // destination.
453      fs.updateParentFolderLastModifiedTime(srcKey);
454      fs.updateParentFolderLastModifiedTime(dstKey);
455    }
456
457    /** Clean up after execution of rename.
458     * @throws IOException */
459    public void cleanup() throws IOException {
460
461      if (fs.getStoreInterface().isAtomicRenameKey(srcKey)) {
462
463        // Remove RenamePending file
464        fs.delete(getRenamePendingFilePath(), false);
465
466        // Freeing source folder lease is not necessary since the source
467        // folder file was deleted.
468      }
469    }
470
471    private Path getRenamePendingFilePath() {
472      String fileName = srcKey + SUFFIX;
473      Path fileNamePath = keyToPath(fileName);
474      Path path = fs.makeAbsolute(fileNamePath);
475      return path;
476    }
477
478    /**
479     * Recover from a folder rename failure by redoing the intended work,
480     * as recorded in the -RenamePending.json file.
481     * 
482     * @throws IOException
483     */
484    public void redo() throws IOException {
485
486      if (!committed) {
487
488        // Nothing to do. The -RedoPending.json file should have already been
489        // deleted.
490        return;
491      }
492
493      // Try to get a lease on source folder to block concurrent access to it.
494      // It may fail if the folder is already gone. We don't check if the
495      // source exists explicitly because that could recursively trigger redo
496      // and give an infinite recursion.
497      SelfRenewingLease lease = null;
498      boolean sourceFolderGone = false;
499      try {
500        lease = fs.leaseSourceFolder(srcKey);
501      } catch (AzureException e) {
502
503        // If the source folder was not found then somebody probably
504        // raced with us and finished the rename first, or the
505        // first rename failed right before deleting the rename pending
506        // file.
507        String errorCode = "";
508        try {
509          StorageException se = (StorageException) e.getCause();
510          errorCode = se.getErrorCode();
511        } catch (Exception e2) {
512          ; // do nothing -- could not get errorCode
513        }
514        if (errorCode.equals("BlobNotFound")) {
515          sourceFolderGone = true;
516        } else {
517          throw new IOException(
518              "Unexpected error when trying to lease source folder name during "
519              + "folder rename redo",
520              e);
521        }
522      }
523
524      if (!sourceFolderGone) {
525        // Make sure the target folder exists.
526        Path dst = fullPath(dstKey);
527        if (!fs.exists(dst)) {
528          fs.mkdirs(dst);
529        }
530
531        // For each file inside the folder to be renamed,
532        // make sure it has been renamed.
533        for(String fileName : fileStrings) {
534          finishSingleFileRename(fileName);
535        }
536
537        // Remove the source folder. Don't check explicitly if it exists,
538        // to avoid triggering redo recursively.
539        try {
540          fs.getStoreInterface().delete(srcKey, lease);
541        } catch (Exception e) {
542          LOG.info("Unable to delete source folder during folder rename redo. "
543              + "If the source folder is already gone, this is not an error "
544              + "condition. Continuing with redo.", e);
545        }
546
547        // Update the last-modified time of the parent folders of both source
548        // and destination.
549        fs.updateParentFolderLastModifiedTime(srcKey);
550        fs.updateParentFolderLastModifiedTime(dstKey);
551      }
552
553      // Remove the -RenamePending.json file.
554      fs.delete(getRenamePendingFilePath(), false);
555    }
556
557    // See if the source file is still there, and if it is, rename it.
558    private void finishSingleFileRename(String fileName)
559        throws IOException {
560      Path srcFile = fullPath(srcKey, fileName);
561      Path dstFile = fullPath(dstKey, fileName);
562      String srcName = fs.pathToKey(srcFile);
563      String dstName = fs.pathToKey(dstFile);
564      boolean srcExists = fs.getStoreInterface().explicitFileExists(srcName);
565      boolean dstExists = fs.getStoreInterface().explicitFileExists(dstName);
566      if(srcExists) {
567        // Rename gets exclusive access (via a lease) for HBase write-ahead log
568        // (WAL) file processing correctness.  See the rename code for details.
569        fs.getStoreInterface().rename(srcName, dstName, true, null);
570      } else if (!srcExists && dstExists) {
571        // The rename already finished, so do nothing.
572        ;
573      } else {
574        throw new IOException(
575            "Attempting to complete rename of file " + srcKey + "/" + fileName
576            + " during folder rename redo, and file was not found in source "
577            + "or destination.");
578      }
579    }
580
581    // Return an absolute path for the specific fileName within the folder
582    // specified by folderKey.
583    private Path fullPath(String folderKey, String fileName) {
584      return new Path(new Path(fs.getUri()), "/" + folderKey + "/" + fileName);
585    }
586
587    private Path fullPath(String fileKey) {
588      return new Path(new Path(fs.getUri()), "/" + fileKey);
589    }
590  }
591
592  private static final String TRAILING_PERIOD_PLACEHOLDER = "[[.]]";
593  private static final Pattern TRAILING_PERIOD_PLACEHOLDER_PATTERN =
594      Pattern.compile("\\[\\[\\.\\]\\](?=$|/)");
595  private static final Pattern TRAILING_PERIOD_PATTERN = Pattern.compile("\\.(?=$|/)");
596
597  @Override
598  public String getScheme() {
599    return "wasb";
600  }
601
602  
603  /**
604   * <p>
605   * A {@link FileSystem} for reading and writing files stored on <a
606   * href="http://store.azure.com/">Windows Azure</a>. This implementation is
607   * blob-based and stores files on Azure in their native form so they can be read
608   * by other Azure tools. This implementation uses HTTPS for secure network communication.
609   * </p>
610   */
611  public static class Secure extends NativeAzureFileSystem {
612    @Override
613    public String getScheme() {
614      return "wasbs";
615    }
616  }
617
618  public static final Logger LOG = LoggerFactory.getLogger(NativeAzureFileSystem.class);
619
620  static final String AZURE_BLOCK_SIZE_PROPERTY_NAME = "fs.azure.block.size";
621  /**
622   * The time span in seconds before which we consider a temp blob to be
623   * dangling (not being actively uploaded to) and up for reclamation.
624   * 
625   * So e.g. if this is 60, then any temporary blobs more than a minute old
626   * would be considered dangling.
627   */
628  static final String AZURE_TEMP_EXPIRY_PROPERTY_NAME = "fs.azure.fsck.temp.expiry.seconds";
629  private static final int AZURE_TEMP_EXPIRY_DEFAULT = 3600;
630  static final String PATH_DELIMITER = Path.SEPARATOR;
631  static final String AZURE_TEMP_FOLDER = "_$azuretmpfolder$";
632
633  private static final int AZURE_LIST_ALL = -1;
634  private static final int AZURE_UNBOUNDED_DEPTH = -1;
635
636  private static final long MAX_AZURE_BLOCK_SIZE = 512 * 1024 * 1024L;
637
638  /**
639   * The configuration property that determines which group owns files created
640   * in WASB.
641   */
642  private static final String AZURE_DEFAULT_GROUP_PROPERTY_NAME = "fs.azure.permissions.supergroup";
643  /**
644   * The default value for fs.azure.permissions.supergroup. Chosen as the same
645   * default as DFS.
646   */
647  static final String AZURE_DEFAULT_GROUP_DEFAULT = "supergroup";
648
649  static final String AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME =
650      "fs.azure.block.location.impersonatedhost";
651  private static final String AZURE_BLOCK_LOCATION_HOST_DEFAULT =
652      "localhost";
653  static final String AZURE_RINGBUFFER_CAPACITY_PROPERTY_NAME =
654      "fs.azure.ring.buffer.capacity";
655  static final String AZURE_OUTPUT_STREAM_BUFFER_SIZE_PROPERTY_NAME =
656      "fs.azure.output.stream.buffer.size";
657
658  public static final String SKIP_AZURE_METRICS_PROPERTY_NAME = "fs.azure.skip.metrics";
659
660  /*
661   * Property to enable Append API.
662   */
663  public static final String APPEND_SUPPORT_ENABLE_PROPERTY_NAME = "fs.azure.enable.append.support";
664
665  private class NativeAzureFsInputStream extends FSInputStream {
666    private InputStream in;
667    private final String key;
668    private long pos = 0;
669    private boolean closed = false;
670    private boolean isPageBlob;
671
672    // File length, valid only for streams over block blobs.
673    private long fileLength;
674
675    public NativeAzureFsInputStream(DataInputStream in, String key, long fileLength) {
676      this.in = in;
677      this.key = key;
678      this.isPageBlob = store.isPageBlobKey(key);
679      this.fileLength = fileLength;
680    }
681
682    /**
683     * Return the size of the remaining available bytes
684     * if the size is less than or equal to {@link Integer#MAX_VALUE},
685     * otherwise, return {@link Integer#MAX_VALUE}.
686     *
687     * This is to match the behavior of DFSInputStream.available(),
688     * which some clients may rely on (HBase write-ahead log reading in
689     * particular).
690     */
691    @Override
692    public synchronized int available() throws IOException {
693      if (isPageBlob) {
694        return in.available();
695      } else {
696        if (closed) {
697          throw new IOException("Stream closed");
698        }
699        final long remaining = this.fileLength - pos;
700        return remaining <= Integer.MAX_VALUE ?
701            (int) remaining : Integer.MAX_VALUE;
702      }
703    }
704
705    /*
706     * Reads the next byte of data from the input stream. The value byte is
707     * returned as an integer in the range 0 to 255. If no byte is available
708     * because the end of the stream has been reached, the value -1 is returned.
709     * This method blocks until input data is available, the end of the stream
710     * is detected, or an exception is thrown.
711     *
712     * @returns int An integer corresponding to the byte read.
713     */
714    @Override
715    public synchronized int read() throws FileNotFoundException, IOException {
716      try {
717        int result = 0;
718        result = in.read();
719        if (result != -1) {
720          pos++;
721          if (statistics != null) {
722            statistics.incrementBytesRead(1);
723          }
724        }
725      // Return to the caller with the result.
726      //
727        return result;
728      } catch(EOFException e) {
729        return -1;
730      } catch(IOException e) {
731
732        Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
733
734        if (innerException instanceof StorageException) {
735
736          LOG.error("Encountered Storage Exception for read on Blob : {}"
737              + " Exception details: {} Error Code : {}",
738              key, e, ((StorageException) innerException).getErrorCode());
739
740          if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
741            throw new FileNotFoundException(String.format("%s is not found", key));
742          }
743        }
744
745       throw e;
746      }
747    }
748
749    /*
750     * Reads up to len bytes of data from the input stream into an array of
751     * bytes. An attempt is made to read as many as len bytes, but a smaller
752     * number may be read. The number of bytes actually read is returned as an
753     * integer. This method blocks until input data is available, end of file is
754     * detected, or an exception is thrown. If len is zero, then no bytes are
755     * read and 0 is returned; otherwise, there is an attempt to read at least
756     * one byte. If no byte is available because the stream is at end of file,
757     * the value -1 is returned; otherwise, at least one byte is read and stored
758     * into b.
759     *
760     * @param b -- the buffer into which data is read
761     *
762     * @param off -- the start offset in the array b at which data is written
763     *
764     * @param len -- the maximum number of bytes read
765     *
766     * @ returns int The total number of byes read into the buffer, or -1 if
767     * there is no more data because the end of stream is reached.
768     */
769    @Override
770    public synchronized int read(byte[] b, int off, int len) throws FileNotFoundException, IOException {
771      try {
772        int result = 0;
773        result = in.read(b, off, len);
774        if (result > 0) {
775          pos += result;
776        }
777
778        if (null != statistics && result > 0) {
779          statistics.incrementBytesRead(result);
780        }
781
782        // Return to the caller with the result.
783        return result;
784      } catch(IOException e) {
785
786        Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
787
788        if (innerException instanceof StorageException) {
789
790          LOG.error("Encountered Storage Exception for read on Blob : {}"
791              + " Exception details: {} Error Code : {}",
792              key, e, ((StorageException) innerException).getErrorCode());
793
794          if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
795            throw new FileNotFoundException(String.format("%s is not found", key));
796          }
797        }
798
799       throw e;
800      }
801    }
802
803    @Override
804    public synchronized void close() throws IOException {
805      if (!closed) {
806        closed = true;
807        IOUtils.closeStream(in);
808        in = null;
809      }
810    }
811
812    @Override
813    public synchronized void seek(long pos) throws FileNotFoundException, EOFException, IOException {
814      try {
815        checkNotClosed();
816        if (pos < 0) {
817          throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
818        }
819        IOUtils.closeStream(in);
820        in = store.retrieve(key);
821        this.pos = in.skip(pos);
822        LOG.debug("Seek to position {}. Bytes skipped {}", pos,
823          this.pos);
824      } catch(IOException e) {
825
826        Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
827
828        if (innerException instanceof StorageException
829            && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
830          throw new FileNotFoundException(String.format("%s is not found", key));
831        }
832
833        throw e;
834      } catch(IndexOutOfBoundsException e) {
835        throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
836      }
837    }
838
839    @Override
840    public synchronized long getPos() throws IOException {
841      return pos;
842    }
843
844    @Override
845    public boolean seekToNewSource(long targetPos) throws IOException {
846      return false;
847    }
848
849
850    /*
851     * Helper method to check if a stream is closed.
852     */
853    private void checkNotClosed() throws IOException {
854      if (closed) {
855        throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
856      }
857    }
858  }
859
860  private class NativeAzureFsOutputStream extends OutputStream {
861    // We should not override flush() to actually close current block and flush
862    // to DFS, this will break applications that assume flush() is a no-op.
863    // Applications are advised to use Syncable.hflush() for that purpose.
864    // NativeAzureFsOutputStream needs to implement Syncable if needed.
865    private String key;
866    private String keyEncoded;
867    private OutputStream out;
868
869    public NativeAzureFsOutputStream(OutputStream out, String aKey,
870        String anEncodedKey) throws IOException {
871      // Check input arguments. The output stream should be non-null and the
872      // keys
873      // should be valid strings.
874      if (null == out) {
875        throw new IllegalArgumentException(
876            "Illegal argument: the output stream is null.");
877      }
878
879      if (null == aKey || 0 == aKey.length()) {
880        throw new IllegalArgumentException(
881            "Illegal argument the key string is null or empty");
882      }
883
884      if (null == anEncodedKey || 0 == anEncodedKey.length()) {
885        throw new IllegalArgumentException(
886            "Illegal argument the encoded key string is null or empty");
887      }
888
889      // Initialize the member variables with the incoming parameters.
890      this.out = out;
891
892      setKey(aKey);
893      setEncodedKey(anEncodedKey);
894    }
895
896    @Override
897    public synchronized void close() throws IOException {
898      if (out != null) {
899        // Close the output stream and decode the key for the output stream
900        // before returning to the caller.
901        //
902        out.close();
903        restoreKey();
904        out = null;
905      }
906    }
907
908    /**
909     * Writes the specified byte to this output stream. The general contract for
910     * write is that one byte is written to the output stream. The byte to be
911     * written is the eight low-order bits of the argument b. The 24 high-order
912     * bits of b are ignored.
913     * 
914     * @param b
915     *          32-bit integer of block of 4 bytes
916     */
917    @Override
918    public void write(int b) throws IOException {
919      try {
920        out.write(b);
921      } catch(IOException e) {
922        if (e.getCause() instanceof StorageException) {
923          StorageException storageExcp  = (StorageException) e.getCause();
924          LOG.error("Encountered Storage Exception for write on Blob : {}"
925              + " Exception details: {} Error Code : {}",
926              key, e.getMessage(), storageExcp.getErrorCode());
927        }
928        throw e;
929      }
930    }
931
932    /**
933     * Writes b.length bytes from the specified byte array to this output
934     * stream. The general contract for write(b) is that it should have exactly
935     * the same effect as the call write(b, 0, b.length).
936     * 
937     * @param b
938     *          Block of bytes to be written to the output stream.
939     */
940    @Override
941    public void write(byte[] b) throws IOException {
942      try {
943        out.write(b);
944      } catch(IOException e) {
945        if (e.getCause() instanceof StorageException) {
946          StorageException storageExcp  = (StorageException) e.getCause();
947          LOG.error("Encountered Storage Exception for write on Blob : {}"
948              + " Exception details: {} Error Code : {}",
949              key, e.getMessage(), storageExcp.getErrorCode());
950        }
951        throw e;
952      }
953    }
954
955    /**
956     * Writes <code>len</code> from the specified byte array starting at offset
957     * <code>off</code> to the output stream. The general contract for write(b,
958     * off, len) is that some of the bytes in the array <code>
959     * b</code b> are written to the output stream in order; element
960     * <code>b[off]</code> is the first byte written and
961     * <code>b[off+len-1]</code> is the last byte written by this operation.
962     * 
963     * @param b
964     *          Byte array to be written.
965     * @param off
966     *          Write this offset in stream.
967     * @param len
968     *          Number of bytes to be written.
969     */
970    @Override
971    public void write(byte[] b, int off, int len) throws IOException {
972      try {
973        out.write(b, off, len);
974      } catch(IOException e) {
975        if (e.getCause() instanceof StorageException) {
976          StorageException storageExcp  = (StorageException) e.getCause();
977          LOG.error("Encountered Storage Exception for write on Blob : {}"
978              + " Exception details: {} Error Code : {}",
979              key, e.getMessage(), storageExcp.getErrorCode());
980        }
981        throw e;
982      }
983    }
984
985    /**
986     * Get the blob name.
987     * 
988     * @return String Blob name.
989     */
990    public String getKey() {
991      return key;
992    }
993
994    /**
995     * Set the blob name.
996     * 
997     * @param key
998     *          Blob name.
999     */
1000    public void setKey(String key) {
1001      this.key = key;
1002    }
1003
1004    /**
1005     * Get the blob name.
1006     * 
1007     * @return String Blob name.
1008     */
1009    public String getEncodedKey() {
1010      return keyEncoded;
1011    }
1012
1013    /**
1014     * Set the blob name.
1015     * 
1016     * @param anEncodedKey
1017     *          Blob name.
1018     */
1019    public void setEncodedKey(String anEncodedKey) {
1020      this.keyEncoded = anEncodedKey;
1021    }
1022
1023    /**
1024     * Restore the original key name from the m_key member variable. Note: The
1025     * output file stream is created with an encoded blob store key to guarantee
1026     * load balancing on the front end of the Azure storage partition servers.
1027     * The create also includes the name of the original key value which is
1028     * stored in the m_key member variable. This method should only be called
1029     * when the stream is closed.
1030     */
1031    private void restoreKey() throws IOException {
1032      store.rename(getEncodedKey(), getKey());
1033    }
1034  }
1035
1036  private URI uri;
1037  private NativeFileSystemStore store;
1038  private AzureNativeFileSystemStore actualStore;
1039  private Path workingDir;
1040  private long blockSize = MAX_AZURE_BLOCK_SIZE;
1041  private AzureFileSystemInstrumentation instrumentation;
1042  private String metricsSourceName;
1043  private boolean isClosed = false;
1044  private static boolean suppressRetryPolicy = false;
1045  // A counter to create unique (within-process) names for my metrics sources.
1046  private static AtomicInteger metricsSourceNameCounter = new AtomicInteger();
1047  private boolean appendSupportEnabled = false;
1048  
1049  public NativeAzureFileSystem() {
1050    // set store in initialize()
1051  }
1052
1053  public NativeAzureFileSystem(NativeFileSystemStore store) {
1054    this.store = store;
1055  }
1056
1057  /**
1058   * Suppress the default retry policy for the Storage, useful in unit tests to
1059   * test negative cases without waiting forever.
1060   */
1061  @VisibleForTesting
1062  static void suppressRetryPolicy() {
1063    suppressRetryPolicy = true;
1064  }
1065
1066  /**
1067   * Undo the effect of suppressRetryPolicy.
1068   */
1069  @VisibleForTesting
1070  static void resumeRetryPolicy() {
1071    suppressRetryPolicy = false;
1072  }
1073
1074  /**
1075   * Creates a new metrics source name that's unique within this process.
1076   */
1077  @VisibleForTesting
1078  public static String newMetricsSourceName() {
1079    int number = metricsSourceNameCounter.incrementAndGet();
1080    final String baseName = "AzureFileSystemMetrics";
1081    if (number == 1) { // No need for a suffix for the first one
1082      return baseName;
1083    } else {
1084      return baseName + number;
1085    }
1086  }
1087  
1088  /**
1089   * Checks if the given URI scheme is a scheme that's affiliated with the Azure
1090   * File System.
1091   * 
1092   * @param scheme
1093   *          The URI scheme.
1094   * @return true iff it's an Azure File System URI scheme.
1095   */
1096  private static boolean isWasbScheme(String scheme) {
1097    // The valid schemes are: asv (old name), asvs (old name over HTTPS),
1098    // wasb (new name), wasbs (new name over HTTPS).
1099    return scheme != null
1100        && (scheme.equalsIgnoreCase("asv") || scheme.equalsIgnoreCase("asvs")
1101            || scheme.equalsIgnoreCase("wasb") || scheme
1102              .equalsIgnoreCase("wasbs"));
1103  }
1104
1105  /**
1106   * Puts in the authority of the default file system if it is a WASB file
1107   * system and the given URI's authority is null.
1108   * 
1109   * @return The URI with reconstructed authority if necessary and possible.
1110   */
1111  private static URI reconstructAuthorityIfNeeded(URI uri, Configuration conf) {
1112    if (null == uri.getAuthority()) {
1113      // If WASB is the default file system, get the authority from there
1114      URI defaultUri = FileSystem.getDefaultUri(conf);
1115      if (defaultUri != null && isWasbScheme(defaultUri.getScheme())) {
1116        try {
1117          // Reconstruct the URI with the authority from the default URI.
1118          return new URI(uri.getScheme(), defaultUri.getAuthority(),
1119              uri.getPath(), uri.getQuery(), uri.getFragment());
1120        } catch (URISyntaxException e) {
1121          // This should never happen.
1122          throw new Error("Bad URI construction", e);
1123        }
1124      }
1125    }
1126    return uri;
1127  }
1128
1129  @Override
1130  protected void checkPath(Path path) {
1131    // Make sure to reconstruct the path's authority if needed
1132    super.checkPath(new Path(reconstructAuthorityIfNeeded(path.toUri(),
1133        getConf())));
1134  }
1135
1136  @Override
1137  public void initialize(URI uri, Configuration conf)
1138      throws IOException, IllegalArgumentException {
1139    // Check authority for the URI to guarantee that it is non-null.
1140    uri = reconstructAuthorityIfNeeded(uri, conf);
1141    if (null == uri.getAuthority()) {
1142      final String errMsg = String
1143          .format("Cannot initialize WASB file system, URI authority not recognized.");
1144      throw new IllegalArgumentException(errMsg);
1145    }
1146    super.initialize(uri, conf);
1147
1148    if (store == null) {
1149      store = createDefaultStore(conf);
1150    }
1151
1152    instrumentation = new AzureFileSystemInstrumentation(conf);
1153    if(!conf.getBoolean(SKIP_AZURE_METRICS_PROPERTY_NAME, false)) {
1154      // Make sure the metrics system is available before interacting with Azure
1155      AzureFileSystemMetricsSystem.fileSystemStarted();
1156      metricsSourceName = newMetricsSourceName();
1157      String sourceDesc = "Azure Storage Volume File System metrics";
1158      AzureFileSystemMetricsSystem.registerSource(metricsSourceName, sourceDesc,
1159        instrumentation);
1160    }
1161
1162    store.initialize(uri, conf, instrumentation);
1163    setConf(conf);
1164    this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
1165    this.workingDir = new Path("/user", UserGroupInformation.getCurrentUser()
1166        .getShortUserName()).makeQualified(getUri(), getWorkingDirectory());
1167    this.blockSize = conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME,
1168        MAX_AZURE_BLOCK_SIZE);
1169
1170    this.appendSupportEnabled = conf.getBoolean(APPEND_SUPPORT_ENABLE_PROPERTY_NAME, false);
1171    LOG.debug("NativeAzureFileSystem. Initializing.");
1172    LOG.debug("  blockSize  = {}",
1173        conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME, MAX_AZURE_BLOCK_SIZE));
1174
1175  }
1176
1177  private NativeFileSystemStore createDefaultStore(Configuration conf) {
1178    actualStore = new AzureNativeFileSystemStore();
1179
1180    if (suppressRetryPolicy) {
1181      actualStore.suppressRetryPolicy();
1182    }
1183    return actualStore;
1184  }
1185
1186  /**
1187   * Azure Storage doesn't allow the blob names to end in a period,
1188   * so encode this here to work around that limitation.
1189   */
1190  private static String encodeTrailingPeriod(String toEncode) {
1191    Matcher matcher = TRAILING_PERIOD_PATTERN.matcher(toEncode);
1192    return matcher.replaceAll(TRAILING_PERIOD_PLACEHOLDER);
1193  }
1194
1195  /**
1196   * Reverse the encoding done by encodeTrailingPeriod().
1197   */
1198  private static String decodeTrailingPeriod(String toDecode) {
1199    Matcher matcher = TRAILING_PERIOD_PLACEHOLDER_PATTERN.matcher(toDecode);
1200    return matcher.replaceAll(".");
1201  }
1202
1203  /**
1204   * Convert the path to a key. By convention, any leading or trailing slash is
1205   * removed, except for the special case of a single slash.
1206   */
1207  @VisibleForTesting
1208  public String pathToKey(Path path) {
1209    // Convert the path to a URI to parse the scheme, the authority, and the
1210    // path from the path object.
1211    URI tmpUri = path.toUri();
1212    String pathUri = tmpUri.getPath();
1213
1214    // The scheme and authority is valid. If the path does not exist add a "/"
1215    // separator to list the root of the container.
1216    Path newPath = path;
1217    if ("".equals(pathUri)) {
1218      newPath = new Path(tmpUri.toString() + Path.SEPARATOR);
1219    }
1220
1221    // Verify path is absolute if the path refers to a windows drive scheme.
1222    if (!newPath.isAbsolute()) {
1223      throw new IllegalArgumentException("Path must be absolute: " + path);
1224    }
1225
1226    String key = null;
1227    key = newPath.toUri().getPath();
1228    key = removeTrailingSlash(key);
1229    key = encodeTrailingPeriod(key);
1230    if (key.length() == 1) {
1231      return key;
1232    } else {
1233      return key.substring(1); // remove initial slash
1234    }
1235  }
1236
1237  // Remove any trailing slash except for the case of a single slash.
1238  private static String removeTrailingSlash(String key) {
1239    if (key.length() == 0 || key.length() == 1) {
1240      return key;
1241    }
1242    if (key.charAt(key.length() - 1) == '/') {
1243      return key.substring(0, key.length() - 1);
1244    } else {
1245      return key;
1246    }
1247  }
1248
1249  private static Path keyToPath(String key) {
1250    if (key.equals("/")) {
1251      return new Path("/"); // container
1252    }
1253    return new Path("/" + decodeTrailingPeriod(key));
1254  }
1255
1256  /**
1257   * Get the absolute version of the path (fully qualified).
1258   * This is public for testing purposes.
1259   *
1260   * @param path
1261   * @return fully qualified path
1262   */
1263  @VisibleForTesting
1264  public Path makeAbsolute(Path path) {
1265    if (path.isAbsolute()) {
1266      return path;
1267    }
1268    return new Path(workingDir, path);
1269  }
1270
1271  /**
1272   * For unit test purposes, retrieves the AzureNativeFileSystemStore store
1273   * backing this file system.
1274   * 
1275   * @return The store object.
1276   */
1277  @VisibleForTesting
1278  public AzureNativeFileSystemStore getStore() {
1279    return actualStore;
1280  }
1281  
1282  NativeFileSystemStore getStoreInterface() {
1283    return store;
1284  }
1285
1286  /**
1287   * Gets the metrics source for this file system.
1288   * This is mainly here for unit testing purposes.
1289   *
1290   * @return the metrics source.
1291   */
1292  public AzureFileSystemInstrumentation getInstrumentation() {
1293    return instrumentation;
1294  }
1295
1296  /** This optional operation is not yet supported. */
1297  @Override
1298  public FSDataOutputStream append(Path f, int bufferSize, Progressable progress)
1299      throws IOException {
1300
1301    if (!appendSupportEnabled) {
1302      throw new UnsupportedOperationException("Append Support not enabled");
1303    }
1304
1305    LOG.debug("Opening file: {} for append", f);
1306
1307    Path absolutePath = makeAbsolute(f);
1308    String key = pathToKey(absolutePath);
1309    FileMetadata meta = null;
1310    try {
1311      meta = store.retrieveMetadata(key);
1312    } catch(Exception ex) {
1313
1314      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
1315
1316      if (innerException instanceof StorageException
1317          && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
1318
1319        throw new FileNotFoundException(String.format("%s is not found", key));
1320      } else {
1321        throw ex;
1322      }
1323    }
1324
1325    if (meta == null) {
1326      throw new FileNotFoundException(f.toString());
1327    }
1328
1329    if (meta.isDir()) {
1330      throw new FileNotFoundException(f.toString()
1331          + " is a directory not a file.");
1332    }
1333
1334    if (store.isPageBlobKey(key)) {
1335      throw new IOException("Append not supported for Page Blobs");
1336    }
1337
1338    DataOutputStream appendStream = null;
1339
1340    try {
1341      appendStream = store.retrieveAppendStream(key, bufferSize);
1342    } catch (Exception ex) {
1343
1344      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
1345
1346      if (innerException instanceof StorageException
1347          && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
1348        throw new FileNotFoundException(String.format("%s is not found", key));
1349      } else {
1350        throw ex;
1351      }
1352    }
1353
1354    return new FSDataOutputStream(appendStream, statistics);
1355  }
1356
1357  @Override
1358  public FSDataOutputStream create(Path f, FsPermission permission,
1359      boolean overwrite, int bufferSize, short replication, long blockSize,
1360      Progressable progress) throws IOException {
1361    return create(f, permission, overwrite, true,
1362        bufferSize, replication, blockSize, progress,
1363        (SelfRenewingLease) null);
1364  }
1365
1366  /**
1367   * Get a self-renewing lease on the specified file.
1368   */
1369  public SelfRenewingLease acquireLease(Path path) throws AzureException {
1370    String fullKey = pathToKey(makeAbsolute(path));
1371    return getStore().acquireLease(fullKey);
1372  }
1373
1374  @Override
1375  public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
1376      boolean overwrite, int bufferSize, short replication, long blockSize,
1377      Progressable progress) throws IOException {
1378
1379    Path parent = f.getParent();
1380
1381    // Get exclusive access to folder if this is a directory designated
1382    // for atomic rename. The primary use case of for HBase write-ahead
1383    // log file management.
1384    SelfRenewingLease lease = null;
1385    if (store.isAtomicRenameKey(pathToKey(f))) {
1386      try {
1387        lease = acquireLease(parent);
1388      } catch (AzureException e) {
1389
1390        String errorCode = "";
1391        try {
1392          StorageException e2 = (StorageException) e.getCause();
1393          errorCode = e2.getErrorCode();
1394        } catch (Exception e3) {
1395          // do nothing if cast fails
1396        }
1397        if (errorCode.equals("BlobNotFound")) {
1398          throw new FileNotFoundException("Cannot create file " +
1399              f.getName() + " because parent folder does not exist.");
1400        }
1401
1402        LOG.warn("Got unexpected exception trying to get lease on {} . {}",
1403          pathToKey(parent), e.getMessage());
1404        throw e;
1405      }
1406    }
1407
1408    // See if the parent folder exists. If not, throw error.
1409    // The exists() check will push any pending rename operation forward,
1410    // if there is one, and return false.
1411    //
1412    // At this point, we have exclusive access to the source folder
1413    // via the lease, so we will not conflict with an active folder
1414    // rename operation.
1415    if (!exists(parent)) {
1416      try {
1417
1418        // This'll let the keep-alive thread exit as soon as it wakes up.
1419        lease.free();
1420      } catch (Exception e) {
1421        LOG.warn("Unable to free lease because: {}", e.getMessage());
1422      }
1423      throw new FileNotFoundException("Cannot create file " +
1424          f.getName() + " because parent folder does not exist.");
1425    }
1426
1427    // Create file inside folder.
1428    FSDataOutputStream out = null;
1429    try {
1430      out = create(f, permission, overwrite, false,
1431          bufferSize, replication, blockSize, progress, lease);
1432    } finally {
1433      // Release exclusive access to folder.
1434      try {
1435        if (lease != null) {
1436          lease.free();
1437        }
1438      } catch (Exception e) {
1439        NativeAzureFileSystemHelper.cleanup(LOG, out);
1440        String msg = "Unable to free lease on " + parent.toUri();
1441        LOG.error(msg);
1442        throw new IOException(msg, e);
1443      }
1444    }
1445    return out;
1446  }
1447
1448  @Override
1449  public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
1450      EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize,
1451      Progressable progress) throws IOException {
1452
1453    // Check if file should be appended or overwritten. Assume that the file
1454    // is overwritten on if the CREATE and OVERWRITE create flags are set. Note
1455    // that any other combinations of create flags will result in an open new or
1456    // open with append.
1457    final EnumSet<CreateFlag> createflags =
1458        EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
1459    boolean overwrite = flags.containsAll(createflags);
1460
1461    // Delegate the create non-recursive call.
1462    return this.createNonRecursive(f, permission, overwrite,
1463        bufferSize, replication, blockSize, progress);
1464  }
1465
1466  @Override
1467  public FSDataOutputStream createNonRecursive(Path f,
1468      boolean overwrite, int bufferSize, short replication, long blockSize,
1469      Progressable progress) throws IOException {
1470    return this.createNonRecursive(f, FsPermission.getFileDefault(),
1471        overwrite, bufferSize, replication, blockSize, progress);
1472  }
1473
1474
1475  /**
1476   * Create an Azure blob and return an output stream to use
1477   * to write data to it.
1478   *
1479   * @param f
1480   * @param permission
1481   * @param overwrite
1482   * @param createParent
1483   * @param bufferSize
1484   * @param replication
1485   * @param blockSize
1486   * @param progress
1487   * @param parentFolderLease Lease on parent folder (or null if
1488   * no lease).
1489   * @return
1490   * @throws IOException
1491   */
1492  private FSDataOutputStream create(Path f, FsPermission permission,
1493      boolean overwrite, boolean createParent, int bufferSize,
1494      short replication, long blockSize, Progressable progress,
1495      SelfRenewingLease parentFolderLease)
1496          throws FileAlreadyExistsException, IOException {
1497
1498    LOG.debug("Creating file: {}", f.toString());
1499
1500    if (containsColon(f)) {
1501      throw new IOException("Cannot create file " + f
1502          + " through WASB that has colons in the name");
1503    }
1504
1505    Path absolutePath = makeAbsolute(f);
1506    String key = pathToKey(absolutePath);
1507
1508    FileMetadata existingMetadata = store.retrieveMetadata(key);
1509    if (existingMetadata != null) {
1510      if (existingMetadata.isDir()) {
1511        throw new FileAlreadyExistsException("Cannot create file " + f
1512            + "; already exists as a directory.");
1513      }
1514      if (!overwrite) {
1515        throw new FileAlreadyExistsException("File already exists:" + f);
1516      }
1517    }
1518
1519    Path parentFolder = absolutePath.getParent();
1520    if (parentFolder != null && parentFolder.getParent() != null) { // skip root
1521      // Update the parent folder last modified time if the parent folder
1522      // already exists.
1523      String parentKey = pathToKey(parentFolder);
1524      FileMetadata parentMetadata = store.retrieveMetadata(parentKey);
1525      if (parentMetadata != null && parentMetadata.isDir() &&
1526        parentMetadata.getBlobMaterialization() == BlobMaterialization.Explicit) {
1527        if (parentFolderLease != null) {
1528          store.updateFolderLastModifiedTime(parentKey, parentFolderLease);
1529        } else {
1530          updateParentFolderLastModifiedTime(key);
1531        }
1532      } else {
1533        // Make sure that the parent folder exists.
1534        // Create it using inherited permissions from the first existing directory going up the path
1535        Path firstExisting = parentFolder.getParent();
1536        FileMetadata metadata = store.retrieveMetadata(pathToKey(firstExisting));
1537        while(metadata == null) {
1538          // Guaranteed to terminate properly because we will eventually hit root, which will return non-null metadata
1539          firstExisting = firstExisting.getParent();
1540          metadata = store.retrieveMetadata(pathToKey(firstExisting));
1541        }
1542        mkdirs(parentFolder, metadata.getPermissionStatus().getPermission(), true);
1543      }
1544    }
1545
1546    // Mask the permission first (with the default permission mask as well).
1547    FsPermission masked = applyUMask(permission, UMaskApplyMode.NewFile);
1548    PermissionStatus permissionStatus = createPermissionStatus(masked);
1549
1550    OutputStream bufOutStream;
1551    if (store.isPageBlobKey(key)) {
1552      // Store page blobs directly in-place without renames.
1553      bufOutStream = store.storefile(key, permissionStatus);
1554    } else {
1555      // This is a block blob, so open the output blob stream based on the
1556      // encoded key.
1557      //
1558      String keyEncoded = encodeKey(key);
1559
1560
1561      // First create a blob at the real key, pointing back to the temporary file
1562      // This accomplishes a few things:
1563      // 1. Makes sure we can create a file there.
1564      // 2. Makes it visible to other concurrent threads/processes/nodes what
1565      // we're
1566      // doing.
1567      // 3. Makes it easier to restore/cleanup data in the event of us crashing.
1568      store.storeEmptyLinkFile(key, keyEncoded, permissionStatus);
1569
1570      // The key is encoded to point to a common container at the storage server.
1571      // This reduces the number of splits on the server side when load balancing.
1572      // Ingress to Azure storage can take advantage of earlier splits. We remove
1573      // the root path to the key and prefix a random GUID to the tail (or leaf
1574      // filename) of the key. Keys are thus broadly and randomly distributed over
1575      // a single container to ease load balancing on the storage server. When the
1576      // blob is committed it is renamed to its earlier key. Uncommitted blocks
1577      // are not cleaned up and we leave it to Azure storage to garbage collect
1578      // these
1579      // blocks.
1580      bufOutStream = new NativeAzureFsOutputStream(store.storefile(
1581          keyEncoded, permissionStatus), key, keyEncoded);
1582    }
1583    // Construct the data output stream from the buffered output stream.
1584    FSDataOutputStream fsOut = new FSDataOutputStream(bufOutStream, statistics);
1585
1586    
1587    // Increment the counter
1588    instrumentation.fileCreated();
1589    
1590    // Return data output stream to caller.
1591    return fsOut;
1592  }
1593
1594  @Override
1595  @Deprecated
1596  public boolean delete(Path path) throws IOException {
1597    return delete(path, true);
1598  }
1599
1600  @Override
1601  public boolean delete(Path f, boolean recursive) throws IOException {
1602    return delete(f, recursive, false);
1603  }
1604
1605  /**
1606   * Delete the specified file or folder. The parameter
1607   * skipParentFolderLastModifidedTimeUpdate
1608   * is used in the case of atomic folder rename redo. In that case, there is
1609   * a lease on the parent folder, so (without reworking the code) modifying
1610   * the parent folder update time will fail because of a conflict with the
1611   * lease. Since we are going to delete the folder soon anyway so accurate
1612   * modified time is not necessary, it's easier to just skip
1613   * the modified time update.
1614   *
1615   * @param f
1616   * @param recursive
1617   * @param skipParentFolderLastModifidedTimeUpdate If true, don't update the folder last
1618   * modified time.
1619   * @return true if and only if the file is deleted
1620   * @throws IOException
1621   */
1622  public boolean delete(Path f, boolean recursive,
1623      boolean skipParentFolderLastModifidedTimeUpdate) throws IOException {
1624
1625    LOG.debug("Deleting file: {}", f.toString());
1626
1627    Path absolutePath = makeAbsolute(f);
1628    String key = pathToKey(absolutePath);
1629
1630    // Capture the metadata for the path.
1631    //
1632    FileMetadata metaFile = null;
1633    try {
1634      metaFile = store.retrieveMetadata(key);
1635    } catch (IOException e) {
1636
1637      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
1638
1639      if (innerException instanceof StorageException
1640          && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
1641
1642        return false;
1643      }
1644      throw e;
1645    }
1646
1647    if (null == metaFile) {
1648      // The path to be deleted does not exist.
1649      return false;
1650    }
1651
1652    // The path exists, determine if it is a folder containing objects,
1653    // an empty folder, or a simple file and take the appropriate actions.
1654    if (!metaFile.isDir()) {
1655      // The path specifies a file. We need to check the parent path
1656      // to make sure it's a proper materialized directory before we
1657      // delete the file. Otherwise we may get into a situation where
1658      // the file we were deleting was the last one in an implicit directory
1659      // (e.g. the blob store only contains the blob a/b and there's no
1660      // corresponding directory blob a) and that would implicitly delete
1661      // the directory as well, which is not correct.
1662      Path parentPath = absolutePath.getParent();
1663      if (parentPath.getParent() != null) {// Not root
1664        String parentKey = pathToKey(parentPath);
1665
1666        FileMetadata parentMetadata = null;
1667        try {
1668          parentMetadata = store.retrieveMetadata(parentKey);
1669        } catch (IOException e) {
1670
1671          Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
1672
1673          if (innerException instanceof StorageException) {
1674            // Invalid State.
1675            // A FileNotFoundException is not thrown here as the API returns false
1676            // if the file not present. But not retrieving metadata here is an
1677            // unrecoverable state and can only happen if there is a race condition
1678            // hence throwing a IOException
1679            if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
1680              throw new IOException("File " + f + " has a parent directory "
1681                  + parentPath + " whose metadata cannot be retrieved. Can't resolve");
1682            }
1683          }
1684          throw e;
1685        }
1686
1687        // Invalid State.
1688        // A FileNotFoundException is not thrown here as the API returns false
1689        // if the file not present. But not retrieving metadata here is an
1690        // unrecoverable state and can only happen if there is a race condition
1691        // hence throwing a IOException
1692        if (parentMetadata == null) {
1693          throw new IOException("File " + f + " has a parent directory "
1694              + parentPath + " whose metadata cannot be retrieved. Can't resolve");
1695        }
1696
1697        if (!parentMetadata.isDir()) {
1698          // Invalid state: the parent path is actually a file. Throw.
1699          throw new AzureException("File " + f + " has a parent directory "
1700              + parentPath + " which is also a file. Can't resolve.");
1701        }
1702
1703        if (parentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
1704          LOG.debug("Found an implicit parent directory while trying to"
1705              + " delete the file {}. Creating the directory blob for"
1706              + " it in {}.", f, parentKey);
1707
1708          store.storeEmptyFolder(parentKey,
1709              createPermissionStatus(FsPermission.getDefault()));
1710        } else {
1711          if (!skipParentFolderLastModifidedTimeUpdate) {
1712            updateParentFolderLastModifiedTime(key);
1713          }
1714        }
1715      }
1716
1717      try {
1718        store.delete(key);
1719        instrumentation.fileDeleted();
1720      } catch(IOException e) {
1721
1722        Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
1723
1724        if (innerException instanceof StorageException
1725            && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
1726          return false;
1727        }
1728
1729       throw e;
1730      }
1731    } else {
1732      // The path specifies a folder. Recursively delete all entries under the
1733      // folder.
1734      LOG.debug("Directory Delete encountered: {}", f.toString());
1735      Path parentPath = absolutePath.getParent();
1736      if (parentPath.getParent() != null) {
1737        String parentKey = pathToKey(parentPath);
1738        FileMetadata parentMetadata = null;
1739
1740        try {
1741          parentMetadata = store.retrieveMetadata(parentKey);
1742        } catch (IOException e) {
1743
1744          Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
1745
1746          if (innerException instanceof StorageException) {
1747            // Invalid State.
1748            // A FileNotFoundException is not thrown here as the API returns false
1749            // if the file not present. But not retrieving metadata here is an
1750            // unrecoverable state and can only happen if there is a race condition
1751            // hence throwing a IOException
1752            if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
1753              throw new IOException("File " + f + " has a parent directory "
1754                  + parentPath + " whose metadata cannot be retrieved. Can't resolve");
1755            }
1756          }
1757          throw e;
1758        }
1759
1760        // Invalid State.
1761        // A FileNotFoundException is not thrown here as the API returns false
1762        // if the file not present. But not retrieving metadata here is an
1763        // unrecoverable state and can only happen if there is a race condition
1764        // hence throwing a IOException
1765        if (parentMetadata == null) {
1766          throw new IOException("File " + f + " has a parent directory "
1767              + parentPath + " whose metadata cannot be retrieved. Can't resolve");
1768        }
1769
1770        if (parentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
1771          LOG.debug("Found an implicit parent directory while trying to"
1772              + " delete the directory {}. Creating the directory blob for"
1773              + " it in {}. ", f, parentKey);
1774
1775          store.storeEmptyFolder(parentKey,
1776              createPermissionStatus(FsPermission.getDefault()));
1777        }
1778      }
1779
1780      // List all the blobs in the current folder.
1781      String priorLastKey = null;
1782      PartialListing listing = null;
1783      try {
1784        listing = store.listAll(key, AZURE_LIST_ALL, 1,
1785            priorLastKey);
1786      } catch(IOException e) {
1787
1788        Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
1789
1790        if (innerException instanceof StorageException
1791            && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
1792          return false;
1793        }
1794
1795        throw e;
1796      }
1797
1798      if (listing == null) {
1799        return false;
1800      }
1801
1802      FileMetadata[] contents = listing.getFiles();
1803      if (!recursive && contents.length > 0) {
1804        // The folder is non-empty and recursive delete was not specified.
1805        // Throw an exception indicating that a non-recursive delete was
1806        // specified for a non-empty folder.
1807        throw new IOException("Non-recursive delete of non-empty directory "
1808            + f.toString());
1809      }
1810
1811      // Delete all the files in the folder.
1812      for (FileMetadata p : contents) {
1813        // Tag on the directory name found as the suffix of the suffix of the
1814        // parent directory to get the new absolute path.
1815        String suffix = p.getKey().substring(
1816            p.getKey().lastIndexOf(PATH_DELIMITER));
1817        if (!p.isDir()) {
1818          try {
1819            store.delete(key + suffix);
1820            instrumentation.fileDeleted();
1821          } catch(IOException e) {
1822
1823            Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
1824
1825            if (innerException instanceof StorageException
1826                && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
1827              return false;
1828            }
1829
1830            throw e;
1831          }
1832        } else {
1833          // Recursively delete contents of the sub-folders. Notice this also
1834          // deletes the blob for the directory.
1835          if (!delete(new Path(f.toString() + suffix), true)) {
1836            return false;
1837          }
1838        }
1839      }
1840
1841      try {
1842        store.delete(key);
1843      } catch(IOException e) {
1844
1845        Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
1846
1847        if (innerException instanceof StorageException
1848            && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
1849          return false;
1850        }
1851
1852        throw e;
1853      }
1854
1855      // Update parent directory last modified time
1856      Path parent = absolutePath.getParent();
1857      if (parent != null && parent.getParent() != null) { // not root
1858        if (!skipParentFolderLastModifidedTimeUpdate) {
1859          updateParentFolderLastModifiedTime(key);
1860        }
1861      }
1862      instrumentation.directoryDeleted();
1863    }
1864
1865    // File or directory was successfully deleted.
1866    LOG.debug("Delete Successful for : {}", f.toString());
1867    return true;
1868  }
1869
1870  @Override
1871  public FileStatus getFileStatus(Path f) throws FileNotFoundException, IOException {
1872
1873    LOG.debug("Getting the file status for {}", f.toString());
1874
1875    // Capture the absolute path and the path to key.
1876    Path absolutePath = makeAbsolute(f);
1877    String key = pathToKey(absolutePath);
1878    if (key.length() == 0) { // root always exists
1879      return newDirectory(null, absolutePath);
1880    }
1881
1882    // The path is either a folder or a file. Retrieve metadata to
1883    // determine if it is a directory or file.
1884    FileMetadata meta = null;
1885    try {
1886      meta = store.retrieveMetadata(key);
1887    } catch(Exception ex) {
1888
1889      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
1890
1891      if (innerException instanceof StorageException
1892          && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
1893
1894          throw new FileNotFoundException(String.format("%s is not found", key));
1895       }
1896
1897      throw ex;
1898    }
1899
1900    if (meta != null) {
1901      if (meta.isDir()) {
1902        // The path is a folder with files in it.
1903        //
1904
1905        LOG.debug("Path {} is a folder.", f.toString());
1906
1907        // If a rename operation for the folder was pending, redo it.
1908        // Then the file does not exist, so signal that.
1909        if (conditionalRedoFolderRename(f)) {
1910          throw new FileNotFoundException(
1911              absolutePath + ": No such file or directory.");
1912        }
1913
1914        // Return reference to the directory object.
1915        return newDirectory(meta, absolutePath);
1916      }
1917
1918      // The path is a file.
1919      LOG.debug("Found the path: {} as a file.", f.toString());
1920
1921      // Return with reference to a file object.
1922      return newFile(meta, absolutePath);
1923    }
1924
1925    // File not found. Throw exception no such file or directory.
1926    //
1927    throw new FileNotFoundException(
1928        absolutePath + ": No such file or directory.");
1929  }
1930
1931  // Return true if there is a rename pending and we redo it, otherwise false.
1932  private boolean conditionalRedoFolderRename(Path f) throws IOException {
1933
1934    // Can't rename /, so return immediately in that case.
1935    if (f.getName().equals("")) {
1936      return false;
1937    }
1938
1939    // Check if there is a -RenamePending.json file for this folder, and if so,
1940    // redo the rename.
1941    Path absoluteRenamePendingFile = renamePendingFilePath(f);
1942    if (exists(absoluteRenamePendingFile)) {
1943      FolderRenamePending pending =
1944          new FolderRenamePending(absoluteRenamePendingFile, this);
1945      pending.redo();
1946      return true;
1947    } else {
1948      return false;
1949    }
1950  }
1951
1952  // Return the path name that would be used for rename of folder with path f.
1953  private Path renamePendingFilePath(Path f) {
1954    Path absPath = makeAbsolute(f);
1955    String key = pathToKey(absPath);
1956    key += "-RenamePending.json";
1957    return keyToPath(key);
1958  }
1959
1960  @Override
1961  public URI getUri() {
1962    return uri;
1963  }
1964
1965  /**
1966   * Retrieve the status of a given path if it is a file, or of all the
1967   * contained files if it is a directory.
1968   */
1969  @Override
1970  public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException {
1971
1972    LOG.debug("Listing status for {}", f.toString());
1973
1974    Path absolutePath = makeAbsolute(f);
1975    String key = pathToKey(absolutePath);
1976    Set<FileStatus> status = new TreeSet<FileStatus>();
1977    FileMetadata meta = null;
1978    try {
1979      meta = store.retrieveMetadata(key);
1980    } catch (IOException ex) {
1981
1982      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
1983
1984      if (innerException instanceof StorageException
1985          && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
1986
1987        throw new FileNotFoundException(String.format("%s is not found", f));
1988      }
1989
1990      throw ex;
1991    }
1992
1993    if (meta != null) {
1994      if (!meta.isDir()) {
1995
1996        LOG.debug("Found path as a file");
1997
1998        return new FileStatus[] { newFile(meta, absolutePath) };
1999      }
2000
2001      String partialKey = null;
2002      PartialListing listing = null;
2003
2004      try {
2005        listing  = store.list(key, AZURE_LIST_ALL, 1, partialKey);
2006      } catch (IOException ex) {
2007
2008        Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
2009
2010        if (innerException instanceof StorageException
2011            && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
2012
2013            throw new FileNotFoundException(String.format("%s is not found", key));
2014        }
2015
2016        throw ex;
2017      }
2018      // NOTE: We don't check for Null condition as the Store API should return
2019      // an empty list if there are not listing.
2020
2021      // For any -RenamePending.json files in the listing,
2022      // push the rename forward.
2023      boolean renamed = conditionalRedoFolderRenames(listing);
2024
2025      // If any renames were redone, get another listing,
2026      // since the current one may have changed due to the redo.
2027      if (renamed) {
2028       listing = null;
2029       try {
2030         listing = store.list(key, AZURE_LIST_ALL, 1, partialKey);
2031       } catch (IOException ex) {
2032         Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
2033
2034         if (innerException instanceof StorageException
2035             && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
2036
2037           throw new FileNotFoundException(String.format("%s is not found", key));
2038         }
2039
2040         throw ex;
2041       }
2042      }
2043
2044      // NOTE: We don't check for Null condition as the Store API should return
2045      // and empty list if there are not listing.
2046
2047      for (FileMetadata fileMetadata : listing.getFiles()) {
2048        Path subpath = keyToPath(fileMetadata.getKey());
2049
2050        // Test whether the metadata represents a file or directory and
2051        // add the appropriate metadata object.
2052        //
2053        // Note: There was a very old bug here where directories were added
2054        // to the status set as files flattening out recursive listings
2055        // using "-lsr" down the file system hierarchy.
2056        if (fileMetadata.isDir()) {
2057          // Make sure we hide the temp upload folder
2058          if (fileMetadata.getKey().equals(AZURE_TEMP_FOLDER)) {
2059            // Don't expose that.
2060            continue;
2061          }
2062          status.add(newDirectory(fileMetadata, subpath));
2063        } else {
2064          status.add(newFile(fileMetadata, subpath));
2065        }
2066      }
2067
2068      LOG.debug("Found path as a directory with {}"
2069          + " files in it.", status.size());
2070
2071    } else {
2072      // There is no metadata found for the path.
2073      LOG.debug("Did not find any metadata for path: {}", key);
2074
2075      throw new FileNotFoundException("File" + f + " does not exist.");
2076    }
2077
2078    return status.toArray(new FileStatus[0]);
2079  }
2080
2081  // Redo any folder renames needed if there are rename pending files in the
2082  // directory listing. Return true if one or more redo operations were done.
2083  private boolean conditionalRedoFolderRenames(PartialListing listing)
2084      throws IllegalArgumentException, IOException {
2085    boolean renamed = false;
2086    for (FileMetadata fileMetadata : listing.getFiles()) {
2087      Path subpath = keyToPath(fileMetadata.getKey());
2088      if (isRenamePendingFile(subpath)) {
2089        FolderRenamePending pending =
2090            new FolderRenamePending(subpath, this);
2091        pending.redo();
2092        renamed = true;
2093      }
2094    }
2095    return renamed;
2096  }
2097
2098  // True if this is a folder rename pending file, else false.
2099  private boolean isRenamePendingFile(Path path) {
2100    return path.toString().endsWith(FolderRenamePending.SUFFIX);
2101  }
2102
2103  private FileStatus newFile(FileMetadata meta, Path path) {
2104    return new FileStatus (
2105        meta.getLength(),
2106        false,
2107        1,
2108        blockSize,
2109        meta.getLastModified(),
2110        0,
2111        meta.getPermissionStatus().getPermission(),
2112        meta.getPermissionStatus().getUserName(),
2113        meta.getPermissionStatus().getGroupName(),
2114        path.makeQualified(getUri(), getWorkingDirectory()));
2115  }
2116
2117  private FileStatus newDirectory(FileMetadata meta, Path path) {
2118    return new FileStatus (
2119        0,
2120        true,
2121        1,
2122        blockSize,
2123        meta == null ? 0 : meta.getLastModified(),
2124        0,
2125        meta == null ? FsPermission.getDefault() : meta.getPermissionStatus().getPermission(),
2126        meta == null ? "" : meta.getPermissionStatus().getUserName(),
2127        meta == null ? "" : meta.getPermissionStatus().getGroupName(),
2128        path.makeQualified(getUri(), getWorkingDirectory()));
2129  }
2130
2131  private static enum UMaskApplyMode {
2132    NewFile,
2133    NewDirectory,
2134    NewDirectoryNoUmask,
2135    ChangeExistingFile,
2136    ChangeExistingDirectory,
2137  }
2138
2139  /**
2140   * Applies the applicable UMASK's on the given permission.
2141   * 
2142   * @param permission
2143   *          The permission to mask.
2144   * @param applyMode
2145   *          Whether to also apply the default umask.
2146   * @return The masked persmission.
2147   */
2148  private FsPermission applyUMask(final FsPermission permission,
2149      final UMaskApplyMode applyMode) {
2150    FsPermission newPermission = new FsPermission(permission);
2151    // Apply the default umask - this applies for new files or directories.
2152    if (applyMode == UMaskApplyMode.NewFile
2153        || applyMode == UMaskApplyMode.NewDirectory) {
2154      newPermission = newPermission
2155          .applyUMask(FsPermission.getUMask(getConf()));
2156    }
2157    return newPermission;
2158  }
2159
2160  /**
2161   * Creates the PermissionStatus object to use for the given permission, based
2162   * on the current user in context.
2163   * 
2164   * @param permission
2165   *          The permission for the file.
2166   * @return The permission status object to use.
2167   * @throws IOException
2168   *           If login fails in getCurrentUser
2169   */
2170  @VisibleForTesting
2171  PermissionStatus createPermissionStatus(FsPermission permission)
2172    throws IOException {
2173    // Create the permission status for this file based on current user
2174    return new PermissionStatus(
2175        UserGroupInformation.getCurrentUser().getShortUserName(),
2176        getConf().get(AZURE_DEFAULT_GROUP_PROPERTY_NAME,
2177            AZURE_DEFAULT_GROUP_DEFAULT),
2178        permission);
2179  }
2180
2181  @Override
2182  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
2183      return mkdirs(f, permission, false);
2184  }
2185
2186  public boolean mkdirs(Path f, FsPermission permission, boolean noUmask) throws IOException {
2187
2188
2189    LOG.debug("Creating directory: {}", f.toString());
2190
2191    if (containsColon(f)) {
2192      throw new IOException("Cannot create directory " + f
2193          + " through WASB that has colons in the name");
2194    }
2195
2196    Path absolutePath = makeAbsolute(f);
2197    PermissionStatus permissionStatus = null;
2198    if(noUmask) {
2199      // ensure owner still has wx permissions at the minimum
2200      permissionStatus = createPermissionStatus(
2201          applyUMask(FsPermission.createImmutable((short) (permission.toShort() | USER_WX_PERMISION)),
2202              UMaskApplyMode.NewDirectoryNoUmask));
2203    } else {
2204      permissionStatus = createPermissionStatus(
2205          applyUMask(permission, UMaskApplyMode.NewDirectory));
2206    }
2207
2208
2209    ArrayList<String> keysToCreateAsFolder = new ArrayList<String>();
2210    ArrayList<String> keysToUpdateAsFolder = new ArrayList<String>();
2211    boolean childCreated = false;
2212    // Check that there is no file in the parent chain of the given path.
2213    for (Path current = absolutePath, parent = current.getParent();
2214        parent != null; // Stop when you get to the root
2215        current = parent, parent = current.getParent()) {
2216      String currentKey = pathToKey(current);
2217      FileMetadata currentMetadata = store.retrieveMetadata(currentKey);
2218      if (currentMetadata != null && !currentMetadata.isDir()) {
2219        throw new FileAlreadyExistsException("Cannot create directory " + f + " because "
2220            + current + " is an existing file.");
2221      } else if (currentMetadata == null) {
2222        keysToCreateAsFolder.add(currentKey);
2223        childCreated = true;
2224      } else {
2225        // The directory already exists. Its last modified time need to be
2226        // updated if there is a child directory created under it.
2227        if (childCreated) {
2228          keysToUpdateAsFolder.add(currentKey);
2229        }
2230        childCreated = false;
2231      }
2232    }
2233
2234    for (String currentKey : keysToCreateAsFolder) {
2235      store.storeEmptyFolder(currentKey, permissionStatus);
2236    }
2237
2238    instrumentation.directoryCreated();
2239
2240    // otherwise throws exception
2241    return true;
2242  }
2243
2244  @Override
2245  public FSDataInputStream open(Path f, int bufferSize) throws FileNotFoundException, IOException {
2246
2247    LOG.debug("Opening file: {}", f.toString());
2248
2249    Path absolutePath = makeAbsolute(f);
2250    String key = pathToKey(absolutePath);
2251    FileMetadata meta = null;
2252    try {
2253      meta = store.retrieveMetadata(key);
2254    } catch(Exception ex) {
2255
2256      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
2257
2258      if (innerException instanceof StorageException
2259          && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
2260
2261        throw new FileNotFoundException(String.format("%s is not found", key));
2262      }
2263
2264      throw ex;
2265    }
2266
2267    if (meta == null) {
2268      throw new FileNotFoundException(f.toString());
2269    }
2270    if (meta.isDir()) {
2271      throw new FileNotFoundException(f.toString()
2272          + " is a directory not a file.");
2273    }
2274
2275    DataInputStream inputStream = null;
2276    try {
2277      inputStream = store.retrieve(key);
2278    } catch(Exception ex) {
2279      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
2280
2281      if (innerException instanceof StorageException
2282          && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
2283
2284        throw new FileNotFoundException(String.format("%s is not found", key));
2285      }
2286
2287      throw ex;
2288    }
2289
2290    return new FSDataInputStream(new BufferedFSInputStream(
2291        new NativeAzureFsInputStream(inputStream, key, meta.getLength()), bufferSize));
2292  }
2293
2294  @Override
2295  public boolean rename(Path src, Path dst) throws FileNotFoundException, IOException {
2296
2297    FolderRenamePending renamePending = null;
2298
2299    LOG.debug("Moving {} to {}", src, dst);
2300
2301    if (containsColon(dst)) {
2302      throw new IOException("Cannot rename to file " + dst
2303          + " through WASB that has colons in the name");
2304    }
2305
2306    String srcKey = pathToKey(makeAbsolute(src));
2307
2308    if (srcKey.length() == 0) {
2309      // Cannot rename root of file system
2310      return false;
2311    }
2312
2313    // Figure out the final destination
2314    Path absoluteDst = makeAbsolute(dst);
2315    String dstKey = pathToKey(absoluteDst);
2316    FileMetadata dstMetadata = null;
2317    try {
2318      dstMetadata = store.retrieveMetadata(dstKey);
2319    } catch (IOException ex) {
2320
2321      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
2322
2323      // A BlobNotFound storage exception in only thrown from retrieveMetdata API when
2324      // there is a race condition. If there is another thread which deletes the destination
2325      // file or folder, then this thread calling rename should be able to continue with
2326      // rename gracefully. Hence the StorageException is swallowed here.
2327      if (innerException instanceof StorageException) {
2328        if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
2329          LOG.debug("BlobNotFound exception encountered for Destination key : {}. "
2330              + "Swallowin the exception to handle race condition gracefully", dstKey);
2331        }
2332      } else {
2333        throw ex;
2334      }
2335    }
2336
2337    if (dstMetadata != null && dstMetadata.isDir()) {
2338      // It's an existing directory.
2339      dstKey = pathToKey(makeAbsolute(new Path(dst, src.getName())));
2340      LOG.debug("Destination {} "
2341          + " is a directory, adjusted the destination to be {}", dst, dstKey);
2342    } else if (dstMetadata != null) {
2343      // Attempting to overwrite a file using rename()
2344      LOG.debug("Destination {}"
2345          + " is an already existing file, failing the rename.", dst);
2346      return false;
2347    } else {
2348      // Check that the parent directory exists.
2349      FileMetadata parentOfDestMetadata = null;
2350      try {
2351        parentOfDestMetadata = store.retrieveMetadata(pathToKey(absoluteDst.getParent()));
2352      } catch (IOException ex) {
2353
2354        Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
2355
2356        if (innerException instanceof StorageException
2357            && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
2358
2359          LOG.debug("Parent of destination {} doesn't exists. Failing rename", dst);
2360          return false;
2361        }
2362
2363        throw ex;
2364      }
2365
2366      if (parentOfDestMetadata == null) {
2367        LOG.debug("Parent of the destination {}"
2368            + " doesn't exist, failing the rename.", dst);
2369        return false;
2370      } else if (!parentOfDestMetadata.isDir()) {
2371        LOG.debug("Parent of the destination {}"
2372            + " is a file, failing the rename.", dst);
2373        return false;
2374      }
2375    }
2376    FileMetadata srcMetadata = null;
2377    try {
2378      srcMetadata = store.retrieveMetadata(srcKey);
2379    } catch (IOException ex) {
2380      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
2381
2382      if (innerException instanceof StorageException
2383          && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
2384
2385        LOG.debug("Source {} doesn't exists. Failing rename", src);
2386        return false;
2387      }
2388
2389      throw ex;
2390    }
2391
2392    if (srcMetadata == null) {
2393      // Source doesn't exist
2394      LOG.debug("Source {} doesn't exist, failing the rename.", src);
2395      return false;
2396    } else if (!srcMetadata.isDir()) {
2397      LOG.debug("Source {} found as a file, renaming.", src);
2398      try {
2399        store.rename(srcKey, dstKey);
2400      } catch(IOException ex) {
2401
2402        Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
2403
2404        if (innerException instanceof StorageException
2405            && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
2406
2407          LOG.debug("BlobNotFoundException encountered. Failing rename", src);
2408          return false;
2409        }
2410
2411        throw ex;
2412      }
2413    } else {
2414
2415      // Prepare for, execute and clean up after of all files in folder, and
2416      // the root file, and update the last modified time of the source and
2417      // target parent folders. The operation can be redone if it fails part
2418      // way through, by applying the "Rename Pending" file.
2419
2420      // The following code (internally) only does atomic rename preparation
2421      // and lease management for page blob folders, limiting the scope of the
2422      // operation to HBase log file folders, where atomic rename is required.
2423      // In the future, we could generalize it easily to all folders.
2424      renamePending = prepareAtomicFolderRename(srcKey, dstKey);
2425      renamePending.execute();
2426
2427      LOG.debug("Renamed {} to {} successfully.", src, dst);
2428      renamePending.cleanup();
2429      return true;
2430    }
2431
2432    // Update the last-modified time of the parent folders of both source
2433    // and destination.
2434    updateParentFolderLastModifiedTime(srcKey);
2435    updateParentFolderLastModifiedTime(dstKey);
2436
2437    LOG.debug("Renamed {} to {} successfully.", src, dst);
2438    return true;
2439  }
2440
2441  /**
2442   * Update the last-modified time of the parent folder of the file
2443   * identified by key.
2444   * @param key
2445   * @throws IOException
2446   */
2447  private void updateParentFolderLastModifiedTime(String key)
2448      throws IOException {
2449    Path parent = makeAbsolute(keyToPath(key)).getParent();
2450    if (parent != null && parent.getParent() != null) { // not root
2451      String parentKey = pathToKey(parent);
2452
2453      // ensure the parent is a materialized folder
2454      FileMetadata parentMetadata = store.retrieveMetadata(parentKey);
2455      // The metadata could be null if the implicit folder only contains a
2456      // single file. In this case, the parent folder no longer exists if the
2457      // file is renamed; so we can safely ignore the null pointer case.
2458      if (parentMetadata != null) {
2459        if (parentMetadata.isDir()
2460            && parentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
2461          store.storeEmptyFolder(parentKey,
2462              createPermissionStatus(FsPermission.getDefault()));
2463        }
2464
2465        if (store.isAtomicRenameKey(parentKey)) {
2466          SelfRenewingLease lease = null;
2467          try {
2468            lease = leaseSourceFolder(parentKey);
2469            store.updateFolderLastModifiedTime(parentKey, lease);
2470          } catch (AzureException e) {
2471            String errorCode = "";
2472            try {
2473              StorageException e2 = (StorageException) e.getCause();
2474              errorCode = e2.getErrorCode();
2475            } catch (Exception e3) {
2476              // do nothing if cast fails
2477            }
2478            if (errorCode.equals("BlobNotFound")) {
2479              throw new FileNotFoundException("Folder does not exist: " + parentKey);
2480            }
2481            LOG.warn("Got unexpected exception trying to get lease on {}. {}",
2482                parentKey, e.getMessage());
2483            throw e;
2484          } finally {
2485            try {
2486              if (lease != null) {
2487                lease.free();
2488              }
2489            } catch (Exception e) {
2490              LOG.error("Unable to free lease on {}", parentKey, e);
2491            }
2492          }
2493        } else {
2494          store.updateFolderLastModifiedTime(parentKey, null);
2495        }
2496      }
2497    }
2498  }
2499
2500  /**
2501   * If the source is a page blob folder,
2502   * prepare to rename this folder atomically. This means to get exclusive
2503   * access to the source folder, and record the actions to be performed for
2504   * this rename in a "Rename Pending" file. This code was designed to
2505   * meet the needs of HBase, which requires atomic rename of write-ahead log
2506   * (WAL) folders for correctness.
2507   *
2508   * Before calling this method, the caller must ensure that the source is a
2509   * folder.
2510   *
2511   * For non-page-blob directories, prepare the in-memory information needed,
2512   * but don't take the lease or write the redo file. This is done to limit the
2513   * scope of atomic folder rename to HBase, at least at the time of writing
2514   * this code.
2515   *
2516   * @param srcKey Source folder name.
2517   * @param dstKey Destination folder name.
2518   * @throws IOException
2519   */
2520  private FolderRenamePending prepareAtomicFolderRename(
2521      String srcKey, String dstKey) throws IOException {
2522
2523    if (store.isAtomicRenameKey(srcKey)) {
2524
2525      // Block unwanted concurrent access to source folder.
2526      SelfRenewingLease lease = leaseSourceFolder(srcKey);
2527
2528      // Prepare in-memory information needed to do or redo a folder rename.
2529      FolderRenamePending renamePending =
2530          new FolderRenamePending(srcKey, dstKey, lease, this);
2531
2532      // Save it to persistent storage to help recover if the operation fails.
2533      renamePending.writeFile(this);
2534      return renamePending;
2535    } else {
2536      FolderRenamePending renamePending =
2537          new FolderRenamePending(srcKey, dstKey, null, this);
2538      return renamePending;
2539    }
2540  }
2541
2542  /**
2543   * Get a self-renewing Azure blob lease on the source folder zero-byte file.
2544   */
2545  private SelfRenewingLease leaseSourceFolder(String srcKey)
2546      throws AzureException {
2547    return store.acquireLease(srcKey);
2548  }
2549
2550  /**
2551   * Return an array containing hostnames, offset and size of
2552   * portions of the given file. For WASB we'll just lie and give
2553   * fake hosts to make sure we get many splits in MR jobs.
2554   */
2555  @Override
2556  public BlockLocation[] getFileBlockLocations(FileStatus file,
2557      long start, long len) throws IOException {
2558    if (file == null) {
2559      return null;
2560    }
2561
2562    if ((start < 0) || (len < 0)) {
2563      throw new IllegalArgumentException("Invalid start or len parameter");
2564    }
2565
2566    if (file.getLen() < start) {
2567      return new BlockLocation[0];
2568    }
2569    final String blobLocationHost = getConf().get(
2570        AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME,
2571        AZURE_BLOCK_LOCATION_HOST_DEFAULT);
2572    final String[] name = { blobLocationHost };
2573    final String[] host = { blobLocationHost };
2574    long blockSize = file.getBlockSize();
2575    if (blockSize <= 0) {
2576      throw new IllegalArgumentException(
2577          "The block size for the given file is not a positive number: "
2578              + blockSize);
2579    }
2580    int numberOfLocations = (int) (len / blockSize)
2581        + ((len % blockSize == 0) ? 0 : 1);
2582    BlockLocation[] locations = new BlockLocation[numberOfLocations];
2583    for (int i = 0; i < locations.length; i++) {
2584      long currentOffset = start + (i * blockSize);
2585      long currentLength = Math.min(blockSize, start + len - currentOffset);
2586      locations[i] = new BlockLocation(name, host, currentOffset, currentLength);
2587    }
2588    return locations;
2589  }
2590
2591  /**
2592   * Set the working directory to the given directory.
2593   */
2594  @Override
2595  public void setWorkingDirectory(Path newDir) {
2596    workingDir = makeAbsolute(newDir);
2597  }
2598
2599  @Override
2600  public Path getWorkingDirectory() {
2601    return workingDir;
2602  }
2603
2604  @Override
2605  public void setPermission(Path p, FsPermission permission) throws FileNotFoundException, IOException {
2606    Path absolutePath = makeAbsolute(p);
2607    String key = pathToKey(absolutePath);
2608    FileMetadata metadata = null;
2609    try {
2610      metadata = store.retrieveMetadata(key);
2611    } catch (IOException ex) {
2612      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
2613
2614      if (innerException instanceof StorageException
2615          && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
2616
2617        throw new FileNotFoundException(String.format("File %s doesn't exists.", p));
2618      }
2619
2620      throw ex;
2621    }
2622
2623    if (metadata == null) {
2624      throw new FileNotFoundException("File doesn't exist: " + p);
2625    }
2626    permission = applyUMask(permission,
2627        metadata.isDir() ? UMaskApplyMode.ChangeExistingDirectory
2628            : UMaskApplyMode.ChangeExistingFile);
2629    if (metadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
2630      // It's an implicit folder, need to materialize it.
2631      store.storeEmptyFolder(key, createPermissionStatus(permission));
2632    } else if (!metadata.getPermissionStatus().getPermission().
2633        equals(permission)) {
2634      store.changePermissionStatus(key, new PermissionStatus(
2635          metadata.getPermissionStatus().getUserName(),
2636          metadata.getPermissionStatus().getGroupName(),
2637          permission));
2638    }
2639  }
2640
2641  @Override
2642  public void setOwner(Path p, String username, String groupname)
2643      throws IOException {
2644    Path absolutePath = makeAbsolute(p);
2645    String key = pathToKey(absolutePath);
2646    FileMetadata metadata = null;
2647
2648    try {
2649      metadata = store.retrieveMetadata(key);
2650    } catch (IOException ex) {
2651      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
2652
2653      if (innerException instanceof StorageException
2654          && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
2655
2656        throw new FileNotFoundException(String.format("File %s doesn't exists.", p));
2657      }
2658
2659      throw ex;
2660    }
2661
2662    if (metadata == null) {
2663      throw new FileNotFoundException("File doesn't exist: " + p);
2664    }
2665
2666    PermissionStatus newPermissionStatus = new PermissionStatus(
2667        username == null ?
2668            metadata.getPermissionStatus().getUserName() : username,
2669        groupname == null ?
2670            metadata.getPermissionStatus().getGroupName() : groupname,
2671        metadata.getPermissionStatus().getPermission());
2672    if (metadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
2673      // It's an implicit folder, need to materialize it.
2674      store.storeEmptyFolder(key, newPermissionStatus);
2675    } else {
2676      store.changePermissionStatus(key, newPermissionStatus);
2677    }
2678  }
2679
2680  @Override
2681  public synchronized void close() throws IOException {
2682    if (isClosed) {
2683      return;
2684    }
2685
2686    // Call the base close() to close any resources there.
2687    super.close();
2688    // Close the store to close any resources there - e.g. the bandwidth
2689    // updater thread would be stopped at this time.
2690    store.close();
2691    // Notify the metrics system that this file system is closed, which may
2692    // trigger one final metrics push to get the accurate final file system
2693    // metrics out.
2694
2695    long startTime = System.currentTimeMillis();
2696
2697    if(!getConf().getBoolean(SKIP_AZURE_METRICS_PROPERTY_NAME, false)) {
2698      AzureFileSystemMetricsSystem.unregisterSource(metricsSourceName);
2699      AzureFileSystemMetricsSystem.fileSystemClosed();
2700    }
2701
2702    LOG.debug("Submitting metrics when file system closed took {} ms.",
2703        (System.currentTimeMillis() - startTime));
2704    isClosed = true;
2705  }
2706
2707  /**
2708   * A handler that defines what to do with blobs whose upload was
2709   * interrupted.
2710   */
2711  private abstract class DanglingFileHandler {
2712    abstract void handleFile(FileMetadata file, FileMetadata tempFile)
2713      throws IOException;
2714  }
2715
2716  /**
2717   * Handler implementation for just deleting dangling files and cleaning
2718   * them up.
2719   */
2720  private class DanglingFileDeleter extends DanglingFileHandler {
2721    @Override
2722    void handleFile(FileMetadata file, FileMetadata tempFile)
2723        throws IOException {
2724
2725      LOG.debug("Deleting dangling file {}", file.getKey());
2726      store.delete(file.getKey());
2727      store.delete(tempFile.getKey());
2728    }
2729  }
2730
2731  /**
2732   * Handler implementation for just moving dangling files to recovery
2733   * location (/lost+found).
2734   */
2735  private class DanglingFileRecoverer extends DanglingFileHandler {
2736    private final Path destination;
2737
2738    DanglingFileRecoverer(Path destination) {
2739      this.destination = destination;
2740    }
2741
2742    @Override
2743    void handleFile(FileMetadata file, FileMetadata tempFile)
2744        throws IOException {
2745
2746      LOG.debug("Recovering {}", file.getKey());
2747      // Move to the final destination
2748      String finalDestinationKey =
2749          pathToKey(new Path(destination, file.getKey()));
2750      store.rename(tempFile.getKey(), finalDestinationKey);
2751      if (!finalDestinationKey.equals(file.getKey())) {
2752        // Delete the empty link file now that we've restored it.
2753        store.delete(file.getKey());
2754      }
2755    }
2756  }
2757
2758  /**
2759   * Check if a path has colons in its name
2760   */
2761  private boolean containsColon(Path p) {
2762    return p.toUri().getPath().toString().contains(":");
2763  }
2764
2765  /**
2766   * Implements recover and delete (-move and -delete) behaviors for handling
2767   * dangling files (blobs whose upload was interrupted).
2768   * 
2769   * @param root
2770   *          The root path to check from.
2771   * @param handler
2772   *          The handler that deals with dangling files.
2773   */
2774  private void handleFilesWithDanglingTempData(Path root,
2775      DanglingFileHandler handler) throws IOException {
2776    // Calculate the cut-off for when to consider a blob to be dangling.
2777    long cutoffForDangling = new Date().getTime()
2778        - getConf().getInt(AZURE_TEMP_EXPIRY_PROPERTY_NAME,
2779            AZURE_TEMP_EXPIRY_DEFAULT) * 1000;
2780    // Go over all the blobs under the given root and look for blobs to
2781    // recover.
2782    String priorLastKey = null;
2783    do {
2784      PartialListing listing = store.listAll(pathToKey(root), AZURE_LIST_ALL,
2785          AZURE_UNBOUNDED_DEPTH, priorLastKey);
2786
2787      for (FileMetadata file : listing.getFiles()) {
2788        if (!file.isDir()) { // We don't recover directory blobs
2789          // See if this blob has a link in it (meaning it's a place-holder
2790          // blob for when the upload to the temp blob is complete).
2791          String link = store.getLinkInFileMetadata(file.getKey());
2792          if (link != null) {
2793            // It has a link, see if the temp blob it is pointing to is
2794            // existent and old enough to be considered dangling.
2795            FileMetadata linkMetadata = store.retrieveMetadata(link);
2796            if (linkMetadata != null
2797                && linkMetadata.getLastModified() >= cutoffForDangling) {
2798              // Found one!
2799              handler.handleFile(file, linkMetadata);
2800            }
2801          }
2802        }
2803      }
2804      priorLastKey = listing.getPriorLastKey();
2805    } while (priorLastKey != null);
2806  }
2807
2808  /**
2809   * Looks under the given root path for any blob that are left "dangling",
2810   * meaning that they are place-holder blobs that we created while we upload
2811   * the data to a temporary blob, but for some reason we crashed in the middle
2812   * of the upload and left them there. If any are found, we move them to the
2813   * destination given.
2814   * 
2815   * @param root
2816   *          The root path to consider.
2817   * @param destination
2818   *          The destination path to move any recovered files to.
2819   * @throws IOException
2820   */
2821  public void recoverFilesWithDanglingTempData(Path root, Path destination)
2822      throws IOException {
2823
2824    LOG.debug("Recovering files with dangling temp data in {}", root);
2825    handleFilesWithDanglingTempData(root,
2826        new DanglingFileRecoverer(destination));
2827  }
2828
2829  /**
2830   * Looks under the given root path for any blob that are left "dangling",
2831   * meaning that they are place-holder blobs that we created while we upload
2832   * the data to a temporary blob, but for some reason we crashed in the middle
2833   * of the upload and left them there. If any are found, we delete them.
2834   * 
2835   * @param root
2836   *          The root path to consider.
2837   * @throws IOException
2838   */
2839  public void deleteFilesWithDanglingTempData(Path root) throws IOException {
2840
2841    LOG.debug("Deleting files with dangling temp data in {}", root);
2842    handleFilesWithDanglingTempData(root, new DanglingFileDeleter());
2843  }
2844
2845  @Override
2846  protected void finalize() throws Throwable {
2847    LOG.debug("finalize() called.");
2848    close();
2849    super.finalize();
2850  }
2851
2852  /**
2853   * Encode the key with a random prefix for load balancing in Azure storage.
2854   * Upload data to a random temporary file then do storage side renaming to
2855   * recover the original key.
2856   * 
2857   * @param aKey
2858   * @return Encoded version of the original key.
2859   */
2860  private static String encodeKey(String aKey) {
2861    // Get the tail end of the key name.
2862    //
2863    String fileName = aKey.substring(aKey.lastIndexOf(Path.SEPARATOR) + 1,
2864        aKey.length());
2865
2866    // Construct the randomized prefix of the file name. The prefix ensures the
2867    // file always drops into the same folder but with a varying tail key name.
2868    String filePrefix = AZURE_TEMP_FOLDER + Path.SEPARATOR
2869        + UUID.randomUUID().toString();
2870
2871    // Concatenate the randomized prefix with the tail of the key name.
2872    String randomizedKey = filePrefix + fileName;
2873
2874    // Return to the caller with the randomized key.
2875    return randomizedKey;
2876  }
2877}