001/** 002res * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hdfs.web; 020 021import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_WEBHDFS_REST_CSRF_CUSTOM_HEADER_DEFAULT; 022import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_WEBHDFS_REST_CSRF_CUSTOM_HEADER_KEY; 023import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_WEBHDFS_REST_CSRF_ENABLED_DEFAULT; 024import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_WEBHDFS_REST_CSRF_ENABLED_KEY; 025import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_WEBHDFS_REST_CSRF_METHODS_TO_IGNORE_DEFAULT; 026import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_WEBHDFS_REST_CSRF_METHODS_TO_IGNORE_KEY; 027 028import java.io.BufferedInputStream; 029import java.io.BufferedOutputStream; 030import java.io.EOFException; 031import java.io.FileNotFoundException; 032import java.io.IOException; 033import java.io.InputStream; 034import java.lang.reflect.InvocationTargetException; 035import java.net.HttpURLConnection; 036import java.net.InetSocketAddress; 037import java.net.MalformedURLException; 038import java.net.URI; 039import java.net.URL; 040import java.security.PrivilegedExceptionAction; 041import java.util.ArrayList; 042import java.util.Collection; 043import java.util.EnumSet; 044import java.util.HashSet; 045import java.util.List; 046import java.util.Map; 047import java.util.Set; 048import java.util.StringTokenizer; 049 050import javax.ws.rs.core.HttpHeaders; 051import javax.ws.rs.core.MediaType; 052 053import org.apache.commons.io.IOUtils; 054import org.apache.commons.io.input.BoundedInputStream; 055import org.apache.hadoop.conf.Configuration; 056import org.apache.hadoop.fs.BlockLocation; 057import org.apache.hadoop.fs.CommonConfigurationKeys; 058import org.apache.hadoop.fs.ContentSummary; 059import org.apache.hadoop.fs.CreateFlag; 060import org.apache.hadoop.fs.DelegationTokenRenewer; 061import org.apache.hadoop.fs.FSDataInputStream; 062import org.apache.hadoop.fs.FSDataOutputStream; 063import org.apache.hadoop.fs.FSInputStream; 064import org.apache.hadoop.fs.FileStatus; 065import org.apache.hadoop.fs.FileSystem; 066import org.apache.hadoop.fs.GlobalStorageStatistics; 067import org.apache.hadoop.fs.GlobalStorageStatistics.StorageStatisticsProvider; 068import org.apache.hadoop.fs.StorageStatistics; 069import org.apache.hadoop.hdfs.DFSOpsCountStatistics; 070import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType; 071import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum; 072import org.apache.hadoop.fs.Options; 073import org.apache.hadoop.fs.Path; 074import org.apache.hadoop.fs.XAttrCodec; 075import org.apache.hadoop.fs.XAttrSetFlag; 076import org.apache.hadoop.fs.permission.AclEntry; 077import org.apache.hadoop.fs.permission.AclStatus; 078import org.apache.hadoop.fs.permission.FsAction; 079import org.apache.hadoop.fs.permission.FsPermission; 080import org.apache.hadoop.hdfs.DFSUtilClient; 081import org.apache.hadoop.hdfs.HAUtilClient; 082import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; 083import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; 084import org.apache.hadoop.hdfs.protocol.HdfsConstants; 085import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; 086import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; 087import org.apache.hadoop.hdfs.web.resources.*; 088import org.apache.hadoop.hdfs.web.resources.HttpOpParam.Op; 089import org.apache.hadoop.io.Text; 090import org.apache.hadoop.io.retry.RetryPolicies; 091import org.apache.hadoop.io.retry.RetryPolicy; 092import org.apache.hadoop.io.retry.RetryUtils; 093import org.apache.hadoop.ipc.RemoteException; 094import org.apache.hadoop.ipc.StandbyException; 095import org.apache.hadoop.net.NetUtils; 096import org.apache.hadoop.security.AccessControlException; 097import org.apache.hadoop.security.SecurityUtil; 098import org.apache.hadoop.security.UserGroupInformation; 099import org.apache.hadoop.security.token.SecretManager.InvalidToken; 100import org.apache.hadoop.security.token.Token; 101import org.apache.hadoop.security.token.TokenIdentifier; 102import org.apache.hadoop.security.token.TokenSelector; 103import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector; 104import org.apache.hadoop.util.Progressable; 105import org.apache.hadoop.util.StringUtils; 106import org.codehaus.jackson.map.ObjectMapper; 107import org.codehaus.jackson.map.ObjectReader; 108import org.slf4j.Logger; 109import org.slf4j.LoggerFactory; 110 111import com.google.common.annotations.VisibleForTesting; 112import com.google.common.base.Preconditions; 113import com.google.common.collect.Lists; 114 115/** A FileSystem for HDFS over the web. */ 116public class WebHdfsFileSystem extends FileSystem 117 implements DelegationTokenRenewer.Renewable, 118 TokenAspect.TokenManagementDelegator { 119 public static final Logger LOG = LoggerFactory 120 .getLogger(WebHdfsFileSystem.class); 121 /** WebHdfs version. */ 122 public static final int VERSION = 1; 123 /** Http URI: http://namenode:port/{PATH_PREFIX}/path/to/file */ 124 public static final String PATH_PREFIX = "/" + WebHdfsConstants.WEBHDFS_SCHEME 125 + "/v" + VERSION; 126 127 /** 128 * Default connection factory may be overridden in tests to use smaller 129 * timeout values 130 */ 131 protected URLConnectionFactory connectionFactory; 132 133 @VisibleForTesting 134 public static final String CANT_FALLBACK_TO_INSECURE_MSG = 135 "The client is configured to only allow connecting to secure cluster"; 136 137 private boolean canRefreshDelegationToken; 138 139 private UserGroupInformation ugi; 140 private URI uri; 141 private Token<?> delegationToken; 142 protected Text tokenServiceName; 143 private RetryPolicy retryPolicy = null; 144 private Path workingDir; 145 private Path cachedHomeDirectory; 146 private InetSocketAddress nnAddrs[]; 147 private int currentNNAddrIndex; 148 private boolean disallowFallbackToInsecureCluster; 149 private String restCsrfCustomHeader; 150 private Set<String> restCsrfMethodsToIgnore; 151 private static final ObjectReader READER = 152 new ObjectMapper().reader(Map.class); 153 154 private DFSOpsCountStatistics storageStatistics; 155 156 /** 157 * Return the protocol scheme for the FileSystem. 158 * <p/> 159 * 160 * @return <code>webhdfs</code> 161 */ 162 @Override 163 public String getScheme() { 164 return WebHdfsConstants.WEBHDFS_SCHEME; 165 } 166 167 /** 168 * return the underlying transport protocol (http / https). 169 */ 170 protected String getTransportScheme() { 171 return "http"; 172 } 173 174 protected Text getTokenKind() { 175 return WebHdfsConstants.WEBHDFS_TOKEN_KIND; 176 } 177 178 @Override 179 public synchronized void initialize(URI uri, Configuration conf 180 ) throws IOException { 181 super.initialize(uri, conf); 182 setConf(conf); 183 /** set user pattern based on configuration file */ 184 UserParam.setUserPattern(conf.get( 185 HdfsClientConfigKeys.DFS_WEBHDFS_USER_PATTERN_KEY, 186 HdfsClientConfigKeys.DFS_WEBHDFS_USER_PATTERN_DEFAULT)); 187 188 boolean isOAuth = conf.getBoolean( 189 HdfsClientConfigKeys.DFS_WEBHDFS_OAUTH_ENABLED_KEY, 190 HdfsClientConfigKeys.DFS_WEBHDFS_OAUTH_ENABLED_DEFAULT); 191 192 if(isOAuth) { 193 LOG.debug("Enabling OAuth2 in WebHDFS"); 194 connectionFactory = URLConnectionFactory 195 .newOAuth2URLConnectionFactory(conf); 196 } else { 197 LOG.debug("Not enabling OAuth2 in WebHDFS"); 198 connectionFactory = URLConnectionFactory 199 .newDefaultURLConnectionFactory(conf); 200 } 201 202 ugi = UserGroupInformation.getCurrentUser(); 203 this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); 204 this.nnAddrs = resolveNNAddr(); 205 206 boolean isHA = HAUtilClient.isClientFailoverConfigured(conf, this.uri); 207 boolean isLogicalUri = isHA && HAUtilClient.isLogicalUri(conf, this.uri); 208 // In non-HA or non-logical URI case, the code needs to call 209 // getCanonicalUri() in order to handle the case where no port is 210 // specified in the URI 211 this.tokenServiceName = isLogicalUri ? 212 HAUtilClient.buildTokenServiceForLogicalUri(uri, getScheme()) 213 : SecurityUtil.buildTokenService(getCanonicalUri()); 214 215 if (!isHA) { 216 this.retryPolicy = 217 RetryUtils.getDefaultRetryPolicy( 218 conf, 219 HdfsClientConfigKeys.HttpClient.RETRY_POLICY_ENABLED_KEY, 220 HdfsClientConfigKeys.HttpClient.RETRY_POLICY_ENABLED_DEFAULT, 221 HdfsClientConfigKeys.HttpClient.RETRY_POLICY_SPEC_KEY, 222 HdfsClientConfigKeys.HttpClient.RETRY_POLICY_SPEC_DEFAULT, 223 HdfsConstants.SAFEMODE_EXCEPTION_CLASS_NAME); 224 } else { 225 226 int maxFailoverAttempts = conf.getInt( 227 HdfsClientConfigKeys.HttpClient.FAILOVER_MAX_ATTEMPTS_KEY, 228 HdfsClientConfigKeys.HttpClient.FAILOVER_MAX_ATTEMPTS_DEFAULT); 229 int maxRetryAttempts = conf.getInt( 230 HdfsClientConfigKeys.HttpClient.RETRY_MAX_ATTEMPTS_KEY, 231 HdfsClientConfigKeys.HttpClient.RETRY_MAX_ATTEMPTS_DEFAULT); 232 int failoverSleepBaseMillis = conf.getInt( 233 HdfsClientConfigKeys.HttpClient.FAILOVER_SLEEPTIME_BASE_KEY, 234 HdfsClientConfigKeys.HttpClient.FAILOVER_SLEEPTIME_BASE_DEFAULT); 235 int failoverSleepMaxMillis = conf.getInt( 236 HdfsClientConfigKeys.HttpClient.FAILOVER_SLEEPTIME_MAX_KEY, 237 HdfsClientConfigKeys.HttpClient.FAILOVER_SLEEPTIME_MAX_DEFAULT); 238 239 this.retryPolicy = RetryPolicies 240 .failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL, 241 maxFailoverAttempts, maxRetryAttempts, failoverSleepBaseMillis, 242 failoverSleepMaxMillis); 243 } 244 245 this.workingDir = makeQualified(new Path(getHomeDirectoryString(ugi))); 246 this.canRefreshDelegationToken = UserGroupInformation.isSecurityEnabled(); 247 this.disallowFallbackToInsecureCluster = !conf.getBoolean( 248 CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, 249 CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT); 250 this.initializeRestCsrf(conf); 251 this.delegationToken = null; 252 253 storageStatistics = (DFSOpsCountStatistics) GlobalStorageStatistics.INSTANCE 254 .put(DFSOpsCountStatistics.NAME, 255 new StorageStatisticsProvider() { 256 @Override 257 public StorageStatistics provide() { 258 return new DFSOpsCountStatistics(); 259 } 260 }); 261 } 262 263 /** 264 * Initializes client-side handling of cross-site request forgery (CSRF) 265 * protection by figuring out the custom HTTP headers that need to be sent in 266 * requests and which HTTP methods are ignored because they do not require 267 * CSRF protection. 268 * 269 * @param conf configuration to read 270 */ 271 private void initializeRestCsrf(Configuration conf) { 272 if (conf.getBoolean(DFS_WEBHDFS_REST_CSRF_ENABLED_KEY, 273 DFS_WEBHDFS_REST_CSRF_ENABLED_DEFAULT)) { 274 this.restCsrfCustomHeader = conf.getTrimmed( 275 DFS_WEBHDFS_REST_CSRF_CUSTOM_HEADER_KEY, 276 DFS_WEBHDFS_REST_CSRF_CUSTOM_HEADER_DEFAULT); 277 this.restCsrfMethodsToIgnore = new HashSet<>(); 278 this.restCsrfMethodsToIgnore.addAll(getTrimmedStringList(conf, 279 DFS_WEBHDFS_REST_CSRF_METHODS_TO_IGNORE_KEY, 280 DFS_WEBHDFS_REST_CSRF_METHODS_TO_IGNORE_DEFAULT)); 281 } else { 282 this.restCsrfCustomHeader = null; 283 this.restCsrfMethodsToIgnore = null; 284 } 285 } 286 287 /** 288 * Returns a list of strings from a comma-delimited configuration value. 289 * 290 * @param conf configuration to check 291 * @param name configuration property name 292 * @param defaultValue default value if no value found for name 293 * @return list of strings from comma-delimited configuration value, or an 294 * empty list if not found 295 */ 296 private static List<String> getTrimmedStringList(Configuration conf, 297 String name, String defaultValue) { 298 String valueString = conf.get(name, defaultValue); 299 if (valueString == null) { 300 return new ArrayList<>(); 301 } 302 return new ArrayList<>(StringUtils.getTrimmedStringCollection(valueString)); 303 } 304 305 @Override 306 public URI getCanonicalUri() { 307 return super.getCanonicalUri(); 308 } 309 310 /** Is WebHDFS enabled in conf? */ 311 public static boolean isEnabled(final Configuration conf) { 312 final boolean b = conf.getBoolean( 313 HdfsClientConfigKeys.DFS_WEBHDFS_ENABLED_KEY, 314 HdfsClientConfigKeys.DFS_WEBHDFS_ENABLED_DEFAULT); 315 return b; 316 } 317 318 TokenSelector<DelegationTokenIdentifier> tokenSelector = 319 new AbstractDelegationTokenSelector<DelegationTokenIdentifier>(getTokenKind()){}; 320 321 // the first getAuthParams() for a non-token op will either get the 322 // internal token from the ugi or lazy fetch one 323 protected synchronized Token<?> getDelegationToken() throws IOException { 324 if (canRefreshDelegationToken && delegationToken == null) { 325 Token<?> token = tokenSelector.selectToken( 326 new Text(getCanonicalServiceName()), ugi.getTokens()); 327 // ugi tokens are usually indicative of a task which can't 328 // refetch tokens. even if ugi has credentials, don't attempt 329 // to get another token to match hdfs/rpc behavior 330 if (token != null) { 331 LOG.debug("Using UGI token: {}", token); 332 canRefreshDelegationToken = false; 333 } else { 334 token = getDelegationToken(null); 335 if (token != null) { 336 LOG.debug("Fetched new token: {}", token); 337 } else { // security is disabled 338 canRefreshDelegationToken = false; 339 } 340 } 341 setDelegationToken(token); 342 } 343 return delegationToken; 344 } 345 346 @VisibleForTesting 347 synchronized boolean replaceExpiredDelegationToken() throws IOException { 348 boolean replaced = false; 349 if (canRefreshDelegationToken) { 350 Token<?> token = getDelegationToken(null); 351 LOG.debug("Replaced expired token: {}", token); 352 setDelegationToken(token); 353 replaced = (token != null); 354 } 355 return replaced; 356 } 357 358 @Override 359 @VisibleForTesting 360 public int getDefaultPort() { 361 return getConf().getInt(HdfsClientConfigKeys.DFS_NAMENODE_HTTP_PORT_KEY, 362 HdfsClientConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT); 363 } 364 365 @Override 366 public URI getUri() { 367 return this.uri; 368 } 369 370 @Override 371 protected URI canonicalizeUri(URI uri) { 372 return NetUtils.getCanonicalUri(uri, getDefaultPort()); 373 } 374 375 /** @return the home directory */ 376 @Deprecated 377 public static String getHomeDirectoryString(final UserGroupInformation ugi) { 378 return "/user/" + ugi.getShortUserName(); 379 } 380 381 @Override 382 public Path getHomeDirectory() { 383 if (cachedHomeDirectory == null) { 384 final HttpOpParam.Op op = GetOpParam.Op.GETHOMEDIRECTORY; 385 try { 386 String pathFromDelegatedFS = new FsPathResponseRunner<String>(op, null, 387 new UserParam(ugi)) { 388 @Override 389 String decodeResponse(Map<?, ?> json) throws IOException { 390 return JsonUtilClient.getPath(json); 391 } 392 } .run(); 393 394 cachedHomeDirectory = new Path(pathFromDelegatedFS).makeQualified( 395 this.getUri(), null); 396 397 } catch (IOException e) { 398 LOG.error("Unable to get HomeDirectory from original File System", e); 399 cachedHomeDirectory = new Path("/user/" + ugi.getShortUserName()) 400 .makeQualified(this.getUri(), null); 401 } 402 } 403 return cachedHomeDirectory; 404 } 405 406 @Override 407 public synchronized Path getWorkingDirectory() { 408 return workingDir; 409 } 410 411 @Override 412 public synchronized void setWorkingDirectory(final Path dir) { 413 Path absolutePath = makeAbsolute(dir); 414 String result = absolutePath.toUri().getPath(); 415 if (!DFSUtilClient.isValidName(result)) { 416 throw new IllegalArgumentException("Invalid DFS directory name " + 417 result); 418 } 419 workingDir = absolutePath; 420 } 421 422 private Path makeAbsolute(Path f) { 423 return f.isAbsolute()? f: new Path(workingDir, f); 424 } 425 426 static Map<?, ?> jsonParse(final HttpURLConnection c, 427 final boolean useErrorStream) throws IOException { 428 if (c.getContentLength() == 0) { 429 return null; 430 } 431 final InputStream in = useErrorStream ? 432 c.getErrorStream() : c.getInputStream(); 433 if (in == null) { 434 throw new IOException("The " + (useErrorStream? "error": "input") + 435 " stream is null."); 436 } 437 try { 438 final String contentType = c.getContentType(); 439 if (contentType != null) { 440 final MediaType parsed = MediaType.valueOf(contentType); 441 if (!MediaType.APPLICATION_JSON_TYPE.isCompatible(parsed)) { 442 throw new IOException("Content-Type \"" + contentType 443 + "\" is incompatible with \"" + MediaType.APPLICATION_JSON 444 + "\" (parsed=\"" + parsed + "\")"); 445 } 446 } 447 return READER.readValue(in); 448 } finally { 449 in.close(); 450 } 451 } 452 453 private static Map<?, ?> validateResponse(final HttpOpParam.Op op, 454 final HttpURLConnection conn, boolean unwrapException) 455 throws IOException { 456 final int code = conn.getResponseCode(); 457 // server is demanding an authentication we don't support 458 if (code == HttpURLConnection.HTTP_UNAUTHORIZED) { 459 // match hdfs/rpc exception 460 throw new AccessControlException(conn.getResponseMessage()); 461 } 462 if (code != op.getExpectedHttpResponseCode()) { 463 final Map<?, ?> m; 464 try { 465 m = jsonParse(conn, true); 466 } catch(Exception e) { 467 throw new IOException("Unexpected HTTP response: code=" + code + " != " 468 + op.getExpectedHttpResponseCode() + ", " + op.toQueryString() 469 + ", message=" + conn.getResponseMessage(), e); 470 } 471 472 if (m == null) { 473 throw new IOException("Unexpected HTTP response: code=" + code + " != " 474 + op.getExpectedHttpResponseCode() + ", " + op.toQueryString() 475 + ", message=" + conn.getResponseMessage()); 476 } else if (m.get(RemoteException.class.getSimpleName()) == null) { 477 return m; 478 } 479 480 IOException re = JsonUtilClient.toRemoteException(m); 481 482 //check if exception is due to communication with a Standby name node 483 if (re.getMessage() != null && re.getMessage().endsWith( 484 StandbyException.class.getSimpleName())) { 485 LOG.trace("Detected StandbyException", re); 486 throw new IOException(re); 487 } 488 // extract UGI-related exceptions and unwrap InvalidToken 489 // the NN mangles these exceptions but the DN does not and may need 490 // to re-fetch a token if either report the token is expired 491 if (re.getMessage() != null && re.getMessage().startsWith( 492 SecurityUtil.FAILED_TO_GET_UGI_MSG_HEADER)) { 493 String[] parts = re.getMessage().split(":\\s+", 3); 494 re = new RemoteException(parts[1], parts[2]); 495 re = ((RemoteException)re).unwrapRemoteException(InvalidToken.class); 496 } 497 throw unwrapException? toIOException(re): re; 498 } 499 return null; 500 } 501 502 /** 503 * Covert an exception to an IOException. 504 * 505 * For a non-IOException, wrap it with IOException. 506 * For a RemoteException, unwrap it. 507 * For an IOException which is not a RemoteException, return it. 508 */ 509 private static IOException toIOException(Exception e) { 510 if (!(e instanceof IOException)) { 511 return new IOException(e); 512 } 513 514 final IOException ioe = (IOException)e; 515 if (!(ioe instanceof RemoteException)) { 516 return ioe; 517 } 518 519 return ((RemoteException)ioe).unwrapRemoteException(); 520 } 521 522 private synchronized InetSocketAddress getCurrentNNAddr() { 523 return nnAddrs[currentNNAddrIndex]; 524 } 525 526 /** 527 * Reset the appropriate state to gracefully fail over to another name node 528 */ 529 private synchronized void resetStateToFailOver() { 530 currentNNAddrIndex = (currentNNAddrIndex + 1) % nnAddrs.length; 531 } 532 533 /** 534 * Return a URL pointing to given path on the namenode. 535 * 536 * @param path to obtain the URL for 537 * @param query string to append to the path 538 * @return namenode URL referring to the given path 539 * @throws IOException on error constructing the URL 540 */ 541 private URL getNamenodeURL(String path, String query) throws IOException { 542 InetSocketAddress nnAddr = getCurrentNNAddr(); 543 final URL url = new URL(getTransportScheme(), nnAddr.getHostName(), 544 nnAddr.getPort(), path + '?' + query); 545 LOG.trace("url={}", url); 546 return url; 547 } 548 549 Param<?,?>[] getAuthParameters(final HttpOpParam.Op op) throws IOException { 550 List<Param<?,?>> authParams = Lists.newArrayList(); 551 // Skip adding delegation token for token operations because these 552 // operations require authentication. 553 Token<?> token = null; 554 if (!op.getRequireAuth()) { 555 token = getDelegationToken(); 556 } 557 if (token != null) { 558 authParams.add(new DelegationParam(token.encodeToUrlString())); 559 } else { 560 UserGroupInformation userUgi = ugi; 561 UserGroupInformation realUgi = userUgi.getRealUser(); 562 if (realUgi != null) { // proxy user 563 authParams.add(new DoAsParam(userUgi.getShortUserName())); 564 userUgi = realUgi; 565 } 566 authParams.add(new UserParam(userUgi.getShortUserName())); 567 } 568 return authParams.toArray(new Param<?,?>[0]); 569 } 570 571 URL toUrl(final HttpOpParam.Op op, final Path fspath, 572 final Param<?,?>... parameters) throws IOException { 573 //initialize URI path and query 574 final String path = PATH_PREFIX 575 + (fspath == null? "/": makeQualified(fspath).toUri().getRawPath()); 576 final String query = op.toQueryString() 577 + Param.toSortedString("&", getAuthParameters(op)) 578 + Param.toSortedString("&", parameters); 579 final URL url = getNamenodeURL(path, query); 580 LOG.trace("url={}", url); 581 return url; 582 } 583 584 /** 585 * This class is for initialing a HTTP connection, connecting to server, 586 * obtaining a response, and also handling retry on failures. 587 */ 588 abstract class AbstractRunner<T> { 589 abstract protected URL getUrl() throws IOException; 590 591 protected final HttpOpParam.Op op; 592 private final boolean redirected; 593 protected ExcludeDatanodesParam excludeDatanodes = 594 new ExcludeDatanodesParam(""); 595 596 private boolean checkRetry; 597 private String redirectHost; 598 599 protected AbstractRunner(final HttpOpParam.Op op, boolean redirected) { 600 this.op = op; 601 this.redirected = redirected; 602 } 603 604 T run() throws IOException { 605 UserGroupInformation connectUgi = ugi.getRealUser(); 606 if (connectUgi == null) { 607 connectUgi = ugi; 608 } 609 if (op.getRequireAuth()) { 610 connectUgi.checkTGTAndReloginFromKeytab(); 611 } 612 try { 613 // the entire lifecycle of the connection must be run inside the 614 // doAs to ensure authentication is performed correctly 615 return connectUgi.doAs( 616 new PrivilegedExceptionAction<T>() { 617 @Override 618 public T run() throws IOException { 619 return runWithRetry(); 620 } 621 }); 622 } catch (InterruptedException e) { 623 throw new IOException(e); 624 } 625 } 626 627 /** 628 * Two-step requests redirected to a DN 629 * 630 * Create/Append: 631 * Step 1) Submit a Http request with neither auto-redirect nor data. 632 * Step 2) Submit another Http request with the URL from the Location header 633 * with data. 634 * 635 * The reason of having two-step create/append is for preventing clients to 636 * send out the data before the redirect. This issue is addressed by the 637 * "Expect: 100-continue" header in HTTP/1.1; see RFC 2616, Section 8.2.3. 638 * Unfortunately, there are software library bugs (e.g. Jetty 6 http server 639 * and Java 6 http client), which do not correctly implement "Expect: 640 * 100-continue". The two-step create/append is a temporary workaround for 641 * the software library bugs. 642 * 643 * Open/Checksum 644 * Also implements two-step connects for other operations redirected to 645 * a DN such as open and checksum 646 */ 647 protected HttpURLConnection connect(URL url) throws IOException { 648 //redirect hostname and port 649 redirectHost = null; 650 651 652 // resolve redirects for a DN operation unless already resolved 653 if (op.getRedirect() && !redirected) { 654 final HttpOpParam.Op redirectOp = 655 HttpOpParam.TemporaryRedirectOp.valueOf(op); 656 final HttpURLConnection conn = connect(redirectOp, url); 657 // application level proxy like httpfs might not issue a redirect 658 if (conn.getResponseCode() == op.getExpectedHttpResponseCode()) { 659 return conn; 660 } 661 try { 662 validateResponse(redirectOp, conn, false); 663 url = new URL(conn.getHeaderField("Location")); 664 redirectHost = url.getHost() + ":" + url.getPort(); 665 } finally { 666 // TODO: consider not calling conn.disconnect() to allow connection reuse 667 // See http://tinyurl.com/java7-http-keepalive 668 conn.disconnect(); 669 } 670 } 671 try { 672 return connect(op, url); 673 } catch (IOException ioe) { 674 if (redirectHost != null) { 675 if (excludeDatanodes.getValue() != null) { 676 excludeDatanodes = new ExcludeDatanodesParam(redirectHost + "," 677 + excludeDatanodes.getValue()); 678 } else { 679 excludeDatanodes = new ExcludeDatanodesParam(redirectHost); 680 } 681 } 682 throw ioe; 683 } 684 } 685 686 private HttpURLConnection connect(final HttpOpParam.Op op, final URL url) 687 throws IOException { 688 final HttpURLConnection conn = 689 (HttpURLConnection)connectionFactory.openConnection(url); 690 final boolean doOutput = op.getDoOutput(); 691 conn.setRequestMethod(op.getType().toString()); 692 conn.setInstanceFollowRedirects(false); 693 if (restCsrfCustomHeader != null && 694 !restCsrfMethodsToIgnore.contains(op.getType().name())) { 695 // The value of the header is unimportant. Only its presence matters. 696 conn.setRequestProperty(restCsrfCustomHeader, "\"\""); 697 } 698 switch (op.getType()) { 699 // if not sending a message body for a POST or PUT operation, need 700 // to ensure the server/proxy knows this 701 case POST: 702 case PUT: { 703 conn.setDoOutput(true); 704 if (!doOutput) { 705 // explicitly setting content-length to 0 won't do spnego!! 706 // opening and closing the stream will send "Content-Length: 0" 707 conn.getOutputStream().close(); 708 } else { 709 conn.setRequestProperty("Content-Type", 710 MediaType.APPLICATION_OCTET_STREAM); 711 conn.setChunkedStreamingMode(32 << 10); //32kB-chunk 712 } 713 break; 714 } 715 default: 716 conn.setDoOutput(doOutput); 717 break; 718 } 719 conn.connect(); 720 return conn; 721 } 722 723 private T runWithRetry() throws IOException { 724 /** 725 * Do the real work. 726 * 727 * There are three cases that the code inside the loop can throw an 728 * IOException: 729 * 730 * <ul> 731 * <li>The connection has failed (e.g., ConnectException, 732 * @see FailoverOnNetworkExceptionRetry for more details)</li> 733 * <li>The namenode enters the standby state (i.e., StandbyException).</li> 734 * <li>The server returns errors for the command (i.e., RemoteException)</li> 735 * </ul> 736 * 737 * The call to shouldRetry() will conduct the retry policy. The policy 738 * examines the exception and swallows it if it decides to rerun the work. 739 */ 740 for(int retry = 0; ; retry++) { 741 checkRetry = !redirected; 742 final URL url = getUrl(); 743 try { 744 final HttpURLConnection conn = connect(url); 745 // output streams will validate on close 746 if (!op.getDoOutput()) { 747 validateResponse(op, conn, false); 748 } 749 return getResponse(conn); 750 } catch (AccessControlException ace) { 751 // no retries for auth failures 752 throw ace; 753 } catch (InvalidToken it) { 754 // try to replace the expired token with a new one. the attempt 755 // to acquire a new token must be outside this operation's retry 756 // so if it fails after its own retries, this operation fails too. 757 if (op.getRequireAuth() || !replaceExpiredDelegationToken()) { 758 throw it; 759 } 760 } catch (IOException ioe) { 761 // Attempt to include the redirected node in the exception. If the 762 // attempt to recreate the exception fails, just use the original. 763 String node = redirectHost; 764 if (node == null) { 765 node = url.getAuthority(); 766 } 767 try { 768 IOException newIoe = ioe.getClass().getConstructor(String.class) 769 .newInstance(node + ": " + ioe.getMessage()); 770 newIoe.setStackTrace(ioe.getStackTrace()); 771 ioe = newIoe; 772 } catch (NoSuchMethodException | SecurityException 773 | InstantiationException | IllegalAccessException 774 | IllegalArgumentException | InvocationTargetException e) { 775 } 776 shouldRetry(ioe, retry); 777 } 778 } 779 } 780 781 private void shouldRetry(final IOException ioe, final int retry 782 ) throws IOException { 783 InetSocketAddress nnAddr = getCurrentNNAddr(); 784 if (checkRetry) { 785 try { 786 final RetryPolicy.RetryAction a = retryPolicy.shouldRetry( 787 ioe, retry, 0, true); 788 789 boolean isRetry = 790 a.action == RetryPolicy.RetryAction.RetryDecision.RETRY; 791 boolean isFailoverAndRetry = 792 a.action == RetryPolicy.RetryAction.RetryDecision.FAILOVER_AND_RETRY; 793 794 if (isRetry || isFailoverAndRetry) { 795 LOG.info("Retrying connect to namenode: {}. Already tried {}" 796 + " time(s); retry policy is {}, delay {}ms.", 797 nnAddr, retry, retryPolicy, a.delayMillis); 798 799 if (isFailoverAndRetry) { 800 resetStateToFailOver(); 801 } 802 803 Thread.sleep(a.delayMillis); 804 return; 805 } 806 } catch(Exception e) { 807 LOG.warn("Original exception is ", ioe); 808 throw toIOException(e); 809 } 810 } 811 throw toIOException(ioe); 812 } 813 814 abstract T getResponse(HttpURLConnection conn) throws IOException; 815 } 816 817 /** 818 * Abstract base class to handle path-based operations with params 819 */ 820 abstract class AbstractFsPathRunner<T> extends AbstractRunner<T> { 821 private final Path fspath; 822 private Param<?,?>[] parameters; 823 824 AbstractFsPathRunner(final HttpOpParam.Op op, final Path fspath, 825 Param<?,?>... parameters) { 826 super(op, false); 827 this.fspath = fspath; 828 this.parameters = parameters; 829 } 830 831 AbstractFsPathRunner(final HttpOpParam.Op op, Param<?,?>[] parameters, 832 final Path fspath) { 833 super(op, false); 834 this.fspath = fspath; 835 this.parameters = parameters; 836 } 837 838 protected void updateURLParameters(Param<?, ?>... p) { 839 this.parameters = p; 840 } 841 842 @Override 843 protected URL getUrl() throws IOException { 844 if (excludeDatanodes.getValue() != null) { 845 Param<?, ?>[] tmpParam = new Param<?, ?>[parameters.length + 1]; 846 System.arraycopy(parameters, 0, tmpParam, 0, parameters.length); 847 tmpParam[parameters.length] = excludeDatanodes; 848 return toUrl(op, fspath, tmpParam); 849 } else { 850 return toUrl(op, fspath, parameters); 851 } 852 } 853 } 854 855 /** 856 * Default path-based implementation expects no json response 857 */ 858 class FsPathRunner extends AbstractFsPathRunner<Void> { 859 FsPathRunner(Op op, Path fspath, Param<?,?>... parameters) { 860 super(op, fspath, parameters); 861 } 862 863 @Override 864 Void getResponse(HttpURLConnection conn) throws IOException { 865 return null; 866 } 867 } 868 869 /** 870 * Handle path-based operations with a json response 871 */ 872 abstract class FsPathResponseRunner<T> extends AbstractFsPathRunner<T> { 873 FsPathResponseRunner(final HttpOpParam.Op op, final Path fspath, 874 Param<?,?>... parameters) { 875 super(op, fspath, parameters); 876 } 877 878 FsPathResponseRunner(final HttpOpParam.Op op, Param<?,?>[] parameters, 879 final Path fspath) { 880 super(op, parameters, fspath); 881 } 882 883 @Override 884 final T getResponse(HttpURLConnection conn) throws IOException { 885 try { 886 final Map<?,?> json = jsonParse(conn, false); 887 if (json == null) { 888 // match exception class thrown by parser 889 throw new IllegalStateException("Missing response"); 890 } 891 return decodeResponse(json); 892 } catch (IOException ioe) { 893 throw ioe; 894 } catch (Exception e) { // catch json parser errors 895 final IOException ioe = 896 new IOException("Response decoding failure: "+e.toString(), e); 897 LOG.debug("Response decoding failure.", e); 898 throw ioe; 899 } finally { 900 // Don't call conn.disconnect() to allow connection reuse 901 // See http://tinyurl.com/java7-http-keepalive 902 conn.getInputStream().close(); 903 } 904 } 905 906 abstract T decodeResponse(Map<?,?> json) throws IOException; 907 } 908 909 /** 910 * Handle path-based operations with json boolean response 911 */ 912 class FsPathBooleanRunner extends FsPathResponseRunner<Boolean> { 913 FsPathBooleanRunner(Op op, Path fspath, Param<?,?>... parameters) { 914 super(op, fspath, parameters); 915 } 916 917 @Override 918 Boolean decodeResponse(Map<?,?> json) throws IOException { 919 return (Boolean)json.get("boolean"); 920 } 921 } 922 923 /** 924 * Handle create/append output streams 925 */ 926 class FsPathOutputStreamRunner 927 extends AbstractFsPathRunner<FSDataOutputStream> { 928 private final int bufferSize; 929 930 FsPathOutputStreamRunner(Op op, Path fspath, int bufferSize, 931 Param<?,?>... parameters) { 932 super(op, fspath, parameters); 933 this.bufferSize = bufferSize; 934 } 935 936 @Override 937 FSDataOutputStream getResponse(final HttpURLConnection conn) 938 throws IOException { 939 return new FSDataOutputStream(new BufferedOutputStream( 940 conn.getOutputStream(), bufferSize), statistics) { 941 @Override 942 public void close() throws IOException { 943 try { 944 super.close(); 945 } finally { 946 try { 947 validateResponse(op, conn, true); 948 } finally { 949 // This is a connection to DataNode. Let's disconnect since 950 // there is little chance that the connection will be reused 951 // any time soonl 952 conn.disconnect(); 953 } 954 } 955 } 956 }; 957 } 958 } 959 960 class FsPathConnectionRunner extends AbstractFsPathRunner<HttpURLConnection> { 961 FsPathConnectionRunner(Op op, Path fspath, Param<?,?>... parameters) { 962 super(op, fspath, parameters); 963 } 964 @Override 965 HttpURLConnection getResponse(final HttpURLConnection conn) 966 throws IOException { 967 return conn; 968 } 969 } 970 971 /** 972 * Used by open() which tracks the resolved url itself 973 */ 974 final class URLRunner extends AbstractRunner<HttpURLConnection> { 975 private final URL url; 976 @Override 977 protected URL getUrl() { 978 return url; 979 } 980 981 protected URLRunner(final HttpOpParam.Op op, final URL url, 982 boolean redirected) { 983 super(op, redirected); 984 this.url = url; 985 } 986 987 @Override 988 HttpURLConnection getResponse(HttpURLConnection conn) throws IOException { 989 return conn; 990 } 991 } 992 993 private FsPermission applyUMask(FsPermission permission) { 994 if (permission == null) { 995 permission = FsPermission.getDefault(); 996 } 997 return permission.applyUMask(FsPermission.getUMask(getConf())); 998 } 999 1000 private HdfsFileStatus getHdfsFileStatus(Path f) throws IOException { 1001 final HttpOpParam.Op op = GetOpParam.Op.GETFILESTATUS; 1002 HdfsFileStatus status = new FsPathResponseRunner<HdfsFileStatus>(op, f) { 1003 @Override 1004 HdfsFileStatus decodeResponse(Map<?,?> json) { 1005 return JsonUtilClient.toFileStatus(json, true); 1006 } 1007 }.run(); 1008 if (status == null) { 1009 throw new FileNotFoundException("File does not exist: " + f); 1010 } 1011 return status; 1012 } 1013 1014 @Override 1015 public FileStatus getFileStatus(Path f) throws IOException { 1016 statistics.incrementReadOps(1); 1017 storageStatistics.incrementOpCounter(OpType.GET_FILE_STATUS); 1018 return makeQualified(getHdfsFileStatus(f), f); 1019 } 1020 1021 private FileStatus makeQualified(HdfsFileStatus f, Path parent) { 1022 return new FileStatus(f.getLen(), f.isDir(), f.getReplication(), 1023 f.getBlockSize(), f.getModificationTime(), f.getAccessTime(), 1024 f.getPermission(), f.getOwner(), f.getGroup(), 1025 f.isSymlink() ? new Path(f.getSymlink()) : null, 1026 f.getFullPath(parent).makeQualified(getUri(), getWorkingDirectory())); 1027 } 1028 1029 @Override 1030 public AclStatus getAclStatus(Path f) throws IOException { 1031 final HttpOpParam.Op op = GetOpParam.Op.GETACLSTATUS; 1032 AclStatus status = new FsPathResponseRunner<AclStatus>(op, f) { 1033 @Override 1034 AclStatus decodeResponse(Map<?,?> json) { 1035 return JsonUtilClient.toAclStatus(json); 1036 } 1037 }.run(); 1038 if (status == null) { 1039 throw new FileNotFoundException("File does not exist: " + f); 1040 } 1041 return status; 1042 } 1043 1044 @Override 1045 public boolean mkdirs(Path f, FsPermission permission) throws IOException { 1046 statistics.incrementWriteOps(1); 1047 storageStatistics.incrementOpCounter(OpType.MKDIRS); 1048 final HttpOpParam.Op op = PutOpParam.Op.MKDIRS; 1049 return new FsPathBooleanRunner(op, f, 1050 new PermissionParam(applyUMask(permission)) 1051 ).run(); 1052 } 1053 1054 /** 1055 * Create a symlink pointing to the destination path. 1056 */ 1057 public void createSymlink(Path destination, Path f, boolean createParent 1058 ) throws IOException { 1059 statistics.incrementWriteOps(1); 1060 storageStatistics.incrementOpCounter(OpType.CREATE_SYM_LINK); 1061 final HttpOpParam.Op op = PutOpParam.Op.CREATESYMLINK; 1062 new FsPathRunner(op, f, 1063 new DestinationParam(makeQualified(destination).toUri().getPath()), 1064 new CreateParentParam(createParent) 1065 ).run(); 1066 } 1067 1068 @Override 1069 public boolean rename(final Path src, final Path dst) throws IOException { 1070 statistics.incrementWriteOps(1); 1071 storageStatistics.incrementOpCounter(OpType.RENAME); 1072 final HttpOpParam.Op op = PutOpParam.Op.RENAME; 1073 return new FsPathBooleanRunner(op, src, 1074 new DestinationParam(makeQualified(dst).toUri().getPath()) 1075 ).run(); 1076 } 1077 1078 @SuppressWarnings("deprecation") 1079 @Override 1080 public void rename(final Path src, final Path dst, 1081 final Options.Rename... options) throws IOException { 1082 statistics.incrementWriteOps(1); 1083 storageStatistics.incrementOpCounter(OpType.RENAME); 1084 final HttpOpParam.Op op = PutOpParam.Op.RENAME; 1085 new FsPathRunner(op, src, 1086 new DestinationParam(makeQualified(dst).toUri().getPath()), 1087 new RenameOptionSetParam(options) 1088 ).run(); 1089 } 1090 1091 @Override 1092 public void setXAttr(Path p, String name, byte[] value, 1093 EnumSet<XAttrSetFlag> flag) throws IOException { 1094 statistics.incrementWriteOps(1); 1095 storageStatistics.incrementOpCounter(OpType.SET_XATTR); 1096 final HttpOpParam.Op op = PutOpParam.Op.SETXATTR; 1097 if (value != null) { 1098 new FsPathRunner(op, p, new XAttrNameParam(name), new XAttrValueParam( 1099 XAttrCodec.encodeValue(value, XAttrCodec.HEX)), 1100 new XAttrSetFlagParam(flag)).run(); 1101 } else { 1102 new FsPathRunner(op, p, new XAttrNameParam(name), 1103 new XAttrSetFlagParam(flag)).run(); 1104 } 1105 } 1106 1107 @Override 1108 public byte[] getXAttr(Path p, final String name) throws IOException { 1109 statistics.incrementReadOps(1); 1110 storageStatistics.incrementOpCounter(OpType.GET_XATTR); 1111 final HttpOpParam.Op op = GetOpParam.Op.GETXATTRS; 1112 return new FsPathResponseRunner<byte[]>(op, p, new XAttrNameParam(name), 1113 new XAttrEncodingParam(XAttrCodec.HEX)) { 1114 @Override 1115 byte[] decodeResponse(Map<?, ?> json) throws IOException { 1116 return JsonUtilClient.getXAttr(json); 1117 } 1118 }.run(); 1119 } 1120 1121 @Override 1122 public Map<String, byte[]> getXAttrs(Path p) throws IOException { 1123 final HttpOpParam.Op op = GetOpParam.Op.GETXATTRS; 1124 return new FsPathResponseRunner<Map<String, byte[]>>(op, p, 1125 new XAttrEncodingParam(XAttrCodec.HEX)) { 1126 @Override 1127 Map<String, byte[]> decodeResponse(Map<?, ?> json) throws IOException { 1128 return JsonUtilClient.toXAttrs(json); 1129 } 1130 }.run(); 1131 } 1132 1133 @Override 1134 public Map<String, byte[]> getXAttrs(Path p, final List<String> names) 1135 throws IOException { 1136 Preconditions.checkArgument(names != null && !names.isEmpty(), 1137 "XAttr names cannot be null or empty."); 1138 Param<?,?>[] parameters = new Param<?,?>[names.size() + 1]; 1139 for (int i = 0; i < parameters.length - 1; i++) { 1140 parameters[i] = new XAttrNameParam(names.get(i)); 1141 } 1142 parameters[parameters.length - 1] = new XAttrEncodingParam(XAttrCodec.HEX); 1143 1144 final HttpOpParam.Op op = GetOpParam.Op.GETXATTRS; 1145 return new FsPathResponseRunner<Map<String, byte[]>>(op, parameters, p) { 1146 @Override 1147 Map<String, byte[]> decodeResponse(Map<?, ?> json) throws IOException { 1148 return JsonUtilClient.toXAttrs(json); 1149 } 1150 }.run(); 1151 } 1152 1153 @Override 1154 public List<String> listXAttrs(Path p) throws IOException { 1155 final HttpOpParam.Op op = GetOpParam.Op.LISTXATTRS; 1156 return new FsPathResponseRunner<List<String>>(op, p) { 1157 @Override 1158 List<String> decodeResponse(Map<?, ?> json) throws IOException { 1159 return JsonUtilClient.toXAttrNames(json); 1160 } 1161 }.run(); 1162 } 1163 1164 @Override 1165 public void removeXAttr(Path p, String name) throws IOException { 1166 statistics.incrementWriteOps(1); 1167 storageStatistics.incrementOpCounter(OpType.REMOVE_XATTR); 1168 final HttpOpParam.Op op = PutOpParam.Op.REMOVEXATTR; 1169 new FsPathRunner(op, p, new XAttrNameParam(name)).run(); 1170 } 1171 1172 @Override 1173 public void setOwner(final Path p, final String owner, final String group 1174 ) throws IOException { 1175 if (owner == null && group == null) { 1176 throw new IOException("owner == null && group == null"); 1177 } 1178 1179 statistics.incrementWriteOps(1); 1180 storageStatistics.incrementOpCounter(OpType.SET_OWNER); 1181 final HttpOpParam.Op op = PutOpParam.Op.SETOWNER; 1182 new FsPathRunner(op, p, 1183 new OwnerParam(owner), new GroupParam(group) 1184 ).run(); 1185 } 1186 1187 @Override 1188 public void setPermission(final Path p, final FsPermission permission 1189 ) throws IOException { 1190 statistics.incrementWriteOps(1); 1191 storageStatistics.incrementOpCounter(OpType.SET_PERMISSION); 1192 final HttpOpParam.Op op = PutOpParam.Op.SETPERMISSION; 1193 new FsPathRunner(op, p,new PermissionParam(permission)).run(); 1194 } 1195 1196 @Override 1197 public void modifyAclEntries(Path path, List<AclEntry> aclSpec) 1198 throws IOException { 1199 statistics.incrementWriteOps(1); 1200 storageStatistics.incrementOpCounter(OpType.MODIFY_ACL_ENTRIES); 1201 final HttpOpParam.Op op = PutOpParam.Op.MODIFYACLENTRIES; 1202 new FsPathRunner(op, path, new AclPermissionParam(aclSpec)).run(); 1203 } 1204 1205 @Override 1206 public void removeAclEntries(Path path, List<AclEntry> aclSpec) 1207 throws IOException { 1208 statistics.incrementWriteOps(1); 1209 storageStatistics.incrementOpCounter(OpType.REMOVE_ACL_ENTRIES); 1210 final HttpOpParam.Op op = PutOpParam.Op.REMOVEACLENTRIES; 1211 new FsPathRunner(op, path, new AclPermissionParam(aclSpec)).run(); 1212 } 1213 1214 @Override 1215 public void removeDefaultAcl(Path path) throws IOException { 1216 statistics.incrementWriteOps(1); 1217 storageStatistics.incrementOpCounter(OpType.REMOVE_DEFAULT_ACL); 1218 final HttpOpParam.Op op = PutOpParam.Op.REMOVEDEFAULTACL; 1219 new FsPathRunner(op, path).run(); 1220 } 1221 1222 @Override 1223 public void removeAcl(Path path) throws IOException { 1224 statistics.incrementWriteOps(1); 1225 storageStatistics.incrementOpCounter(OpType.REMOVE_ACL); 1226 final HttpOpParam.Op op = PutOpParam.Op.REMOVEACL; 1227 new FsPathRunner(op, path).run(); 1228 } 1229 1230 @Override 1231 public void setAcl(final Path p, final List<AclEntry> aclSpec) 1232 throws IOException { 1233 statistics.incrementWriteOps(1); 1234 storageStatistics.incrementOpCounter(OpType.SET_ACL); 1235 final HttpOpParam.Op op = PutOpParam.Op.SETACL; 1236 new FsPathRunner(op, p, new AclPermissionParam(aclSpec)).run(); 1237 } 1238 1239 public void allowSnapshot(final Path p) throws IOException { 1240 statistics.incrementWriteOps(1); 1241 final HttpOpParam.Op op = PutOpParam.Op.ALLOWSNAPSHOT; 1242 new FsPathRunner(op, p).run(); 1243 } 1244 1245 @Override 1246 public Path createSnapshot(final Path path, final String snapshotName) 1247 throws IOException { 1248 statistics.incrementWriteOps(1); 1249 storageStatistics.incrementOpCounter(OpType.CREATE_SNAPSHOT); 1250 final HttpOpParam.Op op = PutOpParam.Op.CREATESNAPSHOT; 1251 return new FsPathResponseRunner<Path>(op, path, 1252 new SnapshotNameParam(snapshotName)) { 1253 @Override 1254 Path decodeResponse(Map<?,?> json) { 1255 return new Path((String) json.get(Path.class.getSimpleName())); 1256 } 1257 }.run(); 1258 } 1259 1260 public void disallowSnapshot(final Path p) throws IOException { 1261 statistics.incrementWriteOps(1); 1262 final HttpOpParam.Op op = PutOpParam.Op.DISALLOWSNAPSHOT; 1263 new FsPathRunner(op, p).run(); 1264 } 1265 1266 @Override 1267 public void deleteSnapshot(final Path path, final String snapshotName) 1268 throws IOException { 1269 statistics.incrementWriteOps(1); 1270 storageStatistics.incrementOpCounter(OpType.DELETE_SNAPSHOT); 1271 final HttpOpParam.Op op = DeleteOpParam.Op.DELETESNAPSHOT; 1272 new FsPathRunner(op, path, new SnapshotNameParam(snapshotName)).run(); 1273 } 1274 1275 @Override 1276 public void renameSnapshot(final Path path, final String snapshotOldName, 1277 final String snapshotNewName) throws IOException { 1278 statistics.incrementWriteOps(1); 1279 storageStatistics.incrementOpCounter(OpType.RENAME_SNAPSHOT); 1280 final HttpOpParam.Op op = PutOpParam.Op.RENAMESNAPSHOT; 1281 new FsPathRunner(op, path, new OldSnapshotNameParam(snapshotOldName), 1282 new SnapshotNameParam(snapshotNewName)).run(); 1283 } 1284 1285 @Override 1286 public boolean setReplication(final Path p, final short replication 1287 ) throws IOException { 1288 statistics.incrementWriteOps(1); 1289 storageStatistics.incrementOpCounter(OpType.SET_REPLICATION); 1290 final HttpOpParam.Op op = PutOpParam.Op.SETREPLICATION; 1291 return new FsPathBooleanRunner(op, p, 1292 new ReplicationParam(replication) 1293 ).run(); 1294 } 1295 1296 @Override 1297 public void setTimes(final Path p, final long mtime, final long atime 1298 ) throws IOException { 1299 statistics.incrementWriteOps(1); 1300 storageStatistics.incrementOpCounter(OpType.SET_TIMES); 1301 final HttpOpParam.Op op = PutOpParam.Op.SETTIMES; 1302 new FsPathRunner(op, p, 1303 new ModificationTimeParam(mtime), 1304 new AccessTimeParam(atime) 1305 ).run(); 1306 } 1307 1308 @Override 1309 public long getDefaultBlockSize() { 1310 return getConf().getLongBytes(HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY, 1311 HdfsClientConfigKeys.DFS_BLOCK_SIZE_DEFAULT); 1312 } 1313 1314 @Override 1315 public short getDefaultReplication() { 1316 return (short)getConf().getInt(HdfsClientConfigKeys.DFS_REPLICATION_KEY, 1317 HdfsClientConfigKeys.DFS_REPLICATION_DEFAULT); 1318 } 1319 1320 @Override 1321 public void concat(final Path trg, final Path [] srcs) throws IOException { 1322 statistics.incrementWriteOps(1); 1323 storageStatistics.incrementOpCounter(OpType.CONCAT); 1324 final HttpOpParam.Op op = PostOpParam.Op.CONCAT; 1325 new FsPathRunner(op, trg, new ConcatSourcesParam(srcs)).run(); 1326 } 1327 1328 @Override 1329 public FSDataOutputStream create(final Path f, final FsPermission permission, 1330 final boolean overwrite, final int bufferSize, final short replication, 1331 final long blockSize, final Progressable progress) throws IOException { 1332 statistics.incrementWriteOps(1); 1333 storageStatistics.incrementOpCounter(OpType.CREATE); 1334 1335 final HttpOpParam.Op op = PutOpParam.Op.CREATE; 1336 return new FsPathOutputStreamRunner(op, f, bufferSize, 1337 new PermissionParam(applyUMask(permission)), 1338 new OverwriteParam(overwrite), 1339 new BufferSizeParam(bufferSize), 1340 new ReplicationParam(replication), 1341 new BlockSizeParam(blockSize) 1342 ).run(); 1343 } 1344 1345 @Override 1346 public FSDataOutputStream createNonRecursive(final Path f, 1347 final FsPermission permission, final EnumSet<CreateFlag> flag, 1348 final int bufferSize, final short replication, final long blockSize, 1349 final Progressable progress) throws IOException { 1350 statistics.incrementWriteOps(1); 1351 storageStatistics.incrementOpCounter(OpType.CREATE_NON_RECURSIVE); 1352 1353 final HttpOpParam.Op op = PutOpParam.Op.CREATE; 1354 return new FsPathOutputStreamRunner(op, f, bufferSize, 1355 new PermissionParam(applyUMask(permission)), 1356 new CreateFlagParam(flag), 1357 new CreateParentParam(false), 1358 new BufferSizeParam(bufferSize), 1359 new ReplicationParam(replication), 1360 new BlockSizeParam(blockSize) 1361 ).run(); 1362 } 1363 1364 @Override 1365 public FSDataOutputStream append(final Path f, final int bufferSize, 1366 final Progressable progress) throws IOException { 1367 statistics.incrementWriteOps(1); 1368 storageStatistics.incrementOpCounter(OpType.APPEND); 1369 1370 final HttpOpParam.Op op = PostOpParam.Op.APPEND; 1371 return new FsPathOutputStreamRunner(op, f, bufferSize, 1372 new BufferSizeParam(bufferSize) 1373 ).run(); 1374 } 1375 1376 @Override 1377 public boolean truncate(Path f, long newLength) throws IOException { 1378 statistics.incrementWriteOps(1); 1379 storageStatistics.incrementOpCounter(OpType.TRUNCATE); 1380 1381 final HttpOpParam.Op op = PostOpParam.Op.TRUNCATE; 1382 return new FsPathBooleanRunner(op, f, new NewLengthParam(newLength)).run(); 1383 } 1384 1385 @Override 1386 public boolean delete(Path f, boolean recursive) throws IOException { 1387 statistics.incrementWriteOps(1); 1388 storageStatistics.incrementOpCounter(OpType.DELETE); 1389 final HttpOpParam.Op op = DeleteOpParam.Op.DELETE; 1390 return new FsPathBooleanRunner(op, f, 1391 new RecursiveParam(recursive) 1392 ).run(); 1393 } 1394 1395 @Override 1396 public FSDataInputStream open(final Path f, final int bufferSize 1397 ) throws IOException { 1398 statistics.incrementReadOps(1); 1399 storageStatistics.incrementOpCounter(OpType.OPEN); 1400 return new FSDataInputStream(new WebHdfsInputStream(f, bufferSize)); 1401 } 1402 1403 @Override 1404 public synchronized void close() throws IOException { 1405 try { 1406 if (canRefreshDelegationToken && delegationToken != null) { 1407 cancelDelegationToken(delegationToken); 1408 } 1409 } catch (IOException ioe) { 1410 LOG.debug("Token cancel failed: ", ioe); 1411 } finally { 1412 super.close(); 1413 } 1414 } 1415 1416 // use FsPathConnectionRunner to ensure retries for InvalidTokens 1417 class UnresolvedUrlOpener extends ByteRangeInputStream.URLOpener { 1418 private final FsPathConnectionRunner runner; 1419 UnresolvedUrlOpener(FsPathConnectionRunner runner) { 1420 super(null); 1421 this.runner = runner; 1422 } 1423 1424 @Override 1425 protected HttpURLConnection connect(long offset, boolean resolved) 1426 throws IOException { 1427 assert offset == 0; 1428 HttpURLConnection conn = runner.run(); 1429 setURL(conn.getURL()); 1430 return conn; 1431 } 1432 } 1433 1434 class OffsetUrlOpener extends ByteRangeInputStream.URLOpener { 1435 OffsetUrlOpener(final URL url) { 1436 super(url); 1437 } 1438 1439 /** Setup offset url and connect. */ 1440 @Override 1441 protected HttpURLConnection connect(final long offset, 1442 final boolean resolved) throws IOException { 1443 final URL offsetUrl = offset == 0L? url 1444 : new URL(url + "&" + new OffsetParam(offset)); 1445 return new URLRunner(GetOpParam.Op.OPEN, offsetUrl, resolved).run(); 1446 } 1447 } 1448 1449 private static final String OFFSET_PARAM_PREFIX = OffsetParam.NAME + "="; 1450 1451 /** Remove offset parameter, if there is any, from the url */ 1452 static URL removeOffsetParam(final URL url) throws MalformedURLException { 1453 String query = url.getQuery(); 1454 if (query == null) { 1455 return url; 1456 } 1457 final String lower = StringUtils.toLowerCase(query); 1458 if (!lower.startsWith(OFFSET_PARAM_PREFIX) 1459 && !lower.contains("&" + OFFSET_PARAM_PREFIX)) { 1460 return url; 1461 } 1462 1463 //rebuild query 1464 StringBuilder b = null; 1465 for(final StringTokenizer st = new StringTokenizer(query, "&"); 1466 st.hasMoreTokens();) { 1467 final String token = st.nextToken(); 1468 if (!StringUtils.toLowerCase(token).startsWith(OFFSET_PARAM_PREFIX)) { 1469 if (b == null) { 1470 b = new StringBuilder("?").append(token); 1471 } else { 1472 b.append('&').append(token); 1473 } 1474 } 1475 } 1476 query = b == null? "": b.toString(); 1477 1478 final String urlStr = url.toString(); 1479 return new URL(urlStr.substring(0, urlStr.indexOf('?')) + query); 1480 } 1481 1482 static class OffsetUrlInputStream extends ByteRangeInputStream { 1483 OffsetUrlInputStream(UnresolvedUrlOpener o, OffsetUrlOpener r) 1484 throws IOException { 1485 super(o, r); 1486 } 1487 1488 /** Remove offset parameter before returning the resolved url. */ 1489 @Override 1490 protected URL getResolvedUrl(final HttpURLConnection connection 1491 ) throws MalformedURLException { 1492 return removeOffsetParam(connection.getURL()); 1493 } 1494 } 1495 1496 @Override 1497 public FileStatus[] listStatus(final Path f) throws IOException { 1498 statistics.incrementReadOps(1); 1499 storageStatistics.incrementOpCounter(OpType.LIST_STATUS); 1500 1501 final HttpOpParam.Op op = GetOpParam.Op.LISTSTATUS; 1502 return new FsPathResponseRunner<FileStatus[]>(op, f) { 1503 @Override 1504 FileStatus[] decodeResponse(Map<?,?> json) { 1505 final Map<?, ?> rootmap = 1506 (Map<?, ?>)json.get(FileStatus.class.getSimpleName() + "es"); 1507 final List<?> array = JsonUtilClient.getList(rootmap, 1508 FileStatus.class.getSimpleName()); 1509 1510 //convert FileStatus 1511 assert array != null; 1512 final FileStatus[] statuses = new FileStatus[array.size()]; 1513 int i = 0; 1514 for (Object object : array) { 1515 final Map<?, ?> m = (Map<?, ?>) object; 1516 statuses[i++] = makeQualified(JsonUtilClient.toFileStatus(m, false), 1517 f); 1518 } 1519 return statuses; 1520 } 1521 }.run(); 1522 } 1523 1524 @Override 1525 public Token<DelegationTokenIdentifier> getDelegationToken( 1526 final String renewer) throws IOException { 1527 final HttpOpParam.Op op = GetOpParam.Op.GETDELEGATIONTOKEN; 1528 Token<DelegationTokenIdentifier> token = 1529 new FsPathResponseRunner<Token<DelegationTokenIdentifier>>( 1530 op, null, new RenewerParam(renewer)) { 1531 @Override 1532 Token<DelegationTokenIdentifier> decodeResponse(Map<?,?> json) 1533 throws IOException { 1534 return JsonUtilClient.toDelegationToken(json); 1535 } 1536 }.run(); 1537 if (token != null) { 1538 token.setService(tokenServiceName); 1539 } else { 1540 if (disallowFallbackToInsecureCluster) { 1541 throw new AccessControlException(CANT_FALLBACK_TO_INSECURE_MSG); 1542 } 1543 } 1544 return token; 1545 } 1546 1547 @Override 1548 public synchronized Token<?> getRenewToken() { 1549 return delegationToken; 1550 } 1551 1552 @Override 1553 public <T extends TokenIdentifier> void setDelegationToken( 1554 final Token<T> token) { 1555 synchronized (this) { 1556 delegationToken = token; 1557 } 1558 } 1559 1560 @Override 1561 public synchronized long renewDelegationToken(final Token<?> token 1562 ) throws IOException { 1563 final HttpOpParam.Op op = PutOpParam.Op.RENEWDELEGATIONTOKEN; 1564 return new FsPathResponseRunner<Long>(op, null, 1565 new TokenArgumentParam(token.encodeToUrlString())) { 1566 @Override 1567 Long decodeResponse(Map<?,?> json) throws IOException { 1568 return ((Number) json.get("long")).longValue(); 1569 } 1570 }.run(); 1571 } 1572 1573 @Override 1574 public synchronized void cancelDelegationToken(final Token<?> token 1575 ) throws IOException { 1576 final HttpOpParam.Op op = PutOpParam.Op.CANCELDELEGATIONTOKEN; 1577 new FsPathRunner(op, null, 1578 new TokenArgumentParam(token.encodeToUrlString()) 1579 ).run(); 1580 } 1581 1582 @Override 1583 public BlockLocation[] getFileBlockLocations(final FileStatus status, 1584 final long offset, final long length) throws IOException { 1585 if (status == null) { 1586 return null; 1587 } 1588 return getFileBlockLocations(status.getPath(), offset, length); 1589 } 1590 1591 @Override 1592 public BlockLocation[] getFileBlockLocations(final Path p, 1593 final long offset, final long length) throws IOException { 1594 statistics.incrementReadOps(1); 1595 storageStatistics.incrementOpCounter(OpType.GET_FILE_BLOCK_LOCATIONS); 1596 1597 final HttpOpParam.Op op = GetOpParam.Op.GET_BLOCK_LOCATIONS; 1598 return new FsPathResponseRunner<BlockLocation[]>(op, p, 1599 new OffsetParam(offset), new LengthParam(length)) { 1600 @Override 1601 BlockLocation[] decodeResponse(Map<?,?> json) throws IOException { 1602 return DFSUtilClient.locatedBlocks2Locations( 1603 JsonUtilClient.toLocatedBlocks(json)); 1604 } 1605 }.run(); 1606 } 1607 1608 @Override 1609 public void access(final Path path, final FsAction mode) throws IOException { 1610 final HttpOpParam.Op op = GetOpParam.Op.CHECKACCESS; 1611 new FsPathRunner(op, path, new FsActionParam(mode)).run(); 1612 } 1613 1614 @Override 1615 public ContentSummary getContentSummary(final Path p) throws IOException { 1616 statistics.incrementReadOps(1); 1617 storageStatistics.incrementOpCounter(OpType.GET_CONTENT_SUMMARY); 1618 1619 final HttpOpParam.Op op = GetOpParam.Op.GETCONTENTSUMMARY; 1620 return new FsPathResponseRunner<ContentSummary>(op, p) { 1621 @Override 1622 ContentSummary decodeResponse(Map<?,?> json) { 1623 return JsonUtilClient.toContentSummary(json); 1624 } 1625 }.run(); 1626 } 1627 1628 @Override 1629 public MD5MD5CRC32FileChecksum getFileChecksum(final Path p 1630 ) throws IOException { 1631 statistics.incrementReadOps(1); 1632 storageStatistics.incrementOpCounter(OpType.GET_FILE_CHECKSUM); 1633 1634 final HttpOpParam.Op op = GetOpParam.Op.GETFILECHECKSUM; 1635 return new FsPathResponseRunner<MD5MD5CRC32FileChecksum>(op, p) { 1636 @Override 1637 MD5MD5CRC32FileChecksum decodeResponse(Map<?,?> json) throws IOException { 1638 return JsonUtilClient.toMD5MD5CRC32FileChecksum(json); 1639 } 1640 }.run(); 1641 } 1642 1643 /** 1644 * Resolve an HDFS URL into real INetSocketAddress. It works like a DNS 1645 * resolver when the URL points to an non-HA cluster. When the URL points to 1646 * an HA cluster with its logical name, the resolver further resolves the 1647 * logical name(i.e., the authority in the URL) into real namenode addresses. 1648 */ 1649 private InetSocketAddress[] resolveNNAddr() { 1650 Configuration conf = getConf(); 1651 final String scheme = uri.getScheme(); 1652 1653 ArrayList<InetSocketAddress> ret = new ArrayList<>(); 1654 1655 if (!HAUtilClient.isLogicalUri(conf, uri)) { 1656 InetSocketAddress addr = NetUtils.createSocketAddr(uri.getAuthority(), 1657 getDefaultPort()); 1658 ret.add(addr); 1659 1660 } else { 1661 Map<String, Map<String, InetSocketAddress>> addresses = DFSUtilClient 1662 .getHaNnWebHdfsAddresses(conf, scheme); 1663 1664 // Extract the entry corresponding to the logical name. 1665 Map<String, InetSocketAddress> addrs = addresses.get(uri.getHost()); 1666 for (InetSocketAddress addr : addrs.values()) { 1667 ret.add(addr); 1668 } 1669 } 1670 1671 InetSocketAddress[] r = new InetSocketAddress[ret.size()]; 1672 return ret.toArray(r); 1673 } 1674 1675 @Override 1676 public String getCanonicalServiceName() { 1677 return tokenServiceName == null ? super.getCanonicalServiceName() 1678 : tokenServiceName.toString(); 1679 } 1680 1681 @Override 1682 public void setStoragePolicy(Path p, String policyName) throws IOException { 1683 if (policyName == null) { 1684 throw new IOException("policyName == null"); 1685 } 1686 statistics.incrementWriteOps(1); 1687 storageStatistics.incrementOpCounter(OpType.SET_STORAGE_POLICY); 1688 final HttpOpParam.Op op = PutOpParam.Op.SETSTORAGEPOLICY; 1689 new FsPathRunner(op, p, new StoragePolicyParam(policyName)).run(); 1690 } 1691 1692 @Override 1693 public Collection<BlockStoragePolicy> getAllStoragePolicies() 1694 throws IOException { 1695 final HttpOpParam.Op op = GetOpParam.Op.GETALLSTORAGEPOLICY; 1696 return new FsPathResponseRunner<Collection<BlockStoragePolicy>>(op, null) { 1697 @Override 1698 Collection<BlockStoragePolicy> decodeResponse(Map<?, ?> json) 1699 throws IOException { 1700 return JsonUtilClient.getStoragePolicies(json); 1701 } 1702 }.run(); 1703 } 1704 1705 @Override 1706 public BlockStoragePolicy getStoragePolicy(Path src) throws IOException { 1707 final HttpOpParam.Op op = GetOpParam.Op.GETSTORAGEPOLICY; 1708 return new FsPathResponseRunner<BlockStoragePolicy>(op, src) { 1709 @Override 1710 BlockStoragePolicy decodeResponse(Map<?, ?> json) throws IOException { 1711 return JsonUtilClient.toBlockStoragePolicy((Map<?, ?>) json 1712 .get(BlockStoragePolicy.class.getSimpleName())); 1713 } 1714 }.run(); 1715 } 1716 1717 @Override 1718 public void unsetStoragePolicy(Path src) throws IOException { 1719 statistics.incrementWriteOps(1); 1720 storageStatistics.incrementOpCounter(OpType.UNSET_STORAGE_POLICY); 1721 final HttpOpParam.Op op = PostOpParam.Op.UNSETSTORAGEPOLICY; 1722 new FsPathRunner(op, src).run(); 1723 } 1724 1725 @VisibleForTesting 1726 InetSocketAddress[] getResolvedNNAddr() { 1727 return nnAddrs; 1728 } 1729 1730 @VisibleForTesting 1731 public void setRetryPolicy(RetryPolicy rp) { 1732 this.retryPolicy = rp; 1733 } 1734 1735 /** 1736 * This class is used for opening, reading, and seeking files while using the 1737 * WebHdfsFileSystem. This class will invoke the retry policy when performing 1738 * any of these actions. 1739 */ 1740 @VisibleForTesting 1741 public class WebHdfsInputStream extends FSInputStream { 1742 private ReadRunner readRunner = null; 1743 1744 WebHdfsInputStream(Path path, int buffersize) throws IOException { 1745 // Only create the ReadRunner once. Each read's byte array and position 1746 // will be updated within the ReadRunner object before every read. 1747 readRunner = new ReadRunner(path, buffersize); 1748 } 1749 1750 @Override 1751 public int read() throws IOException { 1752 final byte[] b = new byte[1]; 1753 return (read(b, 0, 1) == -1) ? -1 : (b[0] & 0xff); 1754 } 1755 1756 @Override 1757 public int read(byte b[], int off, int len) throws IOException { 1758 return readRunner.read(b, off, len); 1759 } 1760 1761 @Override 1762 public void seek(long newPos) throws IOException { 1763 readRunner.seek(newPos); 1764 } 1765 1766 @Override 1767 public long getPos() throws IOException { 1768 return readRunner.getPos(); 1769 } 1770 1771 protected int getBufferSize() throws IOException { 1772 return readRunner.getBufferSize(); 1773 } 1774 1775 protected Path getPath() throws IOException { 1776 return readRunner.getPath(); 1777 } 1778 1779 @Override 1780 public boolean seekToNewSource(long targetPos) throws IOException { 1781 return false; 1782 } 1783 1784 @Override 1785 public void close() throws IOException { 1786 readRunner.close(); 1787 } 1788 1789 public void setFileLength(long len) { 1790 readRunner.setFileLength(len); 1791 } 1792 1793 public long getFileLength() { 1794 return readRunner.getFileLength(); 1795 } 1796 1797 @VisibleForTesting 1798 ReadRunner getReadRunner() { 1799 return readRunner; 1800 } 1801 1802 @VisibleForTesting 1803 void setReadRunner(ReadRunner rr) { 1804 this.readRunner = rr; 1805 } 1806 } 1807 1808 enum RunnerState { 1809 DISCONNECTED, // Connection is closed programmatically by ReadRunner 1810 OPEN, // Connection has been established by ReadRunner 1811 SEEK, // Calling code has explicitly called seek() 1812 CLOSED // Calling code has explicitly called close() 1813 } 1814 1815 /** 1816 * This class will allow retries to occur for both open and read operations. 1817 * The first WebHdfsFileSystem#open creates a new WebHdfsInputStream object, 1818 * which creates a new ReadRunner object that will be used to open a 1819 * connection and read or seek into the input stream. 1820 * 1821 * ReadRunner is a subclass of the AbstractRunner class, which will run the 1822 * ReadRunner#getUrl(), ReadRunner#connect(URL), and ReadRunner#getResponse 1823 * methods within a retry loop, based on the configured retry policy. 1824 * ReadRunner#connect will create a connection if one has not already been 1825 * created. Otherwise, it will return the previously created connection 1826 * object. This is necessary because a new connection should not be created 1827 * for every read. 1828 * Likewise, ReadRunner#getUrl will construct a new URL object only if the 1829 * connection has not previously been established. Otherwise, it will return 1830 * the previously created URL object. 1831 * ReadRunner#getResponse will initialize the input stream if it has not 1832 * already been initialized and read the requested data from the specified 1833 * input stream. 1834 */ 1835 @VisibleForTesting 1836 protected class ReadRunner extends AbstractFsPathRunner<Integer> { 1837 private InputStream in = null; 1838 private HttpURLConnection cachedConnection = null; 1839 private byte[] readBuffer; 1840 private int readOffset; 1841 private int readLength; 1842 private RunnerState runnerState = RunnerState.DISCONNECTED; 1843 private URL originalUrl = null; 1844 private URL resolvedUrl = null; 1845 1846 private final Path path; 1847 private final int bufferSize; 1848 private long pos = 0; 1849 private long fileLength = 0; 1850 1851 /* The following methods are WebHdfsInputStream helpers. */ 1852 1853 ReadRunner(Path p, int bs) throws IOException { 1854 super(GetOpParam.Op.OPEN, p, new BufferSizeParam(bs)); 1855 this.path = p; 1856 this.bufferSize = bs; 1857 } 1858 1859 int read(byte[] b, int off, int len) throws IOException { 1860 if (runnerState == RunnerState.CLOSED) { 1861 throw new IOException("Stream closed"); 1862 } 1863 if (len == 0) { 1864 return 0; 1865 } 1866 1867 // Before the first read, pos and fileLength will be 0 and readBuffer 1868 // will all be null. They will be initialized once the first connection 1869 // is made. Only after that it makes sense to compare pos and fileLength. 1870 if (pos >= fileLength && readBuffer != null) { 1871 return -1; 1872 } 1873 1874 // If a seek is occurring, the input stream will have been closed, so it 1875 // needs to be reopened. Use the URLRunner to call AbstractRunner#connect 1876 // with the previously-cached resolved URL and with the 'redirected' flag 1877 // set to 'true'. The resolved URL contains the URL of the previously 1878 // opened DN as opposed to the NN. It is preferable to use the resolved 1879 // URL when creating a connection because it does not hit the NN or every 1880 // seek, nor does it open a connection to a new DN after every seek. 1881 // The redirect flag is needed so that AbstractRunner#connect knows the 1882 // URL is already resolved. 1883 // Note that when the redirected flag is set, retries are not attempted. 1884 // So, if the connection fails using URLRunner, clear out the connection 1885 // and fall through to establish the connection using ReadRunner. 1886 if (runnerState == RunnerState.SEEK) { 1887 try { 1888 final URL rurl = new URL(resolvedUrl + "&" + new OffsetParam(pos)); 1889 cachedConnection = new URLRunner(GetOpParam.Op.OPEN, rurl, true).run(); 1890 } catch (IOException ioe) { 1891 closeInputStream(RunnerState.DISCONNECTED); 1892 } 1893 } 1894 1895 readBuffer = b; 1896 readOffset = off; 1897 readLength = len; 1898 1899 int count = -1; 1900 count = this.run(); 1901 if (count >= 0) { 1902 statistics.incrementBytesRead(count); 1903 pos += count; 1904 } else if (pos < fileLength) { 1905 throw new EOFException( 1906 "Premature EOF: pos=" + pos + " < filelength=" + fileLength); 1907 } 1908 return count; 1909 } 1910 1911 void seek(long newPos) throws IOException { 1912 if (pos != newPos) { 1913 pos = newPos; 1914 closeInputStream(RunnerState.SEEK); 1915 } 1916 } 1917 1918 public void close() throws IOException { 1919 closeInputStream(RunnerState.CLOSED); 1920 } 1921 1922 /* The following methods are overriding AbstractRunner methods, 1923 * to be called within the retry policy context by runWithRetry. 1924 */ 1925 1926 @Override 1927 protected URL getUrl() throws IOException { 1928 // This method is called every time either a read is executed. 1929 // The check for connection == null is to ensure that a new URL is only 1930 // created upon a new connection and not for every read. 1931 if (cachedConnection == null) { 1932 // Update URL with current offset. BufferSize doesn't change, but it 1933 // still must be included when creating the new URL. 1934 updateURLParameters(new BufferSizeParam(bufferSize), 1935 new OffsetParam(pos)); 1936 originalUrl = super.getUrl(); 1937 } 1938 return originalUrl; 1939 } 1940 1941 /* Only make the connection if it is not already open. Don't cache the 1942 * connection here. After this method is called, runWithRetry will call 1943 * validateResponse, and then call the below ReadRunner#getResponse. If 1944 * the code path makes it that far, then we can cache the connection. 1945 */ 1946 @Override 1947 protected HttpURLConnection connect(URL url) throws IOException { 1948 HttpURLConnection conn = cachedConnection; 1949 if (conn == null) { 1950 try { 1951 conn = super.connect(url); 1952 } catch (IOException e) { 1953 closeInputStream(RunnerState.DISCONNECTED); 1954 throw e; 1955 } 1956 } 1957 return conn; 1958 } 1959 1960 /* 1961 * This method is used to perform reads within the retry policy context. 1962 * This code is relying on runWithRetry to always call the above connect 1963 * method and the verifyResponse method prior to calling getResponse. 1964 */ 1965 @Override 1966 Integer getResponse(final HttpURLConnection conn) 1967 throws IOException { 1968 try { 1969 // In the "open-then-read" use case, runWithRetry will have executed 1970 // ReadRunner#connect to make the connection and then executed 1971 // validateResponse to validate the response code. Only then do we want 1972 // to cache the connection. 1973 // In the "read-after-seek" use case, the connection is made and the 1974 // response is validated by the URLRunner. ReadRunner#read then caches 1975 // the connection and the ReadRunner#connect will pass on the cached 1976 // connection 1977 // In either case, stream initialization is done here if necessary. 1978 cachedConnection = conn; 1979 if (in == null) { 1980 in = initializeInputStream(conn); 1981 } 1982 1983 int count = in.read(readBuffer, readOffset, readLength); 1984 if (count < 0 && pos < fileLength) { 1985 throw new EOFException( 1986 "Premature EOF: pos=" + pos + " < filelength=" + fileLength); 1987 } 1988 return Integer.valueOf(count); 1989 } catch (IOException e) { 1990 String redirectHost = resolvedUrl.getAuthority(); 1991 if (excludeDatanodes.getValue() != null) { 1992 excludeDatanodes = new ExcludeDatanodesParam(redirectHost + "," 1993 + excludeDatanodes.getValue()); 1994 } else { 1995 excludeDatanodes = new ExcludeDatanodesParam(redirectHost); 1996 } 1997 1998 // If an exception occurs, close the input stream and null it out so 1999 // that if the abstract runner decides to retry, it will reconnect. 2000 closeInputStream(RunnerState.DISCONNECTED); 2001 throw e; 2002 } 2003 } 2004 2005 @VisibleForTesting 2006 InputStream initializeInputStream(HttpURLConnection conn) 2007 throws IOException { 2008 // Cache the resolved URL so that it can be used in the event of 2009 // a future seek operation. 2010 resolvedUrl = removeOffsetParam(conn.getURL()); 2011 final String cl = conn.getHeaderField(HttpHeaders.CONTENT_LENGTH); 2012 InputStream inStream = conn.getInputStream(); 2013 if (LOG.isDebugEnabled()) { 2014 LOG.debug("open file: " + conn.getURL()); 2015 } 2016 if (cl != null) { 2017 long streamLength = Long.parseLong(cl); 2018 fileLength = pos + streamLength; 2019 // Java has a bug with >2GB request streams. It won't bounds check 2020 // the reads so the transfer blocks until the server times out 2021 inStream = new BoundedInputStream(inStream, streamLength); 2022 } else { 2023 fileLength = getHdfsFileStatus(path).getLen(); 2024 } 2025 // Wrapping in BufferedInputStream because it is more performant than 2026 // BoundedInputStream by itself. 2027 runnerState = RunnerState.OPEN; 2028 return new BufferedInputStream(inStream, bufferSize); 2029 } 2030 2031 // Close both the InputStream and the connection. 2032 @VisibleForTesting 2033 void closeInputStream(RunnerState rs) throws IOException { 2034 if (in != null) { 2035 IOUtils.close(cachedConnection); 2036 in = null; 2037 } 2038 cachedConnection = null; 2039 runnerState = rs; 2040 } 2041 2042 /* Getters and Setters */ 2043 2044 @VisibleForTesting 2045 protected InputStream getInputStream() { 2046 return in; 2047 } 2048 2049 @VisibleForTesting 2050 protected void setInputStream(InputStream inStream) { 2051 in = inStream; 2052 } 2053 2054 Path getPath() { 2055 return path; 2056 } 2057 2058 int getBufferSize() { 2059 return bufferSize; 2060 } 2061 2062 long getFileLength() { 2063 return fileLength; 2064 } 2065 2066 void setFileLength(long len) { 2067 fileLength = len; 2068 } 2069 2070 long getPos() { 2071 return pos; 2072 } 2073 } 2074}