001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.io;
020
021import java.io.*;
022import java.net.Socket;
023import java.nio.ByteBuffer;
024import java.nio.channels.FileChannel;
025import java.nio.channels.WritableByteChannel;
026
027import org.apache.commons.logging.Log;
028
029import org.apache.hadoop.classification.InterfaceAudience;
030import org.apache.hadoop.classification.InterfaceStability;
031import org.apache.hadoop.conf.Configuration;
032
033/**
034 * An utility class for I/O related functionality. 
035 */
036@InterfaceAudience.Public
037@InterfaceStability.Evolving
038public class IOUtils {
039
040  /**
041   * Copies from one stream to another.
042   *
043   * @param in InputStrem to read from
044   * @param out OutputStream to write to
045   * @param buffSize the size of the buffer 
046   * @param close whether or not close the InputStream and 
047   * OutputStream at the end. The streams are closed in the finally clause.  
048   */
049  public static void copyBytes(InputStream in, OutputStream out, int buffSize, boolean close) 
050    throws IOException {
051    try {
052      copyBytes(in, out, buffSize);
053      if(close) {
054        out.close();
055        out = null;
056        in.close();
057        in = null;
058      }
059    } finally {
060      if(close) {
061        closeStream(out);
062        closeStream(in);
063      }
064    }
065  }
066  
067  /**
068   * Copies from one stream to another.
069   * 
070   * @param in InputStrem to read from
071   * @param out OutputStream to write to
072   * @param buffSize the size of the buffer 
073   */
074  public static void copyBytes(InputStream in, OutputStream out, int buffSize) 
075    throws IOException {
076    PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null;
077    byte buf[] = new byte[buffSize];
078    int bytesRead = in.read(buf);
079    while (bytesRead >= 0) {
080      out.write(buf, 0, bytesRead);
081      if ((ps != null) && ps.checkError()) {
082        throw new IOException("Unable to write to output stream.");
083      }
084      bytesRead = in.read(buf);
085    }
086  }
087
088  /**
089   * Copies from one stream to another. <strong>closes the input and output streams 
090   * at the end</strong>.
091   *
092   * @param in InputStrem to read from
093   * @param out OutputStream to write to
094   * @param conf the Configuration object 
095   */
096  public static void copyBytes(InputStream in, OutputStream out, Configuration conf)
097    throws IOException {
098    copyBytes(in, out, conf.getInt("io.file.buffer.size", 4096), true);
099  }
100  
101  /**
102   * Copies from one stream to another.
103   *
104   * @param in InputStream to read from
105   * @param out OutputStream to write to
106   * @param conf the Configuration object
107   * @param close whether or not close the InputStream and 
108   * OutputStream at the end. The streams are closed in the finally clause.
109   */
110  public static void copyBytes(InputStream in, OutputStream out, Configuration conf, boolean close)
111    throws IOException {
112    copyBytes(in, out, conf.getInt("io.file.buffer.size", 4096),  close);
113  }
114
115  /**
116   * Copies count bytes from one stream to another.
117   *
118   * @param in InputStream to read from
119   * @param out OutputStream to write to
120   * @param count number of bytes to copy
121   * @param close whether to close the streams
122   * @throws IOException if bytes can not be read or written
123   */
124  public static void copyBytes(InputStream in, OutputStream out, long count,
125      boolean close) throws IOException {
126    byte buf[] = new byte[4096];
127    long bytesRemaining = count;
128    int bytesRead;
129
130    try {
131      while (bytesRemaining > 0) {
132        int bytesToRead = (int)
133          (bytesRemaining < buf.length ? bytesRemaining : buf.length);
134
135        bytesRead = in.read(buf, 0, bytesToRead);
136        if (bytesRead == -1)
137          break;
138
139        out.write(buf, 0, bytesRead);
140        bytesRemaining -= bytesRead;
141      }
142      if (close) {
143        out.close();
144        out = null;
145        in.close();
146        in = null;
147      }
148    } finally {
149      if (close) {
150        closeStream(out);
151        closeStream(in);
152      }
153    }
154  }
155  
156  /**
157   * Utility wrapper for reading from {@link InputStream}. It catches any errors
158   * thrown by the underlying stream (either IO or decompression-related), and
159   * re-throws as an IOException.
160   * 
161   * @param is - InputStream to be read from
162   * @param buf - buffer the data is read into
163   * @param off - offset within buf
164   * @param len - amount of data to be read
165   * @return number of bytes read
166   */
167  public static int wrappedReadForCompressedData(InputStream is, byte[] buf,
168      int off, int len) throws IOException {
169    try {
170      return is.read(buf, off, len);
171    } catch (IOException ie) {
172      throw ie;
173    } catch (Throwable t) {
174      throw new IOException("Error while reading compressed data", t);
175    }
176  }
177
178  /**
179   * Reads len bytes in a loop.
180   *
181   * @param in InputStream to read from
182   * @param buf The buffer to fill
183   * @param off offset from the buffer
184   * @param len the length of bytes to read
185   * @throws IOException if it could not read requested number of bytes 
186   * for any reason (including EOF)
187   */
188  public static void readFully(InputStream in, byte buf[],
189      int off, int len) throws IOException {
190    int toRead = len;
191    while (toRead > 0) {
192      int ret = in.read(buf, off, toRead);
193      if (ret < 0) {
194        throw new IOException( "Premature EOF from inputStream");
195      }
196      toRead -= ret;
197      off += ret;
198    }
199  }
200  
201  /**
202   * Similar to readFully(). Skips bytes in a loop.
203   * @param in The InputStream to skip bytes from
204   * @param len number of bytes to skip.
205   * @throws IOException if it could not skip requested number of bytes 
206   * for any reason (including EOF)
207   */
208  public static void skipFully(InputStream in, long len) throws IOException {
209    long amt = len;
210    while (amt > 0) {
211      long ret = in.skip(amt);
212      if (ret == 0) {
213        // skip may return 0 even if we're not at EOF.  Luckily, we can 
214        // use the read() method to figure out if we're at the end.
215        int b = in.read();
216        if (b == -1) {
217          throw new EOFException( "Premature EOF from inputStream after " +
218              "skipping " + (len - amt) + " byte(s).");
219        }
220        ret = 1;
221      }
222      amt -= ret;
223    }
224  }
225  
226  /**
227   * Close the Closeable objects and <b>ignore</b> any {@link IOException} or 
228   * null pointers. Must only be used for cleanup in exception handlers.
229   *
230   * @param log the log to record problems to at debug level. Can be null.
231   * @param closeables the objects to close
232   */
233  public static void cleanup(Log log, java.io.Closeable... closeables) {
234    for (java.io.Closeable c : closeables) {
235      if (c != null) {
236        try {
237          c.close();
238        } catch(IOException e) {
239          if (log != null && log.isDebugEnabled()) {
240            log.debug("Exception in closing " + c, e);
241          }
242        }
243      }
244    }
245  }
246
247  /**
248   * Closes the stream ignoring {@link IOException}.
249   * Must only be called in cleaning up from exception handlers.
250   *
251   * @param stream the Stream to close
252   */
253  public static void closeStream(java.io.Closeable stream) {
254    cleanup(null, stream);
255  }
256  
257  /**
258   * Closes the socket ignoring {@link IOException}
259   *
260   * @param sock the Socket to close
261   */
262  public static void closeSocket(Socket sock) {
263    if (sock != null) {
264      try {
265        sock.close();
266      } catch (IOException ignored) {
267      }
268    }
269  }
270  
271  /**
272   * The /dev/null of OutputStreams.
273   */
274  public static class NullOutputStream extends OutputStream {
275    @Override
276    public void write(byte[] b, int off, int len) throws IOException {
277    }
278
279    @Override
280    public void write(int b) throws IOException {
281    }
282  }  
283  
284  /**
285   * Write a ByteBuffer to a WritableByteChannel, handling short writes.
286   * 
287   * @param bc               The WritableByteChannel to write to
288   * @param buf              The input buffer
289   * @throws IOException     On I/O error
290   */
291  public static void writeFully(WritableByteChannel bc, ByteBuffer buf)
292      throws IOException {
293    do {
294      bc.write(buf);
295    } while (buf.remaining() > 0);
296  }
297
298  /**
299   * Write a ByteBuffer to a FileChannel at a given offset, 
300   * handling short writes.
301   * 
302   * @param fc               The FileChannel to write to
303   * @param buf              The input buffer
304   * @param offset           The offset in the file to start writing at
305   * @throws IOException     On I/O error
306   */
307  public static void writeFully(FileChannel fc, ByteBuffer buf,
308      long offset) throws IOException {
309    do {
310      offset += fc.write(buf, offset);
311    } while (buf.remaining() > 0);
312  }
313}