001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.io;
020
021 import java.io.*;
022 import java.net.Socket;
023 import java.nio.ByteBuffer;
024 import java.nio.channels.FileChannel;
025 import java.nio.channels.WritableByteChannel;
026
027 import org.apache.commons.logging.Log;
028
029 import org.apache.hadoop.classification.InterfaceAudience;
030 import org.apache.hadoop.classification.InterfaceStability;
031 import org.apache.hadoop.conf.Configuration;
032
033 /**
034 * An utility class for I/O related functionality.
035 */
036 @InterfaceAudience.Public
037 @InterfaceStability.Evolving
038 public class IOUtils {
039
040 /**
041 * Copies from one stream to another.
042 *
043 * @param in InputStrem to read from
044 * @param out OutputStream to write to
045 * @param buffSize the size of the buffer
046 * @param close whether or not close the InputStream and
047 * OutputStream at the end. The streams are closed in the finally clause.
048 */
049 public static void copyBytes(InputStream in, OutputStream out, int buffSize, boolean close)
050 throws IOException {
051 try {
052 copyBytes(in, out, buffSize);
053 if(close) {
054 out.close();
055 out = null;
056 in.close();
057 in = null;
058 }
059 } finally {
060 if(close) {
061 closeStream(out);
062 closeStream(in);
063 }
064 }
065 }
066
067 /**
068 * Copies from one stream to another.
069 *
070 * @param in InputStrem to read from
071 * @param out OutputStream to write to
072 * @param buffSize the size of the buffer
073 */
074 public static void copyBytes(InputStream in, OutputStream out, int buffSize)
075 throws IOException {
076 PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null;
077 byte buf[] = new byte[buffSize];
078 int bytesRead = in.read(buf);
079 while (bytesRead >= 0) {
080 out.write(buf, 0, bytesRead);
081 if ((ps != null) && ps.checkError()) {
082 throw new IOException("Unable to write to output stream.");
083 }
084 bytesRead = in.read(buf);
085 }
086 }
087
088 /**
089 * Copies from one stream to another. <strong>closes the input and output streams
090 * at the end</strong>.
091 *
092 * @param in InputStrem to read from
093 * @param out OutputStream to write to
094 * @param conf the Configuration object
095 */
096 public static void copyBytes(InputStream in, OutputStream out, Configuration conf)
097 throws IOException {
098 copyBytes(in, out, conf.getInt("io.file.buffer.size", 4096), true);
099 }
100
101 /**
102 * Copies from one stream to another.
103 *
104 * @param in InputStream to read from
105 * @param out OutputStream to write to
106 * @param conf the Configuration object
107 * @param close whether or not close the InputStream and
108 * OutputStream at the end. The streams are closed in the finally clause.
109 */
110 public static void copyBytes(InputStream in, OutputStream out, Configuration conf, boolean close)
111 throws IOException {
112 copyBytes(in, out, conf.getInt("io.file.buffer.size", 4096), close);
113 }
114
115 /**
116 * Copies count bytes from one stream to another.
117 *
118 * @param in InputStream to read from
119 * @param out OutputStream to write to
120 * @param count number of bytes to copy
121 * @param close whether to close the streams
122 * @throws IOException if bytes can not be read or written
123 */
124 public static void copyBytes(InputStream in, OutputStream out, long count,
125 boolean close) throws IOException {
126 byte buf[] = new byte[4096];
127 long bytesRemaining = count;
128 int bytesRead;
129
130 try {
131 while (bytesRemaining > 0) {
132 int bytesToRead = (int)
133 (bytesRemaining < buf.length ? bytesRemaining : buf.length);
134
135 bytesRead = in.read(buf, 0, bytesToRead);
136 if (bytesRead == -1)
137 break;
138
139 out.write(buf, 0, bytesRead);
140 bytesRemaining -= bytesRead;
141 }
142 if (close) {
143 out.close();
144 out = null;
145 in.close();
146 in = null;
147 }
148 } finally {
149 if (close) {
150 closeStream(out);
151 closeStream(in);
152 }
153 }
154 }
155
156 /**
157 * Utility wrapper for reading from {@link InputStream}. It catches any errors
158 * thrown by the underlying stream (either IO or decompression-related), and
159 * re-throws as an IOException.
160 *
161 * @param is - InputStream to be read from
162 * @param buf - buffer the data is read into
163 * @param off - offset within buf
164 * @param len - amount of data to be read
165 * @return number of bytes read
166 */
167 public static int wrappedReadForCompressedData(InputStream is, byte[] buf,
168 int off, int len) throws IOException {
169 try {
170 return is.read(buf, off, len);
171 } catch (IOException ie) {
172 throw ie;
173 } catch (Throwable t) {
174 throw new IOException("Error while reading compressed data", t);
175 }
176 }
177
178 /**
179 * Reads len bytes in a loop.
180 *
181 * @param in InputStream to read from
182 * @param buf The buffer to fill
183 * @param off offset from the buffer
184 * @param len the length of bytes to read
185 * @throws IOException if it could not read requested number of bytes
186 * for any reason (including EOF)
187 */
188 public static void readFully(InputStream in, byte buf[],
189 int off, int len) throws IOException {
190 int toRead = len;
191 while (toRead > 0) {
192 int ret = in.read(buf, off, toRead);
193 if (ret < 0) {
194 throw new IOException( "Premature EOF from inputStream");
195 }
196 toRead -= ret;
197 off += ret;
198 }
199 }
200
201 /**
202 * Similar to readFully(). Skips bytes in a loop.
203 * @param in The InputStream to skip bytes from
204 * @param len number of bytes to skip.
205 * @throws IOException if it could not skip requested number of bytes
206 * for any reason (including EOF)
207 */
208 public static void skipFully(InputStream in, long len) throws IOException {
209 long amt = len;
210 while (amt > 0) {
211 long ret = in.skip(amt);
212 if (ret == 0) {
213 // skip may return 0 even if we're not at EOF. Luckily, we can
214 // use the read() method to figure out if we're at the end.
215 int b = in.read();
216 if (b == -1) {
217 throw new EOFException( "Premature EOF from inputStream after " +
218 "skipping " + (len - amt) + " byte(s).");
219 }
220 ret = 1;
221 }
222 amt -= ret;
223 }
224 }
225
226 /**
227 * Close the Closeable objects and <b>ignore</b> any {@link IOException} or
228 * null pointers. Must only be used for cleanup in exception handlers.
229 *
230 * @param log the log to record problems to at debug level. Can be null.
231 * @param closeables the objects to close
232 */
233 public static void cleanup(Log log, java.io.Closeable... closeables) {
234 for (java.io.Closeable c : closeables) {
235 if (c != null) {
236 try {
237 c.close();
238 } catch(IOException e) {
239 if (log != null && log.isDebugEnabled()) {
240 log.debug("Exception in closing " + c, e);
241 }
242 }
243 }
244 }
245 }
246
247 /**
248 * Closes the stream ignoring {@link IOException}.
249 * Must only be called in cleaning up from exception handlers.
250 *
251 * @param stream the Stream to close
252 */
253 public static void closeStream(java.io.Closeable stream) {
254 cleanup(null, stream);
255 }
256
257 /**
258 * Closes the socket ignoring {@link IOException}
259 *
260 * @param sock the Socket to close
261 */
262 public static void closeSocket(Socket sock) {
263 if (sock != null) {
264 try {
265 sock.close();
266 } catch (IOException ignored) {
267 }
268 }
269 }
270
271 /**
272 * The /dev/null of OutputStreams.
273 */
274 public static class NullOutputStream extends OutputStream {
275 @Override
276 public void write(byte[] b, int off, int len) throws IOException {
277 }
278
279 @Override
280 public void write(int b) throws IOException {
281 }
282 }
283
284 /**
285 * Write a ByteBuffer to a WritableByteChannel, handling short writes.
286 *
287 * @param bc The WritableByteChannel to write to
288 * @param buf The input buffer
289 * @throws IOException On I/O error
290 */
291 public static void writeFully(WritableByteChannel bc, ByteBuffer buf)
292 throws IOException {
293 do {
294 bc.write(buf);
295 } while (buf.remaining() > 0);
296 }
297
298 /**
299 * Write a ByteBuffer to a FileChannel at a given offset,
300 * handling short writes.
301 *
302 * @param fc The FileChannel to write to
303 * @param buf The input buffer
304 * @param offset The offset in the file to start writing at
305 * @throws IOException On I/O error
306 */
307 public static void writeFully(FileChannel fc, ByteBuffer buf,
308 long offset) throws IOException {
309 do {
310 offset += fc.write(buf, offset);
311 } while (buf.remaining() > 0);
312 }
313 }