001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.io;
020
021import java.io.*;
022import java.net.Socket;
023import java.nio.ByteBuffer;
024import java.nio.channels.FileChannel;
025import java.nio.channels.WritableByteChannel;
026import java.nio.file.DirectoryStream;
027import java.nio.file.DirectoryIteratorException;
028import java.nio.file.Files;
029import java.nio.file.Path;
030import java.util.ArrayList;
031import java.util.List;
032
033import org.apache.commons.logging.Log;
034
035import org.apache.hadoop.classification.InterfaceAudience;
036import org.apache.hadoop.classification.InterfaceStability;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.util.ChunkedArrayList;
039
040/**
041 * An utility class for I/O related functionality. 
042 */
043@InterfaceAudience.Public
044@InterfaceStability.Evolving
045public class IOUtils {
046
047  /**
048   * Copies from one stream to another.
049   *
050   * @param in InputStrem to read from
051   * @param out OutputStream to write to
052   * @param buffSize the size of the buffer 
053   * @param close whether or not close the InputStream and 
054   * OutputStream at the end. The streams are closed in the finally clause.  
055   */
056  public static void copyBytes(InputStream in, OutputStream out, int buffSize, boolean close) 
057    throws IOException {
058    try {
059      copyBytes(in, out, buffSize);
060      if(close) {
061        out.close();
062        out = null;
063        in.close();
064        in = null;
065      }
066    } finally {
067      if(close) {
068        closeStream(out);
069        closeStream(in);
070      }
071    }
072  }
073  
074  /**
075   * Copies from one stream to another.
076   * 
077   * @param in InputStrem to read from
078   * @param out OutputStream to write to
079   * @param buffSize the size of the buffer 
080   */
081  public static void copyBytes(InputStream in, OutputStream out, int buffSize) 
082    throws IOException {
083    PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null;
084    byte buf[] = new byte[buffSize];
085    int bytesRead = in.read(buf);
086    while (bytesRead >= 0) {
087      out.write(buf, 0, bytesRead);
088      if ((ps != null) && ps.checkError()) {
089        throw new IOException("Unable to write to output stream.");
090      }
091      bytesRead = in.read(buf);
092    }
093  }
094
095  /**
096   * Copies from one stream to another. <strong>closes the input and output streams 
097   * at the end</strong>.
098   *
099   * @param in InputStrem to read from
100   * @param out OutputStream to write to
101   * @param conf the Configuration object 
102   */
103  public static void copyBytes(InputStream in, OutputStream out, Configuration conf)
104    throws IOException {
105    copyBytes(in, out, conf.getInt("io.file.buffer.size", 4096), true);
106  }
107  
108  /**
109   * Copies from one stream to another.
110   *
111   * @param in InputStream to read from
112   * @param out OutputStream to write to
113   * @param conf the Configuration object
114   * @param close whether or not close the InputStream and 
115   * OutputStream at the end. The streams are closed in the finally clause.
116   */
117  public static void copyBytes(InputStream in, OutputStream out, Configuration conf, boolean close)
118    throws IOException {
119    copyBytes(in, out, conf.getInt("io.file.buffer.size", 4096),  close);
120  }
121
122  /**
123   * Copies count bytes from one stream to another.
124   *
125   * @param in InputStream to read from
126   * @param out OutputStream to write to
127   * @param count number of bytes to copy
128   * @param close whether to close the streams
129   * @throws IOException if bytes can not be read or written
130   */
131  public static void copyBytes(InputStream in, OutputStream out, long count,
132      boolean close) throws IOException {
133    byte buf[] = new byte[4096];
134    long bytesRemaining = count;
135    int bytesRead;
136
137    try {
138      while (bytesRemaining > 0) {
139        int bytesToRead = (int)
140          (bytesRemaining < buf.length ? bytesRemaining : buf.length);
141
142        bytesRead = in.read(buf, 0, bytesToRead);
143        if (bytesRead == -1)
144          break;
145
146        out.write(buf, 0, bytesRead);
147        bytesRemaining -= bytesRead;
148      }
149      if (close) {
150        out.close();
151        out = null;
152        in.close();
153        in = null;
154      }
155    } finally {
156      if (close) {
157        closeStream(out);
158        closeStream(in);
159      }
160    }
161  }
162  
163  /**
164   * Utility wrapper for reading from {@link InputStream}. It catches any errors
165   * thrown by the underlying stream (either IO or decompression-related), and
166   * re-throws as an IOException.
167   * 
168   * @param is - InputStream to be read from
169   * @param buf - buffer the data is read into
170   * @param off - offset within buf
171   * @param len - amount of data to be read
172   * @return number of bytes read
173   */
174  public static int wrappedReadForCompressedData(InputStream is, byte[] buf,
175      int off, int len) throws IOException {
176    try {
177      return is.read(buf, off, len);
178    } catch (IOException ie) {
179      throw ie;
180    } catch (Throwable t) {
181      throw new IOException("Error while reading compressed data", t);
182    }
183  }
184
185  /**
186   * Reads len bytes in a loop.
187   *
188   * @param in InputStream to read from
189   * @param buf The buffer to fill
190   * @param off offset from the buffer
191   * @param len the length of bytes to read
192   * @throws IOException if it could not read requested number of bytes 
193   * for any reason (including EOF)
194   */
195  public static void readFully(InputStream in, byte buf[],
196      int off, int len) throws IOException {
197    int toRead = len;
198    while (toRead > 0) {
199      int ret = in.read(buf, off, toRead);
200      if (ret < 0) {
201        throw new IOException( "Premature EOF from inputStream");
202      }
203      toRead -= ret;
204      off += ret;
205    }
206  }
207  
208  /**
209   * Similar to readFully(). Skips bytes in a loop.
210   * @param in The InputStream to skip bytes from
211   * @param len number of bytes to skip.
212   * @throws IOException if it could not skip requested number of bytes 
213   * for any reason (including EOF)
214   */
215  public static void skipFully(InputStream in, long len) throws IOException {
216    long amt = len;
217    while (amt > 0) {
218      long ret = in.skip(amt);
219      if (ret == 0) {
220        // skip may return 0 even if we're not at EOF.  Luckily, we can 
221        // use the read() method to figure out if we're at the end.
222        int b = in.read();
223        if (b == -1) {
224          throw new EOFException( "Premature EOF from inputStream after " +
225              "skipping " + (len - amt) + " byte(s).");
226        }
227        ret = 1;
228      }
229      amt -= ret;
230    }
231  }
232  
233  /**
234   * Close the Closeable objects and <b>ignore</b> any {@link IOException} or 
235   * null pointers. Must only be used for cleanup in exception handlers.
236   *
237   * @param log the log to record problems to at debug level. Can be null.
238   * @param closeables the objects to close
239   */
240  public static void cleanup(Log log, java.io.Closeable... closeables) {
241    for (java.io.Closeable c : closeables) {
242      if (c != null) {
243        try {
244          c.close();
245        } catch(IOException e) {
246          if (log != null && log.isDebugEnabled()) {
247            log.debug("Exception in closing " + c, e);
248          }
249        }
250      }
251    }
252  }
253
254  /**
255   * Closes the stream ignoring {@link IOException}.
256   * Must only be called in cleaning up from exception handlers.
257   *
258   * @param stream the Stream to close
259   */
260  public static void closeStream(java.io.Closeable stream) {
261    cleanup(null, stream);
262  }
263  
264  /**
265   * Closes the socket ignoring {@link IOException}
266   *
267   * @param sock the Socket to close
268   */
269  public static void closeSocket(Socket sock) {
270    if (sock != null) {
271      try {
272        sock.close();
273      } catch (IOException ignored) {
274      }
275    }
276  }
277  
278  /**
279   * The /dev/null of OutputStreams.
280   */
281  public static class NullOutputStream extends OutputStream {
282    @Override
283    public void write(byte[] b, int off, int len) throws IOException {
284    }
285
286    @Override
287    public void write(int b) throws IOException {
288    }
289  }  
290  
291  /**
292   * Write a ByteBuffer to a WritableByteChannel, handling short writes.
293   * 
294   * @param bc               The WritableByteChannel to write to
295   * @param buf              The input buffer
296   * @throws IOException     On I/O error
297   */
298  public static void writeFully(WritableByteChannel bc, ByteBuffer buf)
299      throws IOException {
300    do {
301      bc.write(buf);
302    } while (buf.remaining() > 0);
303  }
304
305  /**
306   * Write a ByteBuffer to a FileChannel at a given offset, 
307   * handling short writes.
308   * 
309   * @param fc               The FileChannel to write to
310   * @param buf              The input buffer
311   * @param offset           The offset in the file to start writing at
312   * @throws IOException     On I/O error
313   */
314  public static void writeFully(FileChannel fc, ByteBuffer buf,
315      long offset) throws IOException {
316    do {
317      offset += fc.write(buf, offset);
318    } while (buf.remaining() > 0);
319  }
320
321  /**
322   * Return the complete list of files in a directory as strings.<p/>
323   *
324   * This is better than File#listDir because it does not ignore IOExceptions.
325   *
326   * @param dir              The directory to list.
327   * @param filter           If non-null, the filter to use when listing
328   *                         this directory.
329   * @return                 The list of files in the directory.
330   *
331   * @throws IOException     On I/O error
332   */
333  public static List<String> listDirectory(File dir, FilenameFilter filter)
334      throws IOException {
335    ArrayList<String> list = new ArrayList<String> ();
336    try (DirectoryStream<Path> stream =
337             Files.newDirectoryStream(dir.toPath())) {
338      for (Path entry: stream) {
339        String fileName = entry.getFileName().toString();
340        if ((filter == null) || filter.accept(dir, fileName)) {
341          list.add(fileName);
342        }
343      }
344    } catch (DirectoryIteratorException e) {
345      throw e.getCause();
346    }
347    return list;
348  }
349}