001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.io; 020 021import java.io.*; 022import java.net.Socket; 023import java.nio.ByteBuffer; 024import java.nio.channels.FileChannel; 025import java.nio.channels.WritableByteChannel; 026import java.nio.file.DirectoryStream; 027import java.nio.file.DirectoryIteratorException; 028import java.nio.file.Files; 029import java.nio.file.Path; 030import java.util.ArrayList; 031import java.util.List; 032 033import org.apache.commons.logging.Log; 034 035import org.apache.hadoop.classification.InterfaceAudience; 036import org.apache.hadoop.classification.InterfaceStability; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.util.ChunkedArrayList; 039 040/** 041 * An utility class for I/O related functionality. 042 */ 043@InterfaceAudience.Public 044@InterfaceStability.Evolving 045public class IOUtils { 046 047 /** 048 * Copies from one stream to another. 049 * 050 * @param in InputStrem to read from 051 * @param out OutputStream to write to 052 * @param buffSize the size of the buffer 053 * @param close whether or not close the InputStream and 054 * OutputStream at the end. The streams are closed in the finally clause. 055 */ 056 public static void copyBytes(InputStream in, OutputStream out, int buffSize, boolean close) 057 throws IOException { 058 try { 059 copyBytes(in, out, buffSize); 060 if(close) { 061 out.close(); 062 out = null; 063 in.close(); 064 in = null; 065 } 066 } finally { 067 if(close) { 068 closeStream(out); 069 closeStream(in); 070 } 071 } 072 } 073 074 /** 075 * Copies from one stream to another. 076 * 077 * @param in InputStrem to read from 078 * @param out OutputStream to write to 079 * @param buffSize the size of the buffer 080 */ 081 public static void copyBytes(InputStream in, OutputStream out, int buffSize) 082 throws IOException { 083 PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null; 084 byte buf[] = new byte[buffSize]; 085 int bytesRead = in.read(buf); 086 while (bytesRead >= 0) { 087 out.write(buf, 0, bytesRead); 088 if ((ps != null) && ps.checkError()) { 089 throw new IOException("Unable to write to output stream."); 090 } 091 bytesRead = in.read(buf); 092 } 093 } 094 095 /** 096 * Copies from one stream to another. <strong>closes the input and output streams 097 * at the end</strong>. 098 * 099 * @param in InputStrem to read from 100 * @param out OutputStream to write to 101 * @param conf the Configuration object 102 */ 103 public static void copyBytes(InputStream in, OutputStream out, Configuration conf) 104 throws IOException { 105 copyBytes(in, out, conf.getInt("io.file.buffer.size", 4096), true); 106 } 107 108 /** 109 * Copies from one stream to another. 110 * 111 * @param in InputStream to read from 112 * @param out OutputStream to write to 113 * @param conf the Configuration object 114 * @param close whether or not close the InputStream and 115 * OutputStream at the end. The streams are closed in the finally clause. 116 */ 117 public static void copyBytes(InputStream in, OutputStream out, Configuration conf, boolean close) 118 throws IOException { 119 copyBytes(in, out, conf.getInt("io.file.buffer.size", 4096), close); 120 } 121 122 /** 123 * Copies count bytes from one stream to another. 124 * 125 * @param in InputStream to read from 126 * @param out OutputStream to write to 127 * @param count number of bytes to copy 128 * @param close whether to close the streams 129 * @throws IOException if bytes can not be read or written 130 */ 131 public static void copyBytes(InputStream in, OutputStream out, long count, 132 boolean close) throws IOException { 133 byte buf[] = new byte[4096]; 134 long bytesRemaining = count; 135 int bytesRead; 136 137 try { 138 while (bytesRemaining > 0) { 139 int bytesToRead = (int) 140 (bytesRemaining < buf.length ? bytesRemaining : buf.length); 141 142 bytesRead = in.read(buf, 0, bytesToRead); 143 if (bytesRead == -1) 144 break; 145 146 out.write(buf, 0, bytesRead); 147 bytesRemaining -= bytesRead; 148 } 149 if (close) { 150 out.close(); 151 out = null; 152 in.close(); 153 in = null; 154 } 155 } finally { 156 if (close) { 157 closeStream(out); 158 closeStream(in); 159 } 160 } 161 } 162 163 /** 164 * Utility wrapper for reading from {@link InputStream}. It catches any errors 165 * thrown by the underlying stream (either IO or decompression-related), and 166 * re-throws as an IOException. 167 * 168 * @param is - InputStream to be read from 169 * @param buf - buffer the data is read into 170 * @param off - offset within buf 171 * @param len - amount of data to be read 172 * @return number of bytes read 173 */ 174 public static int wrappedReadForCompressedData(InputStream is, byte[] buf, 175 int off, int len) throws IOException { 176 try { 177 return is.read(buf, off, len); 178 } catch (IOException ie) { 179 throw ie; 180 } catch (Throwable t) { 181 throw new IOException("Error while reading compressed data", t); 182 } 183 } 184 185 /** 186 * Reads len bytes in a loop. 187 * 188 * @param in InputStream to read from 189 * @param buf The buffer to fill 190 * @param off offset from the buffer 191 * @param len the length of bytes to read 192 * @throws IOException if it could not read requested number of bytes 193 * for any reason (including EOF) 194 */ 195 public static void readFully(InputStream in, byte buf[], 196 int off, int len) throws IOException { 197 int toRead = len; 198 while (toRead > 0) { 199 int ret = in.read(buf, off, toRead); 200 if (ret < 0) { 201 throw new IOException( "Premature EOF from inputStream"); 202 } 203 toRead -= ret; 204 off += ret; 205 } 206 } 207 208 /** 209 * Similar to readFully(). Skips bytes in a loop. 210 * @param in The InputStream to skip bytes from 211 * @param len number of bytes to skip. 212 * @throws IOException if it could not skip requested number of bytes 213 * for any reason (including EOF) 214 */ 215 public static void skipFully(InputStream in, long len) throws IOException { 216 long amt = len; 217 while (amt > 0) { 218 long ret = in.skip(amt); 219 if (ret == 0) { 220 // skip may return 0 even if we're not at EOF. Luckily, we can 221 // use the read() method to figure out if we're at the end. 222 int b = in.read(); 223 if (b == -1) { 224 throw new EOFException( "Premature EOF from inputStream after " + 225 "skipping " + (len - amt) + " byte(s)."); 226 } 227 ret = 1; 228 } 229 amt -= ret; 230 } 231 } 232 233 /** 234 * Close the Closeable objects and <b>ignore</b> any {@link IOException} or 235 * null pointers. Must only be used for cleanup in exception handlers. 236 * 237 * @param log the log to record problems to at debug level. Can be null. 238 * @param closeables the objects to close 239 */ 240 public static void cleanup(Log log, java.io.Closeable... closeables) { 241 for (java.io.Closeable c : closeables) { 242 if (c != null) { 243 try { 244 c.close(); 245 } catch(IOException e) { 246 if (log != null && log.isDebugEnabled()) { 247 log.debug("Exception in closing " + c, e); 248 } 249 } 250 } 251 } 252 } 253 254 /** 255 * Closes the stream ignoring {@link IOException}. 256 * Must only be called in cleaning up from exception handlers. 257 * 258 * @param stream the Stream to close 259 */ 260 public static void closeStream(java.io.Closeable stream) { 261 cleanup(null, stream); 262 } 263 264 /** 265 * Closes the socket ignoring {@link IOException} 266 * 267 * @param sock the Socket to close 268 */ 269 public static void closeSocket(Socket sock) { 270 if (sock != null) { 271 try { 272 sock.close(); 273 } catch (IOException ignored) { 274 } 275 } 276 } 277 278 /** 279 * The /dev/null of OutputStreams. 280 */ 281 public static class NullOutputStream extends OutputStream { 282 @Override 283 public void write(byte[] b, int off, int len) throws IOException { 284 } 285 286 @Override 287 public void write(int b) throws IOException { 288 } 289 } 290 291 /** 292 * Write a ByteBuffer to a WritableByteChannel, handling short writes. 293 * 294 * @param bc The WritableByteChannel to write to 295 * @param buf The input buffer 296 * @throws IOException On I/O error 297 */ 298 public static void writeFully(WritableByteChannel bc, ByteBuffer buf) 299 throws IOException { 300 do { 301 bc.write(buf); 302 } while (buf.remaining() > 0); 303 } 304 305 /** 306 * Write a ByteBuffer to a FileChannel at a given offset, 307 * handling short writes. 308 * 309 * @param fc The FileChannel to write to 310 * @param buf The input buffer 311 * @param offset The offset in the file to start writing at 312 * @throws IOException On I/O error 313 */ 314 public static void writeFully(FileChannel fc, ByteBuffer buf, 315 long offset) throws IOException { 316 do { 317 offset += fc.write(buf, offset); 318 } while (buf.remaining() > 0); 319 } 320 321 /** 322 * Return the complete list of files in a directory as strings.<p/> 323 * 324 * This is better than File#listDir because it does not ignore IOExceptions. 325 * 326 * @param dir The directory to list. 327 * @param filter If non-null, the filter to use when listing 328 * this directory. 329 * @return The list of files in the directory. 330 * 331 * @throws IOException On I/O error 332 */ 333 public static List<String> listDirectory(File dir, FilenameFilter filter) 334 throws IOException { 335 ArrayList<String> list = new ArrayList<String> (); 336 try (DirectoryStream<Path> stream = 337 Files.newDirectoryStream(dir.toPath())) { 338 for (Path entry: stream) { 339 String fileName = entry.getFileName().toString(); 340 if ((filter == null) || filter.accept(dir, fileName)) { 341 list.add(fileName); 342 } 343 } 344 } catch (DirectoryIteratorException e) { 345 throw e.getCause(); 346 } 347 return list; 348 } 349}