001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.io; 020 021import java.io.*; 022import java.net.Socket; 023import java.nio.ByteBuffer; 024import java.nio.channels.FileChannel; 025import java.nio.channels.WritableByteChannel; 026import java.nio.file.DirectoryStream; 027import java.nio.file.DirectoryIteratorException; 028import java.nio.file.Files; 029import java.nio.file.Path; 030import java.util.ArrayList; 031import java.util.List; 032 033import org.apache.commons.logging.Log; 034 035import org.apache.hadoop.classification.InterfaceAudience; 036import org.apache.hadoop.classification.InterfaceStability; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.util.ChunkedArrayList; 039 040import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT; 041import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; 042 043/** 044 * An utility class for I/O related functionality. 045 */ 046@InterfaceAudience.Public 047@InterfaceStability.Evolving 048public class IOUtils { 049 050 /** 051 * Copies from one stream to another. 052 * 053 * @param in InputStrem to read from 054 * @param out OutputStream to write to 055 * @param buffSize the size of the buffer 056 * @param close whether or not close the InputStream and 057 * OutputStream at the end. The streams are closed in the finally clause. 058 */ 059 public static void copyBytes(InputStream in, OutputStream out, 060 int buffSize, boolean close) 061 throws IOException { 062 try { 063 copyBytes(in, out, buffSize); 064 if(close) { 065 out.close(); 066 out = null; 067 in.close(); 068 in = null; 069 } 070 } finally { 071 if(close) { 072 closeStream(out); 073 closeStream(in); 074 } 075 } 076 } 077 078 /** 079 * Copies from one stream to another. 080 * 081 * @param in InputStrem to read from 082 * @param out OutputStream to write to 083 * @param buffSize the size of the buffer 084 */ 085 public static void copyBytes(InputStream in, OutputStream out, int buffSize) 086 throws IOException { 087 PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null; 088 byte buf[] = new byte[buffSize]; 089 int bytesRead = in.read(buf); 090 while (bytesRead >= 0) { 091 out.write(buf, 0, bytesRead); 092 if ((ps != null) && ps.checkError()) { 093 throw new IOException("Unable to write to output stream."); 094 } 095 bytesRead = in.read(buf); 096 } 097 } 098 099 /** 100 * Copies from one stream to another. <strong>closes the input and output streams 101 * at the end</strong>. 102 * 103 * @param in InputStrem to read from 104 * @param out OutputStream to write to 105 * @param conf the Configuration object 106 */ 107 public static void copyBytes(InputStream in, OutputStream out, Configuration conf) 108 throws IOException { 109 copyBytes(in, out, conf.getInt( 110 IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT), true); 111 } 112 113 /** 114 * Copies from one stream to another. 115 * 116 * @param in InputStream to read from 117 * @param out OutputStream to write to 118 * @param conf the Configuration object 119 * @param close whether or not close the InputStream and 120 * OutputStream at the end. The streams are closed in the finally clause. 121 */ 122 public static void copyBytes(InputStream in, OutputStream out, Configuration conf, boolean close) 123 throws IOException { 124 copyBytes(in, out, conf.getInt( 125 IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT), close); 126 } 127 128 /** 129 * Copies count bytes from one stream to another. 130 * 131 * @param in InputStream to read from 132 * @param out OutputStream to write to 133 * @param count number of bytes to copy 134 * @param close whether to close the streams 135 * @throws IOException if bytes can not be read or written 136 */ 137 public static void copyBytes(InputStream in, OutputStream out, long count, 138 boolean close) throws IOException { 139 byte buf[] = new byte[4096]; 140 long bytesRemaining = count; 141 int bytesRead; 142 143 try { 144 while (bytesRemaining > 0) { 145 int bytesToRead = (int) 146 (bytesRemaining < buf.length ? bytesRemaining : buf.length); 147 148 bytesRead = in.read(buf, 0, bytesToRead); 149 if (bytesRead == -1) 150 break; 151 152 out.write(buf, 0, bytesRead); 153 bytesRemaining -= bytesRead; 154 } 155 if (close) { 156 out.close(); 157 out = null; 158 in.close(); 159 in = null; 160 } 161 } finally { 162 if (close) { 163 closeStream(out); 164 closeStream(in); 165 } 166 } 167 } 168 169 /** 170 * Utility wrapper for reading from {@link InputStream}. It catches any errors 171 * thrown by the underlying stream (either IO or decompression-related), and 172 * re-throws as an IOException. 173 * 174 * @param is - InputStream to be read from 175 * @param buf - buffer the data is read into 176 * @param off - offset within buf 177 * @param len - amount of data to be read 178 * @return number of bytes read 179 */ 180 public static int wrappedReadForCompressedData(InputStream is, byte[] buf, 181 int off, int len) throws IOException { 182 try { 183 return is.read(buf, off, len); 184 } catch (IOException ie) { 185 throw ie; 186 } catch (Throwable t) { 187 throw new IOException("Error while reading compressed data", t); 188 } 189 } 190 191 /** 192 * Reads len bytes in a loop. 193 * 194 * @param in InputStream to read from 195 * @param buf The buffer to fill 196 * @param off offset from the buffer 197 * @param len the length of bytes to read 198 * @throws IOException if it could not read requested number of bytes 199 * for any reason (including EOF) 200 */ 201 public static void readFully(InputStream in, byte[] buf, 202 int off, int len) throws IOException { 203 int toRead = len; 204 while (toRead > 0) { 205 int ret = in.read(buf, off, toRead); 206 if (ret < 0) { 207 throw new IOException( "Premature EOF from inputStream"); 208 } 209 toRead -= ret; 210 off += ret; 211 } 212 } 213 214 /** 215 * Similar to readFully(). Skips bytes in a loop. 216 * @param in The InputStream to skip bytes from 217 * @param len number of bytes to skip. 218 * @throws IOException if it could not skip requested number of bytes 219 * for any reason (including EOF) 220 */ 221 public static void skipFully(InputStream in, long len) throws IOException { 222 long amt = len; 223 while (amt > 0) { 224 long ret = in.skip(amt); 225 if (ret == 0) { 226 // skip may return 0 even if we're not at EOF. Luckily, we can 227 // use the read() method to figure out if we're at the end. 228 int b = in.read(); 229 if (b == -1) { 230 throw new EOFException( "Premature EOF from inputStream after " + 231 "skipping " + (len - amt) + " byte(s)."); 232 } 233 ret = 1; 234 } 235 amt -= ret; 236 } 237 } 238 239 /** 240 * Close the Closeable objects and <b>ignore</b> any {@link IOException} or 241 * null pointers. Must only be used for cleanup in exception handlers. 242 * 243 * @param log the log to record problems to at debug level. Can be null. 244 * @param closeables the objects to close 245 */ 246 public static void cleanup(Log log, java.io.Closeable... closeables) { 247 for (java.io.Closeable c : closeables) { 248 if (c != null) { 249 try { 250 c.close(); 251 } catch(IOException e) { 252 if (log != null && log.isDebugEnabled()) { 253 log.debug("Exception in closing " + c, e); 254 } 255 } 256 } 257 } 258 } 259 260 /** 261 * Closes the stream ignoring {@link IOException}. 262 * Must only be called in cleaning up from exception handlers. 263 * 264 * @param stream the Stream to close 265 */ 266 public static void closeStream(java.io.Closeable stream) { 267 cleanup(null, stream); 268 } 269 270 /** 271 * Closes the socket ignoring {@link IOException} 272 * 273 * @param sock the Socket to close 274 */ 275 public static void closeSocket(Socket sock) { 276 if (sock != null) { 277 try { 278 sock.close(); 279 } catch (IOException ignored) { 280 } 281 } 282 } 283 284 /** 285 * The /dev/null of OutputStreams. 286 */ 287 public static class NullOutputStream extends OutputStream { 288 @Override 289 public void write(byte[] b, int off, int len) throws IOException { 290 } 291 292 @Override 293 public void write(int b) throws IOException { 294 } 295 } 296 297 /** 298 * Write a ByteBuffer to a WritableByteChannel, handling short writes. 299 * 300 * @param bc The WritableByteChannel to write to 301 * @param buf The input buffer 302 * @throws IOException On I/O error 303 */ 304 public static void writeFully(WritableByteChannel bc, ByteBuffer buf) 305 throws IOException { 306 do { 307 bc.write(buf); 308 } while (buf.remaining() > 0); 309 } 310 311 /** 312 * Write a ByteBuffer to a FileChannel at a given offset, 313 * handling short writes. 314 * 315 * @param fc The FileChannel to write to 316 * @param buf The input buffer 317 * @param offset The offset in the file to start writing at 318 * @throws IOException On I/O error 319 */ 320 public static void writeFully(FileChannel fc, ByteBuffer buf, 321 long offset) throws IOException { 322 do { 323 offset += fc.write(buf, offset); 324 } while (buf.remaining() > 0); 325 } 326 327 /** 328 * Return the complete list of files in a directory as strings.<p/> 329 * 330 * This is better than File#listDir because it does not ignore IOExceptions. 331 * 332 * @param dir The directory to list. 333 * @param filter If non-null, the filter to use when listing 334 * this directory. 335 * @return The list of files in the directory. 336 * 337 * @throws IOException On I/O error 338 */ 339 public static List<String> listDirectory(File dir, FilenameFilter filter) 340 throws IOException { 341 ArrayList<String> list = new ArrayList<String> (); 342 try (DirectoryStream<Path> stream = 343 Files.newDirectoryStream(dir.toPath())) { 344 for (Path entry: stream) { 345 String fileName = entry.getFileName().toString(); 346 if ((filter == null) || filter.accept(dir, fileName)) { 347 list.add(fileName); 348 } 349 } 350 } catch (DirectoryIteratorException e) { 351 throw e.getCause(); 352 } 353 return list; 354 } 355}