001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hdfs.web;
020
021import java.io.EOFException;
022import java.io.IOException;
023import java.io.InputStream;
024import java.net.HttpURLConnection;
025import java.net.URL;
026import java.util.List;
027import java.util.Map;
028import java.util.StringTokenizer;
029
030import org.apache.commons.io.input.BoundedInputStream;
031import org.apache.hadoop.fs.FSExceptionMessages;
032import org.apache.hadoop.fs.FSInputStream;
033import org.apache.http.HttpStatus;
034
035import com.google.common.annotations.VisibleForTesting;
036import com.google.common.net.HttpHeaders;
037
038import javax.annotation.Nonnull;
039
040/**
041 * To support HTTP byte streams, a new connection to an HTTP server needs to be
042 * created each time. This class hides the complexity of those multiple
043 * connections from the client. Whenever seek() is called, a new connection
044 * is made on the successive read(). The normal input stream functions are
045 * connected to the currently active input stream.
046 */
047public abstract class ByteRangeInputStream extends FSInputStream {
048
049  /**
050   * This class wraps a URL and provides method to open connection.
051   * It can be overridden to change how a connection is opened.
052   */
053  public static abstract class URLOpener {
054    protected URL url;
055
056    public URLOpener(URL u) {
057      url = u;
058    }
059
060    public void setURL(URL u) {
061      url = u;
062    }
063
064    public URL getURL() {
065      return url;
066    }
067
068    /** Connect to server with a data offset. */
069    protected abstract HttpURLConnection connect(final long offset,
070        final boolean resolved) throws IOException;
071  }
072
073  static class InputStreamAndFileLength {
074    final Long length;
075    final InputStream in;
076
077    InputStreamAndFileLength(Long length, InputStream in) {
078      this.length = length;
079      this.in = in;
080    }
081  }
082
083  enum StreamStatus {
084    NORMAL, SEEK, CLOSED
085  }
086  protected InputStream in;
087  protected final URLOpener originalURL;
088  protected final URLOpener resolvedURL;
089  protected long startPos = 0;
090  protected long currentPos = 0;
091  protected Long fileLength = null;
092
093  StreamStatus status = StreamStatus.SEEK;
094
095  /**
096   * Create with the specified URLOpeners. Original url is used to open the
097   * stream for the first time. Resolved url is used in subsequent requests.
098   * @param o Original url
099   * @param r Resolved url
100   */
101  public ByteRangeInputStream(URLOpener o, URLOpener r) throws IOException {
102    this.originalURL = o;
103    this.resolvedURL = r;
104    getInputStream();
105  }
106
107  protected abstract URL getResolvedUrl(final HttpURLConnection connection
108  ) throws IOException;
109
110  @VisibleForTesting
111  protected InputStream getInputStream() throws IOException {
112    switch (status) {
113    case NORMAL:
114      break;
115    case SEEK:
116      if (in != null) {
117        in.close();
118      }
119      InputStreamAndFileLength fin = openInputStream(startPos);
120      in = fin.in;
121      fileLength = fin.length;
122      status = StreamStatus.NORMAL;
123      break;
124    case CLOSED:
125      throw new IOException("Stream closed");
126    }
127    return in;
128  }
129
130  @VisibleForTesting
131  protected InputStreamAndFileLength openInputStream(long startOffset)
132      throws IOException {
133    if (startOffset < 0) {
134      throw new EOFException("Negative Position");
135    }
136    // Use the original url if no resolved url exists, eg. if
137    // it's the first time a request is made.
138    final boolean resolved = resolvedURL.getURL() != null;
139    final URLOpener opener = resolved? resolvedURL: originalURL;
140
141    final HttpURLConnection connection = opener.connect(startOffset, resolved);
142    resolvedURL.setURL(getResolvedUrl(connection));
143
144    InputStream in = connection.getInputStream();
145    final Long length;
146    final Map<String, List<String>> headers = connection.getHeaderFields();
147    if (isChunkedTransferEncoding(headers)) {
148      // file length is not known
149      length = null;
150    } else {
151      // for non-chunked transfer-encoding, get content-length
152      long streamlength = getStreamLength(connection, headers);
153      length = startOffset + streamlength;
154
155      // Java has a bug with >2GB request streams.  It won't bounds check
156      // the reads so the transfer blocks until the server times out
157      in = new BoundedInputStream(in, streamlength);
158    }
159
160    return new InputStreamAndFileLength(length, in);
161  }
162
163  private static long getStreamLength(HttpURLConnection connection,
164      Map<String, List<String>> headers) throws IOException {
165    String cl = connection.getHeaderField(HttpHeaders.CONTENT_LENGTH);
166    if (cl == null) {
167      // Try to get the content length by parsing the content range
168      // because HftpFileSystem does not return the content length
169      // if the content is partial.
170      if (connection.getResponseCode() == HttpStatus.SC_PARTIAL_CONTENT) {
171        cl = connection.getHeaderField(HttpHeaders.CONTENT_RANGE);
172        return getLengthFromRange(cl);
173      } else {
174        throw new IOException(HttpHeaders.CONTENT_LENGTH + " is missing: "
175            + headers);
176      }
177    }
178    return Long.parseLong(cl);
179  }
180
181  private static long getLengthFromRange(String cl) throws IOException {
182    try {
183
184      String[] str = cl.substring(6).split("[-/]");
185      return Long.parseLong(str[1]) - Long.parseLong(str[0]) + 1;
186    } catch (Exception e) {
187      throw new IOException(
188          "failed to get content length by parsing the content range: " + cl
189              + " " + e.getMessage());
190    }
191  }
192
193  private static boolean isChunkedTransferEncoding(
194      final Map<String, List<String>> headers) {
195    return contains(headers, HttpHeaders.TRANSFER_ENCODING, "chunked")
196        || contains(headers, HttpHeaders.TE, "chunked");
197  }
198
199  /** Does the HTTP header map contain the given key, value pair? */
200  private static boolean contains(final Map<String, List<String>> headers,
201      final String key, final String value) {
202    final List<String> values = headers.get(key);
203    if (values != null) {
204      for(String v : values) {
205        for(final StringTokenizer t = new StringTokenizer(v, ",");
206            t.hasMoreTokens(); ) {
207          if (value.equalsIgnoreCase(t.nextToken())) {
208            return true;
209          }
210        }
211      }
212    }
213    return false;
214  }
215
216  private int update(final int n) throws IOException {
217    if (n != -1) {
218      currentPos += n;
219    } else if (fileLength != null && currentPos < fileLength) {
220      throw new IOException("Got EOF but currentPos = " + currentPos
221          + " < filelength = " + fileLength);
222    }
223    return n;
224  }
225
226  @Override
227  public int read() throws IOException {
228    final int b = getInputStream().read();
229    update((b == -1) ? -1 : 1);
230    return b;
231  }
232
233  @Override
234  public int read(@Nonnull byte b[], int off, int len) throws IOException {
235    return update(getInputStream().read(b, off, len));
236  }
237
238  /**
239   * Seek to the given offset from the start of the file.
240   * The next read() will be from that location.  Can't
241   * seek past the end of the file.
242   */
243  @Override
244  public void seek(long pos) throws IOException {
245    if (pos != currentPos) {
246      startPos = pos;
247      currentPos = pos;
248      if (status != StreamStatus.CLOSED) {
249        status = StreamStatus.SEEK;
250      }
251    }
252  }
253
254  @Override
255  public int read(long position, byte[] buffer, int offset, int length)
256      throws IOException {
257    validatePositionedReadArgs(position, buffer, offset, length);
258    if (length == 0) {
259      return 0;
260    }
261    try (InputStream in = openInputStream(position).in) {
262      return in.read(buffer, offset, length);
263    }
264  }
265
266  @Override
267  public void readFully(long position, byte[] buffer, int offset, int length)
268      throws IOException {
269    validatePositionedReadArgs(position, buffer, offset, length);
270    if (length == 0) {
271      return;
272    }
273    final InputStreamAndFileLength fin = openInputStream(position);
274    try {
275      if (fin.length != null && length + position > fin.length) {
276        throw new EOFException("The length to read " + length
277            + " exceeds the file length " + fin.length);
278      }
279      int nread = 0;
280      while (nread < length) {
281        int nbytes = fin.in.read(buffer, offset + nread, length - nread);
282        if (nbytes < 0) {
283          throw new EOFException(FSExceptionMessages.EOF_IN_READ_FULLY);
284        }
285        nread += nbytes;
286      }
287    } finally {
288      fin.in.close();
289    }
290  }
291
292  /**
293   * Return the current offset from the start of the file
294   */
295  @Override
296  public long getPos() throws IOException {
297    return currentPos;
298  }
299
300  /**
301   * Seeks a different copy of the data.  Returns true if
302   * found a new source, false otherwise.
303   */
304  @Override
305  public boolean seekToNewSource(long targetPos) throws IOException {
306    return false;
307  }
308
309  @Override
310  public void close() throws IOException {
311    if (in != null) {
312      in.close();
313      in = null;
314    }
315    status = StreamStatus.CLOSED;
316  }
317
318  @Override
319  public synchronized int available() throws IOException{
320    getInputStream();
321    if(fileLength != null){
322      long remaining = fileLength - currentPos;
323      return remaining <= Integer.MAX_VALUE ? (int) remaining : Integer.MAX_VALUE;
324    }else {
325      return Integer.MAX_VALUE;
326    }
327  }
328}