001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.fs;
019
020import java.io.*;
021
022import org.apache.hadoop.classification.InterfaceAudience;
023import org.apache.hadoop.classification.InterfaceStability;
024
025/** Utility that wraps a {@link OutputStream} in a {@link DataOutputStream},
026 * buffers output through a {@link BufferedOutputStream} and creates a checksum
027 * file. */
028@InterfaceAudience.Public
029@InterfaceStability.Stable
030public class FSDataOutputStream extends DataOutputStream implements Syncable {
031  private OutputStream wrappedStream;
032
033  private static class PositionCache extends FilterOutputStream {
034    private FileSystem.Statistics statistics;
035    long position;
036
037    public PositionCache(OutputStream out, 
038                         FileSystem.Statistics stats,
039                         long pos) throws IOException {
040      super(out);
041      statistics = stats;
042      position = pos;
043    }
044
045    public void write(int b) throws IOException {
046      out.write(b);
047      position++;
048      if (statistics != null) {
049        statistics.incrementBytesWritten(1);
050      }
051    }
052    
053    public void write(byte b[], int off, int len) throws IOException {
054      out.write(b, off, len);
055      position += len;                            // update position
056      if (statistics != null) {
057        statistics.incrementBytesWritten(len);
058      }
059    }
060      
061    public long getPos() throws IOException {
062      return position;                            // return cached position
063    }
064    
065    public void close() throws IOException {
066      out.close();
067    }
068  }
069
070  @Deprecated
071  public FSDataOutputStream(OutputStream out) throws IOException {
072    this(out, null);
073  }
074
075  public FSDataOutputStream(OutputStream out, FileSystem.Statistics stats)
076    throws IOException {
077    this(out, stats, 0);
078  }
079
080  public FSDataOutputStream(OutputStream out, FileSystem.Statistics stats,
081                            long startPosition) throws IOException {
082    super(new PositionCache(out, stats, startPosition));
083    wrappedStream = out;
084  }
085  
086  /**
087   * Get the current position in the output stream.
088   *
089   * @return the current position in the output stream
090   */
091  public long getPos() throws IOException {
092    return ((PositionCache)out).getPos();
093  }
094
095  /**
096   * Close the underlying output stream.
097   */
098  public void close() throws IOException {
099    out.close(); // This invokes PositionCache.close()
100  }
101
102  /**
103   * Get a reference to the wrapped output stream. Used by unit tests.
104   *
105   * @return the underlying output stream
106   */
107  @InterfaceAudience.LimitedPrivate({"HDFS"})
108  public OutputStream getWrappedStream() {
109    return wrappedStream;
110  }
111
112  @Override  // Syncable
113  @Deprecated
114  public void sync() throws IOException {
115    if (wrappedStream instanceof Syncable) {
116      ((Syncable)wrappedStream).sync();
117    }
118  }
119  
120  @Override  // Syncable
121  public void hflush() throws IOException {
122    if (wrappedStream instanceof Syncable) {
123      ((Syncable)wrappedStream).hflush();
124    } else {
125      wrappedStream.flush();
126    }
127  }
128  
129  @Override  // Syncable
130  public void hsync() throws IOException {
131    if (wrappedStream instanceof Syncable) {
132      ((Syncable)wrappedStream).hsync();
133    } else {
134      wrappedStream.flush();
135    }
136  }
137}