001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.fs; 019 020import java.io.*; 021 022import org.apache.hadoop.classification.InterfaceAudience; 023import org.apache.hadoop.classification.InterfaceStability; 024 025/** Utility that wraps a {@link OutputStream} in a {@link DataOutputStream}, 026 * buffers output through a {@link BufferedOutputStream} and creates a checksum 027 * file. */ 028@InterfaceAudience.Public 029@InterfaceStability.Stable 030public class FSDataOutputStream extends DataOutputStream implements Syncable { 031 private OutputStream wrappedStream; 032 033 private static class PositionCache extends FilterOutputStream { 034 private FileSystem.Statistics statistics; 035 long position; 036 037 public PositionCache(OutputStream out, 038 FileSystem.Statistics stats, 039 long pos) throws IOException { 040 super(out); 041 statistics = stats; 042 position = pos; 043 } 044 045 public void write(int b) throws IOException { 046 out.write(b); 047 position++; 048 if (statistics != null) { 049 statistics.incrementBytesWritten(1); 050 } 051 } 052 053 public void write(byte b[], int off, int len) throws IOException { 054 out.write(b, off, len); 055 position += len; // update position 056 if (statistics != null) { 057 statistics.incrementBytesWritten(len); 058 } 059 } 060 061 public long getPos() throws IOException { 062 return position; // return cached position 063 } 064 065 public void close() throws IOException { 066 out.close(); 067 } 068 } 069 070 @Deprecated 071 public FSDataOutputStream(OutputStream out) throws IOException { 072 this(out, null); 073 } 074 075 public FSDataOutputStream(OutputStream out, FileSystem.Statistics stats) 076 throws IOException { 077 this(out, stats, 0); 078 } 079 080 public FSDataOutputStream(OutputStream out, FileSystem.Statistics stats, 081 long startPosition) throws IOException { 082 super(new PositionCache(out, stats, startPosition)); 083 wrappedStream = out; 084 } 085 086 /** 087 * Get the current position in the output stream. 088 * 089 * @return the current position in the output stream 090 */ 091 public long getPos() throws IOException { 092 return ((PositionCache)out).getPos(); 093 } 094 095 /** 096 * Close the underlying output stream. 097 */ 098 public void close() throws IOException { 099 out.close(); // This invokes PositionCache.close() 100 } 101 102 /** 103 * Get a reference to the wrapped output stream. Used by unit tests. 104 * 105 * @return the underlying output stream 106 */ 107 @InterfaceAudience.LimitedPrivate({"HDFS"}) 108 public OutputStream getWrappedStream() { 109 return wrappedStream; 110 } 111 112 @Override // Syncable 113 @Deprecated 114 public void sync() throws IOException { 115 if (wrappedStream instanceof Syncable) { 116 ((Syncable)wrappedStream).sync(); 117 } 118 } 119 120 @Override // Syncable 121 public void hflush() throws IOException { 122 if (wrappedStream instanceof Syncable) { 123 ((Syncable)wrappedStream).hflush(); 124 } else { 125 wrappedStream.flush(); 126 } 127 } 128 129 @Override // Syncable 130 public void hsync() throws IOException { 131 if (wrappedStream instanceof Syncable) { 132 ((Syncable)wrappedStream).hsync(); 133 } else { 134 wrappedStream.flush(); 135 } 136 } 137}