001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.io;
020
021import java.io.*;
022import java.net.Socket;
023import java.nio.ByteBuffer;
024import java.nio.channels.FileChannel;
025import java.nio.channels.WritableByteChannel;
026import java.nio.file.DirectoryStream;
027import java.nio.file.DirectoryIteratorException;
028import java.nio.file.Files;
029import java.nio.file.Path;
030import java.nio.file.StandardOpenOption;
031import java.util.ArrayList;
032import java.util.List;
033
034import org.apache.commons.logging.Log;
035
036import org.apache.hadoop.classification.InterfaceAudience;
037import org.apache.hadoop.classification.InterfaceStability;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.util.Shell;
040
041/**
042 * An utility class for I/O related functionality. 
043 */
044@InterfaceAudience.Public
045@InterfaceStability.Evolving
046public class IOUtils {
047
048  /**
049   * Copies from one stream to another.
050   *
051   * @param in InputStrem to read from
052   * @param out OutputStream to write to
053   * @param buffSize the size of the buffer 
054   * @param close whether or not close the InputStream and 
055   * OutputStream at the end. The streams are closed in the finally clause.  
056   */
057  public static void copyBytes(InputStream in, OutputStream out, int buffSize, boolean close) 
058    throws IOException {
059    try {
060      copyBytes(in, out, buffSize);
061      if(close) {
062        out.close();
063        out = null;
064        in.close();
065        in = null;
066      }
067    } finally {
068      if(close) {
069        closeStream(out);
070        closeStream(in);
071      }
072    }
073  }
074  
075  /**
076   * Copies from one stream to another.
077   * 
078   * @param in InputStrem to read from
079   * @param out OutputStream to write to
080   * @param buffSize the size of the buffer 
081   */
082  public static void copyBytes(InputStream in, OutputStream out, int buffSize) 
083    throws IOException {
084    PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null;
085    byte buf[] = new byte[buffSize];
086    int bytesRead = in.read(buf);
087    while (bytesRead >= 0) {
088      out.write(buf, 0, bytesRead);
089      if ((ps != null) && ps.checkError()) {
090        throw new IOException("Unable to write to output stream.");
091      }
092      bytesRead = in.read(buf);
093    }
094  }
095
096  /**
097   * Copies from one stream to another. <strong>closes the input and output streams 
098   * at the end</strong>.
099   *
100   * @param in InputStrem to read from
101   * @param out OutputStream to write to
102   * @param conf the Configuration object 
103   */
104  public static void copyBytes(InputStream in, OutputStream out, Configuration conf)
105    throws IOException {
106    copyBytes(in, out, conf.getInt("io.file.buffer.size", 4096), true);
107  }
108  
109  /**
110   * Copies from one stream to another.
111   *
112   * @param in InputStream to read from
113   * @param out OutputStream to write to
114   * @param conf the Configuration object
115   * @param close whether or not close the InputStream and 
116   * OutputStream at the end. The streams are closed in the finally clause.
117   */
118  public static void copyBytes(InputStream in, OutputStream out, Configuration conf, boolean close)
119    throws IOException {
120    copyBytes(in, out, conf.getInt("io.file.buffer.size", 4096),  close);
121  }
122
123  /**
124   * Copies count bytes from one stream to another.
125   *
126   * @param in InputStream to read from
127   * @param out OutputStream to write to
128   * @param count number of bytes to copy
129   * @param close whether to close the streams
130   * @throws IOException if bytes can not be read or written
131   */
132  public static void copyBytes(InputStream in, OutputStream out, long count,
133      boolean close) throws IOException {
134    byte buf[] = new byte[4096];
135    long bytesRemaining = count;
136    int bytesRead;
137
138    try {
139      while (bytesRemaining > 0) {
140        int bytesToRead = (int)
141          (bytesRemaining < buf.length ? bytesRemaining : buf.length);
142
143        bytesRead = in.read(buf, 0, bytesToRead);
144        if (bytesRead == -1)
145          break;
146
147        out.write(buf, 0, bytesRead);
148        bytesRemaining -= bytesRead;
149      }
150      if (close) {
151        out.close();
152        out = null;
153        in.close();
154        in = null;
155      }
156    } finally {
157      if (close) {
158        closeStream(out);
159        closeStream(in);
160      }
161    }
162  }
163  
164  /**
165   * Utility wrapper for reading from {@link InputStream}. It catches any errors
166   * thrown by the underlying stream (either IO or decompression-related), and
167   * re-throws as an IOException.
168   * 
169   * @param is - InputStream to be read from
170   * @param buf - buffer the data is read into
171   * @param off - offset within buf
172   * @param len - amount of data to be read
173   * @return number of bytes read
174   */
175  public static int wrappedReadForCompressedData(InputStream is, byte[] buf,
176      int off, int len) throws IOException {
177    try {
178      return is.read(buf, off, len);
179    } catch (IOException ie) {
180      throw ie;
181    } catch (Throwable t) {
182      throw new IOException("Error while reading compressed data", t);
183    }
184  }
185
186  /**
187   * Reads len bytes in a loop.
188   *
189   * @param in InputStream to read from
190   * @param buf The buffer to fill
191   * @param off offset from the buffer
192   * @param len the length of bytes to read
193   * @throws IOException if it could not read requested number of bytes 
194   * for any reason (including EOF)
195   */
196  public static void readFully(InputStream in, byte buf[],
197      int off, int len) throws IOException {
198    int toRead = len;
199    while (toRead > 0) {
200      int ret = in.read(buf, off, toRead);
201      if (ret < 0) {
202        throw new IOException( "Premature EOF from inputStream");
203      }
204      toRead -= ret;
205      off += ret;
206    }
207  }
208  
209  /**
210   * Similar to readFully(). Skips bytes in a loop.
211   * @param in The InputStream to skip bytes from
212   * @param len number of bytes to skip.
213   * @throws IOException if it could not skip requested number of bytes 
214   * for any reason (including EOF)
215   */
216  public static void skipFully(InputStream in, long len) throws IOException {
217    long amt = len;
218    while (amt > 0) {
219      long ret = in.skip(amt);
220      if (ret == 0) {
221        // skip may return 0 even if we're not at EOF.  Luckily, we can 
222        // use the read() method to figure out if we're at the end.
223        int b = in.read();
224        if (b == -1) {
225          throw new EOFException( "Premature EOF from inputStream after " +
226              "skipping " + (len - amt) + " byte(s).");
227        }
228        ret = 1;
229      }
230      amt -= ret;
231    }
232  }
233  
234  /**
235   * Close the Closeable objects and <b>ignore</b> any {@link IOException} or 
236   * null pointers. Must only be used for cleanup in exception handlers.
237   *
238   * @param log the log to record problems to at debug level. Can be null.
239   * @param closeables the objects to close
240   */
241  public static void cleanup(Log log, java.io.Closeable... closeables) {
242    for (java.io.Closeable c : closeables) {
243      if (c != null) {
244        try {
245          c.close();
246        } catch(IOException e) {
247          if (log != null && log.isDebugEnabled()) {
248            log.debug("Exception in closing " + c, e);
249          }
250        }
251      }
252    }
253  }
254
255  /**
256   * Closes the stream ignoring {@link IOException}.
257   * Must only be called in cleaning up from exception handlers.
258   *
259   * @param stream the Stream to close
260   */
261  public static void closeStream(java.io.Closeable stream) {
262    cleanup(null, stream);
263  }
264  
265  /**
266   * Closes the socket ignoring {@link IOException}
267   *
268   * @param sock the Socket to close
269   */
270  public static void closeSocket(Socket sock) {
271    if (sock != null) {
272      try {
273        sock.close();
274      } catch (IOException ignored) {
275      }
276    }
277  }
278  
279  /**
280   * The /dev/null of OutputStreams.
281   */
282  public static class NullOutputStream extends OutputStream {
283    @Override
284    public void write(byte[] b, int off, int len) throws IOException {
285    }
286
287    @Override
288    public void write(int b) throws IOException {
289    }
290  }  
291  
292  /**
293   * Write a ByteBuffer to a WritableByteChannel, handling short writes.
294   * 
295   * @param bc               The WritableByteChannel to write to
296   * @param buf              The input buffer
297   * @throws IOException     On I/O error
298   */
299  public static void writeFully(WritableByteChannel bc, ByteBuffer buf)
300      throws IOException {
301    do {
302      bc.write(buf);
303    } while (buf.remaining() > 0);
304  }
305
306  /**
307   * Write a ByteBuffer to a FileChannel at a given offset, 
308   * handling short writes.
309   * 
310   * @param fc               The FileChannel to write to
311   * @param buf              The input buffer
312   * @param offset           The offset in the file to start writing at
313   * @throws IOException     On I/O error
314   */
315  public static void writeFully(FileChannel fc, ByteBuffer buf,
316      long offset) throws IOException {
317    do {
318      offset += fc.write(buf, offset);
319    } while (buf.remaining() > 0);
320  }
321
322  /**
323   * Return the complete list of files in a directory as strings.<p/>
324   *
325   * This is better than File#listDir because it does not ignore IOExceptions.
326   *
327   * @param dir              The directory to list.
328   * @param filter           If non-null, the filter to use when listing
329   *                         this directory.
330   * @return                 The list of files in the directory.
331   *
332   * @throws IOException     On I/O error
333   */
334  public static List<String> listDirectory(File dir, FilenameFilter filter)
335      throws IOException {
336    ArrayList<String> list = new ArrayList<String> ();
337    try (DirectoryStream<Path> stream =
338             Files.newDirectoryStream(dir.toPath())) {
339      for (Path entry: stream) {
340        String fileName = entry.getFileName().toString();
341        if ((filter == null) || filter.accept(dir, fileName)) {
342          list.add(fileName);
343        }
344      }
345    } catch (DirectoryIteratorException e) {
346      throw e.getCause();
347    }
348    return list;
349  }
350
351  /**
352   * Ensure that any writes to the given file is written to the storage device
353   * that contains it. This method opens channel on given File and closes it
354   * once the sync is done.<br>
355   * Borrowed from Uwe Schindler in LUCENE-5588
356   * @param fileToSync the file to fsync
357   */
358  public static void fsync(File fileToSync) throws IOException {
359    if (!fileToSync.exists()) {
360      throw new FileNotFoundException(
361          "File/Directory " + fileToSync.getAbsolutePath() + " does not exist");
362    }
363    boolean isDir = fileToSync.isDirectory();
364    // If the file is a directory we have to open read-only, for regular files
365    // we must open r/w for the fsync to have an effect. See
366    // http://blog.httrack.com/blog/2013/11/15/
367    // everything-you-always-wanted-to-know-about-fsync/
368    try(FileChannel channel = FileChannel.open(fileToSync.toPath(),
369        isDir ? StandardOpenOption.READ : StandardOpenOption.WRITE)){
370      fsync(channel, isDir);
371    }
372  }
373
374  /**
375   * Ensure that any writes to the given file is written to the storage device
376   * that contains it. This method opens channel on given File and closes it
377   * once the sync is done.
378   * Borrowed from Uwe Schindler in LUCENE-5588
379   * @param channel Channel to sync
380   * @param isDir if true, the given file is a directory (Channel should be
381   *          opened for read and ignore IOExceptions, because not all file
382   *          systems and operating systems allow to fsync on a directory)
383   * @throws IOException
384   */
385  public static void fsync(FileChannel channel, boolean isDir)
386      throws IOException {
387    try {
388      channel.force(true);
389    } catch (IOException ioe) {
390      if (isDir) {
391        assert !(Shell.LINUX
392            || Shell.MAC) : "On Linux and MacOSX fsyncing a directory"
393                + " should not throw IOException, we just don't want to rely"
394                + " on that in production (undocumented)" + ". Got: " + ioe;
395        // Ignore exception if it is a directory
396        return;
397      }
398      // Throw original exception
399      throw ioe;
400    }
401  }
402}