001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hdfs.web; 020 021import java.io.EOFException; 022import java.io.IOException; 023import java.io.InputStream; 024import java.net.HttpURLConnection; 025import java.net.URL; 026import java.util.List; 027import java.util.Map; 028import java.util.StringTokenizer; 029 030import org.apache.commons.io.input.BoundedInputStream; 031import org.apache.hadoop.fs.FSExceptionMessages; 032import org.apache.hadoop.fs.FSInputStream; 033import org.apache.http.HttpStatus; 034 035import com.google.common.annotations.VisibleForTesting; 036import com.google.common.net.HttpHeaders; 037 038import javax.annotation.Nonnull; 039 040/** 041 * To support HTTP byte streams, a new connection to an HTTP server needs to be 042 * created each time. This class hides the complexity of those multiple 043 * connections from the client. Whenever seek() is called, a new connection 044 * is made on the successive read(). The normal input stream functions are 045 * connected to the currently active input stream. 046 */ 047public abstract class ByteRangeInputStream extends FSInputStream { 048 049 /** 050 * This class wraps a URL and provides method to open connection. 051 * It can be overridden to change how a connection is opened. 052 */ 053 public static abstract class URLOpener { 054 protected URL url; 055 056 public URLOpener(URL u) { 057 url = u; 058 } 059 060 public void setURL(URL u) { 061 url = u; 062 } 063 064 public URL getURL() { 065 return url; 066 } 067 068 /** Connect to server with a data offset. */ 069 protected abstract HttpURLConnection connect(final long offset, 070 final boolean resolved) throws IOException; 071 } 072 073 static class InputStreamAndFileLength { 074 final Long length; 075 final InputStream in; 076 077 InputStreamAndFileLength(Long length, InputStream in) { 078 this.length = length; 079 this.in = in; 080 } 081 } 082 083 enum StreamStatus { 084 NORMAL, SEEK, CLOSED 085 } 086 protected InputStream in; 087 protected final URLOpener originalURL; 088 protected final URLOpener resolvedURL; 089 protected long startPos = 0; 090 protected long currentPos = 0; 091 protected Long fileLength = null; 092 093 StreamStatus status = StreamStatus.SEEK; 094 095 /** 096 * Create with the specified URLOpeners. Original url is used to open the 097 * stream for the first time. Resolved url is used in subsequent requests. 098 * @param o Original url 099 * @param r Resolved url 100 */ 101 public ByteRangeInputStream(URLOpener o, URLOpener r) throws IOException { 102 this.originalURL = o; 103 this.resolvedURL = r; 104 getInputStream(); 105 } 106 107 protected abstract URL getResolvedUrl(final HttpURLConnection connection 108 ) throws IOException; 109 110 @VisibleForTesting 111 protected InputStream getInputStream() throws IOException { 112 switch (status) { 113 case NORMAL: 114 break; 115 case SEEK: 116 if (in != null) { 117 in.close(); 118 } 119 InputStreamAndFileLength fin = openInputStream(startPos); 120 in = fin.in; 121 fileLength = fin.length; 122 status = StreamStatus.NORMAL; 123 break; 124 case CLOSED: 125 throw new IOException("Stream closed"); 126 } 127 return in; 128 } 129 130 @VisibleForTesting 131 protected InputStreamAndFileLength openInputStream(long startOffset) 132 throws IOException { 133 if (startOffset < 0) { 134 throw new EOFException("Negative Position"); 135 } 136 // Use the original url if no resolved url exists, eg. if 137 // it's the first time a request is made. 138 final boolean resolved = resolvedURL.getURL() != null; 139 final URLOpener opener = resolved? resolvedURL: originalURL; 140 141 final HttpURLConnection connection = opener.connect(startOffset, resolved); 142 resolvedURL.setURL(getResolvedUrl(connection)); 143 144 InputStream in = connection.getInputStream(); 145 final Long length; 146 final Map<String, List<String>> headers = connection.getHeaderFields(); 147 if (isChunkedTransferEncoding(headers)) { 148 // file length is not known 149 length = null; 150 } else { 151 // for non-chunked transfer-encoding, get content-length 152 long streamlength = getStreamLength(connection, headers); 153 length = startOffset + streamlength; 154 155 // Java has a bug with >2GB request streams. It won't bounds check 156 // the reads so the transfer blocks until the server times out 157 in = new BoundedInputStream(in, streamlength); 158 } 159 160 return new InputStreamAndFileLength(length, in); 161 } 162 163 private static long getStreamLength(HttpURLConnection connection, 164 Map<String, List<String>> headers) throws IOException { 165 String cl = connection.getHeaderField(HttpHeaders.CONTENT_LENGTH); 166 if (cl == null) { 167 // Try to get the content length by parsing the content range 168 // because HftpFileSystem does not return the content length 169 // if the content is partial. 170 if (connection.getResponseCode() == HttpStatus.SC_PARTIAL_CONTENT) { 171 cl = connection.getHeaderField(HttpHeaders.CONTENT_RANGE); 172 return getLengthFromRange(cl); 173 } else { 174 throw new IOException(HttpHeaders.CONTENT_LENGTH + " is missing: " 175 + headers); 176 } 177 } 178 return Long.parseLong(cl); 179 } 180 181 private static long getLengthFromRange(String cl) throws IOException { 182 try { 183 184 String[] str = cl.substring(6).split("[-/]"); 185 return Long.parseLong(str[1]) - Long.parseLong(str[0]) + 1; 186 } catch (Exception e) { 187 throw new IOException( 188 "failed to get content length by parsing the content range: " + cl 189 + " " + e.getMessage()); 190 } 191 } 192 193 private static boolean isChunkedTransferEncoding( 194 final Map<String, List<String>> headers) { 195 return contains(headers, HttpHeaders.TRANSFER_ENCODING, "chunked") 196 || contains(headers, HttpHeaders.TE, "chunked"); 197 } 198 199 /** Does the HTTP header map contain the given key, value pair? */ 200 private static boolean contains(final Map<String, List<String>> headers, 201 final String key, final String value) { 202 final List<String> values = headers.get(key); 203 if (values != null) { 204 for(String v : values) { 205 for(final StringTokenizer t = new StringTokenizer(v, ","); 206 t.hasMoreTokens(); ) { 207 if (value.equalsIgnoreCase(t.nextToken())) { 208 return true; 209 } 210 } 211 } 212 } 213 return false; 214 } 215 216 private int update(final int n) throws IOException { 217 if (n != -1) { 218 currentPos += n; 219 } else if (fileLength != null && currentPos < fileLength) { 220 throw new IOException("Got EOF but currentPos = " + currentPos 221 + " < filelength = " + fileLength); 222 } 223 return n; 224 } 225 226 @Override 227 public int read() throws IOException { 228 final int b = getInputStream().read(); 229 update((b == -1) ? -1 : 1); 230 return b; 231 } 232 233 @Override 234 public int read(@Nonnull byte b[], int off, int len) throws IOException { 235 return update(getInputStream().read(b, off, len)); 236 } 237 238 /** 239 * Seek to the given offset from the start of the file. 240 * The next read() will be from that location. Can't 241 * seek past the end of the file. 242 */ 243 @Override 244 public void seek(long pos) throws IOException { 245 if (pos != currentPos) { 246 startPos = pos; 247 currentPos = pos; 248 if (status != StreamStatus.CLOSED) { 249 status = StreamStatus.SEEK; 250 } 251 } 252 } 253 254 @Override 255 public int read(long position, byte[] buffer, int offset, int length) 256 throws IOException { 257 validatePositionedReadArgs(position, buffer, offset, length); 258 if (length == 0) { 259 return 0; 260 } 261 try (InputStream in = openInputStream(position).in) { 262 return in.read(buffer, offset, length); 263 } 264 } 265 266 @Override 267 public void readFully(long position, byte[] buffer, int offset, int length) 268 throws IOException { 269 validatePositionedReadArgs(position, buffer, offset, length); 270 if (length == 0) { 271 return; 272 } 273 final InputStreamAndFileLength fin = openInputStream(position); 274 try { 275 if (fin.length != null && length + position > fin.length) { 276 throw new EOFException("The length to read " + length 277 + " exceeds the file length " + fin.length); 278 } 279 int nread = 0; 280 while (nread < length) { 281 int nbytes = fin.in.read(buffer, offset + nread, length - nread); 282 if (nbytes < 0) { 283 throw new EOFException(FSExceptionMessages.EOF_IN_READ_FULLY); 284 } 285 nread += nbytes; 286 } 287 } finally { 288 fin.in.close(); 289 } 290 } 291 292 /** 293 * Return the current offset from the start of the file 294 */ 295 @Override 296 public long getPos() throws IOException { 297 return currentPos; 298 } 299 300 /** 301 * Seeks a different copy of the data. Returns true if 302 * found a new source, false otherwise. 303 */ 304 @Override 305 public boolean seekToNewSource(long targetPos) throws IOException { 306 return false; 307 } 308 309 @Override 310 public void close() throws IOException { 311 if (in != null) { 312 in.close(); 313 in = null; 314 } 315 status = StreamStatus.CLOSED; 316 } 317 318 @Override 319 public synchronized int available() throws IOException{ 320 getInputStream(); 321 if(fileLength != null){ 322 long remaining = fileLength - currentPos; 323 return remaining <= Integer.MAX_VALUE ? (int) remaining : Integer.MAX_VALUE; 324 }else { 325 return Integer.MAX_VALUE; 326 } 327 } 328}