001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.io; 019 020import com.google.common.collect.ComparisonChain; 021import org.apache.commons.lang.builder.HashCodeBuilder; 022 023import java.nio.ByteBuffer; 024import java.util.Map; 025import java.util.TreeMap; 026 027import org.apache.hadoop.classification.InterfaceAudience; 028import org.apache.hadoop.classification.InterfaceStability; 029 030/** 031 * This is a simple ByteBufferPool which just creates ByteBuffers as needed. 032 * It also caches ByteBuffers after they're released. It will always return 033 * the smallest cached buffer with at least the capacity you request. 034 * We don't try to do anything clever here like try to limit the maximum cache 035 * size. 036 */ 037@InterfaceAudience.Public 038@InterfaceStability.Stable 039public final class ElasticByteBufferPool implements ByteBufferPool { 040 private static final class Key implements Comparable<Key> { 041 private final int capacity; 042 private final long insertionTime; 043 044 Key(int capacity, long insertionTime) { 045 this.capacity = capacity; 046 this.insertionTime = insertionTime; 047 } 048 049 @Override 050 public int compareTo(Key other) { 051 return ComparisonChain.start(). 052 compare(capacity, other.capacity). 053 compare(insertionTime, other.insertionTime). 054 result(); 055 } 056 057 @Override 058 public boolean equals(Object rhs) { 059 if (rhs == null) { 060 return false; 061 } 062 try { 063 Key o = (Key)rhs; 064 return (compareTo(o) == 0); 065 } catch (ClassCastException e) { 066 return false; 067 } 068 } 069 070 @Override 071 public int hashCode() { 072 return new HashCodeBuilder(). 073 append(capacity). 074 append(insertionTime). 075 toHashCode(); 076 } 077 } 078 079 private final TreeMap<Key, ByteBuffer> buffers = 080 new TreeMap<Key, ByteBuffer>(); 081 082 private final TreeMap<Key, ByteBuffer> directBuffers = 083 new TreeMap<Key, ByteBuffer>(); 084 085 private final TreeMap<Key, ByteBuffer> getBufferTree(boolean direct) { 086 return direct ? directBuffers : buffers; 087 } 088 089 @Override 090 public synchronized ByteBuffer getBuffer(boolean direct, int length) { 091 TreeMap<Key, ByteBuffer> tree = getBufferTree(direct); 092 Map.Entry<Key, ByteBuffer> entry = 093 tree.ceilingEntry(new Key(length, 0)); 094 if (entry == null) { 095 return direct ? ByteBuffer.allocateDirect(length) : 096 ByteBuffer.allocate(length); 097 } 098 tree.remove(entry.getKey()); 099 return entry.getValue(); 100 } 101 102 @Override 103 public synchronized void putBuffer(ByteBuffer buffer) { 104 TreeMap<Key, ByteBuffer> tree = getBufferTree(buffer.isDirect()); 105 while (true) { 106 Key key = new Key(buffer.capacity(), System.nanoTime()); 107 if (!tree.containsKey(key)) { 108 tree.put(key, buffer); 109 return; 110 } 111 // Buffers are indexed by (capacity, time). 112 // If our key is not unique on the first try, we try again, since the 113 // time will be different. Since we use nanoseconds, it's pretty 114 // unlikely that we'll loop even once, unless the system clock has a 115 // poor granularity. 116 } 117 } 118}