001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.io;
020    
021    import java.io.*;
022    import java.net.Socket;
023    import java.nio.ByteBuffer;
024    import java.nio.channels.FileChannel;
025    import java.nio.channels.WritableByteChannel;
026    
027    import org.apache.commons.logging.Log;
028    
029    import org.apache.hadoop.classification.InterfaceAudience;
030    import org.apache.hadoop.classification.InterfaceStability;
031    import org.apache.hadoop.conf.Configuration;
032    
033    /**
034     * An utility class for I/O related functionality. 
035     */
036    @InterfaceAudience.Public
037    @InterfaceStability.Evolving
038    public class IOUtils {
039    
040      /**
041       * Copies from one stream to another.
042       *
043       * @param in InputStrem to read from
044       * @param out OutputStream to write to
045       * @param buffSize the size of the buffer 
046       * @param close whether or not close the InputStream and 
047       * OutputStream at the end. The streams are closed in the finally clause.  
048       */
049      public static void copyBytes(InputStream in, OutputStream out, int buffSize, boolean close) 
050        throws IOException {
051        try {
052          copyBytes(in, out, buffSize);
053          if(close) {
054            out.close();
055            out = null;
056            in.close();
057            in = null;
058          }
059        } finally {
060          if(close) {
061            closeStream(out);
062            closeStream(in);
063          }
064        }
065      }
066      
067      /**
068       * Copies from one stream to another.
069       * 
070       * @param in InputStrem to read from
071       * @param out OutputStream to write to
072       * @param buffSize the size of the buffer 
073       */
074      public static void copyBytes(InputStream in, OutputStream out, int buffSize) 
075        throws IOException {
076        PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null;
077        byte buf[] = new byte[buffSize];
078        int bytesRead = in.read(buf);
079        while (bytesRead >= 0) {
080          out.write(buf, 0, bytesRead);
081          if ((ps != null) && ps.checkError()) {
082            throw new IOException("Unable to write to output stream.");
083          }
084          bytesRead = in.read(buf);
085        }
086      }
087    
088      /**
089       * Copies from one stream to another. <strong>closes the input and output streams 
090       * at the end</strong>.
091       *
092       * @param in InputStrem to read from
093       * @param out OutputStream to write to
094       * @param conf the Configuration object 
095       */
096      public static void copyBytes(InputStream in, OutputStream out, Configuration conf)
097        throws IOException {
098        copyBytes(in, out, conf.getInt("io.file.buffer.size", 4096), true);
099      }
100      
101      /**
102       * Copies from one stream to another.
103       *
104       * @param in InputStream to read from
105       * @param out OutputStream to write to
106       * @param conf the Configuration object
107       * @param close whether or not close the InputStream and 
108       * OutputStream at the end. The streams are closed in the finally clause.
109       */
110      public static void copyBytes(InputStream in, OutputStream out, Configuration conf, boolean close)
111        throws IOException {
112        copyBytes(in, out, conf.getInt("io.file.buffer.size", 4096),  close);
113      }
114    
115      /**
116       * Copies count bytes from one stream to another.
117       *
118       * @param in InputStream to read from
119       * @param out OutputStream to write to
120       * @param count number of bytes to copy
121       * @param close whether to close the streams
122       * @throws IOException if bytes can not be read or written
123       */
124      public static void copyBytes(InputStream in, OutputStream out, long count,
125          boolean close) throws IOException {
126        byte buf[] = new byte[4096];
127        long bytesRemaining = count;
128        int bytesRead;
129    
130        try {
131          while (bytesRemaining > 0) {
132            int bytesToRead = (int)
133              (bytesRemaining < buf.length ? bytesRemaining : buf.length);
134    
135            bytesRead = in.read(buf, 0, bytesToRead);
136            if (bytesRead == -1)
137              break;
138    
139            out.write(buf, 0, bytesRead);
140            bytesRemaining -= bytesRead;
141          }
142          if (close) {
143            out.close();
144            out = null;
145            in.close();
146            in = null;
147          }
148        } finally {
149          if (close) {
150            closeStream(out);
151            closeStream(in);
152          }
153        }
154      }
155      
156      /**
157       * Utility wrapper for reading from {@link InputStream}. It catches any errors
158       * thrown by the underlying stream (either IO or decompression-related), and
159       * re-throws as an IOException.
160       * 
161       * @param is - InputStream to be read from
162       * @param buf - buffer the data is read into
163       * @param off - offset within buf
164       * @param len - amount of data to be read
165       * @return number of bytes read
166       */
167      public static int wrappedReadForCompressedData(InputStream is, byte[] buf,
168          int off, int len) throws IOException {
169        try {
170          return is.read(buf, off, len);
171        } catch (IOException ie) {
172          throw ie;
173        } catch (Throwable t) {
174          throw new IOException("Error while reading compressed data", t);
175        }
176      }
177    
178      /**
179       * Reads len bytes in a loop.
180       *
181       * @param in InputStream to read from
182       * @param buf The buffer to fill
183       * @param off offset from the buffer
184       * @param len the length of bytes to read
185       * @throws IOException if it could not read requested number of bytes 
186       * for any reason (including EOF)
187       */
188      public static void readFully(InputStream in, byte buf[],
189          int off, int len) throws IOException {
190        int toRead = len;
191        while (toRead > 0) {
192          int ret = in.read(buf, off, toRead);
193          if (ret < 0) {
194            throw new IOException( "Premature EOF from inputStream");
195          }
196          toRead -= ret;
197          off += ret;
198        }
199      }
200      
201      /**
202       * Similar to readFully(). Skips bytes in a loop.
203       * @param in The InputStream to skip bytes from
204       * @param len number of bytes to skip.
205       * @throws IOException if it could not skip requested number of bytes 
206       * for any reason (including EOF)
207       */
208      public static void skipFully(InputStream in, long len) throws IOException {
209        long amt = len;
210        while (amt > 0) {
211          long ret = in.skip(amt);
212          if (ret == 0) {
213            // skip may return 0 even if we're not at EOF.  Luckily, we can 
214            // use the read() method to figure out if we're at the end.
215            int b = in.read();
216            if (b == -1) {
217              throw new EOFException( "Premature EOF from inputStream after " +
218                  "skipping " + (len - amt) + " byte(s).");
219            }
220            ret = 1;
221          }
222          amt -= ret;
223        }
224      }
225      
226      /**
227       * Close the Closeable objects and <b>ignore</b> any {@link IOException} or 
228       * null pointers. Must only be used for cleanup in exception handlers.
229       *
230       * @param log the log to record problems to at debug level. Can be null.
231       * @param closeables the objects to close
232       */
233      public static void cleanup(Log log, java.io.Closeable... closeables) {
234        for (java.io.Closeable c : closeables) {
235          if (c != null) {
236            try {
237              c.close();
238            } catch(IOException e) {
239              if (log != null && log.isDebugEnabled()) {
240                log.debug("Exception in closing " + c, e);
241              }
242            }
243          }
244        }
245      }
246    
247      /**
248       * Closes the stream ignoring {@link IOException}.
249       * Must only be called in cleaning up from exception handlers.
250       *
251       * @param stream the Stream to close
252       */
253      public static void closeStream(java.io.Closeable stream) {
254        cleanup(null, stream);
255      }
256      
257      /**
258       * Closes the socket ignoring {@link IOException}
259       *
260       * @param sock the Socket to close
261       */
262      public static void closeSocket(Socket sock) {
263        if (sock != null) {
264          try {
265            sock.close();
266          } catch (IOException ignored) {
267          }
268        }
269      }
270      
271      /**
272       * The /dev/null of OutputStreams.
273       */
274      public static class NullOutputStream extends OutputStream {
275        @Override
276        public void write(byte[] b, int off, int len) throws IOException {
277        }
278    
279        @Override
280        public void write(int b) throws IOException {
281        }
282      }  
283      
284      /**
285       * Write a ByteBuffer to a WritableByteChannel, handling short writes.
286       * 
287       * @param bc               The WritableByteChannel to write to
288       * @param buf              The input buffer
289       * @throws IOException     On I/O error
290       */
291      public static void writeFully(WritableByteChannel bc, ByteBuffer buf)
292          throws IOException {
293        do {
294          bc.write(buf);
295        } while (buf.remaining() > 0);
296      }
297    
298      /**
299       * Write a ByteBuffer to a FileChannel at a given offset, 
300       * handling short writes.
301       * 
302       * @param fc               The FileChannel to write to
303       * @param buf              The input buffer
304       * @param offset           The offset in the file to start writing at
305       * @throws IOException     On I/O error
306       */
307      public static void writeFully(FileChannel fc, ByteBuffer buf,
308          long offset) throws IOException {
309        do {
310          offset += fc.write(buf, offset);
311        } while (buf.remaining() > 0);
312      }
313    }