001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.hadoop.fs;
019
020 import java.io.*;
021 import java.io.DataOutputStream;
022 import java.io.FilterOutputStream;
023 import java.io.IOException;
024 import java.io.OutputStream;
025
026 import org.apache.hadoop.classification.InterfaceAudience;
027 import org.apache.hadoop.classification.InterfaceStability;
028
029 /** Utility that wraps a {@link OutputStream} in a {@link DataOutputStream}.
030 */
031 @InterfaceAudience.Public
032 @InterfaceStability.Stable
033 public class FSDataOutputStream extends DataOutputStream
034 implements Syncable, CanSetDropBehind {
035 private final OutputStream wrappedStream;
036
037 private static class PositionCache extends FilterOutputStream {
038 private FileSystem.Statistics statistics;
039 long position;
040
041 public PositionCache(OutputStream out,
042 FileSystem.Statistics stats,
043 long pos) throws IOException {
044 super(out);
045 statistics = stats;
046 position = pos;
047 }
048
049 public void write(int b) throws IOException {
050 out.write(b);
051 position++;
052 if (statistics != null) {
053 statistics.incrementBytesWritten(1);
054 }
055 }
056
057 public void write(byte b[], int off, int len) throws IOException {
058 out.write(b, off, len);
059 position += len; // update position
060 if (statistics != null) {
061 statistics.incrementBytesWritten(len);
062 }
063 }
064
065 public long getPos() throws IOException {
066 return position; // return cached position
067 }
068
069 public void close() throws IOException {
070 // ensure close works even if a null reference was passed in
071 if (out != null) {
072 out.close();
073 }
074 }
075 }
076
077 @Deprecated
078 public FSDataOutputStream(OutputStream out) throws IOException {
079 this(out, null);
080 }
081
082 public FSDataOutputStream(OutputStream out, FileSystem.Statistics stats)
083 throws IOException {
084 this(out, stats, 0);
085 }
086
087 public FSDataOutputStream(OutputStream out, FileSystem.Statistics stats,
088 long startPosition) throws IOException {
089 super(new PositionCache(out, stats, startPosition));
090 wrappedStream = out;
091 }
092
093 /**
094 * Get the current position in the output stream.
095 *
096 * @return the current position in the output stream
097 */
098 public long getPos() throws IOException {
099 return ((PositionCache)out).getPos();
100 }
101
102 /**
103 * Close the underlying output stream.
104 */
105 public void close() throws IOException {
106 out.close(); // This invokes PositionCache.close()
107 }
108
109 /**
110 * Get a reference to the wrapped output stream. Used by unit tests.
111 *
112 * @return the underlying output stream
113 */
114 @InterfaceAudience.LimitedPrivate({"HDFS"})
115 public OutputStream getWrappedStream() {
116 return wrappedStream;
117 }
118
119 @Override // Syncable
120 @Deprecated
121 public void sync() throws IOException {
122 if (wrappedStream instanceof Syncable) {
123 ((Syncable)wrappedStream).sync();
124 }
125 }
126
127 @Override // Syncable
128 public void hflush() throws IOException {
129 if (wrappedStream instanceof Syncable) {
130 ((Syncable)wrappedStream).hflush();
131 } else {
132 wrappedStream.flush();
133 }
134 }
135
136 @Override // Syncable
137 public void hsync() throws IOException {
138 if (wrappedStream instanceof Syncable) {
139 ((Syncable)wrappedStream).hsync();
140 } else {
141 wrappedStream.flush();
142 }
143 }
144
145 @Override
146 public void setDropBehind(Boolean dropBehind) throws IOException {
147 try {
148 ((CanSetDropBehind)wrappedStream).setDropBehind(dropBehind);
149 } catch (ClassCastException e) {
150 throw new UnsupportedOperationException("the wrapped stream does " +
151 "not support setting the drop-behind caching setting.");
152 }
153 }
154 }