001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 *
018 */
019
020package org.apache.hadoop.fs.adl;
021
022import java.io.IOException;
023import java.net.URI;
024import java.util.ArrayList;
025import java.util.EnumSet;
026import java.util.List;
027
028import com.google.common.annotations.VisibleForTesting;
029import com.microsoft.azure.datalake.store.ADLStoreClient;
030import com.microsoft.azure.datalake.store.ADLStoreOptions;
031import com.microsoft.azure.datalake.store.DirectoryEntry;
032import com.microsoft.azure.datalake.store.DirectoryEntryType;
033import com.microsoft.azure.datalake.store.IfExists;
034import com.microsoft.azure.datalake.store.LatencyTracker;
035import com.microsoft.azure.datalake.store.UserGroupRepresentation;
036import com.microsoft.azure.datalake.store.oauth2.AccessTokenProvider;
037import com.microsoft.azure.datalake.store.oauth2.ClientCredsTokenProvider;
038import com.microsoft.azure.datalake.store.oauth2.RefreshTokenBasedTokenProvider;
039
040import org.apache.commons.lang.StringUtils;
041import org.apache.hadoop.classification.InterfaceAudience;
042import org.apache.hadoop.classification.InterfaceStability;
043import org.apache.hadoop.conf.Configuration;
044import org.apache.hadoop.fs.BlockLocation;
045import org.apache.hadoop.fs.ContentSummary;
046import org.apache.hadoop.fs.ContentSummary.Builder;
047import org.apache.hadoop.fs.CreateFlag;
048import org.apache.hadoop.fs.FSDataInputStream;
049import org.apache.hadoop.fs.FSDataOutputStream;
050import org.apache.hadoop.fs.FileStatus;
051import org.apache.hadoop.fs.FileSystem;
052import org.apache.hadoop.fs.InvalidPathException;
053import org.apache.hadoop.fs.Options;
054import org.apache.hadoop.fs.Options.Rename;
055import org.apache.hadoop.fs.Path;
056import org.apache.hadoop.fs.adl.oauth2.AzureADTokenProvider;
057import org.apache.hadoop.fs.permission.AclEntry;
058import org.apache.hadoop.fs.permission.AclStatus;
059import org.apache.hadoop.fs.permission.FsAction;
060import org.apache.hadoop.fs.permission.FsPermission;
061import org.apache.hadoop.security.AccessControlException;
062import org.apache.hadoop.security.ProviderUtils;
063import org.apache.hadoop.security.UserGroupInformation;
064import org.apache.hadoop.util.Progressable;
065import org.apache.hadoop.util.ReflectionUtils;
066import org.apache.hadoop.util.VersionInfo;
067
068import static org.apache.hadoop.fs.adl.AdlConfKeys.*;
069
070/**
071 * A FileSystem to access Azure Data Lake Store.
072 */
073@InterfaceAudience.Public
074@InterfaceStability.Evolving
075public class AdlFileSystem extends FileSystem {
076  public static final String SCHEME = "adl";
077  static final int DEFAULT_PORT = 443;
078  private URI uri;
079  private String userName;
080  private boolean overrideOwner;
081  private ADLStoreClient adlClient;
082  private Path workingDirectory;
083  private boolean aclBitStatus;
084  private UserGroupRepresentation oidOrUpn;
085
086
087  // retained for tests
088  private AccessTokenProvider tokenProvider;
089  private AzureADTokenProvider azureTokenProvider;
090
091  @Override
092  public String getScheme() {
093    return SCHEME;
094  }
095
096  public URI getUri() {
097    return uri;
098  }
099
100  @Override
101  public int getDefaultPort() {
102    return DEFAULT_PORT;
103  }
104
105  @Override
106  public boolean supportsSymlinks() {
107    return false;
108  }
109
110  /**
111   * Called after a new FileSystem instance is constructed.
112   *
113   * @param storeUri a uri whose authority section names the host, port, etc.
114   *                 for this FileSystem
115   * @param conf     the configuration
116   */
117  @Override
118  public void initialize(URI storeUri, Configuration conf) throws IOException {
119    super.initialize(storeUri, conf);
120    this.setConf(conf);
121    this.uri = URI
122        .create(storeUri.getScheme() + "://" + storeUri.getAuthority());
123
124    try {
125      userName = UserGroupInformation.getCurrentUser().getShortUserName();
126    } catch (IOException e) {
127      userName = "hadoop";
128    }
129
130    this.setWorkingDirectory(getHomeDirectory());
131
132    overrideOwner = getConf().getBoolean(ADL_DEBUG_OVERRIDE_LOCAL_USER_AS_OWNER,
133        ADL_DEBUG_SET_LOCAL_USER_AS_OWNER_DEFAULT);
134
135    aclBitStatus = conf.getBoolean(ADL_SUPPORT_ACL_BIT_IN_FSPERMISSION,
136        ADL_SUPPORT_ACL_BIT_IN_FSPERMISSION_DEFAULT);
137
138    String accountFQDN = null;
139    String mountPoint = null;
140    String hostname = storeUri.getHost();
141    if (!hostname.contains(".") && !hostname.equalsIgnoreCase(
142        "localhost")) {  // this is a symbolic name. Resolve it.
143      String hostNameProperty = "dfs.adls." + hostname + ".hostname";
144      String mountPointProperty = "dfs.adls." + hostname + ".mountpoint";
145      accountFQDN = getNonEmptyVal(conf, hostNameProperty);
146      mountPoint = getNonEmptyVal(conf, mountPointProperty);
147    } else {
148      accountFQDN = hostname;
149    }
150
151    if (storeUri.getPort() > 0) {
152      accountFQDN = accountFQDN + ":" + storeUri.getPort();
153    }
154
155    adlClient = ADLStoreClient
156        .createClient(accountFQDN, getAccessTokenProvider(conf));
157
158    ADLStoreOptions options = new ADLStoreOptions();
159    options.enableThrowingRemoteExceptions();
160
161    if (getTransportScheme().equalsIgnoreCase(INSECURE_TRANSPORT_SCHEME)) {
162      options.setInsecureTransport();
163    }
164
165    if (mountPoint != null) {
166      options.setFilePathPrefix(mountPoint);
167    }
168
169    String clusterName = conf.get(ADL_EVENTS_TRACKING_CLUSTERNAME, "UNKNOWN");
170    String clusterType = conf.get(ADL_EVENTS_TRACKING_CLUSTERTYPE, "UNKNOWN");
171
172    String clientVersion = ADL_HADOOP_CLIENT_NAME + (StringUtils
173        .isEmpty(VersionInfo.getVersion().trim()) ?
174        ADL_HADOOP_CLIENT_VERSION.trim() :
175        VersionInfo.getVersion().trim());
176    options.setUserAgentSuffix(clientVersion + "/" +
177        VersionInfo.getVersion().trim() + "/" + clusterName + "/"
178        + clusterType);
179
180    adlClient.setOptions(options);
181
182    boolean trackLatency = conf
183        .getBoolean(LATENCY_TRACKER_KEY, LATENCY_TRACKER_DEFAULT);
184    if (!trackLatency) {
185      LatencyTracker.disable();
186    }
187
188    boolean enableUPN = conf.getBoolean(ADL_ENABLEUPN_FOR_OWNERGROUP_KEY,
189        ADL_ENABLEUPN_FOR_OWNERGROUP_DEFAULT);
190    oidOrUpn = enableUPN ? UserGroupRepresentation.UPN :
191        UserGroupRepresentation.OID;
192  }
193
194  /**
195   * This method is provided for convenience for derived classes to define
196   * custom {@link AzureADTokenProvider} instance.
197   *
198   * In order to ensure secure hadoop infrastructure and user context for which
199   * respective {@link AdlFileSystem} instance is initialized,
200   * Loading {@link AzureADTokenProvider} is not sufficient.
201   *
202   * The order of loading {@link AzureADTokenProvider} is to first invoke
203   * {@link #getCustomAccessTokenProvider(Configuration)}, If method return null
204   * which means no implementation provided by derived classes, then
205   * configuration object is loaded to retrieve token configuration as specified
206   * is documentation.
207   *
208   * Custom token management takes the higher precedence during initialization.
209   *
210   * @param conf Configuration object
211   * @return null if the no custom {@link AzureADTokenProvider} token management
212   * is specified.
213   * @throws IOException if failed to initialize token provider.
214   */
215  protected synchronized AzureADTokenProvider getCustomAccessTokenProvider(
216      Configuration conf) throws IOException {
217    String className = getNonEmptyVal(conf, AZURE_AD_TOKEN_PROVIDER_CLASS_KEY);
218
219    Class<? extends AzureADTokenProvider> azureADTokenProviderClass =
220        conf.getClass(AZURE_AD_TOKEN_PROVIDER_CLASS_KEY, null,
221            AzureADTokenProvider.class);
222    if (azureADTokenProviderClass == null) {
223      throw new IllegalArgumentException(
224          "Configuration  " + className + " " + "not defined/accessible.");
225    }
226
227    azureTokenProvider = ReflectionUtils
228        .newInstance(azureADTokenProviderClass, conf);
229    if (azureTokenProvider == null) {
230      throw new IllegalArgumentException("Failed to initialize " + className);
231    }
232
233    azureTokenProvider.initialize(conf);
234    return azureTokenProvider;
235  }
236
237  private AccessTokenProvider getAccessTokenProvider(Configuration config)
238      throws IOException {
239    Configuration conf = ProviderUtils.excludeIncompatibleCredentialProviders(
240        config, AdlFileSystem.class);
241    TokenProviderType type = conf.getEnum(
242        AdlConfKeys.AZURE_AD_TOKEN_PROVIDER_TYPE_KEY, TokenProviderType.Custom);
243
244    switch (type) {
245    case RefreshToken:
246      tokenProvider = getConfRefreshTokenBasedTokenProvider(conf);
247      break;
248    case ClientCredential:
249      tokenProvider = getConfCredentialBasedTokenProvider(conf);
250      break;
251    case Custom:
252    default:
253      AzureADTokenProvider azureADTokenProvider = getCustomAccessTokenProvider(
254          conf);
255      tokenProvider = new SdkTokenProviderAdapter(azureADTokenProvider);
256      break;
257    }
258
259    return tokenProvider;
260  }
261
262  private AccessTokenProvider getConfCredentialBasedTokenProvider(
263      Configuration conf) throws IOException {
264    String clientId = getPasswordString(conf, AZURE_AD_CLIENT_ID_KEY);
265    String refreshUrl = getPasswordString(conf, AZURE_AD_REFRESH_URL_KEY);
266    String clientSecret = getPasswordString(conf, AZURE_AD_CLIENT_SECRET_KEY);
267    return new ClientCredsTokenProvider(refreshUrl, clientId, clientSecret);
268  }
269
270  private AccessTokenProvider getConfRefreshTokenBasedTokenProvider(
271      Configuration conf) throws IOException {
272    String clientId = getPasswordString(conf, AZURE_AD_CLIENT_ID_KEY);
273    String refreshToken = getPasswordString(conf, AZURE_AD_REFRESH_TOKEN_KEY);
274    return new RefreshTokenBasedTokenProvider(clientId, refreshToken);
275  }
276
277  @VisibleForTesting
278  AccessTokenProvider getTokenProvider() {
279    return tokenProvider;
280  }
281
282  @VisibleForTesting
283  AzureADTokenProvider getAzureTokenProvider() {
284    return azureTokenProvider;
285  }
286
287  /**
288   * Constructing home directory locally is fine as long as Hadoop
289   * local user name and ADL user name relationship story is not fully baked
290   * yet.
291   *
292   * @return Hadoop local user home directory.
293   */
294  @Override
295  public Path getHomeDirectory() {
296    return makeQualified(new Path("/user/" + userName));
297  }
298
299  /**
300   * Create call semantic is handled differently in case of ADL. Create
301   * semantics is translated to Create/Append
302   * semantics.
303   * 1. No dedicated connection to server.
304   * 2. Buffering is locally done, Once buffer is full or flush is invoked on
305   * the by the caller. All the pending
306   * data is pushed to ADL as APPEND operation code.
307   * 3. On close - Additional call is send to server to close the stream, and
308   * release lock from the stream.
309   *
310   * Necessity of Create/Append semantics is
311   * 1. ADL backend server does not allow idle connection for longer duration
312   * . In case of slow writer scenario,
313   * observed connection timeout/Connection reset causing occasional job
314   * failures.
315   * 2. Performance boost to jobs which are slow writer, avoided network latency
316   * 3. ADL equally better performing with multiple of 4MB chunk as append
317   * calls.
318   *
319   * @param f           File path
320   * @param permission  Access permission for the newly created file
321   * @param overwrite   Remove existing file and recreate new one if true
322   *                    otherwise throw error if file exist
323   * @param bufferSize  Buffer size, ADL backend does not honour
324   * @param replication Replication count, ADL backend does not honour
325   * @param blockSize   Block size, ADL backend does not honour
326   * @param progress    Progress indicator
327   * @return FSDataOutputStream OutputStream on which application can push
328   * stream of bytes
329   * @throws IOException when system error, internal server error or user error
330   */
331  @Override
332  public FSDataOutputStream create(Path f, FsPermission permission,
333      boolean overwrite, int bufferSize, short replication, long blockSize,
334      Progressable progress) throws IOException {
335    statistics.incrementWriteOps(1);
336    IfExists overwriteRule = overwrite ? IfExists.OVERWRITE : IfExists.FAIL;
337    return new FSDataOutputStream(new AdlFsOutputStream(adlClient
338        .createFile(toRelativeFilePath(f), overwriteRule,
339            Integer.toOctalString(applyUMask(permission).toShort()), true),
340        getConf()), this.statistics);
341  }
342
343  /**
344   * Opens an FSDataOutputStream at the indicated Path with write-progress
345   * reporting. Same as create(), except fails if parent directory doesn't
346   * already exist.
347   *
348   * @param f           the file name to open
349   * @param permission  Access permission for the newly created file
350   * @param flags       {@link CreateFlag}s to use for this stream.
351   * @param bufferSize  the size of the buffer to be used. ADL backend does
352   *                    not honour
353   * @param replication required block replication for the file. ADL backend
354   *                    does not honour
355   * @param blockSize   Block size, ADL backend does not honour
356   * @param progress    Progress indicator
357   * @throws IOException when system error, internal server error or user error
358   * @see #setPermission(Path, FsPermission)
359   * @deprecated API only for 0.20-append
360   */
361  @Override
362  public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
363      EnumSet<CreateFlag> flags, int bufferSize, short replication,
364      long blockSize, Progressable progress) throws IOException {
365    statistics.incrementWriteOps(1);
366    IfExists overwriteRule = IfExists.FAIL;
367    for (CreateFlag flag : flags) {
368      if (flag == CreateFlag.OVERWRITE) {
369        overwriteRule = IfExists.OVERWRITE;
370        break;
371      }
372    }
373
374    return new FSDataOutputStream(new AdlFsOutputStream(adlClient
375        .createFile(toRelativeFilePath(f), overwriteRule,
376            Integer.toOctalString(applyUMask(permission).toShort()), false),
377        getConf()), this.statistics);
378  }
379
380  /**
381   * Append to an existing file (optional operation).
382   *
383   * @param f          the existing file to be appended.
384   * @param bufferSize the size of the buffer to be used. ADL backend does
385   *                   not honour
386   * @param progress   Progress indicator
387   * @throws IOException when system error, internal server error or user error
388   */
389  @Override
390  public FSDataOutputStream append(Path f, int bufferSize,
391      Progressable progress) throws IOException {
392    statistics.incrementWriteOps(1);
393    return new FSDataOutputStream(
394        new AdlFsOutputStream(adlClient.getAppendStream(toRelativeFilePath(f)),
395            getConf()), this.statistics);
396  }
397
398  /**
399   * Azure data lake does not support user configuration for data replication
400   * hence not leaving system to query on
401   * azure data lake.
402   *
403   * Stub implementation
404   *
405   * @param p           Not honoured
406   * @param replication Not honoured
407   * @return True hard coded since ADL file system does not support
408   * replication configuration
409   * @throws IOException No exception would not thrown in this case however
410   *                     aligning with parent api definition.
411   */
412  @Override
413  public boolean setReplication(final Path p, final short replication)
414      throws IOException {
415    statistics.incrementWriteOps(1);
416    return true;
417  }
418
419  /**
420   * Open call semantic is handled differently in case of ADL. Instead of
421   * network stream is returned to the user,
422   * Overridden FsInputStream is returned.
423   *
424   * @param f          File path
425   * @param buffersize Buffer size, Not honoured
426   * @return FSDataInputStream InputStream on which application can read
427   * stream of bytes
428   * @throws IOException when system error, internal server error or user error
429   */
430  @Override
431  public FSDataInputStream open(final Path f, final int buffersize)
432      throws IOException {
433    statistics.incrementReadOps(1);
434    return new FSDataInputStream(
435        new AdlFsInputStream(adlClient.getReadStream(toRelativeFilePath(f)),
436            statistics, getConf()));
437  }
438
439  /**
440   * Return a file status object that represents the path.
441   *
442   * @param f The path we want information from
443   * @return a FileStatus object
444   * @throws IOException when the path does not exist or any other error;
445   *                     IOException see specific implementation
446   */
447  @Override
448  public FileStatus getFileStatus(final Path f) throws IOException {
449    statistics.incrementReadOps(1);
450    DirectoryEntry entry =
451        adlClient.getDirectoryEntry(toRelativeFilePath(f), oidOrUpn);
452    return toFileStatus(entry, f);
453  }
454
455  /**
456   * List the statuses of the files/directories in the given path if the path is
457   * a directory.
458   *
459   * @param f given path
460   * @return the statuses of the files/directories in the given patch
461   * @throws IOException when the path does not exist or any other error;
462   *                     IOException see specific implementation
463   */
464  @Override
465  public FileStatus[] listStatus(final Path f) throws IOException {
466    statistics.incrementReadOps(1);
467    List<DirectoryEntry> entries =
468        adlClient.enumerateDirectory(toRelativeFilePath(f), oidOrUpn);
469    return toFileStatuses(entries, f);
470  }
471
472  /**
473   * Renames Path src to Path dst.  Can take place on local fs
474   * or remote DFS.
475   *
476   * ADLS support POSIX standard for rename operation.
477   *
478   * @param src path to be renamed
479   * @param dst new path after rename
480   * @return true if rename is successful
481   * @throws IOException on failure
482   */
483  @Override
484  public boolean rename(final Path src, final Path dst) throws IOException {
485    statistics.incrementWriteOps(1);
486    if (toRelativeFilePath(src).equals("/")) {
487      return false;
488    }
489
490    return adlClient.rename(toRelativeFilePath(src), toRelativeFilePath(dst));
491  }
492
493  @Override
494  @Deprecated
495  public void rename(final Path src, final Path dst,
496      final Options.Rename... options) throws IOException {
497    statistics.incrementWriteOps(1);
498    boolean overwrite = false;
499    for (Rename renameOption : options) {
500      if (renameOption == Rename.OVERWRITE) {
501        overwrite = true;
502        break;
503      }
504    }
505    adlClient
506        .rename(toRelativeFilePath(src), toRelativeFilePath(dst), overwrite);
507  }
508
509  /**
510   * Concat existing files together.
511   *
512   * @param trg  the path to the target destination.
513   * @param srcs the paths to the sources to use for the concatenation.
514   * @throws IOException when system error, internal server error or user error
515   */
516  @Override
517  public void concat(final Path trg, final Path[] srcs) throws IOException {
518    statistics.incrementWriteOps(1);
519    List<String> sourcesList = new ArrayList<String>();
520    for (Path entry : srcs) {
521      sourcesList.add(toRelativeFilePath(entry));
522    }
523    adlClient.concatenateFiles(toRelativeFilePath(trg), sourcesList);
524  }
525
526  /**
527   * Delete a file.
528   *
529   * @param path      the path to delete.
530   * @param recursive if path is a directory and set to
531   *                  true, the directory is deleted else throws an exception.
532   *                  In case of a file the recursive can be set to either
533   *                  true or false.
534   * @return true if delete is successful else false.
535   * @throws IOException when system error, internal server error or user error
536   */
537  @Override
538  public boolean delete(final Path path, final boolean recursive)
539      throws IOException {
540    statistics.incrementWriteOps(1);
541    String relativePath = toRelativeFilePath(path);
542    // Delete on root directory not supported.
543    if (relativePath.equals("/")) {
544      // This is important check after recent commit
545      // HADOOP-12977 and HADOOP-13716 validates on root for
546      // 1. if root is empty and non recursive delete then return false.
547      // 2. if root is non empty and non recursive delete then throw exception.
548      if (!recursive
549          && adlClient.enumerateDirectory(toRelativeFilePath(path), 1).size()
550          > 0) {
551        throw new IOException("Delete on root is not supported.");
552      }
553      return false;
554    }
555
556    return recursive ?
557        adlClient.deleteRecursive(relativePath) :
558        adlClient.delete(relativePath);
559  }
560
561  /**
562   * Make the given file and all non-existent parents into
563   * directories. Has the semantics of Unix 'mkdir -p'.
564   * Existence of the directory hierarchy is not an error.
565   *
566   * @param path       path to create
567   * @param permission to apply to path
568   */
569  @Override
570  public boolean mkdirs(final Path path, final FsPermission permission)
571      throws IOException {
572    statistics.incrementWriteOps(1);
573    return adlClient.createDirectory(toRelativeFilePath(path),
574        Integer.toOctalString(applyUMask(permission).toShort()));
575  }
576
577  private FileStatus[] toFileStatuses(final List<DirectoryEntry> entries,
578      final Path parent) {
579    FileStatus[] fileStatuses = new FileStatus[entries.size()];
580    int index = 0;
581    for (DirectoryEntry entry : entries) {
582      FileStatus status = toFileStatus(entry, parent);
583      if (!(entry.name == null || entry.name == "")) {
584        status.setPath(
585            new Path(parent.makeQualified(uri, workingDirectory), entry.name));
586      }
587
588      fileStatuses[index++] = status;
589    }
590
591    return fileStatuses;
592  }
593
594  private FsPermission applyUMask(FsPermission permission) {
595    if (permission == null) {
596      permission = FsPermission.getDefault();
597    }
598    return permission.applyUMask(FsPermission.getUMask(getConf()));
599  }
600
601  private FileStatus toFileStatus(final DirectoryEntry entry, final Path f) {
602    boolean isDirectory = entry.type == DirectoryEntryType.DIRECTORY;
603    long lastModificationData = entry.lastModifiedTime.getTime();
604    long lastAccessTime = entry.lastAccessTime.getTime();
605    // set aclBit from ADLS backend response if
606    // ADL_SUPPORT_ACL_BIT_IN_FSPERMISSION is true.
607    final boolean aclBit = aclBitStatus ? entry.aclBit : false;
608
609    FsPermission permission = new AdlPermission(aclBit,
610        Short.valueOf(entry.permission, 8));
611    String user = entry.user;
612    String group = entry.group;
613
614    FileStatus status;
615    if (overrideOwner) {
616      status = new FileStatus(entry.length, isDirectory, ADL_REPLICATION_FACTOR,
617          ADL_BLOCK_SIZE, lastModificationData, lastAccessTime, permission,
618          userName, "hdfs", this.makeQualified(f));
619    } else {
620      status = new FileStatus(entry.length, isDirectory, ADL_REPLICATION_FACTOR,
621          ADL_BLOCK_SIZE, lastModificationData, lastAccessTime, permission,
622          user, group, this.makeQualified(f));
623    }
624
625    return status;
626  }
627
628  /**
629   * Set owner of a path (i.e. a file or a directory).
630   * The parameters owner and group cannot both be null.
631   *
632   * @param path  The path
633   * @param owner If it is null, the original username remains unchanged.
634   * @param group If it is null, the original groupname remains unchanged.
635   */
636  @Override
637  public void setOwner(final Path path, final String owner, final String group)
638      throws IOException {
639    statistics.incrementWriteOps(1);
640    adlClient.setOwner(toRelativeFilePath(path), owner, group);
641  }
642
643  /**
644   * Set permission of a path.
645   *
646   * @param path       The path
647   * @param permission Access permission
648   */
649  @Override
650  public void setPermission(final Path path, final FsPermission permission)
651      throws IOException {
652    statistics.incrementWriteOps(1);
653    adlClient.setPermission(toRelativeFilePath(path),
654        Integer.toOctalString(permission.toShort()));
655  }
656
657  /**
658   * Modifies ACL entries of files and directories.  This method can add new ACL
659   * entries or modify the permissions on existing ACL entries.  All existing
660   * ACL entries that are not specified in this call are retained without
661   * changes.  (Modifications are merged into the current ACL.)
662   *
663   * @param path    Path to modify
664   * @param aclSpec List of AclEntry describing modifications
665   * @throws IOException if an ACL could not be modified
666   */
667  @Override
668  public void modifyAclEntries(final Path path, final List<AclEntry> aclSpec)
669      throws IOException {
670    statistics.incrementWriteOps(1);
671    List<com.microsoft.azure.datalake.store.acl.AclEntry> msAclEntries = new
672        ArrayList<com.microsoft.azure.datalake.store.acl.AclEntry>();
673    for (AclEntry aclEntry : aclSpec) {
674      msAclEntries.add(com.microsoft.azure.datalake.store.acl.AclEntry
675          .parseAclEntry(aclEntry.toString()));
676    }
677    adlClient.modifyAclEntries(toRelativeFilePath(path), msAclEntries);
678  }
679
680  /**
681   * Removes ACL entries from files and directories.  Other ACL entries are
682   * retained.
683   *
684   * @param path    Path to modify
685   * @param aclSpec List of AclEntry describing entries to remove
686   * @throws IOException if an ACL could not be modified
687   */
688  @Override
689  public void removeAclEntries(final Path path, final List<AclEntry> aclSpec)
690      throws IOException {
691    statistics.incrementWriteOps(1);
692    List<com.microsoft.azure.datalake.store.acl.AclEntry> msAclEntries = new
693        ArrayList<com.microsoft.azure.datalake.store.acl.AclEntry>();
694    for (AclEntry aclEntry : aclSpec) {
695      msAclEntries.add(com.microsoft.azure.datalake.store.acl.AclEntry
696          .parseAclEntry(aclEntry.toString(), true));
697    }
698    adlClient.removeAclEntries(toRelativeFilePath(path), msAclEntries);
699  }
700
701  /**
702   * Removes all default ACL entries from files and directories.
703   *
704   * @param path Path to modify
705   * @throws IOException if an ACL could not be modified
706   */
707  @Override
708  public void removeDefaultAcl(final Path path) throws IOException {
709    statistics.incrementWriteOps(1);
710    adlClient.removeDefaultAcls(toRelativeFilePath(path));
711  }
712
713  /**
714   * Removes all but the base ACL entries of files and directories.  The entries
715   * for user, group, and others are retained for compatibility with permission
716   * bits.
717   *
718   * @param path Path to modify
719   * @throws IOException if an ACL could not be removed
720   */
721  @Override
722  public void removeAcl(final Path path) throws IOException {
723    statistics.incrementWriteOps(1);
724    adlClient.removeAllAcls(toRelativeFilePath(path));
725  }
726
727  /**
728   * Fully replaces ACL of files and directories, discarding all existing
729   * entries.
730   *
731   * @param path    Path to modify
732   * @param aclSpec List of AclEntry describing modifications, must include
733   *                entries for user, group, and others for compatibility with
734   *                permission bits.
735   * @throws IOException if an ACL could not be modified
736   */
737  @Override
738  public void setAcl(final Path path, final List<AclEntry> aclSpec)
739      throws IOException {
740    statistics.incrementWriteOps(1);
741    List<com.microsoft.azure.datalake.store.acl.AclEntry> msAclEntries = new
742        ArrayList<com.microsoft.azure.datalake.store.acl.AclEntry>();
743    for (AclEntry aclEntry : aclSpec) {
744      msAclEntries.add(com.microsoft.azure.datalake.store.acl.AclEntry
745          .parseAclEntry(aclEntry.toString()));
746    }
747
748    adlClient.setAcl(toRelativeFilePath(path), msAclEntries);
749  }
750
751  /**
752   * Gets the ACL of a file or directory.
753   *
754   * @param path Path to get
755   * @return AclStatus describing the ACL of the file or directory
756   * @throws IOException if an ACL could not be read
757   */
758  @Override
759  public AclStatus getAclStatus(final Path path) throws IOException {
760    statistics.incrementReadOps(1);
761    com.microsoft.azure.datalake.store.acl.AclStatus adlStatus =
762        adlClient.getAclStatus(toRelativeFilePath(path), oidOrUpn);
763    AclStatus.Builder aclStatusBuilder = new AclStatus.Builder();
764    aclStatusBuilder.owner(adlStatus.owner);
765    aclStatusBuilder.group(adlStatus.group);
766    aclStatusBuilder.setPermission(
767        new FsPermission(Short.valueOf(adlStatus.octalPermissions, 8)));
768    aclStatusBuilder.stickyBit(adlStatus.stickyBit);
769    String aclListString = com.microsoft.azure.datalake.store.acl.AclEntry
770        .aclListToString(adlStatus.aclSpec);
771    List<AclEntry> aclEntries = AclEntry.parseAclSpec(aclListString, true);
772    aclStatusBuilder.addEntries(aclEntries);
773    return aclStatusBuilder.build();
774  }
775
776  /**
777   * Checks if the user can access a path.  The mode specifies which access
778   * checks to perform.  If the requested permissions are granted, then the
779   * method returns normally.  If access is denied, then the method throws an
780   * {@link AccessControlException}.
781   *
782   * @param path Path to check
783   * @param mode type of access to check
784   * @throws AccessControlException        if access is denied
785   * @throws java.io.FileNotFoundException if the path does not exist
786   * @throws IOException                   see specific implementation
787   */
788  @Override
789  public void access(final Path path, FsAction mode) throws IOException {
790    statistics.incrementReadOps(1);
791    if (!adlClient.checkAccess(toRelativeFilePath(path), mode.SYMBOL)) {
792      throw new AccessControlException("Access Denied : " + path.toString());
793    }
794  }
795
796  /**
797   * Return the {@link ContentSummary} of a given {@link Path}.
798   *
799   * @param f path to use
800   */
801  @Override
802  public ContentSummary getContentSummary(Path f) throws IOException {
803    statistics.incrementReadOps(1);
804    com.microsoft.azure.datalake.store.ContentSummary msSummary = adlClient
805        .getContentSummary(toRelativeFilePath(f));
806    return new Builder().length(msSummary.length)
807        .directoryCount(msSummary.directoryCount).fileCount(msSummary.fileCount)
808        .spaceConsumed(msSummary.spaceConsumed).build();
809  }
810
811  @VisibleForTesting
812  protected String getTransportScheme() {
813    return SECURE_TRANSPORT_SCHEME;
814  }
815
816  @VisibleForTesting
817  String toRelativeFilePath(Path path) {
818    return path.makeQualified(uri, workingDirectory).toUri().getPath();
819  }
820
821  /**
822   * Get the current working directory for the given file system.
823   *
824   * @return the directory pathname
825   */
826  @Override
827  public Path getWorkingDirectory() {
828    return workingDirectory;
829  }
830
831  /**
832   * Set the current working directory for the given file system. All relative
833   * paths will be resolved relative to it.
834   *
835   * @param dir Working directory path.
836   */
837  @Override
838  public void setWorkingDirectory(final Path dir) {
839    if (dir == null) {
840      throw new InvalidPathException("Working directory cannot be set to NULL");
841    }
842
843    /**
844     * Do not validate the scheme and URI of the passsed parameter. When Adls
845     * runs as additional file system, working directory set has the default
846     * file system scheme and uri.
847     *
848     * Found a problem during PIG execution in
849     * https://github.com/apache/pig/blob/branch-0
850     * .15/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer
851     * /PigInputFormat.java#L235
852     * However similar problem would be present in other application so
853     * defaulting to build working directory using relative path only.
854     */
855    this.workingDirectory = this.makeAbsolute(dir);
856  }
857
858  /**
859   * Return the number of bytes that large input files should be optimally
860   * be split into to minimize i/o time.
861   *
862   * @deprecated use {@link #getDefaultBlockSize(Path)} instead
863   */
864  @Deprecated
865  public long getDefaultBlockSize() {
866    return ADL_BLOCK_SIZE;
867  }
868
869  /**
870   * Return the number of bytes that large input files should be optimally
871   * be split into to minimize i/o time.  The given path will be used to
872   * locate the actual filesystem.  The full path does not have to exist.
873   *
874   * @param f path of file
875   * @return the default block size for the path's filesystem
876   */
877  public long getDefaultBlockSize(Path f) {
878    return getDefaultBlockSize();
879  }
880
881  /**
882   * Get the block size.
883   * @param f the filename
884   * @return the number of bytes in a block
885   */
886  /**
887   * @deprecated Use getFileStatus() instead
888   */
889  @Deprecated
890  public long getBlockSize(Path f) throws IOException {
891    return ADL_BLOCK_SIZE;
892  }
893
894  @Override
895  public BlockLocation[] getFileBlockLocations(final FileStatus status,
896      final long offset, final long length) throws IOException {
897    if (status == null) {
898      return null;
899    }
900
901    if ((offset < 0) || (length < 0)) {
902      throw new IllegalArgumentException("Invalid start or len parameter");
903    }
904
905    if (status.getLen() < offset) {
906      return new BlockLocation[0];
907    }
908
909    final String[] name = {"localhost"};
910    final String[] host = {"localhost"};
911    long blockSize = ADL_BLOCK_SIZE;
912    int numberOfLocations =
913        (int) (length / blockSize) + ((length % blockSize == 0) ? 0 : 1);
914    BlockLocation[] locations = new BlockLocation[numberOfLocations];
915    for (int i = 0; i < locations.length; i++) {
916      long currentOffset = offset + (i * blockSize);
917      long currentLength = Math.min(blockSize, offset + length - currentOffset);
918      locations[i] = new BlockLocation(name, host, currentOffset,
919          currentLength);
920    }
921
922    return locations;
923  }
924
925  @Override
926  public BlockLocation[] getFileBlockLocations(final Path p, final long offset,
927      final long length) throws IOException {
928    // read ops incremented in getFileStatus
929    FileStatus fileStatus = getFileStatus(p);
930    return getFileBlockLocations(fileStatus, offset, length);
931  }
932
933  /**
934   * Get replication.
935   *
936   * @param src file name
937   * @return file replication
938   * @deprecated Use getFileStatus() instead
939   */
940  @Deprecated
941  public short getReplication(Path src) {
942    return ADL_REPLICATION_FACTOR;
943  }
944
945  private Path makeAbsolute(Path path) {
946    return path.isAbsolute() ? path : new Path(this.workingDirectory, path);
947  }
948
949  private static String getNonEmptyVal(Configuration conf, String key) {
950    String value = conf.get(key);
951    if (StringUtils.isEmpty(value)) {
952      throw new IllegalArgumentException(
953          "No value for " + key + " found in conf file.");
954    }
955    return value;
956  }
957
958  /**
959   * A wrapper of {@link Configuration#getPassword(String)}. It returns
960   * <code>String</code> instead of <code>char[]</code>.
961   *
962   * @param conf the configuration
963   * @param key the property key
964   * @return the password string
965   * @throws IOException if the password was not found
966   */
967  private static String getPasswordString(Configuration conf, String key)
968      throws IOException {
969    char[] passchars = conf.getPassword(key);
970    if (passchars == null) {
971      throw new IOException("Password " + key + " not found");
972    }
973    return new String(passchars);
974  }
975
976  @VisibleForTesting
977  public void setUserGroupRepresentationAsUPN(boolean enableUPN) {
978    oidOrUpn = enableUPN ? UserGroupRepresentation.UPN :
979        UserGroupRepresentation.OID;
980  }
981}