001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.fs; 019 020import java.io.*; 021import java.io.DataOutputStream; 022import java.io.FilterOutputStream; 023import java.io.IOException; 024import java.io.OutputStream; 025 026import org.apache.hadoop.classification.InterfaceAudience; 027import org.apache.hadoop.classification.InterfaceStability; 028 029/** Utility that wraps a {@link OutputStream} in a {@link DataOutputStream}. 030 */ 031@InterfaceAudience.Public 032@InterfaceStability.Stable 033public class FSDataOutputStream extends DataOutputStream 034 implements Syncable, CanSetDropBehind { 035 private final OutputStream wrappedStream; 036 037 private static class PositionCache extends FilterOutputStream { 038 private FileSystem.Statistics statistics; 039 long position; 040 041 public PositionCache(OutputStream out, 042 FileSystem.Statistics stats, 043 long pos) throws IOException { 044 super(out); 045 statistics = stats; 046 position = pos; 047 } 048 049 public void write(int b) throws IOException { 050 out.write(b); 051 position++; 052 if (statistics != null) { 053 statistics.incrementBytesWritten(1); 054 } 055 } 056 057 public void write(byte b[], int off, int len) throws IOException { 058 out.write(b, off, len); 059 position += len; // update position 060 if (statistics != null) { 061 statistics.incrementBytesWritten(len); 062 } 063 } 064 065 public long getPos() throws IOException { 066 return position; // return cached position 067 } 068 069 public void close() throws IOException { 070 // ensure close works even if a null reference was passed in 071 if (out != null) { 072 out.close(); 073 } 074 } 075 } 076 077 @Deprecated 078 public FSDataOutputStream(OutputStream out) throws IOException { 079 this(out, null); 080 } 081 082 public FSDataOutputStream(OutputStream out, FileSystem.Statistics stats) 083 throws IOException { 084 this(out, stats, 0); 085 } 086 087 public FSDataOutputStream(OutputStream out, FileSystem.Statistics stats, 088 long startPosition) throws IOException { 089 super(new PositionCache(out, stats, startPosition)); 090 wrappedStream = out; 091 } 092 093 /** 094 * Get the current position in the output stream. 095 * 096 * @return the current position in the output stream 097 */ 098 public long getPos() throws IOException { 099 return ((PositionCache)out).getPos(); 100 } 101 102 /** 103 * Close the underlying output stream. 104 */ 105 public void close() throws IOException { 106 out.close(); // This invokes PositionCache.close() 107 } 108 109 /** 110 * Get a reference to the wrapped output stream. 111 * 112 * @return the underlying output stream 113 */ 114 @InterfaceAudience.LimitedPrivate({"HDFS"}) 115 public OutputStream getWrappedStream() { 116 return wrappedStream; 117 } 118 119 @Override // Syncable 120 @Deprecated 121 public void sync() throws IOException { 122 if (wrappedStream instanceof Syncable) { 123 ((Syncable)wrappedStream).sync(); 124 } 125 } 126 127 @Override // Syncable 128 public void hflush() throws IOException { 129 if (wrappedStream instanceof Syncable) { 130 ((Syncable)wrappedStream).hflush(); 131 } else { 132 wrappedStream.flush(); 133 } 134 } 135 136 @Override // Syncable 137 public void hsync() throws IOException { 138 if (wrappedStream instanceof Syncable) { 139 ((Syncable)wrappedStream).hsync(); 140 } else { 141 wrappedStream.flush(); 142 } 143 } 144 145 @Override 146 public void setDropBehind(Boolean dropBehind) throws IOException { 147 try { 148 ((CanSetDropBehind)wrappedStream).setDropBehind(dropBehind); 149 } catch (ClassCastException e) { 150 throw new UnsupportedOperationException("the wrapped stream does " + 151 "not support setting the drop-behind caching setting."); 152 } 153 } 154}