001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 * 018 */ 019 020package org.apache.hadoop.fs.adl; 021 022import java.io.IOException; 023import java.net.URI; 024import java.util.ArrayList; 025import java.util.EnumSet; 026import java.util.List; 027 028import com.google.common.annotations.VisibleForTesting; 029import com.microsoft.azure.datalake.store.ADLStoreClient; 030import com.microsoft.azure.datalake.store.ADLStoreOptions; 031import com.microsoft.azure.datalake.store.DirectoryEntry; 032import com.microsoft.azure.datalake.store.DirectoryEntryType; 033import com.microsoft.azure.datalake.store.IfExists; 034import com.microsoft.azure.datalake.store.LatencyTracker; 035import com.microsoft.azure.datalake.store.UserGroupRepresentation; 036import com.microsoft.azure.datalake.store.oauth2.AccessTokenProvider; 037import com.microsoft.azure.datalake.store.oauth2.ClientCredsTokenProvider; 038import com.microsoft.azure.datalake.store.oauth2.RefreshTokenBasedTokenProvider; 039 040import org.apache.commons.lang.StringUtils; 041import org.apache.hadoop.classification.InterfaceAudience; 042import org.apache.hadoop.classification.InterfaceStability; 043import org.apache.hadoop.conf.Configuration; 044import org.apache.hadoop.fs.BlockLocation; 045import org.apache.hadoop.fs.ContentSummary; 046import org.apache.hadoop.fs.ContentSummary.Builder; 047import org.apache.hadoop.fs.CreateFlag; 048import org.apache.hadoop.fs.FSDataInputStream; 049import org.apache.hadoop.fs.FSDataOutputStream; 050import org.apache.hadoop.fs.FileStatus; 051import org.apache.hadoop.fs.FileSystem; 052import org.apache.hadoop.fs.InvalidPathException; 053import org.apache.hadoop.fs.Options; 054import org.apache.hadoop.fs.Options.Rename; 055import org.apache.hadoop.fs.Path; 056import org.apache.hadoop.fs.adl.oauth2.AzureADTokenProvider; 057import org.apache.hadoop.fs.permission.AclEntry; 058import org.apache.hadoop.fs.permission.AclStatus; 059import org.apache.hadoop.fs.permission.FsAction; 060import org.apache.hadoop.fs.permission.FsPermission; 061import org.apache.hadoop.security.AccessControlException; 062import org.apache.hadoop.security.ProviderUtils; 063import org.apache.hadoop.security.UserGroupInformation; 064import org.apache.hadoop.util.Progressable; 065import org.apache.hadoop.util.ReflectionUtils; 066import org.apache.hadoop.util.VersionInfo; 067 068import static org.apache.hadoop.fs.adl.AdlConfKeys.*; 069 070/** 071 * A FileSystem to access Azure Data Lake Store. 072 */ 073@InterfaceAudience.Public 074@InterfaceStability.Evolving 075public class AdlFileSystem extends FileSystem { 076 public static final String SCHEME = "adl"; 077 static final int DEFAULT_PORT = 443; 078 private URI uri; 079 private String userName; 080 private boolean overrideOwner; 081 private ADLStoreClient adlClient; 082 private Path workingDirectory; 083 private boolean aclBitStatus; 084 private UserGroupRepresentation oidOrUpn; 085 086 087 // retained for tests 088 private AccessTokenProvider tokenProvider; 089 private AzureADTokenProvider azureTokenProvider; 090 091 @Override 092 public String getScheme() { 093 return SCHEME; 094 } 095 096 public URI getUri() { 097 return uri; 098 } 099 100 @Override 101 public int getDefaultPort() { 102 return DEFAULT_PORT; 103 } 104 105 @Override 106 public boolean supportsSymlinks() { 107 return false; 108 } 109 110 /** 111 * Called after a new FileSystem instance is constructed. 112 * 113 * @param storeUri a uri whose authority section names the host, port, etc. 114 * for this FileSystem 115 * @param conf the configuration 116 */ 117 @Override 118 public void initialize(URI storeUri, Configuration conf) throws IOException { 119 super.initialize(storeUri, conf); 120 this.setConf(conf); 121 this.uri = URI 122 .create(storeUri.getScheme() + "://" + storeUri.getAuthority()); 123 124 try { 125 userName = UserGroupInformation.getCurrentUser().getShortUserName(); 126 } catch (IOException e) { 127 userName = "hadoop"; 128 } 129 130 this.setWorkingDirectory(getHomeDirectory()); 131 132 overrideOwner = getConf().getBoolean(ADL_DEBUG_OVERRIDE_LOCAL_USER_AS_OWNER, 133 ADL_DEBUG_SET_LOCAL_USER_AS_OWNER_DEFAULT); 134 135 aclBitStatus = conf.getBoolean(ADL_SUPPORT_ACL_BIT_IN_FSPERMISSION, 136 ADL_SUPPORT_ACL_BIT_IN_FSPERMISSION_DEFAULT); 137 138 String accountFQDN = null; 139 String mountPoint = null; 140 String hostname = storeUri.getHost(); 141 if (!hostname.contains(".") && !hostname.equalsIgnoreCase( 142 "localhost")) { // this is a symbolic name. Resolve it. 143 String hostNameProperty = "dfs.adls." + hostname + ".hostname"; 144 String mountPointProperty = "dfs.adls." + hostname + ".mountpoint"; 145 accountFQDN = getNonEmptyVal(conf, hostNameProperty); 146 mountPoint = getNonEmptyVal(conf, mountPointProperty); 147 } else { 148 accountFQDN = hostname; 149 } 150 151 if (storeUri.getPort() > 0) { 152 accountFQDN = accountFQDN + ":" + storeUri.getPort(); 153 } 154 155 adlClient = ADLStoreClient 156 .createClient(accountFQDN, getAccessTokenProvider(conf)); 157 158 ADLStoreOptions options = new ADLStoreOptions(); 159 options.enableThrowingRemoteExceptions(); 160 161 if (getTransportScheme().equalsIgnoreCase(INSECURE_TRANSPORT_SCHEME)) { 162 options.setInsecureTransport(); 163 } 164 165 if (mountPoint != null) { 166 options.setFilePathPrefix(mountPoint); 167 } 168 169 String clusterName = conf.get(ADL_EVENTS_TRACKING_CLUSTERNAME, "UNKNOWN"); 170 String clusterType = conf.get(ADL_EVENTS_TRACKING_CLUSTERTYPE, "UNKNOWN"); 171 172 String clientVersion = ADL_HADOOP_CLIENT_NAME + (StringUtils 173 .isEmpty(VersionInfo.getVersion().trim()) ? 174 ADL_HADOOP_CLIENT_VERSION.trim() : 175 VersionInfo.getVersion().trim()); 176 options.setUserAgentSuffix(clientVersion + "/" + 177 VersionInfo.getVersion().trim() + "/" + clusterName + "/" 178 + clusterType); 179 180 adlClient.setOptions(options); 181 182 boolean trackLatency = conf 183 .getBoolean(LATENCY_TRACKER_KEY, LATENCY_TRACKER_DEFAULT); 184 if (!trackLatency) { 185 LatencyTracker.disable(); 186 } 187 188 boolean enableUPN = conf.getBoolean(ADL_ENABLEUPN_FOR_OWNERGROUP_KEY, 189 ADL_ENABLEUPN_FOR_OWNERGROUP_DEFAULT); 190 oidOrUpn = enableUPN ? UserGroupRepresentation.UPN : 191 UserGroupRepresentation.OID; 192 } 193 194 /** 195 * This method is provided for convenience for derived classes to define 196 * custom {@link AzureADTokenProvider} instance. 197 * 198 * In order to ensure secure hadoop infrastructure and user context for which 199 * respective {@link AdlFileSystem} instance is initialized, 200 * Loading {@link AzureADTokenProvider} is not sufficient. 201 * 202 * The order of loading {@link AzureADTokenProvider} is to first invoke 203 * {@link #getCustomAccessTokenProvider(Configuration)}, If method return null 204 * which means no implementation provided by derived classes, then 205 * configuration object is loaded to retrieve token configuration as specified 206 * is documentation. 207 * 208 * Custom token management takes the higher precedence during initialization. 209 * 210 * @param conf Configuration object 211 * @return null if the no custom {@link AzureADTokenProvider} token management 212 * is specified. 213 * @throws IOException if failed to initialize token provider. 214 */ 215 protected synchronized AzureADTokenProvider getCustomAccessTokenProvider( 216 Configuration conf) throws IOException { 217 String className = getNonEmptyVal(conf, AZURE_AD_TOKEN_PROVIDER_CLASS_KEY); 218 219 Class<? extends AzureADTokenProvider> azureADTokenProviderClass = 220 conf.getClass(AZURE_AD_TOKEN_PROVIDER_CLASS_KEY, null, 221 AzureADTokenProvider.class); 222 if (azureADTokenProviderClass == null) { 223 throw new IllegalArgumentException( 224 "Configuration " + className + " " + "not defined/accessible."); 225 } 226 227 azureTokenProvider = ReflectionUtils 228 .newInstance(azureADTokenProviderClass, conf); 229 if (azureTokenProvider == null) { 230 throw new IllegalArgumentException("Failed to initialize " + className); 231 } 232 233 azureTokenProvider.initialize(conf); 234 return azureTokenProvider; 235 } 236 237 private AccessTokenProvider getAccessTokenProvider(Configuration config) 238 throws IOException { 239 Configuration conf = ProviderUtils.excludeIncompatibleCredentialProviders( 240 config, AdlFileSystem.class); 241 TokenProviderType type = conf.getEnum( 242 AdlConfKeys.AZURE_AD_TOKEN_PROVIDER_TYPE_KEY, TokenProviderType.Custom); 243 244 switch (type) { 245 case RefreshToken: 246 tokenProvider = getConfRefreshTokenBasedTokenProvider(conf); 247 break; 248 case ClientCredential: 249 tokenProvider = getConfCredentialBasedTokenProvider(conf); 250 break; 251 case Custom: 252 default: 253 AzureADTokenProvider azureADTokenProvider = getCustomAccessTokenProvider( 254 conf); 255 tokenProvider = new SdkTokenProviderAdapter(azureADTokenProvider); 256 break; 257 } 258 259 return tokenProvider; 260 } 261 262 private AccessTokenProvider getConfCredentialBasedTokenProvider( 263 Configuration conf) throws IOException { 264 String clientId = getPasswordString(conf, AZURE_AD_CLIENT_ID_KEY); 265 String refreshUrl = getPasswordString(conf, AZURE_AD_REFRESH_URL_KEY); 266 String clientSecret = getPasswordString(conf, AZURE_AD_CLIENT_SECRET_KEY); 267 return new ClientCredsTokenProvider(refreshUrl, clientId, clientSecret); 268 } 269 270 private AccessTokenProvider getConfRefreshTokenBasedTokenProvider( 271 Configuration conf) throws IOException { 272 String clientId = getPasswordString(conf, AZURE_AD_CLIENT_ID_KEY); 273 String refreshToken = getPasswordString(conf, AZURE_AD_REFRESH_TOKEN_KEY); 274 return new RefreshTokenBasedTokenProvider(clientId, refreshToken); 275 } 276 277 @VisibleForTesting 278 AccessTokenProvider getTokenProvider() { 279 return tokenProvider; 280 } 281 282 @VisibleForTesting 283 AzureADTokenProvider getAzureTokenProvider() { 284 return azureTokenProvider; 285 } 286 287 /** 288 * Constructing home directory locally is fine as long as Hadoop 289 * local user name and ADL user name relationship story is not fully baked 290 * yet. 291 * 292 * @return Hadoop local user home directory. 293 */ 294 @Override 295 public Path getHomeDirectory() { 296 return makeQualified(new Path("/user/" + userName)); 297 } 298 299 /** 300 * Create call semantic is handled differently in case of ADL. Create 301 * semantics is translated to Create/Append 302 * semantics. 303 * 1. No dedicated connection to server. 304 * 2. Buffering is locally done, Once buffer is full or flush is invoked on 305 * the by the caller. All the pending 306 * data is pushed to ADL as APPEND operation code. 307 * 3. On close - Additional call is send to server to close the stream, and 308 * release lock from the stream. 309 * 310 * Necessity of Create/Append semantics is 311 * 1. ADL backend server does not allow idle connection for longer duration 312 * . In case of slow writer scenario, 313 * observed connection timeout/Connection reset causing occasional job 314 * failures. 315 * 2. Performance boost to jobs which are slow writer, avoided network latency 316 * 3. ADL equally better performing with multiple of 4MB chunk as append 317 * calls. 318 * 319 * @param f File path 320 * @param permission Access permission for the newly created file 321 * @param overwrite Remove existing file and recreate new one if true 322 * otherwise throw error if file exist 323 * @param bufferSize Buffer size, ADL backend does not honour 324 * @param replication Replication count, ADL backend does not honour 325 * @param blockSize Block size, ADL backend does not honour 326 * @param progress Progress indicator 327 * @return FSDataOutputStream OutputStream on which application can push 328 * stream of bytes 329 * @throws IOException when system error, internal server error or user error 330 */ 331 @Override 332 public FSDataOutputStream create(Path f, FsPermission permission, 333 boolean overwrite, int bufferSize, short replication, long blockSize, 334 Progressable progress) throws IOException { 335 statistics.incrementWriteOps(1); 336 IfExists overwriteRule = overwrite ? IfExists.OVERWRITE : IfExists.FAIL; 337 return new FSDataOutputStream(new AdlFsOutputStream(adlClient 338 .createFile(toRelativeFilePath(f), overwriteRule, 339 Integer.toOctalString(applyUMask(permission).toShort()), true), 340 getConf()), this.statistics); 341 } 342 343 /** 344 * Opens an FSDataOutputStream at the indicated Path with write-progress 345 * reporting. Same as create(), except fails if parent directory doesn't 346 * already exist. 347 * 348 * @param f the file name to open 349 * @param permission Access permission for the newly created file 350 * @param flags {@link CreateFlag}s to use for this stream. 351 * @param bufferSize the size of the buffer to be used. ADL backend does 352 * not honour 353 * @param replication required block replication for the file. ADL backend 354 * does not honour 355 * @param blockSize Block size, ADL backend does not honour 356 * @param progress Progress indicator 357 * @throws IOException when system error, internal server error or user error 358 * @see #setPermission(Path, FsPermission) 359 * @deprecated API only for 0.20-append 360 */ 361 @Override 362 public FSDataOutputStream createNonRecursive(Path f, FsPermission permission, 363 EnumSet<CreateFlag> flags, int bufferSize, short replication, 364 long blockSize, Progressable progress) throws IOException { 365 statistics.incrementWriteOps(1); 366 IfExists overwriteRule = IfExists.FAIL; 367 for (CreateFlag flag : flags) { 368 if (flag == CreateFlag.OVERWRITE) { 369 overwriteRule = IfExists.OVERWRITE; 370 break; 371 } 372 } 373 374 return new FSDataOutputStream(new AdlFsOutputStream(adlClient 375 .createFile(toRelativeFilePath(f), overwriteRule, 376 Integer.toOctalString(applyUMask(permission).toShort()), false), 377 getConf()), this.statistics); 378 } 379 380 /** 381 * Append to an existing file (optional operation). 382 * 383 * @param f the existing file to be appended. 384 * @param bufferSize the size of the buffer to be used. ADL backend does 385 * not honour 386 * @param progress Progress indicator 387 * @throws IOException when system error, internal server error or user error 388 */ 389 @Override 390 public FSDataOutputStream append(Path f, int bufferSize, 391 Progressable progress) throws IOException { 392 statistics.incrementWriteOps(1); 393 return new FSDataOutputStream( 394 new AdlFsOutputStream(adlClient.getAppendStream(toRelativeFilePath(f)), 395 getConf()), this.statistics); 396 } 397 398 /** 399 * Azure data lake does not support user configuration for data replication 400 * hence not leaving system to query on 401 * azure data lake. 402 * 403 * Stub implementation 404 * 405 * @param p Not honoured 406 * @param replication Not honoured 407 * @return True hard coded since ADL file system does not support 408 * replication configuration 409 * @throws IOException No exception would not thrown in this case however 410 * aligning with parent api definition. 411 */ 412 @Override 413 public boolean setReplication(final Path p, final short replication) 414 throws IOException { 415 statistics.incrementWriteOps(1); 416 return true; 417 } 418 419 /** 420 * Open call semantic is handled differently in case of ADL. Instead of 421 * network stream is returned to the user, 422 * Overridden FsInputStream is returned. 423 * 424 * @param f File path 425 * @param buffersize Buffer size, Not honoured 426 * @return FSDataInputStream InputStream on which application can read 427 * stream of bytes 428 * @throws IOException when system error, internal server error or user error 429 */ 430 @Override 431 public FSDataInputStream open(final Path f, final int buffersize) 432 throws IOException { 433 statistics.incrementReadOps(1); 434 return new FSDataInputStream( 435 new AdlFsInputStream(adlClient.getReadStream(toRelativeFilePath(f)), 436 statistics, getConf())); 437 } 438 439 /** 440 * Return a file status object that represents the path. 441 * 442 * @param f The path we want information from 443 * @return a FileStatus object 444 * @throws IOException when the path does not exist or any other error; 445 * IOException see specific implementation 446 */ 447 @Override 448 public FileStatus getFileStatus(final Path f) throws IOException { 449 statistics.incrementReadOps(1); 450 DirectoryEntry entry = 451 adlClient.getDirectoryEntry(toRelativeFilePath(f), oidOrUpn); 452 return toFileStatus(entry, f); 453 } 454 455 /** 456 * List the statuses of the files/directories in the given path if the path is 457 * a directory. 458 * 459 * @param f given path 460 * @return the statuses of the files/directories in the given patch 461 * @throws IOException when the path does not exist or any other error; 462 * IOException see specific implementation 463 */ 464 @Override 465 public FileStatus[] listStatus(final Path f) throws IOException { 466 statistics.incrementReadOps(1); 467 List<DirectoryEntry> entries = 468 adlClient.enumerateDirectory(toRelativeFilePath(f), oidOrUpn); 469 return toFileStatuses(entries, f); 470 } 471 472 /** 473 * Renames Path src to Path dst. Can take place on local fs 474 * or remote DFS. 475 * 476 * ADLS support POSIX standard for rename operation. 477 * 478 * @param src path to be renamed 479 * @param dst new path after rename 480 * @return true if rename is successful 481 * @throws IOException on failure 482 */ 483 @Override 484 public boolean rename(final Path src, final Path dst) throws IOException { 485 statistics.incrementWriteOps(1); 486 if (toRelativeFilePath(src).equals("/")) { 487 return false; 488 } 489 490 return adlClient.rename(toRelativeFilePath(src), toRelativeFilePath(dst)); 491 } 492 493 @Override 494 @Deprecated 495 public void rename(final Path src, final Path dst, 496 final Options.Rename... options) throws IOException { 497 statistics.incrementWriteOps(1); 498 boolean overwrite = false; 499 for (Rename renameOption : options) { 500 if (renameOption == Rename.OVERWRITE) { 501 overwrite = true; 502 break; 503 } 504 } 505 adlClient 506 .rename(toRelativeFilePath(src), toRelativeFilePath(dst), overwrite); 507 } 508 509 /** 510 * Concat existing files together. 511 * 512 * @param trg the path to the target destination. 513 * @param srcs the paths to the sources to use for the concatenation. 514 * @throws IOException when system error, internal server error or user error 515 */ 516 @Override 517 public void concat(final Path trg, final Path[] srcs) throws IOException { 518 statistics.incrementWriteOps(1); 519 List<String> sourcesList = new ArrayList<String>(); 520 for (Path entry : srcs) { 521 sourcesList.add(toRelativeFilePath(entry)); 522 } 523 adlClient.concatenateFiles(toRelativeFilePath(trg), sourcesList); 524 } 525 526 /** 527 * Delete a file. 528 * 529 * @param path the path to delete. 530 * @param recursive if path is a directory and set to 531 * true, the directory is deleted else throws an exception. 532 * In case of a file the recursive can be set to either 533 * true or false. 534 * @return true if delete is successful else false. 535 * @throws IOException when system error, internal server error or user error 536 */ 537 @Override 538 public boolean delete(final Path path, final boolean recursive) 539 throws IOException { 540 statistics.incrementWriteOps(1); 541 String relativePath = toRelativeFilePath(path); 542 // Delete on root directory not supported. 543 if (relativePath.equals("/")) { 544 // This is important check after recent commit 545 // HADOOP-12977 and HADOOP-13716 validates on root for 546 // 1. if root is empty and non recursive delete then return false. 547 // 2. if root is non empty and non recursive delete then throw exception. 548 if (!recursive 549 && adlClient.enumerateDirectory(toRelativeFilePath(path), 1).size() 550 > 0) { 551 throw new IOException("Delete on root is not supported."); 552 } 553 return false; 554 } 555 556 return recursive ? 557 adlClient.deleteRecursive(relativePath) : 558 adlClient.delete(relativePath); 559 } 560 561 /** 562 * Make the given file and all non-existent parents into 563 * directories. Has the semantics of Unix 'mkdir -p'. 564 * Existence of the directory hierarchy is not an error. 565 * 566 * @param path path to create 567 * @param permission to apply to path 568 */ 569 @Override 570 public boolean mkdirs(final Path path, final FsPermission permission) 571 throws IOException { 572 statistics.incrementWriteOps(1); 573 return adlClient.createDirectory(toRelativeFilePath(path), 574 Integer.toOctalString(applyUMask(permission).toShort())); 575 } 576 577 private FileStatus[] toFileStatuses(final List<DirectoryEntry> entries, 578 final Path parent) { 579 FileStatus[] fileStatuses = new FileStatus[entries.size()]; 580 int index = 0; 581 for (DirectoryEntry entry : entries) { 582 FileStatus status = toFileStatus(entry, parent); 583 if (!(entry.name == null || entry.name == "")) { 584 status.setPath( 585 new Path(parent.makeQualified(uri, workingDirectory), entry.name)); 586 } 587 588 fileStatuses[index++] = status; 589 } 590 591 return fileStatuses; 592 } 593 594 private FsPermission applyUMask(FsPermission permission) { 595 if (permission == null) { 596 permission = FsPermission.getDefault(); 597 } 598 return permission.applyUMask(FsPermission.getUMask(getConf())); 599 } 600 601 private FileStatus toFileStatus(final DirectoryEntry entry, final Path f) { 602 boolean isDirectory = entry.type == DirectoryEntryType.DIRECTORY; 603 long lastModificationData = entry.lastModifiedTime.getTime(); 604 long lastAccessTime = entry.lastAccessTime.getTime(); 605 // set aclBit from ADLS backend response if 606 // ADL_SUPPORT_ACL_BIT_IN_FSPERMISSION is true. 607 final boolean aclBit = aclBitStatus ? entry.aclBit : false; 608 609 FsPermission permission = new AdlPermission(aclBit, 610 Short.valueOf(entry.permission, 8)); 611 String user = entry.user; 612 String group = entry.group; 613 614 FileStatus status; 615 if (overrideOwner) { 616 status = new FileStatus(entry.length, isDirectory, ADL_REPLICATION_FACTOR, 617 ADL_BLOCK_SIZE, lastModificationData, lastAccessTime, permission, 618 userName, "hdfs", this.makeQualified(f)); 619 } else { 620 status = new FileStatus(entry.length, isDirectory, ADL_REPLICATION_FACTOR, 621 ADL_BLOCK_SIZE, lastModificationData, lastAccessTime, permission, 622 user, group, this.makeQualified(f)); 623 } 624 625 return status; 626 } 627 628 /** 629 * Set owner of a path (i.e. a file or a directory). 630 * The parameters owner and group cannot both be null. 631 * 632 * @param path The path 633 * @param owner If it is null, the original username remains unchanged. 634 * @param group If it is null, the original groupname remains unchanged. 635 */ 636 @Override 637 public void setOwner(final Path path, final String owner, final String group) 638 throws IOException { 639 statistics.incrementWriteOps(1); 640 adlClient.setOwner(toRelativeFilePath(path), owner, group); 641 } 642 643 /** 644 * Set permission of a path. 645 * 646 * @param path The path 647 * @param permission Access permission 648 */ 649 @Override 650 public void setPermission(final Path path, final FsPermission permission) 651 throws IOException { 652 statistics.incrementWriteOps(1); 653 adlClient.setPermission(toRelativeFilePath(path), 654 Integer.toOctalString(permission.toShort())); 655 } 656 657 /** 658 * Modifies ACL entries of files and directories. This method can add new ACL 659 * entries or modify the permissions on existing ACL entries. All existing 660 * ACL entries that are not specified in this call are retained without 661 * changes. (Modifications are merged into the current ACL.) 662 * 663 * @param path Path to modify 664 * @param aclSpec List of AclEntry describing modifications 665 * @throws IOException if an ACL could not be modified 666 */ 667 @Override 668 public void modifyAclEntries(final Path path, final List<AclEntry> aclSpec) 669 throws IOException { 670 statistics.incrementWriteOps(1); 671 List<com.microsoft.azure.datalake.store.acl.AclEntry> msAclEntries = new 672 ArrayList<com.microsoft.azure.datalake.store.acl.AclEntry>(); 673 for (AclEntry aclEntry : aclSpec) { 674 msAclEntries.add(com.microsoft.azure.datalake.store.acl.AclEntry 675 .parseAclEntry(aclEntry.toString())); 676 } 677 adlClient.modifyAclEntries(toRelativeFilePath(path), msAclEntries); 678 } 679 680 /** 681 * Removes ACL entries from files and directories. Other ACL entries are 682 * retained. 683 * 684 * @param path Path to modify 685 * @param aclSpec List of AclEntry describing entries to remove 686 * @throws IOException if an ACL could not be modified 687 */ 688 @Override 689 public void removeAclEntries(final Path path, final List<AclEntry> aclSpec) 690 throws IOException { 691 statistics.incrementWriteOps(1); 692 List<com.microsoft.azure.datalake.store.acl.AclEntry> msAclEntries = new 693 ArrayList<com.microsoft.azure.datalake.store.acl.AclEntry>(); 694 for (AclEntry aclEntry : aclSpec) { 695 msAclEntries.add(com.microsoft.azure.datalake.store.acl.AclEntry 696 .parseAclEntry(aclEntry.toString(), true)); 697 } 698 adlClient.removeAclEntries(toRelativeFilePath(path), msAclEntries); 699 } 700 701 /** 702 * Removes all default ACL entries from files and directories. 703 * 704 * @param path Path to modify 705 * @throws IOException if an ACL could not be modified 706 */ 707 @Override 708 public void removeDefaultAcl(final Path path) throws IOException { 709 statistics.incrementWriteOps(1); 710 adlClient.removeDefaultAcls(toRelativeFilePath(path)); 711 } 712 713 /** 714 * Removes all but the base ACL entries of files and directories. The entries 715 * for user, group, and others are retained for compatibility with permission 716 * bits. 717 * 718 * @param path Path to modify 719 * @throws IOException if an ACL could not be removed 720 */ 721 @Override 722 public void removeAcl(final Path path) throws IOException { 723 statistics.incrementWriteOps(1); 724 adlClient.removeAllAcls(toRelativeFilePath(path)); 725 } 726 727 /** 728 * Fully replaces ACL of files and directories, discarding all existing 729 * entries. 730 * 731 * @param path Path to modify 732 * @param aclSpec List of AclEntry describing modifications, must include 733 * entries for user, group, and others for compatibility with 734 * permission bits. 735 * @throws IOException if an ACL could not be modified 736 */ 737 @Override 738 public void setAcl(final Path path, final List<AclEntry> aclSpec) 739 throws IOException { 740 statistics.incrementWriteOps(1); 741 List<com.microsoft.azure.datalake.store.acl.AclEntry> msAclEntries = new 742 ArrayList<com.microsoft.azure.datalake.store.acl.AclEntry>(); 743 for (AclEntry aclEntry : aclSpec) { 744 msAclEntries.add(com.microsoft.azure.datalake.store.acl.AclEntry 745 .parseAclEntry(aclEntry.toString())); 746 } 747 748 adlClient.setAcl(toRelativeFilePath(path), msAclEntries); 749 } 750 751 /** 752 * Gets the ACL of a file or directory. 753 * 754 * @param path Path to get 755 * @return AclStatus describing the ACL of the file or directory 756 * @throws IOException if an ACL could not be read 757 */ 758 @Override 759 public AclStatus getAclStatus(final Path path) throws IOException { 760 statistics.incrementReadOps(1); 761 com.microsoft.azure.datalake.store.acl.AclStatus adlStatus = 762 adlClient.getAclStatus(toRelativeFilePath(path), oidOrUpn); 763 AclStatus.Builder aclStatusBuilder = new AclStatus.Builder(); 764 aclStatusBuilder.owner(adlStatus.owner); 765 aclStatusBuilder.group(adlStatus.group); 766 aclStatusBuilder.setPermission( 767 new FsPermission(Short.valueOf(adlStatus.octalPermissions, 8))); 768 aclStatusBuilder.stickyBit(adlStatus.stickyBit); 769 String aclListString = com.microsoft.azure.datalake.store.acl.AclEntry 770 .aclListToString(adlStatus.aclSpec); 771 List<AclEntry> aclEntries = AclEntry.parseAclSpec(aclListString, true); 772 aclStatusBuilder.addEntries(aclEntries); 773 return aclStatusBuilder.build(); 774 } 775 776 /** 777 * Checks if the user can access a path. The mode specifies which access 778 * checks to perform. If the requested permissions are granted, then the 779 * method returns normally. If access is denied, then the method throws an 780 * {@link AccessControlException}. 781 * 782 * @param path Path to check 783 * @param mode type of access to check 784 * @throws AccessControlException if access is denied 785 * @throws java.io.FileNotFoundException if the path does not exist 786 * @throws IOException see specific implementation 787 */ 788 @Override 789 public void access(final Path path, FsAction mode) throws IOException { 790 statistics.incrementReadOps(1); 791 if (!adlClient.checkAccess(toRelativeFilePath(path), mode.SYMBOL)) { 792 throw new AccessControlException("Access Denied : " + path.toString()); 793 } 794 } 795 796 /** 797 * Return the {@link ContentSummary} of a given {@link Path}. 798 * 799 * @param f path to use 800 */ 801 @Override 802 public ContentSummary getContentSummary(Path f) throws IOException { 803 statistics.incrementReadOps(1); 804 com.microsoft.azure.datalake.store.ContentSummary msSummary = adlClient 805 .getContentSummary(toRelativeFilePath(f)); 806 return new Builder().length(msSummary.length) 807 .directoryCount(msSummary.directoryCount).fileCount(msSummary.fileCount) 808 .spaceConsumed(msSummary.spaceConsumed).build(); 809 } 810 811 @VisibleForTesting 812 protected String getTransportScheme() { 813 return SECURE_TRANSPORT_SCHEME; 814 } 815 816 @VisibleForTesting 817 String toRelativeFilePath(Path path) { 818 return path.makeQualified(uri, workingDirectory).toUri().getPath(); 819 } 820 821 /** 822 * Get the current working directory for the given file system. 823 * 824 * @return the directory pathname 825 */ 826 @Override 827 public Path getWorkingDirectory() { 828 return workingDirectory; 829 } 830 831 /** 832 * Set the current working directory for the given file system. All relative 833 * paths will be resolved relative to it. 834 * 835 * @param dir Working directory path. 836 */ 837 @Override 838 public void setWorkingDirectory(final Path dir) { 839 if (dir == null) { 840 throw new InvalidPathException("Working directory cannot be set to NULL"); 841 } 842 843 /** 844 * Do not validate the scheme and URI of the passsed parameter. When Adls 845 * runs as additional file system, working directory set has the default 846 * file system scheme and uri. 847 * 848 * Found a problem during PIG execution in 849 * https://github.com/apache/pig/blob/branch-0 850 * .15/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer 851 * /PigInputFormat.java#L235 852 * However similar problem would be present in other application so 853 * defaulting to build working directory using relative path only. 854 */ 855 this.workingDirectory = this.makeAbsolute(dir); 856 } 857 858 /** 859 * Return the number of bytes that large input files should be optimally 860 * be split into to minimize i/o time. 861 * 862 * @deprecated use {@link #getDefaultBlockSize(Path)} instead 863 */ 864 @Deprecated 865 public long getDefaultBlockSize() { 866 return ADL_BLOCK_SIZE; 867 } 868 869 /** 870 * Return the number of bytes that large input files should be optimally 871 * be split into to minimize i/o time. The given path will be used to 872 * locate the actual filesystem. The full path does not have to exist. 873 * 874 * @param f path of file 875 * @return the default block size for the path's filesystem 876 */ 877 public long getDefaultBlockSize(Path f) { 878 return getDefaultBlockSize(); 879 } 880 881 /** 882 * Get the block size. 883 * @param f the filename 884 * @return the number of bytes in a block 885 */ 886 /** 887 * @deprecated Use getFileStatus() instead 888 */ 889 @Deprecated 890 public long getBlockSize(Path f) throws IOException { 891 return ADL_BLOCK_SIZE; 892 } 893 894 @Override 895 public BlockLocation[] getFileBlockLocations(final FileStatus status, 896 final long offset, final long length) throws IOException { 897 if (status == null) { 898 return null; 899 } 900 901 if ((offset < 0) || (length < 0)) { 902 throw new IllegalArgumentException("Invalid start or len parameter"); 903 } 904 905 if (status.getLen() < offset) { 906 return new BlockLocation[0]; 907 } 908 909 final String[] name = {"localhost"}; 910 final String[] host = {"localhost"}; 911 long blockSize = ADL_BLOCK_SIZE; 912 int numberOfLocations = 913 (int) (length / blockSize) + ((length % blockSize == 0) ? 0 : 1); 914 BlockLocation[] locations = new BlockLocation[numberOfLocations]; 915 for (int i = 0; i < locations.length; i++) { 916 long currentOffset = offset + (i * blockSize); 917 long currentLength = Math.min(blockSize, offset + length - currentOffset); 918 locations[i] = new BlockLocation(name, host, currentOffset, 919 currentLength); 920 } 921 922 return locations; 923 } 924 925 @Override 926 public BlockLocation[] getFileBlockLocations(final Path p, final long offset, 927 final long length) throws IOException { 928 // read ops incremented in getFileStatus 929 FileStatus fileStatus = getFileStatus(p); 930 return getFileBlockLocations(fileStatus, offset, length); 931 } 932 933 /** 934 * Get replication. 935 * 936 * @param src file name 937 * @return file replication 938 * @deprecated Use getFileStatus() instead 939 */ 940 @Deprecated 941 public short getReplication(Path src) { 942 return ADL_REPLICATION_FACTOR; 943 } 944 945 private Path makeAbsolute(Path path) { 946 return path.isAbsolute() ? path : new Path(this.workingDirectory, path); 947 } 948 949 private static String getNonEmptyVal(Configuration conf, String key) { 950 String value = conf.get(key); 951 if (StringUtils.isEmpty(value)) { 952 throw new IllegalArgumentException( 953 "No value for " + key + " found in conf file."); 954 } 955 return value; 956 } 957 958 /** 959 * A wrapper of {@link Configuration#getPassword(String)}. It returns 960 * <code>String</code> instead of <code>char[]</code>. 961 * 962 * @param conf the configuration 963 * @param key the property key 964 * @return the password string 965 * @throws IOException if the password was not found 966 */ 967 private static String getPasswordString(Configuration conf, String key) 968 throws IOException { 969 char[] passchars = conf.getPassword(key); 970 if (passchars == null) { 971 throw new IOException("Password " + key + " not found"); 972 } 973 return new String(passchars); 974 } 975 976 @VisibleForTesting 977 public void setUserGroupRepresentationAsUPN(boolean enableUPN) { 978 oidOrUpn = enableUPN ? UserGroupRepresentation.UPN : 979 UserGroupRepresentation.OID; 980 } 981}