001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.hadoop.fs; 019 020 import java.io.*; 021 import java.io.DataOutputStream; 022 import java.io.FilterOutputStream; 023 import java.io.IOException; 024 import java.io.OutputStream; 025 026 import org.apache.hadoop.classification.InterfaceAudience; 027 import org.apache.hadoop.classification.InterfaceStability; 028 029 /** Utility that wraps a {@link OutputStream} in a {@link DataOutputStream}. 030 */ 031 @InterfaceAudience.Public 032 @InterfaceStability.Stable 033 public class FSDataOutputStream extends DataOutputStream 034 implements Syncable, CanSetDropBehind { 035 private final OutputStream wrappedStream; 036 037 private static class PositionCache extends FilterOutputStream { 038 private FileSystem.Statistics statistics; 039 long position; 040 041 public PositionCache(OutputStream out, 042 FileSystem.Statistics stats, 043 long pos) throws IOException { 044 super(out); 045 statistics = stats; 046 position = pos; 047 } 048 049 public void write(int b) throws IOException { 050 out.write(b); 051 position++; 052 if (statistics != null) { 053 statistics.incrementBytesWritten(1); 054 } 055 } 056 057 public void write(byte b[], int off, int len) throws IOException { 058 out.write(b, off, len); 059 position += len; // update position 060 if (statistics != null) { 061 statistics.incrementBytesWritten(len); 062 } 063 } 064 065 public long getPos() throws IOException { 066 return position; // return cached position 067 } 068 069 public void close() throws IOException { 070 out.close(); 071 } 072 } 073 074 @Deprecated 075 public FSDataOutputStream(OutputStream out) throws IOException { 076 this(out, null); 077 } 078 079 public FSDataOutputStream(OutputStream out, FileSystem.Statistics stats) 080 throws IOException { 081 this(out, stats, 0); 082 } 083 084 public FSDataOutputStream(OutputStream out, FileSystem.Statistics stats, 085 long startPosition) throws IOException { 086 super(new PositionCache(out, stats, startPosition)); 087 wrappedStream = out; 088 } 089 090 /** 091 * Get the current position in the output stream. 092 * 093 * @return the current position in the output stream 094 */ 095 public long getPos() throws IOException { 096 return ((PositionCache)out).getPos(); 097 } 098 099 /** 100 * Close the underlying output stream. 101 */ 102 public void close() throws IOException { 103 out.close(); // This invokes PositionCache.close() 104 } 105 106 /** 107 * Get a reference to the wrapped output stream. Used by unit tests. 108 * 109 * @return the underlying output stream 110 */ 111 @InterfaceAudience.LimitedPrivate({"HDFS"}) 112 public OutputStream getWrappedStream() { 113 return wrappedStream; 114 } 115 116 @Override // Syncable 117 @Deprecated 118 public void sync() throws IOException { 119 if (wrappedStream instanceof Syncable) { 120 ((Syncable)wrappedStream).sync(); 121 } 122 } 123 124 @Override // Syncable 125 public void hflush() throws IOException { 126 if (wrappedStream instanceof Syncable) { 127 ((Syncable)wrappedStream).hflush(); 128 } else { 129 wrappedStream.flush(); 130 } 131 } 132 133 @Override // Syncable 134 public void hsync() throws IOException { 135 if (wrappedStream instanceof Syncable) { 136 ((Syncable)wrappedStream).hsync(); 137 } else { 138 wrappedStream.flush(); 139 } 140 } 141 142 @Override 143 public void setDropBehind(Boolean dropBehind) throws IOException { 144 try { 145 ((CanSetDropBehind)wrappedStream).setDropBehind(dropBehind); 146 } catch (ClassCastException e) { 147 throw new UnsupportedOperationException("the wrapped stream does " + 148 "not support setting the drop-behind caching setting."); 149 } 150 } 151 }