001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.io; 020 021import java.io.DataOutputStream; 022import java.io.OutputStream; 023import java.nio.ByteBuffer; 024import java.util.List; 025import java.util.ArrayList; 026import java.util.LinkedList; 027 028public class DataOutputByteBuffer extends DataOutputStream { 029 030 static class Buffer extends OutputStream { 031 032 final byte[] b = new byte[1]; 033 final boolean direct; 034 final List<ByteBuffer> active = new ArrayList<ByteBuffer>(); 035 final List<ByteBuffer> inactive = new LinkedList<ByteBuffer>(); 036 int size; 037 int length; 038 ByteBuffer current; 039 040 Buffer(int size, boolean direct) { 041 this.direct = direct; 042 this.size = size; 043 current = direct 044 ? ByteBuffer.allocateDirect(size) 045 : ByteBuffer.allocate(size); 046 } 047 @Override 048 public void write(int b) { 049 this.b[0] = (byte)(b & 0xFF); 050 write(this.b); 051 } 052 @Override 053 public void write(byte[] b) { 054 write(b, 0, b.length); 055 } 056 @Override 057 public void write(byte[] b, int off, int len) { 058 int rem = current.remaining(); 059 while (len > rem) { 060 current.put(b, off, rem); 061 length += rem; 062 current.flip(); 063 active.add(current); 064 off += rem; 065 len -= rem; 066 rem = getBuffer(len); 067 } 068 current.put(b, off, len); 069 length += len; 070 } 071 int getBuffer(int newsize) { 072 if (inactive.isEmpty()) { 073 size = Math.max(size << 1, newsize); 074 current = direct 075 ? ByteBuffer.allocateDirect(size) 076 : ByteBuffer.allocate(size); 077 } else { 078 current = inactive.remove(0); 079 } 080 return current.remaining(); 081 } 082 ByteBuffer[] getData() { 083 ByteBuffer[] ret = active.toArray(new ByteBuffer[active.size() + 1]); 084 ByteBuffer tmp = current.duplicate(); 085 tmp.flip(); 086 ret[ret.length - 1] = tmp.slice(); 087 return ret; 088 } 089 int getLength() { 090 return length; 091 } 092 void reset() { 093 length = 0; 094 current.rewind(); 095 inactive.add(0, current); 096 for (int i = active.size() - 1; i >= 0; --i) { 097 ByteBuffer b = active.remove(i); 098 b.rewind(); 099 inactive.add(0, b); 100 } 101 current = inactive.remove(0); 102 } 103 } 104 105 private final Buffer buffers; 106 107 public DataOutputByteBuffer() { 108 this(32); 109 } 110 111 public DataOutputByteBuffer(int size) { 112 this(size, false); 113 } 114 115 public DataOutputByteBuffer(int size, boolean direct) { 116 this(new Buffer(size, direct)); 117 } 118 119 private DataOutputByteBuffer(Buffer buffers) { 120 super(buffers); 121 this.buffers = buffers; 122 } 123 124 public ByteBuffer[] getData() { 125 return buffers.getData(); 126 } 127 128 public int getLength() { 129 return buffers.getLength(); 130 } 131 132 public void reset() { 133 this.written = 0; 134 buffers.reset(); 135 } 136 137}