001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.io; 020 021import java.io.*; 022import java.net.Socket; 023import java.nio.ByteBuffer; 024import java.nio.channels.FileChannel; 025import java.nio.channels.WritableByteChannel; 026import java.nio.file.DirectoryStream; 027import java.nio.file.DirectoryIteratorException; 028import java.nio.file.Files; 029import java.nio.file.Path; 030import java.nio.file.StandardOpenOption; 031import java.util.ArrayList; 032import java.util.List; 033 034import org.apache.commons.logging.Log; 035 036import org.apache.hadoop.classification.InterfaceAudience; 037import org.apache.hadoop.classification.InterfaceStability; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.util.Shell; 040 041/** 042 * An utility class for I/O related functionality. 043 */ 044@InterfaceAudience.Public 045@InterfaceStability.Evolving 046public class IOUtils { 047 048 /** 049 * Copies from one stream to another. 050 * 051 * @param in InputStrem to read from 052 * @param out OutputStream to write to 053 * @param buffSize the size of the buffer 054 * @param close whether or not close the InputStream and 055 * OutputStream at the end. The streams are closed in the finally clause. 056 */ 057 public static void copyBytes(InputStream in, OutputStream out, int buffSize, boolean close) 058 throws IOException { 059 try { 060 copyBytes(in, out, buffSize); 061 if(close) { 062 out.close(); 063 out = null; 064 in.close(); 065 in = null; 066 } 067 } finally { 068 if(close) { 069 closeStream(out); 070 closeStream(in); 071 } 072 } 073 } 074 075 /** 076 * Copies from one stream to another. 077 * 078 * @param in InputStrem to read from 079 * @param out OutputStream to write to 080 * @param buffSize the size of the buffer 081 */ 082 public static void copyBytes(InputStream in, OutputStream out, int buffSize) 083 throws IOException { 084 PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null; 085 byte buf[] = new byte[buffSize]; 086 int bytesRead = in.read(buf); 087 while (bytesRead >= 0) { 088 out.write(buf, 0, bytesRead); 089 if ((ps != null) && ps.checkError()) { 090 throw new IOException("Unable to write to output stream."); 091 } 092 bytesRead = in.read(buf); 093 } 094 } 095 096 /** 097 * Copies from one stream to another. <strong>closes the input and output streams 098 * at the end</strong>. 099 * 100 * @param in InputStrem to read from 101 * @param out OutputStream to write to 102 * @param conf the Configuration object 103 */ 104 public static void copyBytes(InputStream in, OutputStream out, Configuration conf) 105 throws IOException { 106 copyBytes(in, out, conf.getInt("io.file.buffer.size", 4096), true); 107 } 108 109 /** 110 * Copies from one stream to another. 111 * 112 * @param in InputStream to read from 113 * @param out OutputStream to write to 114 * @param conf the Configuration object 115 * @param close whether or not close the InputStream and 116 * OutputStream at the end. The streams are closed in the finally clause. 117 */ 118 public static void copyBytes(InputStream in, OutputStream out, Configuration conf, boolean close) 119 throws IOException { 120 copyBytes(in, out, conf.getInt("io.file.buffer.size", 4096), close); 121 } 122 123 /** 124 * Copies count bytes from one stream to another. 125 * 126 * @param in InputStream to read from 127 * @param out OutputStream to write to 128 * @param count number of bytes to copy 129 * @param close whether to close the streams 130 * @throws IOException if bytes can not be read or written 131 */ 132 public static void copyBytes(InputStream in, OutputStream out, long count, 133 boolean close) throws IOException { 134 byte buf[] = new byte[4096]; 135 long bytesRemaining = count; 136 int bytesRead; 137 138 try { 139 while (bytesRemaining > 0) { 140 int bytesToRead = (int) 141 (bytesRemaining < buf.length ? bytesRemaining : buf.length); 142 143 bytesRead = in.read(buf, 0, bytesToRead); 144 if (bytesRead == -1) 145 break; 146 147 out.write(buf, 0, bytesRead); 148 bytesRemaining -= bytesRead; 149 } 150 if (close) { 151 out.close(); 152 out = null; 153 in.close(); 154 in = null; 155 } 156 } finally { 157 if (close) { 158 closeStream(out); 159 closeStream(in); 160 } 161 } 162 } 163 164 /** 165 * Utility wrapper for reading from {@link InputStream}. It catches any errors 166 * thrown by the underlying stream (either IO or decompression-related), and 167 * re-throws as an IOException. 168 * 169 * @param is - InputStream to be read from 170 * @param buf - buffer the data is read into 171 * @param off - offset within buf 172 * @param len - amount of data to be read 173 * @return number of bytes read 174 */ 175 public static int wrappedReadForCompressedData(InputStream is, byte[] buf, 176 int off, int len) throws IOException { 177 try { 178 return is.read(buf, off, len); 179 } catch (IOException ie) { 180 throw ie; 181 } catch (Throwable t) { 182 throw new IOException("Error while reading compressed data", t); 183 } 184 } 185 186 /** 187 * Reads len bytes in a loop. 188 * 189 * @param in InputStream to read from 190 * @param buf The buffer to fill 191 * @param off offset from the buffer 192 * @param len the length of bytes to read 193 * @throws IOException if it could not read requested number of bytes 194 * for any reason (including EOF) 195 */ 196 public static void readFully(InputStream in, byte buf[], 197 int off, int len) throws IOException { 198 int toRead = len; 199 while (toRead > 0) { 200 int ret = in.read(buf, off, toRead); 201 if (ret < 0) { 202 throw new IOException( "Premature EOF from inputStream"); 203 } 204 toRead -= ret; 205 off += ret; 206 } 207 } 208 209 /** 210 * Similar to readFully(). Skips bytes in a loop. 211 * @param in The InputStream to skip bytes from 212 * @param len number of bytes to skip. 213 * @throws IOException if it could not skip requested number of bytes 214 * for any reason (including EOF) 215 */ 216 public static void skipFully(InputStream in, long len) throws IOException { 217 long amt = len; 218 while (amt > 0) { 219 long ret = in.skip(amt); 220 if (ret == 0) { 221 // skip may return 0 even if we're not at EOF. Luckily, we can 222 // use the read() method to figure out if we're at the end. 223 int b = in.read(); 224 if (b == -1) { 225 throw new EOFException( "Premature EOF from inputStream after " + 226 "skipping " + (len - amt) + " byte(s)."); 227 } 228 ret = 1; 229 } 230 amt -= ret; 231 } 232 } 233 234 /** 235 * Close the Closeable objects and <b>ignore</b> any {@link IOException} or 236 * null pointers. Must only be used for cleanup in exception handlers. 237 * 238 * @param log the log to record problems to at debug level. Can be null. 239 * @param closeables the objects to close 240 */ 241 public static void cleanup(Log log, java.io.Closeable... closeables) { 242 for (java.io.Closeable c : closeables) { 243 if (c != null) { 244 try { 245 c.close(); 246 } catch(IOException e) { 247 if (log != null && log.isDebugEnabled()) { 248 log.debug("Exception in closing " + c, e); 249 } 250 } 251 } 252 } 253 } 254 255 /** 256 * Closes the stream ignoring {@link IOException}. 257 * Must only be called in cleaning up from exception handlers. 258 * 259 * @param stream the Stream to close 260 */ 261 public static void closeStream(java.io.Closeable stream) { 262 cleanup(null, stream); 263 } 264 265 /** 266 * Closes the socket ignoring {@link IOException} 267 * 268 * @param sock the Socket to close 269 */ 270 public static void closeSocket(Socket sock) { 271 if (sock != null) { 272 try { 273 sock.close(); 274 } catch (IOException ignored) { 275 } 276 } 277 } 278 279 /** 280 * The /dev/null of OutputStreams. 281 */ 282 public static class NullOutputStream extends OutputStream { 283 @Override 284 public void write(byte[] b, int off, int len) throws IOException { 285 } 286 287 @Override 288 public void write(int b) throws IOException { 289 } 290 } 291 292 /** 293 * Write a ByteBuffer to a WritableByteChannel, handling short writes. 294 * 295 * @param bc The WritableByteChannel to write to 296 * @param buf The input buffer 297 * @throws IOException On I/O error 298 */ 299 public static void writeFully(WritableByteChannel bc, ByteBuffer buf) 300 throws IOException { 301 do { 302 bc.write(buf); 303 } while (buf.remaining() > 0); 304 } 305 306 /** 307 * Write a ByteBuffer to a FileChannel at a given offset, 308 * handling short writes. 309 * 310 * @param fc The FileChannel to write to 311 * @param buf The input buffer 312 * @param offset The offset in the file to start writing at 313 * @throws IOException On I/O error 314 */ 315 public static void writeFully(FileChannel fc, ByteBuffer buf, 316 long offset) throws IOException { 317 do { 318 offset += fc.write(buf, offset); 319 } while (buf.remaining() > 0); 320 } 321 322 /** 323 * Return the complete list of files in a directory as strings.<p/> 324 * 325 * This is better than File#listDir because it does not ignore IOExceptions. 326 * 327 * @param dir The directory to list. 328 * @param filter If non-null, the filter to use when listing 329 * this directory. 330 * @return The list of files in the directory. 331 * 332 * @throws IOException On I/O error 333 */ 334 public static List<String> listDirectory(File dir, FilenameFilter filter) 335 throws IOException { 336 ArrayList<String> list = new ArrayList<String> (); 337 try (DirectoryStream<Path> stream = 338 Files.newDirectoryStream(dir.toPath())) { 339 for (Path entry: stream) { 340 String fileName = entry.getFileName().toString(); 341 if ((filter == null) || filter.accept(dir, fileName)) { 342 list.add(fileName); 343 } 344 } 345 } catch (DirectoryIteratorException e) { 346 throw e.getCause(); 347 } 348 return list; 349 } 350 351 /** 352 * Ensure that any writes to the given file is written to the storage device 353 * that contains it. This method opens channel on given File and closes it 354 * once the sync is done.<br> 355 * Borrowed from Uwe Schindler in LUCENE-5588 356 * @param fileToSync the file to fsync 357 */ 358 public static void fsync(File fileToSync) throws IOException { 359 if (!fileToSync.exists()) { 360 throw new FileNotFoundException( 361 "File/Directory " + fileToSync.getAbsolutePath() + " does not exist"); 362 } 363 boolean isDir = fileToSync.isDirectory(); 364 // If the file is a directory we have to open read-only, for regular files 365 // we must open r/w for the fsync to have an effect. See 366 // http://blog.httrack.com/blog/2013/11/15/ 367 // everything-you-always-wanted-to-know-about-fsync/ 368 try(FileChannel channel = FileChannel.open(fileToSync.toPath(), 369 isDir ? StandardOpenOption.READ : StandardOpenOption.WRITE)){ 370 fsync(channel, isDir); 371 } 372 } 373 374 /** 375 * Ensure that any writes to the given file is written to the storage device 376 * that contains it. This method opens channel on given File and closes it 377 * once the sync is done. 378 * Borrowed from Uwe Schindler in LUCENE-5588 379 * @param channel Channel to sync 380 * @param isDir if true, the given file is a directory (Channel should be 381 * opened for read and ignore IOExceptions, because not all file 382 * systems and operating systems allow to fsync on a directory) 383 * @throws IOException 384 */ 385 public static void fsync(FileChannel channel, boolean isDir) 386 throws IOException { 387 try { 388 channel.force(true); 389 } catch (IOException ioe) { 390 if (isDir) { 391 assert !(Shell.LINUX 392 || Shell.MAC) : "On Linux and MacOSX fsyncing a directory" 393 + " should not throw IOException, we just don't want to rely" 394 + " on that in production (undocumented)" + ". Got: " + ioe; 395 // Ignore exception if it is a directory 396 return; 397 } 398 // Throw original exception 399 throw ioe; 400 } 401 } 402}