001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.io; 020 021 import java.io.*; 022 import java.net.Socket; 023 import java.nio.ByteBuffer; 024 import java.nio.channels.FileChannel; 025 import java.nio.channels.WritableByteChannel; 026 027 import org.apache.commons.logging.Log; 028 029 import org.apache.hadoop.classification.InterfaceAudience; 030 import org.apache.hadoop.classification.InterfaceStability; 031 import org.apache.hadoop.conf.Configuration; 032 033 /** 034 * An utility class for I/O related functionality. 035 */ 036 @InterfaceAudience.Public 037 @InterfaceStability.Evolving 038 public class IOUtils { 039 040 /** 041 * Copies from one stream to another. 042 * 043 * @param in InputStrem to read from 044 * @param out OutputStream to write to 045 * @param buffSize the size of the buffer 046 * @param close whether or not close the InputStream and 047 * OutputStream at the end. The streams are closed in the finally clause. 048 */ 049 public static void copyBytes(InputStream in, OutputStream out, int buffSize, boolean close) 050 throws IOException { 051 try { 052 copyBytes(in, out, buffSize); 053 if(close) { 054 out.close(); 055 out = null; 056 in.close(); 057 in = null; 058 } 059 } finally { 060 if(close) { 061 closeStream(out); 062 closeStream(in); 063 } 064 } 065 } 066 067 /** 068 * Copies from one stream to another. 069 * 070 * @param in InputStrem to read from 071 * @param out OutputStream to write to 072 * @param buffSize the size of the buffer 073 */ 074 public static void copyBytes(InputStream in, OutputStream out, int buffSize) 075 throws IOException { 076 PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null; 077 byte buf[] = new byte[buffSize]; 078 int bytesRead = in.read(buf); 079 while (bytesRead >= 0) { 080 out.write(buf, 0, bytesRead); 081 if ((ps != null) && ps.checkError()) { 082 throw new IOException("Unable to write to output stream."); 083 } 084 bytesRead = in.read(buf); 085 } 086 } 087 088 /** 089 * Copies from one stream to another. <strong>closes the input and output streams 090 * at the end</strong>. 091 * 092 * @param in InputStrem to read from 093 * @param out OutputStream to write to 094 * @param conf the Configuration object 095 */ 096 public static void copyBytes(InputStream in, OutputStream out, Configuration conf) 097 throws IOException { 098 copyBytes(in, out, conf.getInt("io.file.buffer.size", 4096), true); 099 } 100 101 /** 102 * Copies from one stream to another. 103 * 104 * @param in InputStream to read from 105 * @param out OutputStream to write to 106 * @param conf the Configuration object 107 * @param close whether or not close the InputStream and 108 * OutputStream at the end. The streams are closed in the finally clause. 109 */ 110 public static void copyBytes(InputStream in, OutputStream out, Configuration conf, boolean close) 111 throws IOException { 112 copyBytes(in, out, conf.getInt("io.file.buffer.size", 4096), close); 113 } 114 115 /** 116 * Copies count bytes from one stream to another. 117 * 118 * @param in InputStream to read from 119 * @param out OutputStream to write to 120 * @param count number of bytes to copy 121 * @param close whether to close the streams 122 * @throws IOException if bytes can not be read or written 123 */ 124 public static void copyBytes(InputStream in, OutputStream out, long count, 125 boolean close) throws IOException { 126 byte buf[] = new byte[4096]; 127 long bytesRemaining = count; 128 int bytesRead; 129 130 try { 131 while (bytesRemaining > 0) { 132 int bytesToRead = (int) 133 (bytesRemaining < buf.length ? bytesRemaining : buf.length); 134 135 bytesRead = in.read(buf, 0, bytesToRead); 136 if (bytesRead == -1) 137 break; 138 139 out.write(buf, 0, bytesRead); 140 bytesRemaining -= bytesRead; 141 } 142 if (close) { 143 out.close(); 144 out = null; 145 in.close(); 146 in = null; 147 } 148 } finally { 149 if (close) { 150 closeStream(out); 151 closeStream(in); 152 } 153 } 154 } 155 156 /** 157 * Utility wrapper for reading from {@link InputStream}. It catches any errors 158 * thrown by the underlying stream (either IO or decompression-related), and 159 * re-throws as an IOException. 160 * 161 * @param is - InputStream to be read from 162 * @param buf - buffer the data is read into 163 * @param off - offset within buf 164 * @param len - amount of data to be read 165 * @return number of bytes read 166 */ 167 public static int wrappedReadForCompressedData(InputStream is, byte[] buf, 168 int off, int len) throws IOException { 169 try { 170 return is.read(buf, off, len); 171 } catch (IOException ie) { 172 throw ie; 173 } catch (Throwable t) { 174 throw new IOException("Error while reading compressed data", t); 175 } 176 } 177 178 /** 179 * Reads len bytes in a loop. 180 * 181 * @param in InputStream to read from 182 * @param buf The buffer to fill 183 * @param off offset from the buffer 184 * @param len the length of bytes to read 185 * @throws IOException if it could not read requested number of bytes 186 * for any reason (including EOF) 187 */ 188 public static void readFully(InputStream in, byte buf[], 189 int off, int len) throws IOException { 190 int toRead = len; 191 while (toRead > 0) { 192 int ret = in.read(buf, off, toRead); 193 if (ret < 0) { 194 throw new IOException( "Premature EOF from inputStream"); 195 } 196 toRead -= ret; 197 off += ret; 198 } 199 } 200 201 /** 202 * Similar to readFully(). Skips bytes in a loop. 203 * @param in The InputStream to skip bytes from 204 * @param len number of bytes to skip. 205 * @throws IOException if it could not skip requested number of bytes 206 * for any reason (including EOF) 207 */ 208 public static void skipFully(InputStream in, long len) throws IOException { 209 long amt = len; 210 while (amt > 0) { 211 long ret = in.skip(amt); 212 if (ret == 0) { 213 // skip may return 0 even if we're not at EOF. Luckily, we can 214 // use the read() method to figure out if we're at the end. 215 int b = in.read(); 216 if (b == -1) { 217 throw new EOFException( "Premature EOF from inputStream after " + 218 "skipping " + (len - amt) + " byte(s)."); 219 } 220 ret = 1; 221 } 222 amt -= ret; 223 } 224 } 225 226 /** 227 * Close the Closeable objects and <b>ignore</b> any {@link IOException} or 228 * null pointers. Must only be used for cleanup in exception handlers. 229 * 230 * @param log the log to record problems to at debug level. Can be null. 231 * @param closeables the objects to close 232 */ 233 public static void cleanup(Log log, java.io.Closeable... closeables) { 234 for (java.io.Closeable c : closeables) { 235 if (c != null) { 236 try { 237 c.close(); 238 } catch(IOException e) { 239 if (log != null && log.isDebugEnabled()) { 240 log.debug("Exception in closing " + c, e); 241 } 242 } 243 } 244 } 245 } 246 247 /** 248 * Closes the stream ignoring {@link IOException}. 249 * Must only be called in cleaning up from exception handlers. 250 * 251 * @param stream the Stream to close 252 */ 253 public static void closeStream(java.io.Closeable stream) { 254 cleanup(null, stream); 255 } 256 257 /** 258 * Closes the socket ignoring {@link IOException} 259 * 260 * @param sock the Socket to close 261 */ 262 public static void closeSocket(Socket sock) { 263 if (sock != null) { 264 try { 265 sock.close(); 266 } catch (IOException ignored) { 267 } 268 } 269 } 270 271 /** 272 * The /dev/null of OutputStreams. 273 */ 274 public static class NullOutputStream extends OutputStream { 275 @Override 276 public void write(byte[] b, int off, int len) throws IOException { 277 } 278 279 @Override 280 public void write(int b) throws IOException { 281 } 282 } 283 284 /** 285 * Write a ByteBuffer to a WritableByteChannel, handling short writes. 286 * 287 * @param bc The WritableByteChannel to write to 288 * @param buf The input buffer 289 * @throws IOException On I/O error 290 */ 291 public static void writeFully(WritableByteChannel bc, ByteBuffer buf) 292 throws IOException { 293 do { 294 bc.write(buf); 295 } while (buf.remaining() > 0); 296 } 297 298 /** 299 * Write a ByteBuffer to a FileChannel at a given offset, 300 * handling short writes. 301 * 302 * @param fc The FileChannel to write to 303 * @param buf The input buffer 304 * @param offset The offset in the file to start writing at 305 * @throws IOException On I/O error 306 */ 307 public static void writeFully(FileChannel fc, ByteBuffer buf, 308 long offset) throws IOException { 309 do { 310 offset += fc.write(buf, offset); 311 } while (buf.remaining() > 0); 312 } 313 }