001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.io;
020
021import java.io.*;
022import java.net.Socket;
023import java.nio.ByteBuffer;
024import java.nio.channels.FileChannel;
025import java.nio.channels.WritableByteChannel;
026import java.nio.file.DirectoryStream;
027import java.nio.file.DirectoryIteratorException;
028import java.nio.file.Files;
029import java.nio.file.Path;
030import java.util.ArrayList;
031import java.util.List;
032
033import org.apache.commons.logging.Log;
034
035import org.apache.hadoop.classification.InterfaceAudience;
036import org.apache.hadoop.classification.InterfaceStability;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.util.ChunkedArrayList;
039
040import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
041import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
042
043/**
044 * An utility class for I/O related functionality. 
045 */
046@InterfaceAudience.Public
047@InterfaceStability.Evolving
048public class IOUtils {
049
050  /**
051   * Copies from one stream to another.
052   *
053   * @param in InputStrem to read from
054   * @param out OutputStream to write to
055   * @param buffSize the size of the buffer 
056   * @param close whether or not close the InputStream and 
057   * OutputStream at the end. The streams are closed in the finally clause.  
058   */
059  public static void copyBytes(InputStream in, OutputStream out,
060                               int buffSize, boolean close)
061    throws IOException {
062    try {
063      copyBytes(in, out, buffSize);
064      if(close) {
065        out.close();
066        out = null;
067        in.close();
068        in = null;
069      }
070    } finally {
071      if(close) {
072        closeStream(out);
073        closeStream(in);
074      }
075    }
076  }
077  
078  /**
079   * Copies from one stream to another.
080   * 
081   * @param in InputStrem to read from
082   * @param out OutputStream to write to
083   * @param buffSize the size of the buffer 
084   */
085  public static void copyBytes(InputStream in, OutputStream out, int buffSize) 
086    throws IOException {
087    PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null;
088    byte buf[] = new byte[buffSize];
089    int bytesRead = in.read(buf);
090    while (bytesRead >= 0) {
091      out.write(buf, 0, bytesRead);
092      if ((ps != null) && ps.checkError()) {
093        throw new IOException("Unable to write to output stream.");
094      }
095      bytesRead = in.read(buf);
096    }
097  }
098
099  /**
100   * Copies from one stream to another. <strong>closes the input and output streams 
101   * at the end</strong>.
102   *
103   * @param in InputStrem to read from
104   * @param out OutputStream to write to
105   * @param conf the Configuration object 
106   */
107  public static void copyBytes(InputStream in, OutputStream out, Configuration conf)
108    throws IOException {
109    copyBytes(in, out, conf.getInt(
110        IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT), true);
111  }
112  
113  /**
114   * Copies from one stream to another.
115   *
116   * @param in InputStream to read from
117   * @param out OutputStream to write to
118   * @param conf the Configuration object
119   * @param close whether or not close the InputStream and 
120   * OutputStream at the end. The streams are closed in the finally clause.
121   */
122  public static void copyBytes(InputStream in, OutputStream out, Configuration conf, boolean close)
123    throws IOException {
124    copyBytes(in, out, conf.getInt(
125        IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT),  close);
126  }
127
128  /**
129   * Copies count bytes from one stream to another.
130   *
131   * @param in InputStream to read from
132   * @param out OutputStream to write to
133   * @param count number of bytes to copy
134   * @param close whether to close the streams
135   * @throws IOException if bytes can not be read or written
136   */
137  public static void copyBytes(InputStream in, OutputStream out, long count,
138      boolean close) throws IOException {
139    byte buf[] = new byte[4096];
140    long bytesRemaining = count;
141    int bytesRead;
142
143    try {
144      while (bytesRemaining > 0) {
145        int bytesToRead = (int)
146          (bytesRemaining < buf.length ? bytesRemaining : buf.length);
147
148        bytesRead = in.read(buf, 0, bytesToRead);
149        if (bytesRead == -1)
150          break;
151
152        out.write(buf, 0, bytesRead);
153        bytesRemaining -= bytesRead;
154      }
155      if (close) {
156        out.close();
157        out = null;
158        in.close();
159        in = null;
160      }
161    } finally {
162      if (close) {
163        closeStream(out);
164        closeStream(in);
165      }
166    }
167  }
168  
169  /**
170   * Utility wrapper for reading from {@link InputStream}. It catches any errors
171   * thrown by the underlying stream (either IO or decompression-related), and
172   * re-throws as an IOException.
173   * 
174   * @param is - InputStream to be read from
175   * @param buf - buffer the data is read into
176   * @param off - offset within buf
177   * @param len - amount of data to be read
178   * @return number of bytes read
179   */
180  public static int wrappedReadForCompressedData(InputStream is, byte[] buf,
181      int off, int len) throws IOException {
182    try {
183      return is.read(buf, off, len);
184    } catch (IOException ie) {
185      throw ie;
186    } catch (Throwable t) {
187      throw new IOException("Error while reading compressed data", t);
188    }
189  }
190
191  /**
192   * Reads len bytes in a loop.
193   *
194   * @param in InputStream to read from
195   * @param buf The buffer to fill
196   * @param off offset from the buffer
197   * @param len the length of bytes to read
198   * @throws IOException if it could not read requested number of bytes 
199   * for any reason (including EOF)
200   */
201  public static void readFully(InputStream in, byte[] buf,
202      int off, int len) throws IOException {
203    int toRead = len;
204    while (toRead > 0) {
205      int ret = in.read(buf, off, toRead);
206      if (ret < 0) {
207        throw new IOException( "Premature EOF from inputStream");
208      }
209      toRead -= ret;
210      off += ret;
211    }
212  }
213  
214  /**
215   * Similar to readFully(). Skips bytes in a loop.
216   * @param in The InputStream to skip bytes from
217   * @param len number of bytes to skip.
218   * @throws IOException if it could not skip requested number of bytes 
219   * for any reason (including EOF)
220   */
221  public static void skipFully(InputStream in, long len) throws IOException {
222    long amt = len;
223    while (amt > 0) {
224      long ret = in.skip(amt);
225      if (ret == 0) {
226        // skip may return 0 even if we're not at EOF.  Luckily, we can 
227        // use the read() method to figure out if we're at the end.
228        int b = in.read();
229        if (b == -1) {
230          throw new EOFException( "Premature EOF from inputStream after " +
231              "skipping " + (len - amt) + " byte(s).");
232        }
233        ret = 1;
234      }
235      amt -= ret;
236    }
237  }
238  
239  /**
240   * Close the Closeable objects and <b>ignore</b> any {@link IOException} or 
241   * null pointers. Must only be used for cleanup in exception handlers.
242   *
243   * @param log the log to record problems to at debug level. Can be null.
244   * @param closeables the objects to close
245   */
246  public static void cleanup(Log log, java.io.Closeable... closeables) {
247    for (java.io.Closeable c : closeables) {
248      if (c != null) {
249        try {
250          c.close();
251        } catch(IOException e) {
252          if (log != null && log.isDebugEnabled()) {
253            log.debug("Exception in closing " + c, e);
254          }
255        }
256      }
257    }
258  }
259
260  /**
261   * Closes the stream ignoring {@link IOException}.
262   * Must only be called in cleaning up from exception handlers.
263   *
264   * @param stream the Stream to close
265   */
266  public static void closeStream(java.io.Closeable stream) {
267    cleanup(null, stream);
268  }
269  
270  /**
271   * Closes the socket ignoring {@link IOException}
272   *
273   * @param sock the Socket to close
274   */
275  public static void closeSocket(Socket sock) {
276    if (sock != null) {
277      try {
278        sock.close();
279      } catch (IOException ignored) {
280      }
281    }
282  }
283  
284  /**
285   * The /dev/null of OutputStreams.
286   */
287  public static class NullOutputStream extends OutputStream {
288    @Override
289    public void write(byte[] b, int off, int len) throws IOException {
290    }
291
292    @Override
293    public void write(int b) throws IOException {
294    }
295  }  
296  
297  /**
298   * Write a ByteBuffer to a WritableByteChannel, handling short writes.
299   * 
300   * @param bc               The WritableByteChannel to write to
301   * @param buf              The input buffer
302   * @throws IOException     On I/O error
303   */
304  public static void writeFully(WritableByteChannel bc, ByteBuffer buf)
305      throws IOException {
306    do {
307      bc.write(buf);
308    } while (buf.remaining() > 0);
309  }
310
311  /**
312   * Write a ByteBuffer to a FileChannel at a given offset, 
313   * handling short writes.
314   * 
315   * @param fc               The FileChannel to write to
316   * @param buf              The input buffer
317   * @param offset           The offset in the file to start writing at
318   * @throws IOException     On I/O error
319   */
320  public static void writeFully(FileChannel fc, ByteBuffer buf,
321      long offset) throws IOException {
322    do {
323      offset += fc.write(buf, offset);
324    } while (buf.remaining() > 0);
325  }
326
327  /**
328   * Return the complete list of files in a directory as strings.<p/>
329   *
330   * This is better than File#listDir because it does not ignore IOExceptions.
331   *
332   * @param dir              The directory to list.
333   * @param filter           If non-null, the filter to use when listing
334   *                         this directory.
335   * @return                 The list of files in the directory.
336   *
337   * @throws IOException     On I/O error
338   */
339  public static List<String> listDirectory(File dir, FilenameFilter filter)
340      throws IOException {
341    ArrayList<String> list = new ArrayList<String> ();
342    try (DirectoryStream<Path> stream =
343             Files.newDirectoryStream(dir.toPath())) {
344      for (Path entry: stream) {
345        String fileName = entry.getFileName().toString();
346        if ((filter == null) || filter.accept(dir, fileName)) {
347          list.add(fileName);
348        }
349      }
350    } catch (DirectoryIteratorException e) {
351      throw e.getCause();
352    }
353    return list;
354  }
355}