001/**
002res * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hdfs.web;
020
021import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_WEBHDFS_REST_CSRF_CUSTOM_HEADER_DEFAULT;
022import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_WEBHDFS_REST_CSRF_CUSTOM_HEADER_KEY;
023import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_WEBHDFS_REST_CSRF_ENABLED_DEFAULT;
024import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_WEBHDFS_REST_CSRF_ENABLED_KEY;
025import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_WEBHDFS_REST_CSRF_METHODS_TO_IGNORE_DEFAULT;
026import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_WEBHDFS_REST_CSRF_METHODS_TO_IGNORE_KEY;
027
028import java.io.BufferedInputStream;
029import java.io.BufferedOutputStream;
030import java.io.EOFException;
031import java.io.FileNotFoundException;
032import java.io.IOException;
033import java.io.InputStream;
034import java.lang.reflect.InvocationTargetException;
035import java.net.HttpURLConnection;
036import java.net.InetSocketAddress;
037import java.net.MalformedURLException;
038import java.net.URI;
039import java.net.URL;
040import java.security.PrivilegedExceptionAction;
041import java.util.ArrayList;
042import java.util.Collection;
043import java.util.EnumSet;
044import java.util.HashSet;
045import java.util.List;
046import java.util.Map;
047import java.util.Set;
048import java.util.StringTokenizer;
049
050import javax.ws.rs.core.HttpHeaders;
051import javax.ws.rs.core.MediaType;
052
053import org.apache.commons.io.IOUtils;
054import org.apache.commons.io.input.BoundedInputStream;
055import org.apache.hadoop.conf.Configuration;
056import org.apache.hadoop.fs.BlockLocation;
057import org.apache.hadoop.fs.CommonConfigurationKeys;
058import org.apache.hadoop.fs.ContentSummary;
059import org.apache.hadoop.fs.CreateFlag;
060import org.apache.hadoop.fs.DelegationTokenRenewer;
061import org.apache.hadoop.fs.FSDataInputStream;
062import org.apache.hadoop.fs.FSDataOutputStream;
063import org.apache.hadoop.fs.FSInputStream;
064import org.apache.hadoop.fs.FileStatus;
065import org.apache.hadoop.fs.FileSystem;
066import org.apache.hadoop.fs.GlobalStorageStatistics;
067import org.apache.hadoop.fs.GlobalStorageStatistics.StorageStatisticsProvider;
068import org.apache.hadoop.fs.StorageStatistics;
069import org.apache.hadoop.hdfs.DFSOpsCountStatistics;
070import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
071import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
072import org.apache.hadoop.fs.Options;
073import org.apache.hadoop.fs.Path;
074import org.apache.hadoop.fs.XAttrCodec;
075import org.apache.hadoop.fs.XAttrSetFlag;
076import org.apache.hadoop.fs.permission.AclEntry;
077import org.apache.hadoop.fs.permission.AclStatus;
078import org.apache.hadoop.fs.permission.FsAction;
079import org.apache.hadoop.fs.permission.FsPermission;
080import org.apache.hadoop.hdfs.DFSUtilClient;
081import org.apache.hadoop.hdfs.HAUtilClient;
082import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
083import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
084import org.apache.hadoop.hdfs.protocol.HdfsConstants;
085import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
086import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
087import org.apache.hadoop.hdfs.web.resources.*;
088import org.apache.hadoop.hdfs.web.resources.HttpOpParam.Op;
089import org.apache.hadoop.io.Text;
090import org.apache.hadoop.io.retry.RetryPolicies;
091import org.apache.hadoop.io.retry.RetryPolicy;
092import org.apache.hadoop.io.retry.RetryUtils;
093import org.apache.hadoop.ipc.RemoteException;
094import org.apache.hadoop.ipc.StandbyException;
095import org.apache.hadoop.net.NetUtils;
096import org.apache.hadoop.security.AccessControlException;
097import org.apache.hadoop.security.SecurityUtil;
098import org.apache.hadoop.security.UserGroupInformation;
099import org.apache.hadoop.security.token.SecretManager.InvalidToken;
100import org.apache.hadoop.security.token.Token;
101import org.apache.hadoop.security.token.TokenIdentifier;
102import org.apache.hadoop.security.token.TokenSelector;
103import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
104import org.apache.hadoop.util.Progressable;
105import org.apache.hadoop.util.StringUtils;
106import org.codehaus.jackson.map.ObjectMapper;
107import org.codehaus.jackson.map.ObjectReader;
108import org.slf4j.Logger;
109import org.slf4j.LoggerFactory;
110
111import com.google.common.annotations.VisibleForTesting;
112import com.google.common.base.Preconditions;
113import com.google.common.collect.Lists;
114
115/** A FileSystem for HDFS over the web. */
116public class WebHdfsFileSystem extends FileSystem
117    implements DelegationTokenRenewer.Renewable,
118    TokenAspect.TokenManagementDelegator {
119  public static final Logger LOG = LoggerFactory
120      .getLogger(WebHdfsFileSystem.class);
121  /** WebHdfs version. */
122  public static final int VERSION = 1;
123  /** Http URI: http://namenode:port/{PATH_PREFIX}/path/to/file */
124  public static final String PATH_PREFIX = "/" + WebHdfsConstants.WEBHDFS_SCHEME
125      + "/v" + VERSION;
126
127  /**
128   * Default connection factory may be overridden in tests to use smaller
129   * timeout values
130   */
131  protected URLConnectionFactory connectionFactory;
132
133  @VisibleForTesting
134  public static final String CANT_FALLBACK_TO_INSECURE_MSG =
135      "The client is configured to only allow connecting to secure cluster";
136
137  private boolean canRefreshDelegationToken;
138
139  private UserGroupInformation ugi;
140  private URI uri;
141  private Token<?> delegationToken;
142  protected Text tokenServiceName;
143  private RetryPolicy retryPolicy = null;
144  private Path workingDir;
145  private Path cachedHomeDirectory;
146  private InetSocketAddress nnAddrs[];
147  private int currentNNAddrIndex;
148  private boolean disallowFallbackToInsecureCluster;
149  private String restCsrfCustomHeader;
150  private Set<String> restCsrfMethodsToIgnore;
151  private static final ObjectReader READER =
152      new ObjectMapper().reader(Map.class);
153
154  private DFSOpsCountStatistics storageStatistics;
155
156  /**
157   * Return the protocol scheme for the FileSystem.
158   * <p/>
159   *
160   * @return <code>webhdfs</code>
161   */
162  @Override
163  public String getScheme() {
164    return WebHdfsConstants.WEBHDFS_SCHEME;
165  }
166
167  /**
168   * return the underlying transport protocol (http / https).
169   */
170  protected String getTransportScheme() {
171    return "http";
172  }
173
174  protected Text getTokenKind() {
175    return WebHdfsConstants.WEBHDFS_TOKEN_KIND;
176  }
177
178  @Override
179  public synchronized void initialize(URI uri, Configuration conf
180  ) throws IOException {
181    super.initialize(uri, conf);
182    setConf(conf);
183    /** set user pattern based on configuration file */
184    UserParam.setUserPattern(conf.get(
185        HdfsClientConfigKeys.DFS_WEBHDFS_USER_PATTERN_KEY,
186        HdfsClientConfigKeys.DFS_WEBHDFS_USER_PATTERN_DEFAULT));
187
188    boolean isOAuth = conf.getBoolean(
189        HdfsClientConfigKeys.DFS_WEBHDFS_OAUTH_ENABLED_KEY,
190        HdfsClientConfigKeys.DFS_WEBHDFS_OAUTH_ENABLED_DEFAULT);
191
192    if(isOAuth) {
193      LOG.debug("Enabling OAuth2 in WebHDFS");
194      connectionFactory = URLConnectionFactory
195          .newOAuth2URLConnectionFactory(conf);
196    } else {
197      LOG.debug("Not enabling OAuth2 in WebHDFS");
198      connectionFactory = URLConnectionFactory
199          .newDefaultURLConnectionFactory(conf);
200    }
201
202    ugi = UserGroupInformation.getCurrentUser();
203    this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
204    this.nnAddrs = resolveNNAddr();
205
206    boolean isHA = HAUtilClient.isClientFailoverConfigured(conf, this.uri);
207    boolean isLogicalUri = isHA && HAUtilClient.isLogicalUri(conf, this.uri);
208    // In non-HA or non-logical URI case, the code needs to call
209    // getCanonicalUri() in order to handle the case where no port is
210    // specified in the URI
211    this.tokenServiceName = isLogicalUri ?
212        HAUtilClient.buildTokenServiceForLogicalUri(uri, getScheme())
213        : SecurityUtil.buildTokenService(getCanonicalUri());
214
215    if (!isHA) {
216      this.retryPolicy =
217          RetryUtils.getDefaultRetryPolicy(
218              conf,
219              HdfsClientConfigKeys.HttpClient.RETRY_POLICY_ENABLED_KEY,
220              HdfsClientConfigKeys.HttpClient.RETRY_POLICY_ENABLED_DEFAULT,
221              HdfsClientConfigKeys.HttpClient.RETRY_POLICY_SPEC_KEY,
222              HdfsClientConfigKeys.HttpClient.RETRY_POLICY_SPEC_DEFAULT,
223              HdfsConstants.SAFEMODE_EXCEPTION_CLASS_NAME);
224    } else {
225
226      int maxFailoverAttempts = conf.getInt(
227          HdfsClientConfigKeys.HttpClient.FAILOVER_MAX_ATTEMPTS_KEY,
228          HdfsClientConfigKeys.HttpClient.FAILOVER_MAX_ATTEMPTS_DEFAULT);
229      int maxRetryAttempts = conf.getInt(
230          HdfsClientConfigKeys.HttpClient.RETRY_MAX_ATTEMPTS_KEY,
231          HdfsClientConfigKeys.HttpClient.RETRY_MAX_ATTEMPTS_DEFAULT);
232      int failoverSleepBaseMillis = conf.getInt(
233          HdfsClientConfigKeys.HttpClient.FAILOVER_SLEEPTIME_BASE_KEY,
234          HdfsClientConfigKeys.HttpClient.FAILOVER_SLEEPTIME_BASE_DEFAULT);
235      int failoverSleepMaxMillis = conf.getInt(
236          HdfsClientConfigKeys.HttpClient.FAILOVER_SLEEPTIME_MAX_KEY,
237          HdfsClientConfigKeys.HttpClient.FAILOVER_SLEEPTIME_MAX_DEFAULT);
238
239      this.retryPolicy = RetryPolicies
240          .failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL,
241              maxFailoverAttempts, maxRetryAttempts, failoverSleepBaseMillis,
242              failoverSleepMaxMillis);
243    }
244
245    this.workingDir = makeQualified(new Path(getHomeDirectoryString(ugi)));
246    this.canRefreshDelegationToken = UserGroupInformation.isSecurityEnabled();
247    this.disallowFallbackToInsecureCluster = !conf.getBoolean(
248        CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
249        CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
250    this.initializeRestCsrf(conf);
251    this.delegationToken = null;
252
253    storageStatistics = (DFSOpsCountStatistics) GlobalStorageStatistics.INSTANCE
254        .put(DFSOpsCountStatistics.NAME,
255            new StorageStatisticsProvider() {
256              @Override
257              public StorageStatistics provide() {
258                return new DFSOpsCountStatistics();
259              }
260            });
261  }
262
263  /**
264   * Initializes client-side handling of cross-site request forgery (CSRF)
265   * protection by figuring out the custom HTTP headers that need to be sent in
266   * requests and which HTTP methods are ignored because they do not require
267   * CSRF protection.
268   *
269   * @param conf configuration to read
270   */
271  private void initializeRestCsrf(Configuration conf) {
272    if (conf.getBoolean(DFS_WEBHDFS_REST_CSRF_ENABLED_KEY,
273        DFS_WEBHDFS_REST_CSRF_ENABLED_DEFAULT)) {
274      this.restCsrfCustomHeader = conf.getTrimmed(
275          DFS_WEBHDFS_REST_CSRF_CUSTOM_HEADER_KEY,
276          DFS_WEBHDFS_REST_CSRF_CUSTOM_HEADER_DEFAULT);
277      this.restCsrfMethodsToIgnore = new HashSet<>();
278      this.restCsrfMethodsToIgnore.addAll(getTrimmedStringList(conf,
279          DFS_WEBHDFS_REST_CSRF_METHODS_TO_IGNORE_KEY,
280          DFS_WEBHDFS_REST_CSRF_METHODS_TO_IGNORE_DEFAULT));
281    } else {
282      this.restCsrfCustomHeader = null;
283      this.restCsrfMethodsToIgnore = null;
284    }
285  }
286
287  /**
288   * Returns a list of strings from a comma-delimited configuration value.
289   *
290   * @param conf configuration to check
291   * @param name configuration property name
292   * @param defaultValue default value if no value found for name
293   * @return list of strings from comma-delimited configuration value, or an
294   *     empty list if not found
295   */
296  private static List<String> getTrimmedStringList(Configuration conf,
297      String name, String defaultValue) {
298    String valueString = conf.get(name, defaultValue);
299    if (valueString == null) {
300      return new ArrayList<>();
301    }
302    return new ArrayList<>(StringUtils.getTrimmedStringCollection(valueString));
303  }
304
305  @Override
306  public URI getCanonicalUri() {
307    return super.getCanonicalUri();
308  }
309
310  /** Is WebHDFS enabled in conf? */
311  public static boolean isEnabled(final Configuration conf) {
312    final boolean b = conf.getBoolean(
313        HdfsClientConfigKeys.DFS_WEBHDFS_ENABLED_KEY,
314        HdfsClientConfigKeys.DFS_WEBHDFS_ENABLED_DEFAULT);
315    return b;
316  }
317
318  TokenSelector<DelegationTokenIdentifier> tokenSelector =
319      new AbstractDelegationTokenSelector<DelegationTokenIdentifier>(getTokenKind()){};
320
321  // the first getAuthParams() for a non-token op will either get the
322  // internal token from the ugi or lazy fetch one
323  protected synchronized Token<?> getDelegationToken() throws IOException {
324    if (canRefreshDelegationToken && delegationToken == null) {
325      Token<?> token = tokenSelector.selectToken(
326          new Text(getCanonicalServiceName()), ugi.getTokens());
327      // ugi tokens are usually indicative of a task which can't
328      // refetch tokens.  even if ugi has credentials, don't attempt
329      // to get another token to match hdfs/rpc behavior
330      if (token != null) {
331        LOG.debug("Using UGI token: {}", token);
332        canRefreshDelegationToken = false;
333      } else {
334        token = getDelegationToken(null);
335        if (token != null) {
336          LOG.debug("Fetched new token: {}", token);
337        } else { // security is disabled
338          canRefreshDelegationToken = false;
339        }
340      }
341      setDelegationToken(token);
342    }
343    return delegationToken;
344  }
345
346  @VisibleForTesting
347  synchronized boolean replaceExpiredDelegationToken() throws IOException {
348    boolean replaced = false;
349    if (canRefreshDelegationToken) {
350      Token<?> token = getDelegationToken(null);
351      LOG.debug("Replaced expired token: {}", token);
352      setDelegationToken(token);
353      replaced = (token != null);
354    }
355    return replaced;
356  }
357
358  @Override
359  @VisibleForTesting
360  public int getDefaultPort() {
361    return getConf().getInt(HdfsClientConfigKeys.DFS_NAMENODE_HTTP_PORT_KEY,
362        HdfsClientConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT);
363  }
364
365  @Override
366  public URI getUri() {
367    return this.uri;
368  }
369
370  @Override
371  protected URI canonicalizeUri(URI uri) {
372    return NetUtils.getCanonicalUri(uri, getDefaultPort());
373  }
374
375  /** @return the home directory */
376  @Deprecated
377  public static String getHomeDirectoryString(final UserGroupInformation ugi) {
378    return "/user/" + ugi.getShortUserName();
379  }
380
381  @Override
382  public Path getHomeDirectory() {
383    if (cachedHomeDirectory == null) {
384      final HttpOpParam.Op op = GetOpParam.Op.GETHOMEDIRECTORY;
385      try {
386        String pathFromDelegatedFS = new FsPathResponseRunner<String>(op, null,
387            new UserParam(ugi)) {
388          @Override
389          String decodeResponse(Map<?, ?> json) throws IOException {
390            return JsonUtilClient.getPath(json);
391          }
392        }   .run();
393
394        cachedHomeDirectory = new Path(pathFromDelegatedFS).makeQualified(
395            this.getUri(), null);
396
397      } catch (IOException e) {
398        LOG.error("Unable to get HomeDirectory from original File System", e);
399        cachedHomeDirectory = new Path("/user/" + ugi.getShortUserName())
400            .makeQualified(this.getUri(), null);
401      }
402    }
403    return cachedHomeDirectory;
404  }
405
406  @Override
407  public synchronized Path getWorkingDirectory() {
408    return workingDir;
409  }
410
411  @Override
412  public synchronized void setWorkingDirectory(final Path dir) {
413    Path absolutePath = makeAbsolute(dir);
414    String result = absolutePath.toUri().getPath();
415    if (!DFSUtilClient.isValidName(result)) {
416      throw new IllegalArgumentException("Invalid DFS directory name " +
417          result);
418    }
419    workingDir = absolutePath;
420  }
421
422  private Path makeAbsolute(Path f) {
423    return f.isAbsolute()? f: new Path(workingDir, f);
424  }
425
426  static Map<?, ?> jsonParse(final HttpURLConnection c,
427      final boolean useErrorStream) throws IOException {
428    if (c.getContentLength() == 0) {
429      return null;
430    }
431    final InputStream in = useErrorStream ?
432        c.getErrorStream() : c.getInputStream();
433    if (in == null) {
434      throw new IOException("The " + (useErrorStream? "error": "input") +
435          " stream is null.");
436    }
437    try {
438      final String contentType = c.getContentType();
439      if (contentType != null) {
440        final MediaType parsed = MediaType.valueOf(contentType);
441        if (!MediaType.APPLICATION_JSON_TYPE.isCompatible(parsed)) {
442          throw new IOException("Content-Type \"" + contentType
443              + "\" is incompatible with \"" + MediaType.APPLICATION_JSON
444              + "\" (parsed=\"" + parsed + "\")");
445        }
446      }
447      return READER.readValue(in);
448    } finally {
449      in.close();
450    }
451  }
452
453  private static Map<?, ?> validateResponse(final HttpOpParam.Op op,
454      final HttpURLConnection conn, boolean unwrapException)
455      throws IOException {
456    final int code = conn.getResponseCode();
457    // server is demanding an authentication we don't support
458    if (code == HttpURLConnection.HTTP_UNAUTHORIZED) {
459      // match hdfs/rpc exception
460      throw new AccessControlException(conn.getResponseMessage());
461    }
462    if (code != op.getExpectedHttpResponseCode()) {
463      final Map<?, ?> m;
464      try {
465        m = jsonParse(conn, true);
466      } catch(Exception e) {
467        throw new IOException("Unexpected HTTP response: code=" + code + " != "
468            + op.getExpectedHttpResponseCode() + ", " + op.toQueryString()
469            + ", message=" + conn.getResponseMessage(), e);
470      }
471
472      if (m == null) {
473        throw new IOException("Unexpected HTTP response: code=" + code + " != "
474            + op.getExpectedHttpResponseCode() + ", " + op.toQueryString()
475            + ", message=" + conn.getResponseMessage());
476      } else if (m.get(RemoteException.class.getSimpleName()) == null) {
477        return m;
478      }
479
480      IOException re = JsonUtilClient.toRemoteException(m);
481
482      //check if exception is due to communication with a Standby name node
483      if (re.getMessage() != null && re.getMessage().endsWith(
484          StandbyException.class.getSimpleName())) {
485        LOG.trace("Detected StandbyException", re);
486        throw new IOException(re);
487      }
488      // extract UGI-related exceptions and unwrap InvalidToken
489      // the NN mangles these exceptions but the DN does not and may need
490      // to re-fetch a token if either report the token is expired
491      if (re.getMessage() != null && re.getMessage().startsWith(
492          SecurityUtil.FAILED_TO_GET_UGI_MSG_HEADER)) {
493        String[] parts = re.getMessage().split(":\\s+", 3);
494        re = new RemoteException(parts[1], parts[2]);
495        re = ((RemoteException)re).unwrapRemoteException(InvalidToken.class);
496      }
497      throw unwrapException? toIOException(re): re;
498    }
499    return null;
500  }
501
502  /**
503   * Covert an exception to an IOException.
504   *
505   * For a non-IOException, wrap it with IOException.
506   * For a RemoteException, unwrap it.
507   * For an IOException which is not a RemoteException, return it.
508   */
509  private static IOException toIOException(Exception e) {
510    if (!(e instanceof IOException)) {
511      return new IOException(e);
512    }
513
514    final IOException ioe = (IOException)e;
515    if (!(ioe instanceof RemoteException)) {
516      return ioe;
517    }
518
519    return ((RemoteException)ioe).unwrapRemoteException();
520  }
521
522  private synchronized InetSocketAddress getCurrentNNAddr() {
523    return nnAddrs[currentNNAddrIndex];
524  }
525
526  /**
527   * Reset the appropriate state to gracefully fail over to another name node
528   */
529  private synchronized void resetStateToFailOver() {
530    currentNNAddrIndex = (currentNNAddrIndex + 1) % nnAddrs.length;
531  }
532
533  /**
534   * Return a URL pointing to given path on the namenode.
535   *
536   * @param path to obtain the URL for
537   * @param query string to append to the path
538   * @return namenode URL referring to the given path
539   * @throws IOException on error constructing the URL
540   */
541  private URL getNamenodeURL(String path, String query) throws IOException {
542    InetSocketAddress nnAddr = getCurrentNNAddr();
543    final URL url = new URL(getTransportScheme(), nnAddr.getHostName(),
544        nnAddr.getPort(), path + '?' + query);
545    LOG.trace("url={}", url);
546    return url;
547  }
548
549  Param<?,?>[] getAuthParameters(final HttpOpParam.Op op) throws IOException {
550    List<Param<?,?>> authParams = Lists.newArrayList();
551    // Skip adding delegation token for token operations because these
552    // operations require authentication.
553    Token<?> token = null;
554    if (!op.getRequireAuth()) {
555      token = getDelegationToken();
556    }
557    if (token != null) {
558      authParams.add(new DelegationParam(token.encodeToUrlString()));
559    } else {
560      UserGroupInformation userUgi = ugi;
561      UserGroupInformation realUgi = userUgi.getRealUser();
562      if (realUgi != null) { // proxy user
563        authParams.add(new DoAsParam(userUgi.getShortUserName()));
564        userUgi = realUgi;
565      }
566      authParams.add(new UserParam(userUgi.getShortUserName()));
567    }
568    return authParams.toArray(new Param<?,?>[0]);
569  }
570
571  URL toUrl(final HttpOpParam.Op op, final Path fspath,
572      final Param<?,?>... parameters) throws IOException {
573    //initialize URI path and query
574    final String path = PATH_PREFIX
575        + (fspath == null? "/": makeQualified(fspath).toUri().getRawPath());
576    final String query = op.toQueryString()
577        + Param.toSortedString("&", getAuthParameters(op))
578        + Param.toSortedString("&", parameters);
579    final URL url = getNamenodeURL(path, query);
580    LOG.trace("url={}", url);
581    return url;
582  }
583
584  /**
585   * This class is for initialing a HTTP connection, connecting to server,
586   * obtaining a response, and also handling retry on failures.
587   */
588  abstract class AbstractRunner<T> {
589    abstract protected URL getUrl() throws IOException;
590
591    protected final HttpOpParam.Op op;
592    private final boolean redirected;
593    protected ExcludeDatanodesParam excludeDatanodes =
594        new ExcludeDatanodesParam("");
595
596    private boolean checkRetry;
597    private String redirectHost;
598
599    protected AbstractRunner(final HttpOpParam.Op op, boolean redirected) {
600      this.op = op;
601      this.redirected = redirected;
602    }
603
604    T run() throws IOException {
605      UserGroupInformation connectUgi = ugi.getRealUser();
606      if (connectUgi == null) {
607        connectUgi = ugi;
608      }
609      if (op.getRequireAuth()) {
610        connectUgi.checkTGTAndReloginFromKeytab();
611      }
612      try {
613        // the entire lifecycle of the connection must be run inside the
614        // doAs to ensure authentication is performed correctly
615        return connectUgi.doAs(
616            new PrivilegedExceptionAction<T>() {
617              @Override
618              public T run() throws IOException {
619                return runWithRetry();
620              }
621            });
622      } catch (InterruptedException e) {
623        throw new IOException(e);
624      }
625    }
626
627    /**
628     * Two-step requests redirected to a DN
629     *
630     * Create/Append:
631     * Step 1) Submit a Http request with neither auto-redirect nor data.
632     * Step 2) Submit another Http request with the URL from the Location header
633     * with data.
634     *
635     * The reason of having two-step create/append is for preventing clients to
636     * send out the data before the redirect. This issue is addressed by the
637     * "Expect: 100-continue" header in HTTP/1.1; see RFC 2616, Section 8.2.3.
638     * Unfortunately, there are software library bugs (e.g. Jetty 6 http server
639     * and Java 6 http client), which do not correctly implement "Expect:
640     * 100-continue". The two-step create/append is a temporary workaround for
641     * the software library bugs.
642     *
643     * Open/Checksum
644     * Also implements two-step connects for other operations redirected to
645     * a DN such as open and checksum
646     */
647    protected HttpURLConnection connect(URL url) throws IOException {
648      //redirect hostname and port
649      redirectHost = null;
650
651
652      // resolve redirects for a DN operation unless already resolved
653      if (op.getRedirect() && !redirected) {
654        final HttpOpParam.Op redirectOp =
655            HttpOpParam.TemporaryRedirectOp.valueOf(op);
656        final HttpURLConnection conn = connect(redirectOp, url);
657        // application level proxy like httpfs might not issue a redirect
658        if (conn.getResponseCode() == op.getExpectedHttpResponseCode()) {
659          return conn;
660        }
661        try {
662          validateResponse(redirectOp, conn, false);
663          url = new URL(conn.getHeaderField("Location"));
664          redirectHost = url.getHost() + ":" + url.getPort();
665        } finally {
666          // TODO: consider not calling conn.disconnect() to allow connection reuse
667          // See http://tinyurl.com/java7-http-keepalive
668          conn.disconnect();
669        }
670      }
671      try {
672        return connect(op, url);
673      } catch (IOException ioe) {
674        if (redirectHost != null) {
675          if (excludeDatanodes.getValue() != null) {
676            excludeDatanodes = new ExcludeDatanodesParam(redirectHost + ","
677                + excludeDatanodes.getValue());
678          } else {
679            excludeDatanodes = new ExcludeDatanodesParam(redirectHost);
680          }
681        }
682        throw ioe;
683      }
684    }
685
686    private HttpURLConnection connect(final HttpOpParam.Op op, final URL url)
687        throws IOException {
688      final HttpURLConnection conn =
689          (HttpURLConnection)connectionFactory.openConnection(url);
690      final boolean doOutput = op.getDoOutput();
691      conn.setRequestMethod(op.getType().toString());
692      conn.setInstanceFollowRedirects(false);
693      if (restCsrfCustomHeader != null &&
694          !restCsrfMethodsToIgnore.contains(op.getType().name())) {
695        // The value of the header is unimportant.  Only its presence matters.
696        conn.setRequestProperty(restCsrfCustomHeader, "\"\"");
697      }
698      switch (op.getType()) {
699      // if not sending a message body for a POST or PUT operation, need
700      // to ensure the server/proxy knows this
701      case POST:
702      case PUT: {
703        conn.setDoOutput(true);
704        if (!doOutput) {
705          // explicitly setting content-length to 0 won't do spnego!!
706          // opening and closing the stream will send "Content-Length: 0"
707          conn.getOutputStream().close();
708        } else {
709          conn.setRequestProperty("Content-Type",
710              MediaType.APPLICATION_OCTET_STREAM);
711          conn.setChunkedStreamingMode(32 << 10); //32kB-chunk
712        }
713        break;
714      }
715      default:
716        conn.setDoOutput(doOutput);
717        break;
718      }
719      conn.connect();
720      return conn;
721    }
722
723    private T runWithRetry() throws IOException {
724      /**
725       * Do the real work.
726       *
727       * There are three cases that the code inside the loop can throw an
728       * IOException:
729       *
730       * <ul>
731       * <li>The connection has failed (e.g., ConnectException,
732       * @see FailoverOnNetworkExceptionRetry for more details)</li>
733       * <li>The namenode enters the standby state (i.e., StandbyException).</li>
734       * <li>The server returns errors for the command (i.e., RemoteException)</li>
735       * </ul>
736       *
737       * The call to shouldRetry() will conduct the retry policy. The policy
738       * examines the exception and swallows it if it decides to rerun the work.
739       */
740      for(int retry = 0; ; retry++) {
741        checkRetry = !redirected;
742        final URL url = getUrl();
743        try {
744          final HttpURLConnection conn = connect(url);
745          // output streams will validate on close
746          if (!op.getDoOutput()) {
747            validateResponse(op, conn, false);
748          }
749          return getResponse(conn);
750        } catch (AccessControlException ace) {
751          // no retries for auth failures
752          throw ace;
753        } catch (InvalidToken it) {
754          // try to replace the expired token with a new one.  the attempt
755          // to acquire a new token must be outside this operation's retry
756          // so if it fails after its own retries, this operation fails too.
757          if (op.getRequireAuth() || !replaceExpiredDelegationToken()) {
758            throw it;
759          }
760        } catch (IOException ioe) {
761          // Attempt to include the redirected node in the exception. If the
762          // attempt to recreate the exception fails, just use the original.
763          String node = redirectHost;
764          if (node == null) {
765            node = url.getAuthority();
766          }
767          try {
768            IOException newIoe = ioe.getClass().getConstructor(String.class)
769                .newInstance(node + ": " + ioe.getMessage());
770            newIoe.setStackTrace(ioe.getStackTrace());
771            ioe = newIoe;
772          } catch (NoSuchMethodException | SecurityException 
773                   | InstantiationException | IllegalAccessException
774                   | IllegalArgumentException | InvocationTargetException e) {
775          }
776          shouldRetry(ioe, retry);
777        }
778      }
779    }
780
781    private void shouldRetry(final IOException ioe, final int retry
782    ) throws IOException {
783      InetSocketAddress nnAddr = getCurrentNNAddr();
784      if (checkRetry) {
785        try {
786          final RetryPolicy.RetryAction a = retryPolicy.shouldRetry(
787              ioe, retry, 0, true);
788
789          boolean isRetry =
790              a.action == RetryPolicy.RetryAction.RetryDecision.RETRY;
791          boolean isFailoverAndRetry =
792              a.action == RetryPolicy.RetryAction.RetryDecision.FAILOVER_AND_RETRY;
793
794          if (isRetry || isFailoverAndRetry) {
795            LOG.info("Retrying connect to namenode: {}. Already tried {}"
796                    + " time(s); retry policy is {}, delay {}ms.",
797                nnAddr, retry, retryPolicy, a.delayMillis);
798
799            if (isFailoverAndRetry) {
800              resetStateToFailOver();
801            }
802
803            Thread.sleep(a.delayMillis);
804            return;
805          }
806        } catch(Exception e) {
807          LOG.warn("Original exception is ", ioe);
808          throw toIOException(e);
809        }
810      }
811      throw toIOException(ioe);
812    }
813
814    abstract T getResponse(HttpURLConnection conn) throws IOException;
815  }
816
817  /**
818   * Abstract base class to handle path-based operations with params
819   */
820  abstract class AbstractFsPathRunner<T> extends AbstractRunner<T> {
821    private final Path fspath;
822    private Param<?,?>[] parameters;
823
824    AbstractFsPathRunner(final HttpOpParam.Op op, final Path fspath,
825        Param<?,?>... parameters) {
826      super(op, false);
827      this.fspath = fspath;
828      this.parameters = parameters;
829    }
830
831    AbstractFsPathRunner(final HttpOpParam.Op op, Param<?,?>[] parameters,
832        final Path fspath) {
833      super(op, false);
834      this.fspath = fspath;
835      this.parameters = parameters;
836    }
837
838    protected void updateURLParameters(Param<?, ?>... p) {
839      this.parameters = p;
840    }
841
842    @Override
843    protected URL getUrl() throws IOException {
844      if (excludeDatanodes.getValue() != null) {
845        Param<?, ?>[] tmpParam = new Param<?, ?>[parameters.length + 1];
846        System.arraycopy(parameters, 0, tmpParam, 0, parameters.length);
847        tmpParam[parameters.length] = excludeDatanodes;
848        return toUrl(op, fspath, tmpParam);
849      } else {
850        return toUrl(op, fspath, parameters);
851      }
852    }
853  }
854
855  /**
856   * Default path-based implementation expects no json response
857   */
858  class FsPathRunner extends AbstractFsPathRunner<Void> {
859    FsPathRunner(Op op, Path fspath, Param<?,?>... parameters) {
860      super(op, fspath, parameters);
861    }
862
863    @Override
864    Void getResponse(HttpURLConnection conn) throws IOException {
865      return null;
866    }
867  }
868
869  /**
870   * Handle path-based operations with a json response
871   */
872  abstract class FsPathResponseRunner<T> extends AbstractFsPathRunner<T> {
873    FsPathResponseRunner(final HttpOpParam.Op op, final Path fspath,
874        Param<?,?>... parameters) {
875      super(op, fspath, parameters);
876    }
877
878    FsPathResponseRunner(final HttpOpParam.Op op, Param<?,?>[] parameters,
879        final Path fspath) {
880      super(op, parameters, fspath);
881    }
882
883    @Override
884    final T getResponse(HttpURLConnection conn) throws IOException {
885      try {
886        final Map<?,?> json = jsonParse(conn, false);
887        if (json == null) {
888          // match exception class thrown by parser
889          throw new IllegalStateException("Missing response");
890        }
891        return decodeResponse(json);
892      } catch (IOException ioe) {
893        throw ioe;
894      } catch (Exception e) { // catch json parser errors
895        final IOException ioe =
896            new IOException("Response decoding failure: "+e.toString(), e);
897        LOG.debug("Response decoding failure.", e);
898        throw ioe;
899      } finally {
900        // Don't call conn.disconnect() to allow connection reuse
901        // See http://tinyurl.com/java7-http-keepalive
902        conn.getInputStream().close();
903      }
904    }
905
906    abstract T decodeResponse(Map<?,?> json) throws IOException;
907  }
908
909  /**
910   * Handle path-based operations with json boolean response
911   */
912  class FsPathBooleanRunner extends FsPathResponseRunner<Boolean> {
913    FsPathBooleanRunner(Op op, Path fspath, Param<?,?>... parameters) {
914      super(op, fspath, parameters);
915    }
916
917    @Override
918    Boolean decodeResponse(Map<?,?> json) throws IOException {
919      return (Boolean)json.get("boolean");
920    }
921  }
922
923  /**
924   * Handle create/append output streams
925   */
926  class FsPathOutputStreamRunner
927      extends AbstractFsPathRunner<FSDataOutputStream> {
928    private final int bufferSize;
929
930    FsPathOutputStreamRunner(Op op, Path fspath, int bufferSize,
931        Param<?,?>... parameters) {
932      super(op, fspath, parameters);
933      this.bufferSize = bufferSize;
934    }
935
936    @Override
937    FSDataOutputStream getResponse(final HttpURLConnection conn)
938        throws IOException {
939      return new FSDataOutputStream(new BufferedOutputStream(
940          conn.getOutputStream(), bufferSize), statistics) {
941        @Override
942        public void close() throws IOException {
943          try {
944            super.close();
945          } finally {
946            try {
947              validateResponse(op, conn, true);
948            } finally {
949              // This is a connection to DataNode.  Let's disconnect since
950              // there is little chance that the connection will be reused
951              // any time soonl
952              conn.disconnect();
953            }
954          }
955        }
956      };
957    }
958  }
959
960  class FsPathConnectionRunner extends AbstractFsPathRunner<HttpURLConnection> {
961    FsPathConnectionRunner(Op op, Path fspath, Param<?,?>... parameters) {
962      super(op, fspath, parameters);
963    }
964    @Override
965    HttpURLConnection getResponse(final HttpURLConnection conn)
966        throws IOException {
967      return conn;
968    }
969  }
970
971  /**
972   * Used by open() which tracks the resolved url itself
973   */
974  final class URLRunner extends AbstractRunner<HttpURLConnection> {
975    private final URL url;
976    @Override
977    protected URL getUrl() {
978      return url;
979    }
980
981    protected URLRunner(final HttpOpParam.Op op, final URL url,
982        boolean redirected) {
983      super(op, redirected);
984      this.url = url;
985    }
986
987    @Override
988    HttpURLConnection getResponse(HttpURLConnection conn) throws IOException {
989      return conn;
990    }
991  }
992
993  private FsPermission applyUMask(FsPermission permission) {
994    if (permission == null) {
995      permission = FsPermission.getDefault();
996    }
997    return permission.applyUMask(FsPermission.getUMask(getConf()));
998  }
999
1000  private HdfsFileStatus getHdfsFileStatus(Path f) throws IOException {
1001    final HttpOpParam.Op op = GetOpParam.Op.GETFILESTATUS;
1002    HdfsFileStatus status = new FsPathResponseRunner<HdfsFileStatus>(op, f) {
1003      @Override
1004      HdfsFileStatus decodeResponse(Map<?,?> json) {
1005        return JsonUtilClient.toFileStatus(json, true);
1006      }
1007    }.run();
1008    if (status == null) {
1009      throw new FileNotFoundException("File does not exist: " + f);
1010    }
1011    return status;
1012  }
1013
1014  @Override
1015  public FileStatus getFileStatus(Path f) throws IOException {
1016    statistics.incrementReadOps(1);
1017    storageStatistics.incrementOpCounter(OpType.GET_FILE_STATUS);
1018    return makeQualified(getHdfsFileStatus(f), f);
1019  }
1020
1021  private FileStatus makeQualified(HdfsFileStatus f, Path parent) {
1022    return new FileStatus(f.getLen(), f.isDir(), f.getReplication(),
1023        f.getBlockSize(), f.getModificationTime(), f.getAccessTime(),
1024        f.getPermission(), f.getOwner(), f.getGroup(),
1025        f.isSymlink() ? new Path(f.getSymlink()) : null,
1026        f.getFullPath(parent).makeQualified(getUri(), getWorkingDirectory()));
1027  }
1028
1029  @Override
1030  public AclStatus getAclStatus(Path f) throws IOException {
1031    final HttpOpParam.Op op = GetOpParam.Op.GETACLSTATUS;
1032    AclStatus status = new FsPathResponseRunner<AclStatus>(op, f) {
1033      @Override
1034      AclStatus decodeResponse(Map<?,?> json) {
1035        return JsonUtilClient.toAclStatus(json);
1036      }
1037    }.run();
1038    if (status == null) {
1039      throw new FileNotFoundException("File does not exist: " + f);
1040    }
1041    return status;
1042  }
1043
1044  @Override
1045  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
1046    statistics.incrementWriteOps(1);
1047    storageStatistics.incrementOpCounter(OpType.MKDIRS);
1048    final HttpOpParam.Op op = PutOpParam.Op.MKDIRS;
1049    return new FsPathBooleanRunner(op, f,
1050        new PermissionParam(applyUMask(permission))
1051    ).run();
1052  }
1053
1054  /**
1055   * Create a symlink pointing to the destination path.
1056   */
1057  public void createSymlink(Path destination, Path f, boolean createParent
1058  ) throws IOException {
1059    statistics.incrementWriteOps(1);
1060    storageStatistics.incrementOpCounter(OpType.CREATE_SYM_LINK);
1061    final HttpOpParam.Op op = PutOpParam.Op.CREATESYMLINK;
1062    new FsPathRunner(op, f,
1063        new DestinationParam(makeQualified(destination).toUri().getPath()),
1064        new CreateParentParam(createParent)
1065    ).run();
1066  }
1067
1068  @Override
1069  public boolean rename(final Path src, final Path dst) throws IOException {
1070    statistics.incrementWriteOps(1);
1071    storageStatistics.incrementOpCounter(OpType.RENAME);
1072    final HttpOpParam.Op op = PutOpParam.Op.RENAME;
1073    return new FsPathBooleanRunner(op, src,
1074        new DestinationParam(makeQualified(dst).toUri().getPath())
1075    ).run();
1076  }
1077
1078  @SuppressWarnings("deprecation")
1079  @Override
1080  public void rename(final Path src, final Path dst,
1081      final Options.Rename... options) throws IOException {
1082    statistics.incrementWriteOps(1);
1083    storageStatistics.incrementOpCounter(OpType.RENAME);
1084    final HttpOpParam.Op op = PutOpParam.Op.RENAME;
1085    new FsPathRunner(op, src,
1086        new DestinationParam(makeQualified(dst).toUri().getPath()),
1087        new RenameOptionSetParam(options)
1088    ).run();
1089  }
1090
1091  @Override
1092  public void setXAttr(Path p, String name, byte[] value,
1093      EnumSet<XAttrSetFlag> flag) throws IOException {
1094    statistics.incrementWriteOps(1);
1095    storageStatistics.incrementOpCounter(OpType.SET_XATTR);
1096    final HttpOpParam.Op op = PutOpParam.Op.SETXATTR;
1097    if (value != null) {
1098      new FsPathRunner(op, p, new XAttrNameParam(name), new XAttrValueParam(
1099          XAttrCodec.encodeValue(value, XAttrCodec.HEX)),
1100          new XAttrSetFlagParam(flag)).run();
1101    } else {
1102      new FsPathRunner(op, p, new XAttrNameParam(name),
1103          new XAttrSetFlagParam(flag)).run();
1104    }
1105  }
1106
1107  @Override
1108  public byte[] getXAttr(Path p, final String name) throws IOException {
1109    statistics.incrementReadOps(1);
1110    storageStatistics.incrementOpCounter(OpType.GET_XATTR);
1111    final HttpOpParam.Op op = GetOpParam.Op.GETXATTRS;
1112    return new FsPathResponseRunner<byte[]>(op, p, new XAttrNameParam(name),
1113        new XAttrEncodingParam(XAttrCodec.HEX)) {
1114      @Override
1115      byte[] decodeResponse(Map<?, ?> json) throws IOException {
1116        return JsonUtilClient.getXAttr(json);
1117      }
1118    }.run();
1119  }
1120
1121  @Override
1122  public Map<String, byte[]> getXAttrs(Path p) throws IOException {
1123    final HttpOpParam.Op op = GetOpParam.Op.GETXATTRS;
1124    return new FsPathResponseRunner<Map<String, byte[]>>(op, p,
1125        new XAttrEncodingParam(XAttrCodec.HEX)) {
1126      @Override
1127      Map<String, byte[]> decodeResponse(Map<?, ?> json) throws IOException {
1128        return JsonUtilClient.toXAttrs(json);
1129      }
1130    }.run();
1131  }
1132
1133  @Override
1134  public Map<String, byte[]> getXAttrs(Path p, final List<String> names)
1135      throws IOException {
1136    Preconditions.checkArgument(names != null && !names.isEmpty(),
1137        "XAttr names cannot be null or empty.");
1138    Param<?,?>[] parameters = new Param<?,?>[names.size() + 1];
1139    for (int i = 0; i < parameters.length - 1; i++) {
1140      parameters[i] = new XAttrNameParam(names.get(i));
1141    }
1142    parameters[parameters.length - 1] = new XAttrEncodingParam(XAttrCodec.HEX);
1143
1144    final HttpOpParam.Op op = GetOpParam.Op.GETXATTRS;
1145    return new FsPathResponseRunner<Map<String, byte[]>>(op, parameters, p) {
1146      @Override
1147      Map<String, byte[]> decodeResponse(Map<?, ?> json) throws IOException {
1148        return JsonUtilClient.toXAttrs(json);
1149      }
1150    }.run();
1151  }
1152
1153  @Override
1154  public List<String> listXAttrs(Path p) throws IOException {
1155    final HttpOpParam.Op op = GetOpParam.Op.LISTXATTRS;
1156    return new FsPathResponseRunner<List<String>>(op, p) {
1157      @Override
1158      List<String> decodeResponse(Map<?, ?> json) throws IOException {
1159        return JsonUtilClient.toXAttrNames(json);
1160      }
1161    }.run();
1162  }
1163
1164  @Override
1165  public void removeXAttr(Path p, String name) throws IOException {
1166    statistics.incrementWriteOps(1);
1167    storageStatistics.incrementOpCounter(OpType.REMOVE_XATTR);
1168    final HttpOpParam.Op op = PutOpParam.Op.REMOVEXATTR;
1169    new FsPathRunner(op, p, new XAttrNameParam(name)).run();
1170  }
1171
1172  @Override
1173  public void setOwner(final Path p, final String owner, final String group
1174  ) throws IOException {
1175    if (owner == null && group == null) {
1176      throw new IOException("owner == null && group == null");
1177    }
1178
1179    statistics.incrementWriteOps(1);
1180    storageStatistics.incrementOpCounter(OpType.SET_OWNER);
1181    final HttpOpParam.Op op = PutOpParam.Op.SETOWNER;
1182    new FsPathRunner(op, p,
1183        new OwnerParam(owner), new GroupParam(group)
1184    ).run();
1185  }
1186
1187  @Override
1188  public void setPermission(final Path p, final FsPermission permission
1189  ) throws IOException {
1190    statistics.incrementWriteOps(1);
1191    storageStatistics.incrementOpCounter(OpType.SET_PERMISSION);
1192    final HttpOpParam.Op op = PutOpParam.Op.SETPERMISSION;
1193    new FsPathRunner(op, p,new PermissionParam(permission)).run();
1194  }
1195
1196  @Override
1197  public void modifyAclEntries(Path path, List<AclEntry> aclSpec)
1198      throws IOException {
1199    statistics.incrementWriteOps(1);
1200    storageStatistics.incrementOpCounter(OpType.MODIFY_ACL_ENTRIES);
1201    final HttpOpParam.Op op = PutOpParam.Op.MODIFYACLENTRIES;
1202    new FsPathRunner(op, path, new AclPermissionParam(aclSpec)).run();
1203  }
1204
1205  @Override
1206  public void removeAclEntries(Path path, List<AclEntry> aclSpec)
1207      throws IOException {
1208    statistics.incrementWriteOps(1);
1209    storageStatistics.incrementOpCounter(OpType.REMOVE_ACL_ENTRIES);
1210    final HttpOpParam.Op op = PutOpParam.Op.REMOVEACLENTRIES;
1211    new FsPathRunner(op, path, new AclPermissionParam(aclSpec)).run();
1212  }
1213
1214  @Override
1215  public void removeDefaultAcl(Path path) throws IOException {
1216    statistics.incrementWriteOps(1);
1217    storageStatistics.incrementOpCounter(OpType.REMOVE_DEFAULT_ACL);
1218    final HttpOpParam.Op op = PutOpParam.Op.REMOVEDEFAULTACL;
1219    new FsPathRunner(op, path).run();
1220  }
1221
1222  @Override
1223  public void removeAcl(Path path) throws IOException {
1224    statistics.incrementWriteOps(1);
1225    storageStatistics.incrementOpCounter(OpType.REMOVE_ACL);
1226    final HttpOpParam.Op op = PutOpParam.Op.REMOVEACL;
1227    new FsPathRunner(op, path).run();
1228  }
1229
1230  @Override
1231  public void setAcl(final Path p, final List<AclEntry> aclSpec)
1232      throws IOException {
1233    statistics.incrementWriteOps(1);
1234    storageStatistics.incrementOpCounter(OpType.SET_ACL);
1235    final HttpOpParam.Op op = PutOpParam.Op.SETACL;
1236    new FsPathRunner(op, p, new AclPermissionParam(aclSpec)).run();
1237  }
1238
1239  public void allowSnapshot(final Path p) throws IOException {
1240    statistics.incrementWriteOps(1);
1241    final HttpOpParam.Op op = PutOpParam.Op.ALLOWSNAPSHOT;
1242    new FsPathRunner(op, p).run();
1243  }
1244
1245  @Override
1246  public Path createSnapshot(final Path path, final String snapshotName)
1247      throws IOException {
1248    statistics.incrementWriteOps(1);
1249    storageStatistics.incrementOpCounter(OpType.CREATE_SNAPSHOT);
1250    final HttpOpParam.Op op = PutOpParam.Op.CREATESNAPSHOT;
1251    return new FsPathResponseRunner<Path>(op, path,
1252        new SnapshotNameParam(snapshotName)) {
1253      @Override
1254      Path decodeResponse(Map<?,?> json) {
1255        return new Path((String) json.get(Path.class.getSimpleName()));
1256      }
1257    }.run();
1258  }
1259
1260  public void disallowSnapshot(final Path p) throws IOException {
1261    statistics.incrementWriteOps(1);
1262    final HttpOpParam.Op op = PutOpParam.Op.DISALLOWSNAPSHOT;
1263    new FsPathRunner(op, p).run();
1264  }
1265
1266  @Override
1267  public void deleteSnapshot(final Path path, final String snapshotName)
1268      throws IOException {
1269    statistics.incrementWriteOps(1);
1270    storageStatistics.incrementOpCounter(OpType.DELETE_SNAPSHOT);
1271    final HttpOpParam.Op op = DeleteOpParam.Op.DELETESNAPSHOT;
1272    new FsPathRunner(op, path, new SnapshotNameParam(snapshotName)).run();
1273  }
1274
1275  @Override
1276  public void renameSnapshot(final Path path, final String snapshotOldName,
1277      final String snapshotNewName) throws IOException {
1278    statistics.incrementWriteOps(1);
1279    storageStatistics.incrementOpCounter(OpType.RENAME_SNAPSHOT);
1280    final HttpOpParam.Op op = PutOpParam.Op.RENAMESNAPSHOT;
1281    new FsPathRunner(op, path, new OldSnapshotNameParam(snapshotOldName),
1282        new SnapshotNameParam(snapshotNewName)).run();
1283  }
1284
1285  @Override
1286  public boolean setReplication(final Path p, final short replication
1287  ) throws IOException {
1288    statistics.incrementWriteOps(1);
1289    storageStatistics.incrementOpCounter(OpType.SET_REPLICATION);
1290    final HttpOpParam.Op op = PutOpParam.Op.SETREPLICATION;
1291    return new FsPathBooleanRunner(op, p,
1292        new ReplicationParam(replication)
1293    ).run();
1294  }
1295
1296  @Override
1297  public void setTimes(final Path p, final long mtime, final long atime
1298  ) throws IOException {
1299    statistics.incrementWriteOps(1);
1300    storageStatistics.incrementOpCounter(OpType.SET_TIMES);
1301    final HttpOpParam.Op op = PutOpParam.Op.SETTIMES;
1302    new FsPathRunner(op, p,
1303        new ModificationTimeParam(mtime),
1304        new AccessTimeParam(atime)
1305    ).run();
1306  }
1307
1308  @Override
1309  public long getDefaultBlockSize() {
1310    return getConf().getLongBytes(HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY,
1311        HdfsClientConfigKeys.DFS_BLOCK_SIZE_DEFAULT);
1312  }
1313
1314  @Override
1315  public short getDefaultReplication() {
1316    return (short)getConf().getInt(HdfsClientConfigKeys.DFS_REPLICATION_KEY,
1317        HdfsClientConfigKeys.DFS_REPLICATION_DEFAULT);
1318  }
1319
1320  @Override
1321  public void concat(final Path trg, final Path [] srcs) throws IOException {
1322    statistics.incrementWriteOps(1);
1323    storageStatistics.incrementOpCounter(OpType.CONCAT);
1324    final HttpOpParam.Op op = PostOpParam.Op.CONCAT;
1325    new FsPathRunner(op, trg, new ConcatSourcesParam(srcs)).run();
1326  }
1327
1328  @Override
1329  public FSDataOutputStream create(final Path f, final FsPermission permission,
1330      final boolean overwrite, final int bufferSize, final short replication,
1331      final long blockSize, final Progressable progress) throws IOException {
1332    statistics.incrementWriteOps(1);
1333    storageStatistics.incrementOpCounter(OpType.CREATE);
1334
1335    final HttpOpParam.Op op = PutOpParam.Op.CREATE;
1336    return new FsPathOutputStreamRunner(op, f, bufferSize,
1337        new PermissionParam(applyUMask(permission)),
1338        new OverwriteParam(overwrite),
1339        new BufferSizeParam(bufferSize),
1340        new ReplicationParam(replication),
1341        new BlockSizeParam(blockSize)
1342    ).run();
1343  }
1344
1345  @Override
1346  public FSDataOutputStream createNonRecursive(final Path f,
1347      final FsPermission permission, final EnumSet<CreateFlag> flag,
1348      final int bufferSize, final short replication, final long blockSize,
1349      final Progressable progress) throws IOException {
1350    statistics.incrementWriteOps(1);
1351    storageStatistics.incrementOpCounter(OpType.CREATE_NON_RECURSIVE);
1352
1353    final HttpOpParam.Op op = PutOpParam.Op.CREATE;
1354    return new FsPathOutputStreamRunner(op, f, bufferSize,
1355        new PermissionParam(applyUMask(permission)),
1356        new CreateFlagParam(flag),
1357        new CreateParentParam(false),
1358        new BufferSizeParam(bufferSize),
1359        new ReplicationParam(replication),
1360        new BlockSizeParam(blockSize)
1361    ).run();
1362  }
1363
1364  @Override
1365  public FSDataOutputStream append(final Path f, final int bufferSize,
1366      final Progressable progress) throws IOException {
1367    statistics.incrementWriteOps(1);
1368    storageStatistics.incrementOpCounter(OpType.APPEND);
1369
1370    final HttpOpParam.Op op = PostOpParam.Op.APPEND;
1371    return new FsPathOutputStreamRunner(op, f, bufferSize,
1372        new BufferSizeParam(bufferSize)
1373    ).run();
1374  }
1375
1376  @Override
1377  public boolean truncate(Path f, long newLength) throws IOException {
1378    statistics.incrementWriteOps(1);
1379    storageStatistics.incrementOpCounter(OpType.TRUNCATE);
1380
1381    final HttpOpParam.Op op = PostOpParam.Op.TRUNCATE;
1382    return new FsPathBooleanRunner(op, f, new NewLengthParam(newLength)).run();
1383  }
1384
1385  @Override
1386  public boolean delete(Path f, boolean recursive) throws IOException {
1387    statistics.incrementWriteOps(1);
1388    storageStatistics.incrementOpCounter(OpType.DELETE);
1389    final HttpOpParam.Op op = DeleteOpParam.Op.DELETE;
1390    return new FsPathBooleanRunner(op, f,
1391        new RecursiveParam(recursive)
1392    ).run();
1393  }
1394
1395  @Override
1396  public FSDataInputStream open(final Path f, final int bufferSize
1397  ) throws IOException {
1398    statistics.incrementReadOps(1);
1399    storageStatistics.incrementOpCounter(OpType.OPEN);
1400    return new FSDataInputStream(new WebHdfsInputStream(f, bufferSize));
1401  }
1402
1403  @Override
1404  public synchronized void close() throws IOException {
1405    try {
1406      if (canRefreshDelegationToken && delegationToken != null) {
1407        cancelDelegationToken(delegationToken);
1408      }
1409    } catch (IOException ioe) {
1410      LOG.debug("Token cancel failed: ", ioe);
1411    } finally {
1412      super.close();
1413    }
1414  }
1415
1416  // use FsPathConnectionRunner to ensure retries for InvalidTokens
1417  class UnresolvedUrlOpener extends ByteRangeInputStream.URLOpener {
1418    private final FsPathConnectionRunner runner;
1419    UnresolvedUrlOpener(FsPathConnectionRunner runner) {
1420      super(null);
1421      this.runner = runner;
1422    }
1423
1424    @Override
1425    protected HttpURLConnection connect(long offset, boolean resolved)
1426        throws IOException {
1427      assert offset == 0;
1428      HttpURLConnection conn = runner.run();
1429      setURL(conn.getURL());
1430      return conn;
1431    }
1432  }
1433
1434  class OffsetUrlOpener extends ByteRangeInputStream.URLOpener {
1435    OffsetUrlOpener(final URL url) {
1436      super(url);
1437    }
1438
1439    /** Setup offset url and connect. */
1440    @Override
1441    protected HttpURLConnection connect(final long offset,
1442        final boolean resolved) throws IOException {
1443      final URL offsetUrl = offset == 0L? url
1444          : new URL(url + "&" + new OffsetParam(offset));
1445      return new URLRunner(GetOpParam.Op.OPEN, offsetUrl, resolved).run();
1446    }
1447  }
1448
1449  private static final String OFFSET_PARAM_PREFIX = OffsetParam.NAME + "=";
1450
1451  /** Remove offset parameter, if there is any, from the url */
1452  static URL removeOffsetParam(final URL url) throws MalformedURLException {
1453    String query = url.getQuery();
1454    if (query == null) {
1455      return url;
1456    }
1457    final String lower = StringUtils.toLowerCase(query);
1458    if (!lower.startsWith(OFFSET_PARAM_PREFIX)
1459        && !lower.contains("&" + OFFSET_PARAM_PREFIX)) {
1460      return url;
1461    }
1462
1463    //rebuild query
1464    StringBuilder b = null;
1465    for(final StringTokenizer st = new StringTokenizer(query, "&");
1466        st.hasMoreTokens();) {
1467      final String token = st.nextToken();
1468      if (!StringUtils.toLowerCase(token).startsWith(OFFSET_PARAM_PREFIX)) {
1469        if (b == null) {
1470          b = new StringBuilder("?").append(token);
1471        } else {
1472          b.append('&').append(token);
1473        }
1474      }
1475    }
1476    query = b == null? "": b.toString();
1477
1478    final String urlStr = url.toString();
1479    return new URL(urlStr.substring(0, urlStr.indexOf('?')) + query);
1480  }
1481
1482  static class OffsetUrlInputStream extends ByteRangeInputStream {
1483    OffsetUrlInputStream(UnresolvedUrlOpener o, OffsetUrlOpener r)
1484        throws IOException {
1485      super(o, r);
1486    }
1487
1488    /** Remove offset parameter before returning the resolved url. */
1489    @Override
1490    protected URL getResolvedUrl(final HttpURLConnection connection
1491    ) throws MalformedURLException {
1492      return removeOffsetParam(connection.getURL());
1493    }
1494  }
1495
1496  @Override
1497  public FileStatus[] listStatus(final Path f) throws IOException {
1498    statistics.incrementReadOps(1);
1499    storageStatistics.incrementOpCounter(OpType.LIST_STATUS);
1500
1501    final HttpOpParam.Op op = GetOpParam.Op.LISTSTATUS;
1502    return new FsPathResponseRunner<FileStatus[]>(op, f) {
1503      @Override
1504      FileStatus[] decodeResponse(Map<?,?> json) {
1505        final Map<?, ?> rootmap =
1506            (Map<?, ?>)json.get(FileStatus.class.getSimpleName() + "es");
1507        final List<?> array = JsonUtilClient.getList(rootmap,
1508            FileStatus.class.getSimpleName());
1509
1510        //convert FileStatus
1511        assert array != null;
1512        final FileStatus[] statuses = new FileStatus[array.size()];
1513        int i = 0;
1514        for (Object object : array) {
1515          final Map<?, ?> m = (Map<?, ?>) object;
1516          statuses[i++] = makeQualified(JsonUtilClient.toFileStatus(m, false),
1517              f);
1518        }
1519        return statuses;
1520      }
1521    }.run();
1522  }
1523
1524  @Override
1525  public Token<DelegationTokenIdentifier> getDelegationToken(
1526      final String renewer) throws IOException {
1527    final HttpOpParam.Op op = GetOpParam.Op.GETDELEGATIONTOKEN;
1528    Token<DelegationTokenIdentifier> token =
1529        new FsPathResponseRunner<Token<DelegationTokenIdentifier>>(
1530            op, null, new RenewerParam(renewer)) {
1531          @Override
1532          Token<DelegationTokenIdentifier> decodeResponse(Map<?,?> json)
1533              throws IOException {
1534            return JsonUtilClient.toDelegationToken(json);
1535          }
1536        }.run();
1537    if (token != null) {
1538      token.setService(tokenServiceName);
1539    } else {
1540      if (disallowFallbackToInsecureCluster) {
1541        throw new AccessControlException(CANT_FALLBACK_TO_INSECURE_MSG);
1542      }
1543    }
1544    return token;
1545  }
1546
1547  @Override
1548  public synchronized Token<?> getRenewToken() {
1549    return delegationToken;
1550  }
1551
1552  @Override
1553  public <T extends TokenIdentifier> void setDelegationToken(
1554      final Token<T> token) {
1555    synchronized (this) {
1556      delegationToken = token;
1557    }
1558  }
1559
1560  @Override
1561  public synchronized long renewDelegationToken(final Token<?> token
1562  ) throws IOException {
1563    final HttpOpParam.Op op = PutOpParam.Op.RENEWDELEGATIONTOKEN;
1564    return new FsPathResponseRunner<Long>(op, null,
1565        new TokenArgumentParam(token.encodeToUrlString())) {
1566      @Override
1567      Long decodeResponse(Map<?,?> json) throws IOException {
1568        return ((Number) json.get("long")).longValue();
1569      }
1570    }.run();
1571  }
1572
1573  @Override
1574  public synchronized void cancelDelegationToken(final Token<?> token
1575  ) throws IOException {
1576    final HttpOpParam.Op op = PutOpParam.Op.CANCELDELEGATIONTOKEN;
1577    new FsPathRunner(op, null,
1578        new TokenArgumentParam(token.encodeToUrlString())
1579    ).run();
1580  }
1581
1582  @Override
1583  public BlockLocation[] getFileBlockLocations(final FileStatus status,
1584      final long offset, final long length) throws IOException {
1585    if (status == null) {
1586      return null;
1587    }
1588    return getFileBlockLocations(status.getPath(), offset, length);
1589  }
1590
1591  @Override
1592  public BlockLocation[] getFileBlockLocations(final Path p,
1593      final long offset, final long length) throws IOException {
1594    statistics.incrementReadOps(1);
1595    storageStatistics.incrementOpCounter(OpType.GET_FILE_BLOCK_LOCATIONS);
1596
1597    final HttpOpParam.Op op = GetOpParam.Op.GET_BLOCK_LOCATIONS;
1598    return new FsPathResponseRunner<BlockLocation[]>(op, p,
1599        new OffsetParam(offset), new LengthParam(length)) {
1600      @Override
1601      BlockLocation[] decodeResponse(Map<?,?> json) throws IOException {
1602        return DFSUtilClient.locatedBlocks2Locations(
1603            JsonUtilClient.toLocatedBlocks(json));
1604      }
1605    }.run();
1606  }
1607
1608  @Override
1609  public void access(final Path path, final FsAction mode) throws IOException {
1610    final HttpOpParam.Op op = GetOpParam.Op.CHECKACCESS;
1611    new FsPathRunner(op, path, new FsActionParam(mode)).run();
1612  }
1613
1614  @Override
1615  public ContentSummary getContentSummary(final Path p) throws IOException {
1616    statistics.incrementReadOps(1);
1617    storageStatistics.incrementOpCounter(OpType.GET_CONTENT_SUMMARY);
1618
1619    final HttpOpParam.Op op = GetOpParam.Op.GETCONTENTSUMMARY;
1620    return new FsPathResponseRunner<ContentSummary>(op, p) {
1621      @Override
1622      ContentSummary decodeResponse(Map<?,?> json) {
1623        return JsonUtilClient.toContentSummary(json);
1624      }
1625    }.run();
1626  }
1627
1628  @Override
1629  public MD5MD5CRC32FileChecksum getFileChecksum(final Path p
1630  ) throws IOException {
1631    statistics.incrementReadOps(1);
1632    storageStatistics.incrementOpCounter(OpType.GET_FILE_CHECKSUM);
1633
1634    final HttpOpParam.Op op = GetOpParam.Op.GETFILECHECKSUM;
1635    return new FsPathResponseRunner<MD5MD5CRC32FileChecksum>(op, p) {
1636      @Override
1637      MD5MD5CRC32FileChecksum decodeResponse(Map<?,?> json) throws IOException {
1638        return JsonUtilClient.toMD5MD5CRC32FileChecksum(json);
1639      }
1640    }.run();
1641  }
1642
1643  /**
1644   * Resolve an HDFS URL into real INetSocketAddress. It works like a DNS
1645   * resolver when the URL points to an non-HA cluster. When the URL points to
1646   * an HA cluster with its logical name, the resolver further resolves the
1647   * logical name(i.e., the authority in the URL) into real namenode addresses.
1648   */
1649  private InetSocketAddress[] resolveNNAddr() {
1650    Configuration conf = getConf();
1651    final String scheme = uri.getScheme();
1652
1653    ArrayList<InetSocketAddress> ret = new ArrayList<>();
1654
1655    if (!HAUtilClient.isLogicalUri(conf, uri)) {
1656      InetSocketAddress addr = NetUtils.createSocketAddr(uri.getAuthority(),
1657          getDefaultPort());
1658      ret.add(addr);
1659
1660    } else {
1661      Map<String, Map<String, InetSocketAddress>> addresses = DFSUtilClient
1662          .getHaNnWebHdfsAddresses(conf, scheme);
1663
1664      // Extract the entry corresponding to the logical name.
1665      Map<String, InetSocketAddress> addrs = addresses.get(uri.getHost());
1666      for (InetSocketAddress addr : addrs.values()) {
1667        ret.add(addr);
1668      }
1669    }
1670
1671    InetSocketAddress[] r = new InetSocketAddress[ret.size()];
1672    return ret.toArray(r);
1673  }
1674
1675  @Override
1676  public String getCanonicalServiceName() {
1677    return tokenServiceName == null ? super.getCanonicalServiceName()
1678        : tokenServiceName.toString();
1679  }
1680
1681  @Override
1682  public void setStoragePolicy(Path p, String policyName) throws IOException {
1683    if (policyName == null) {
1684      throw new IOException("policyName == null");
1685    }
1686    statistics.incrementWriteOps(1);
1687    storageStatistics.incrementOpCounter(OpType.SET_STORAGE_POLICY);
1688    final HttpOpParam.Op op = PutOpParam.Op.SETSTORAGEPOLICY;
1689    new FsPathRunner(op, p, new StoragePolicyParam(policyName)).run();
1690  }
1691
1692  @Override
1693  public Collection<BlockStoragePolicy> getAllStoragePolicies()
1694      throws IOException {
1695    final HttpOpParam.Op op = GetOpParam.Op.GETALLSTORAGEPOLICY;
1696    return new FsPathResponseRunner<Collection<BlockStoragePolicy>>(op, null) {
1697      @Override
1698      Collection<BlockStoragePolicy> decodeResponse(Map<?, ?> json)
1699          throws IOException {
1700        return JsonUtilClient.getStoragePolicies(json);
1701      }
1702    }.run();
1703  }
1704
1705  @Override
1706  public BlockStoragePolicy getStoragePolicy(Path src) throws IOException {
1707    final HttpOpParam.Op op = GetOpParam.Op.GETSTORAGEPOLICY;
1708    return new FsPathResponseRunner<BlockStoragePolicy>(op, src) {
1709      @Override
1710      BlockStoragePolicy decodeResponse(Map<?, ?> json) throws IOException {
1711        return JsonUtilClient.toBlockStoragePolicy((Map<?, ?>) json
1712            .get(BlockStoragePolicy.class.getSimpleName()));
1713      }
1714    }.run();
1715  }
1716
1717  @Override
1718  public void unsetStoragePolicy(Path src) throws IOException {
1719    statistics.incrementWriteOps(1);
1720    storageStatistics.incrementOpCounter(OpType.UNSET_STORAGE_POLICY);
1721    final HttpOpParam.Op op = PostOpParam.Op.UNSETSTORAGEPOLICY;
1722    new FsPathRunner(op, src).run();
1723  }
1724
1725  @VisibleForTesting
1726  InetSocketAddress[] getResolvedNNAddr() {
1727    return nnAddrs;
1728  }
1729
1730  @VisibleForTesting
1731  public void setRetryPolicy(RetryPolicy rp) {
1732    this.retryPolicy = rp;
1733  }
1734
1735  /**
1736   * This class is used for opening, reading, and seeking files while using the
1737   * WebHdfsFileSystem. This class will invoke the retry policy when performing
1738   * any of these actions.
1739   */
1740  @VisibleForTesting
1741  public class WebHdfsInputStream extends FSInputStream {
1742    private ReadRunner readRunner = null;
1743
1744    WebHdfsInputStream(Path path, int buffersize) throws IOException {
1745      // Only create the ReadRunner once. Each read's byte array and position
1746      // will be updated within the ReadRunner object before every read.
1747      readRunner = new ReadRunner(path, buffersize);
1748    }
1749
1750    @Override
1751    public int read() throws IOException {
1752      final byte[] b = new byte[1];
1753      return (read(b, 0, 1) == -1) ? -1 : (b[0] & 0xff);
1754    }
1755
1756    @Override
1757    public int read(byte b[], int off, int len) throws IOException {
1758      return readRunner.read(b, off, len);
1759    }
1760
1761    @Override
1762    public void seek(long newPos) throws IOException {
1763      readRunner.seek(newPos);
1764    }
1765
1766    @Override
1767    public long getPos() throws IOException {
1768      return readRunner.getPos();
1769    }
1770
1771    protected int getBufferSize() throws IOException {
1772      return readRunner.getBufferSize();
1773    }
1774
1775    protected Path getPath() throws IOException {
1776      return readRunner.getPath();
1777    }
1778
1779    @Override
1780    public boolean seekToNewSource(long targetPos) throws IOException {
1781      return false;
1782    }
1783
1784    @Override
1785    public void close() throws IOException {
1786      readRunner.close();
1787    }
1788
1789    public void setFileLength(long len) {
1790      readRunner.setFileLength(len);
1791    }
1792
1793    public long getFileLength() {
1794      return readRunner.getFileLength();
1795    }
1796
1797    @VisibleForTesting
1798    ReadRunner getReadRunner() {
1799      return readRunner;
1800    }
1801
1802    @VisibleForTesting
1803    void setReadRunner(ReadRunner rr) {
1804      this.readRunner = rr;
1805    }
1806  }
1807
1808  enum RunnerState {
1809    DISCONNECTED, // Connection is closed programmatically by ReadRunner
1810    OPEN,         // Connection has been established by ReadRunner
1811    SEEK,         // Calling code has explicitly called seek()
1812    CLOSED        // Calling code has explicitly called close()
1813    }
1814
1815  /**
1816   * This class will allow retries to occur for both open and read operations.
1817   * The first WebHdfsFileSystem#open creates a new WebHdfsInputStream object,
1818   * which creates a new ReadRunner object that will be used to open a
1819   * connection and read or seek into the input stream.
1820   *
1821   * ReadRunner is a subclass of the AbstractRunner class, which will run the
1822   * ReadRunner#getUrl(), ReadRunner#connect(URL), and ReadRunner#getResponse
1823   * methods within a retry loop, based on the configured retry policy.
1824   * ReadRunner#connect will create a connection if one has not already been
1825   * created. Otherwise, it will return the previously created connection
1826   * object. This is necessary because a new connection should not be created
1827   * for every read.
1828   * Likewise, ReadRunner#getUrl will construct a new URL object only if the
1829   * connection has not previously been established. Otherwise, it will return
1830   * the previously created URL object.
1831   * ReadRunner#getResponse will initialize the input stream if it has not
1832   * already been initialized and read the requested data from the specified
1833   * input stream.
1834   */
1835  @VisibleForTesting
1836  protected class ReadRunner extends AbstractFsPathRunner<Integer> {
1837    private InputStream in = null;
1838    private HttpURLConnection cachedConnection = null;
1839    private byte[] readBuffer;
1840    private int readOffset;
1841    private int readLength;
1842    private RunnerState runnerState = RunnerState.DISCONNECTED;
1843    private URL originalUrl = null;
1844    private URL resolvedUrl = null;
1845
1846    private final Path path;
1847    private final int bufferSize;
1848    private long pos = 0;
1849    private long fileLength = 0;
1850
1851    /* The following methods are WebHdfsInputStream helpers. */
1852
1853    ReadRunner(Path p, int bs) throws IOException {
1854      super(GetOpParam.Op.OPEN, p, new BufferSizeParam(bs));
1855      this.path = p;
1856      this.bufferSize = bs;
1857    }
1858
1859    int read(byte[] b, int off, int len) throws IOException {
1860      if (runnerState == RunnerState.CLOSED) {
1861        throw new IOException("Stream closed");
1862      }
1863      if (len == 0) {
1864        return 0;
1865      }
1866
1867      // Before the first read, pos and fileLength will be 0 and readBuffer
1868      // will all be null. They will be initialized once the first connection
1869      // is made. Only after that it makes sense to compare pos and fileLength.
1870      if (pos >= fileLength && readBuffer != null) {
1871        return -1;
1872      }
1873
1874      // If a seek is occurring, the input stream will have been closed, so it
1875      // needs to be reopened. Use the URLRunner to call AbstractRunner#connect
1876      // with the previously-cached resolved URL and with the 'redirected' flag
1877      // set to 'true'. The resolved URL contains the URL of the previously
1878      // opened DN as opposed to the NN. It is preferable to use the resolved
1879      // URL when creating a connection because it does not hit the NN or every
1880      // seek, nor does it open a connection to a new DN after every seek.
1881      // The redirect flag is needed so that AbstractRunner#connect knows the
1882      // URL is already resolved.
1883      // Note that when the redirected flag is set, retries are not attempted.
1884      // So, if the connection fails using URLRunner, clear out the connection
1885      // and fall through to establish the connection using ReadRunner.
1886      if (runnerState == RunnerState.SEEK) {
1887        try {
1888          final URL rurl = new URL(resolvedUrl + "&" + new OffsetParam(pos));
1889          cachedConnection = new URLRunner(GetOpParam.Op.OPEN, rurl, true).run();
1890        } catch (IOException ioe) {
1891          closeInputStream(RunnerState.DISCONNECTED);
1892        }
1893      }
1894
1895      readBuffer = b;
1896      readOffset = off;
1897      readLength = len;
1898
1899      int count = -1;
1900      count = this.run();
1901      if (count >= 0) {
1902        statistics.incrementBytesRead(count);
1903        pos += count;
1904      } else if (pos < fileLength) {
1905        throw new EOFException(
1906                  "Premature EOF: pos=" + pos + " < filelength=" + fileLength);
1907      }
1908      return count;
1909    }
1910
1911    void seek(long newPos) throws IOException {
1912      if (pos != newPos) {
1913        pos = newPos;
1914        closeInputStream(RunnerState.SEEK);
1915      }
1916    }
1917
1918    public void close() throws IOException {
1919      closeInputStream(RunnerState.CLOSED);
1920    }
1921
1922    /* The following methods are overriding AbstractRunner methods,
1923     * to be called within the retry policy context by runWithRetry.
1924     */
1925
1926    @Override
1927    protected URL getUrl() throws IOException {
1928      // This method is called every time either a read is executed.
1929      // The check for connection == null is to ensure that a new URL is only
1930      // created upon a new connection and not for every read.
1931      if (cachedConnection == null) {
1932        // Update URL with current offset. BufferSize doesn't change, but it
1933        // still must be included when creating the new URL.
1934        updateURLParameters(new BufferSizeParam(bufferSize),
1935            new OffsetParam(pos));
1936        originalUrl = super.getUrl();
1937      }
1938      return originalUrl;
1939    }
1940
1941    /* Only make the connection if it is not already open. Don't cache the
1942     * connection here. After this method is called, runWithRetry will call
1943     * validateResponse, and then call the below ReadRunner#getResponse. If
1944     * the code path makes it that far, then we can cache the connection.
1945     */
1946    @Override
1947    protected HttpURLConnection connect(URL url) throws IOException {
1948      HttpURLConnection conn = cachedConnection;
1949      if (conn == null) {
1950        try {
1951          conn = super.connect(url);
1952        } catch (IOException e) {
1953          closeInputStream(RunnerState.DISCONNECTED);
1954          throw e;
1955        }
1956      }
1957      return conn;
1958    }
1959
1960    /*
1961     * This method is used to perform reads within the retry policy context.
1962     * This code is relying on runWithRetry to always call the above connect
1963     * method and the verifyResponse method prior to calling getResponse.
1964     */
1965    @Override
1966    Integer getResponse(final HttpURLConnection conn)
1967        throws IOException {
1968      try {
1969        // In the "open-then-read" use case, runWithRetry will have executed
1970        // ReadRunner#connect to make the connection and then executed
1971        // validateResponse to validate the response code. Only then do we want
1972        // to cache the connection.
1973        // In the "read-after-seek" use case, the connection is made and the
1974        // response is validated by the URLRunner. ReadRunner#read then caches
1975        // the connection and the ReadRunner#connect will pass on the cached
1976        // connection
1977        // In either case, stream initialization is done here if necessary.
1978        cachedConnection = conn;
1979        if (in == null) {
1980          in = initializeInputStream(conn);
1981        }
1982
1983        int count = in.read(readBuffer, readOffset, readLength);
1984        if (count < 0 && pos < fileLength) {
1985          throw new EOFException(
1986                  "Premature EOF: pos=" + pos + " < filelength=" + fileLength);
1987        }
1988        return Integer.valueOf(count);
1989      } catch (IOException e) {
1990        String redirectHost = resolvedUrl.getAuthority();
1991        if (excludeDatanodes.getValue() != null) {
1992          excludeDatanodes = new ExcludeDatanodesParam(redirectHost + ","
1993              + excludeDatanodes.getValue());
1994        } else {
1995          excludeDatanodes = new ExcludeDatanodesParam(redirectHost);
1996        }
1997
1998        // If an exception occurs, close the input stream and null it out so
1999        // that if the abstract runner decides to retry, it will reconnect.
2000        closeInputStream(RunnerState.DISCONNECTED);
2001        throw e;
2002      }
2003    }
2004
2005    @VisibleForTesting
2006    InputStream initializeInputStream(HttpURLConnection conn)
2007        throws IOException {
2008      // Cache the resolved URL so that it can be used in the event of
2009      // a future seek operation.
2010      resolvedUrl = removeOffsetParam(conn.getURL());
2011      final String cl = conn.getHeaderField(HttpHeaders.CONTENT_LENGTH);
2012      InputStream inStream = conn.getInputStream();
2013      if (LOG.isDebugEnabled()) {
2014        LOG.debug("open file: " + conn.getURL());
2015      }
2016      if (cl != null) {
2017        long streamLength = Long.parseLong(cl);
2018        fileLength = pos + streamLength;
2019        // Java has a bug with >2GB request streams.  It won't bounds check
2020        // the reads so the transfer blocks until the server times out
2021        inStream = new BoundedInputStream(inStream, streamLength);
2022      } else {
2023        fileLength = getHdfsFileStatus(path).getLen();
2024      }
2025      // Wrapping in BufferedInputStream because it is more performant than
2026      // BoundedInputStream by itself.
2027      runnerState = RunnerState.OPEN;
2028      return new BufferedInputStream(inStream, bufferSize);
2029    }
2030
2031    // Close both the InputStream and the connection.
2032    @VisibleForTesting
2033    void closeInputStream(RunnerState rs) throws IOException {
2034      if (in != null) {
2035        IOUtils.close(cachedConnection);
2036        in = null;
2037      }
2038      cachedConnection = null;
2039      runnerState = rs;
2040    }
2041
2042    /* Getters and Setters */
2043
2044    @VisibleForTesting
2045    protected InputStream getInputStream() {
2046      return in;
2047    }
2048
2049    @VisibleForTesting
2050    protected void setInputStream(InputStream inStream) {
2051      in = inStream;
2052    }
2053
2054    Path getPath() {
2055      return path;
2056    }
2057
2058    int getBufferSize() {
2059      return bufferSize;
2060    }
2061
2062    long getFileLength() {
2063      return fileLength;
2064    }
2065
2066    void setFileLength(long len) {
2067      fileLength = len;
2068    }
2069
2070    long getPos() {
2071      return pos;
2072    }
2073  }
2074}