001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hdfs.util; 019 020import java.util.HashMap; 021import java.util.LinkedList; 022import java.util.Map; 023import java.util.Queue; 024 025import org.apache.hadoop.HadoopIllegalArgumentException; 026import org.apache.hadoop.classification.InterfaceAudience; 027import org.apache.hadoop.util.Time; 028 029import com.google.common.base.Preconditions; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033/** 034 * Manage byte array creation and release. 035 */ 036@InterfaceAudience.Private 037public abstract class ByteArrayManager { 038 static final Logger LOG = LoggerFactory.getLogger(ByteArrayManager.class); 039 private static final ThreadLocal<StringBuilder> DEBUG_MESSAGE = 040 new ThreadLocal<StringBuilder>() { 041 protected StringBuilder initialValue() { 042 return new StringBuilder(); 043 } 044 }; 045 046 private static void logDebugMessage() { 047 final StringBuilder b = DEBUG_MESSAGE.get(); 048 LOG.debug(b.toString()); 049 b.setLength(0); 050 } 051 052 static final int MIN_ARRAY_LENGTH = 32; 053 static final byte[] EMPTY_BYTE_ARRAY = {}; 054 055 /** 056 * @return the least power of two greater than or equal to n, i.e. return 057 * the least integer x with x >= n and x a power of two. 058 * 059 * @throws HadoopIllegalArgumentException 060 * if n <= 0. 061 */ 062 public static int leastPowerOfTwo(final int n) { 063 if (n <= 0) { 064 throw new HadoopIllegalArgumentException("n = " + n + " <= 0"); 065 } 066 067 final int highestOne = Integer.highestOneBit(n); 068 if (highestOne == n) { 069 return n; // n is a power of two. 070 } 071 final int roundUp = highestOne << 1; 072 if (roundUp < 0) { 073 final long overflow = ((long) highestOne) << 1; 074 throw new ArithmeticException( 075 "Overflow: for n = " + n + ", the least power of two (the least" 076 + " integer x with x >= n and x a power of two) = " 077 + overflow + " > Integer.MAX_VALUE = " + Integer.MAX_VALUE); 078 } 079 return roundUp; 080 } 081 082 /** 083 * A counter with a time stamp so that it is reset automatically 084 * if there is no increment for the time period. 085 */ 086 static class Counter { 087 private final long countResetTimePeriodMs; 088 private long count = 0L; 089 private long timestamp = Time.monotonicNow(); 090 091 Counter(long countResetTimePeriodMs) { 092 this.countResetTimePeriodMs = countResetTimePeriodMs; 093 } 094 095 synchronized long getCount() { 096 return count; 097 } 098 099 /** 100 * Increment the counter, and reset it if there is no increment 101 * for a certain time period. 102 * 103 * @return the new count. 104 */ 105 synchronized long increment() { 106 final long now = Time.monotonicNow(); 107 if (now - timestamp > countResetTimePeriodMs) { 108 count = 0; // reset the counter 109 } 110 timestamp = now; 111 return ++count; 112 } 113 } 114 115 /** A map from integers to counters. */ 116 static final class CounterMap { 117 /** @see ByteArrayManager.Conf#countResetTimePeriodMs */ 118 private final long countResetTimePeriodMs; 119 private final Map<Integer, Counter> map = new HashMap<>(); 120 121 private CounterMap(long countResetTimePeriodMs) { 122 this.countResetTimePeriodMs = countResetTimePeriodMs; 123 } 124 125 /** 126 * @return the counter for the given key; 127 * and create a new counter if it does not exist. 128 */ 129 synchronized Counter get(final Integer key, final boolean 130 createIfNotExist) { 131 Counter count = map.get(key); 132 if (count == null && createIfNotExist) { 133 count = new Counter(countResetTimePeriodMs); 134 map.put(key, count); 135 } 136 return count; 137 } 138 } 139 140 /** Manage byte arrays with the same fixed length. */ 141 static class FixedLengthManager { 142 private final int byteArrayLength; 143 private final int maxAllocated; 144 private final Queue<byte[]> freeQueue = new LinkedList<>(); 145 146 private int numAllocated = 0; 147 148 FixedLengthManager(int arrayLength, int maxAllocated) { 149 this.byteArrayLength = arrayLength; 150 this.maxAllocated = maxAllocated; 151 } 152 153 /** 154 * Allocate a byte array. 155 * 156 * If the number of allocated arrays >= maximum, the current thread is 157 * blocked until the number of allocated arrays drops to below the maximum. 158 * 159 * The byte array allocated by this method must be returned for recycling 160 * via the {@link FixedLengthManager#recycle(byte[])} method. 161 */ 162 synchronized byte[] allocate() throws InterruptedException { 163 if (LOG.isDebugEnabled()) { 164 DEBUG_MESSAGE.get().append(", ").append(this); 165 } 166 for(; numAllocated >= maxAllocated;) { 167 if (LOG.isDebugEnabled()) { 168 DEBUG_MESSAGE.get().append(": wait ..."); 169 logDebugMessage(); 170 } 171 172 wait(); 173 174 if (LOG.isDebugEnabled()) { 175 DEBUG_MESSAGE.get().append("wake up: ").append(this); 176 } 177 } 178 numAllocated++; 179 180 final byte[] array = freeQueue.poll(); 181 if (LOG.isDebugEnabled()) { 182 DEBUG_MESSAGE.get().append(", recycled? ").append(array != null); 183 } 184 return array != null? array : new byte[byteArrayLength]; 185 } 186 187 /** 188 * Recycle the given byte array, which must have the same length as the 189 * array length managed by this object. 190 * 191 * The byte array may or may not be allocated 192 * by the {@link FixedLengthManager#allocate()} method. 193 */ 194 synchronized int recycle(byte[] array) { 195 Preconditions.checkNotNull(array); 196 Preconditions.checkArgument(array.length == byteArrayLength); 197 if (LOG.isDebugEnabled()) { 198 DEBUG_MESSAGE.get().append(", ").append(this); 199 } 200 201 notify(); 202 numAllocated--; 203 if (numAllocated < 0) { 204 // it is possible to drop below 0 since 205 // some byte arrays may not be created by the allocate() method. 206 numAllocated = 0; 207 } 208 209 if (freeQueue.size() < maxAllocated - numAllocated) { 210 if (LOG.isDebugEnabled()) { 211 DEBUG_MESSAGE.get().append(", freeQueue.offer"); 212 } 213 freeQueue.offer(array); 214 } 215 return freeQueue.size(); 216 } 217 218 @Override 219 public synchronized String toString() { 220 return "[" + byteArrayLength + ": " + numAllocated + "/" 221 + maxAllocated + ", free=" + freeQueue.size() + "]"; 222 } 223 } 224 225 /** A map from array lengths to byte array managers. */ 226 static class ManagerMap { 227 private final int countLimit; 228 private final Map<Integer, FixedLengthManager> map = new HashMap<>(); 229 230 ManagerMap(int countLimit) { 231 this.countLimit = countLimit; 232 } 233 234 /** @return the manager for the given array length. */ 235 synchronized FixedLengthManager get(final Integer arrayLength, 236 final boolean createIfNotExist) { 237 FixedLengthManager manager = map.get(arrayLength); 238 if (manager == null && createIfNotExist) { 239 manager = new FixedLengthManager(arrayLength, countLimit); 240 map.put(arrayLength, manager); 241 } 242 return manager; 243 } 244 } 245 246 /** 247 * Configuration for ByteArrayManager. 248 */ 249 public static class Conf { 250 /** 251 * The count threshold for each array length so that a manager is created 252 * only after the allocation count exceeds the threshold. 253 */ 254 private final int countThreshold; 255 /** 256 * The maximum number of arrays allowed for each array length. 257 */ 258 private final int countLimit; 259 /** 260 * The time period in milliseconds that the allocation count for each array 261 * length is reset to zero if there is no increment. 262 */ 263 private final long countResetTimePeriodMs; 264 265 public Conf(int countThreshold, int countLimit, long 266 countResetTimePeriodMs) { 267 this.countThreshold = countThreshold; 268 this.countLimit = countLimit; 269 this.countResetTimePeriodMs = countResetTimePeriodMs; 270 } 271 } 272 273 /** 274 * Create a byte array for the given length, where the length of 275 * the returned array is larger than or equal to the given length. 276 * 277 * The current thread may be blocked if some resource is unavailable. 278 * 279 * The byte array created by this method must be released 280 * via the {@link ByteArrayManager#release(byte[])} method. 281 * 282 * @return a byte array with length larger than or equal to the given length. 283 */ 284 public abstract byte[] newByteArray(int size) throws InterruptedException; 285 286 /** 287 * Release the given byte array. 288 * 289 * The byte array may or may not be created 290 * by the {@link ByteArrayManager#newByteArray(int)} method. 291 * 292 * @return the number of free array. 293 */ 294 public abstract int release(byte[] array); 295 296 public static ByteArrayManager newInstance(Conf conf) { 297 return conf == null? new NewByteArrayWithoutLimit(): new Impl(conf); 298 } 299 300 /** 301 * A dummy implementation which simply calls new byte[]. 302 */ 303 static class NewByteArrayWithoutLimit extends ByteArrayManager { 304 @Override 305 public byte[] newByteArray(int size) throws InterruptedException { 306 return new byte[size]; 307 } 308 309 @Override 310 public int release(byte[] array) { 311 return 0; 312 } 313 } 314 315 /** 316 * Manage byte array allocation and provide a mechanism for recycling the byte 317 * array objects. 318 */ 319 static class Impl extends ByteArrayManager { 320 private final Conf conf; 321 322 private final CounterMap counters; 323 private final ManagerMap managers; 324 325 Impl(Conf conf) { 326 this.conf = conf; 327 this.counters = new CounterMap(conf.countResetTimePeriodMs); 328 this.managers = new ManagerMap(conf.countLimit); 329 } 330 331 /** 332 * Allocate a byte array, where the length of the allocated array 333 * is the least power of two of the given length 334 * unless the given length is less than {@link #MIN_ARRAY_LENGTH}. 335 * In such case, the returned array length is equal to {@link 336 * #MIN_ARRAY_LENGTH}. 337 * 338 * If the number of allocated arrays exceeds the capacity, 339 * the current thread is blocked until 340 * the number of allocated arrays drops to below the capacity. 341 * 342 * The byte array allocated by this method must be returned for recycling 343 * via the {@link Impl#release(byte[])} method. 344 * 345 * @return a byte array with length larger than or equal to the given 346 * length. 347 */ 348 @Override 349 public byte[] newByteArray(final int arrayLength) 350 throws InterruptedException { 351 Preconditions.checkArgument(arrayLength >= 0); 352 if (LOG.isDebugEnabled()) { 353 DEBUG_MESSAGE.get().append("allocate(").append(arrayLength).append(")"); 354 } 355 356 final byte[] array; 357 if (arrayLength == 0) { 358 array = EMPTY_BYTE_ARRAY; 359 } else { 360 final int powerOfTwo = arrayLength <= MIN_ARRAY_LENGTH? 361 MIN_ARRAY_LENGTH: leastPowerOfTwo(arrayLength); 362 final long count = counters.get(powerOfTwo, true).increment(); 363 final boolean aboveThreshold = count > conf.countThreshold; 364 // create a new manager only if the count is above threshold. 365 final FixedLengthManager manager = 366 managers.get(powerOfTwo, aboveThreshold); 367 368 if (LOG.isDebugEnabled()) { 369 DEBUG_MESSAGE.get().append(": count=").append(count) 370 .append(aboveThreshold? ", aboveThreshold": ", belowThreshold"); 371 } 372 array = manager != null? manager.allocate(): new byte[powerOfTwo]; 373 } 374 375 if (LOG.isDebugEnabled()) { 376 DEBUG_MESSAGE.get().append(", return byte[") 377 .append(array.length).append("]"); 378 logDebugMessage(); 379 } 380 return array; 381 } 382 383 /** 384 * Recycle the given byte array. 385 * 386 * The byte array may or may not be allocated 387 * by the {@link Impl#newByteArray(int)} method. 388 * 389 * This is a non-blocking call. 390 */ 391 @Override 392 public int release(final byte[] array) { 393 Preconditions.checkNotNull(array); 394 if (LOG.isDebugEnabled()) { 395 DEBUG_MESSAGE.get() 396 .append("recycle: array.length=").append(array.length); 397 } 398 399 final int freeQueueSize; 400 if (array.length == 0) { 401 freeQueueSize = -1; 402 } else { 403 final FixedLengthManager manager = managers.get(array.length, false); 404 freeQueueSize = manager == null? -1: manager.recycle(array); 405 } 406 407 if (LOG.isDebugEnabled()) { 408 DEBUG_MESSAGE.get().append(", freeQueueSize=").append(freeQueueSize); 409 logDebugMessage(); 410 } 411 return freeQueueSize; 412 } 413 414 CounterMap getCounters() { 415 return counters; 416 } 417 418 ManagerMap getManagers() { 419 return managers; 420 } 421 } 422}