001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hdfs.util;
019
020import java.util.HashMap;
021import java.util.LinkedList;
022import java.util.Map;
023import java.util.Queue;
024
025import org.apache.hadoop.HadoopIllegalArgumentException;
026import org.apache.hadoop.classification.InterfaceAudience;
027import org.apache.hadoop.util.Time;
028
029import com.google.common.base.Preconditions;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033/**
034 * Manage byte array creation and release.
035 */
036@InterfaceAudience.Private
037public abstract class ByteArrayManager {
038  static final Logger LOG = LoggerFactory.getLogger(ByteArrayManager.class);
039  private static final ThreadLocal<StringBuilder> DEBUG_MESSAGE =
040      new ThreadLocal<StringBuilder>() {
041    protected StringBuilder initialValue() {
042      return new StringBuilder();
043    }
044  };
045
046  private static void logDebugMessage() {
047    final StringBuilder b = DEBUG_MESSAGE.get();
048    LOG.debug(b.toString());
049    b.setLength(0);
050  }
051
052  static final int MIN_ARRAY_LENGTH = 32;
053  static final byte[] EMPTY_BYTE_ARRAY = {};
054
055  /**
056   * @return the least power of two greater than or equal to n, i.e. return
057   *         the least integer x with x >= n and x a power of two.
058   *
059   * @throws HadoopIllegalArgumentException
060   *           if n <= 0.
061   */
062  public static int leastPowerOfTwo(final int n) {
063    if (n <= 0) {
064      throw new HadoopIllegalArgumentException("n = " + n + " <= 0");
065    }
066
067    final int highestOne = Integer.highestOneBit(n);
068    if (highestOne == n) {
069      return n; // n is a power of two.
070    }
071    final int roundUp = highestOne << 1;
072    if (roundUp < 0) {
073      final long overflow = ((long) highestOne) << 1;
074      throw new ArithmeticException(
075          "Overflow: for n = " + n + ", the least power of two (the least"
076          + " integer x with x >= n and x a power of two) = "
077          + overflow + " > Integer.MAX_VALUE = " + Integer.MAX_VALUE);
078    }
079    return roundUp;
080  }
081
082  /**
083   * A counter with a time stamp so that it is reset automatically
084   * if there is no increment for the time period.
085   */
086  static class Counter {
087    private final long countResetTimePeriodMs;
088    private long count = 0L;
089    private long timestamp = Time.monotonicNow();
090
091    Counter(long countResetTimePeriodMs) {
092      this.countResetTimePeriodMs = countResetTimePeriodMs;
093    }
094
095    synchronized long getCount() {
096      return count;
097    }
098
099    /**
100     * Increment the counter, and reset it if there is no increment
101     * for a certain time period.
102     *
103     * @return the new count.
104     */
105    synchronized long increment() {
106      final long now = Time.monotonicNow();
107      if (now - timestamp > countResetTimePeriodMs) {
108        count = 0; // reset the counter
109      }
110      timestamp = now;
111      return ++count;
112    }
113  }
114
115  /** A map from integers to counters. */
116  static final class CounterMap {
117    /** @see ByteArrayManager.Conf#countResetTimePeriodMs */
118    private final long countResetTimePeriodMs;
119    private final Map<Integer, Counter> map = new HashMap<>();
120
121    private CounterMap(long countResetTimePeriodMs) {
122      this.countResetTimePeriodMs = countResetTimePeriodMs;
123    }
124
125    /**
126     * @return the counter for the given key;
127     *         and create a new counter if it does not exist.
128     */
129    synchronized Counter get(final Integer key, final boolean
130        createIfNotExist) {
131      Counter count = map.get(key);
132      if (count == null && createIfNotExist) {
133        count = new Counter(countResetTimePeriodMs);
134        map.put(key, count);
135      }
136      return count;
137    }
138  }
139
140  /** Manage byte arrays with the same fixed length. */
141  static class FixedLengthManager {
142    private final int byteArrayLength;
143    private final int maxAllocated;
144    private final Queue<byte[]> freeQueue = new LinkedList<>();
145
146    private int numAllocated = 0;
147
148    FixedLengthManager(int arrayLength, int maxAllocated) {
149      this.byteArrayLength = arrayLength;
150      this.maxAllocated = maxAllocated;
151    }
152
153    /**
154     * Allocate a byte array.
155     *
156     * If the number of allocated arrays >= maximum, the current thread is
157     * blocked until the number of allocated arrays drops to below the maximum.
158     *
159     * The byte array allocated by this method must be returned for recycling
160     * via the {@link FixedLengthManager#recycle(byte[])} method.
161     */
162    synchronized byte[] allocate() throws InterruptedException {
163      if (LOG.isDebugEnabled()) {
164        DEBUG_MESSAGE.get().append(", ").append(this);
165      }
166      for(; numAllocated >= maxAllocated;) {
167        if (LOG.isDebugEnabled()) {
168          DEBUG_MESSAGE.get().append(": wait ...");
169          logDebugMessage();
170        }
171
172        wait();
173
174        if (LOG.isDebugEnabled()) {
175          DEBUG_MESSAGE.get().append("wake up: ").append(this);
176        }
177      }
178      numAllocated++;
179
180      final byte[] array = freeQueue.poll();
181      if (LOG.isDebugEnabled()) {
182        DEBUG_MESSAGE.get().append(", recycled? ").append(array != null);
183      }
184      return array != null? array : new byte[byteArrayLength];
185    }
186
187    /**
188     * Recycle the given byte array, which must have the same length as the
189     * array length managed by this object.
190     *
191     * The byte array may or may not be allocated
192     * by the {@link FixedLengthManager#allocate()} method.
193     */
194    synchronized int recycle(byte[] array) {
195      Preconditions.checkNotNull(array);
196      Preconditions.checkArgument(array.length == byteArrayLength);
197      if (LOG.isDebugEnabled()) {
198        DEBUG_MESSAGE.get().append(", ").append(this);
199      }
200
201      notify();
202      numAllocated--;
203      if (numAllocated < 0) {
204        // it is possible to drop below 0 since
205        // some byte arrays may not be created by the allocate() method.
206        numAllocated = 0;
207      }
208
209      if (freeQueue.size() < maxAllocated - numAllocated) {
210        if (LOG.isDebugEnabled()) {
211          DEBUG_MESSAGE.get().append(", freeQueue.offer");
212        }
213        freeQueue.offer(array);
214      }
215      return freeQueue.size();
216    }
217
218    @Override
219    public synchronized String toString() {
220      return "[" + byteArrayLength + ": " + numAllocated + "/"
221          + maxAllocated + ", free=" + freeQueue.size() + "]";
222    }
223  }
224
225  /** A map from array lengths to byte array managers. */
226  static class ManagerMap {
227    private final int countLimit;
228    private final Map<Integer, FixedLengthManager> map = new HashMap<>();
229
230    ManagerMap(int countLimit) {
231      this.countLimit = countLimit;
232    }
233
234    /** @return the manager for the given array length. */
235    synchronized FixedLengthManager get(final Integer arrayLength,
236        final boolean createIfNotExist) {
237      FixedLengthManager manager = map.get(arrayLength);
238      if (manager == null && createIfNotExist) {
239        manager = new FixedLengthManager(arrayLength, countLimit);
240        map.put(arrayLength, manager);
241      }
242      return manager;
243    }
244  }
245
246  /**
247   * Configuration for ByteArrayManager.
248   */
249  public static class Conf {
250    /**
251     * The count threshold for each array length so that a manager is created
252     * only after the allocation count exceeds the threshold.
253     */
254    private final int countThreshold;
255    /**
256     * The maximum number of arrays allowed for each array length.
257     */
258    private final int countLimit;
259    /**
260     * The time period in milliseconds that the allocation count for each array
261     * length is reset to zero if there is no increment.
262     */
263    private final long countResetTimePeriodMs;
264
265    public Conf(int countThreshold, int countLimit, long
266        countResetTimePeriodMs) {
267      this.countThreshold = countThreshold;
268      this.countLimit = countLimit;
269      this.countResetTimePeriodMs = countResetTimePeriodMs;
270    }
271  }
272
273  /**
274   * Create a byte array for the given length, where the length of
275   * the returned array is larger than or equal to the given length.
276   *
277   * The current thread may be blocked if some resource is unavailable.
278   *
279   * The byte array created by this method must be released
280   * via the {@link ByteArrayManager#release(byte[])} method.
281   *
282   * @return a byte array with length larger than or equal to the given length.
283   */
284  public abstract byte[] newByteArray(int size) throws InterruptedException;
285
286  /**
287   * Release the given byte array.
288   *
289   * The byte array may or may not be created
290   * by the {@link ByteArrayManager#newByteArray(int)} method.
291   *
292   * @return the number of free array.
293   */
294  public abstract int release(byte[] array);
295
296  public static ByteArrayManager newInstance(Conf conf) {
297    return conf == null? new NewByteArrayWithoutLimit(): new Impl(conf);
298  }
299
300  /**
301   * A dummy implementation which simply calls new byte[].
302   */
303  static class NewByteArrayWithoutLimit extends ByteArrayManager {
304    @Override
305    public byte[] newByteArray(int size) throws InterruptedException {
306      return new byte[size];
307    }
308
309    @Override
310    public int release(byte[] array) {
311      return 0;
312    }
313  }
314
315  /**
316   * Manage byte array allocation and provide a mechanism for recycling the byte
317   * array objects.
318   */
319  static class Impl extends ByteArrayManager {
320    private final Conf conf;
321
322    private final CounterMap counters;
323    private final ManagerMap managers;
324
325    Impl(Conf conf) {
326      this.conf = conf;
327      this.counters = new CounterMap(conf.countResetTimePeriodMs);
328      this.managers = new ManagerMap(conf.countLimit);
329    }
330
331    /**
332     * Allocate a byte array, where the length of the allocated array
333     * is the least power of two of the given length
334     * unless the given length is less than {@link #MIN_ARRAY_LENGTH}.
335     * In such case, the returned array length is equal to {@link
336     * #MIN_ARRAY_LENGTH}.
337     *
338     * If the number of allocated arrays exceeds the capacity,
339     * the current thread is blocked until
340     * the number of allocated arrays drops to below the capacity.
341     *
342     * The byte array allocated by this method must be returned for recycling
343     * via the {@link Impl#release(byte[])} method.
344     *
345     * @return a byte array with length larger than or equal to the given
346     * length.
347     */
348    @Override
349    public byte[] newByteArray(final int arrayLength)
350        throws InterruptedException {
351      Preconditions.checkArgument(arrayLength >= 0);
352      if (LOG.isDebugEnabled()) {
353        DEBUG_MESSAGE.get().append("allocate(").append(arrayLength).append(")");
354      }
355
356      final byte[] array;
357      if (arrayLength == 0) {
358        array = EMPTY_BYTE_ARRAY;
359      } else {
360        final int powerOfTwo = arrayLength <= MIN_ARRAY_LENGTH?
361            MIN_ARRAY_LENGTH: leastPowerOfTwo(arrayLength);
362        final long count = counters.get(powerOfTwo, true).increment();
363        final boolean aboveThreshold = count > conf.countThreshold;
364        // create a new manager only if the count is above threshold.
365        final FixedLengthManager manager =
366            managers.get(powerOfTwo, aboveThreshold);
367
368        if (LOG.isDebugEnabled()) {
369          DEBUG_MESSAGE.get().append(": count=").append(count)
370              .append(aboveThreshold? ", aboveThreshold": ", belowThreshold");
371        }
372        array = manager != null? manager.allocate(): new byte[powerOfTwo];
373      }
374
375      if (LOG.isDebugEnabled()) {
376        DEBUG_MESSAGE.get().append(", return byte[")
377            .append(array.length).append("]");
378        logDebugMessage();
379      }
380      return array;
381    }
382
383    /**
384     * Recycle the given byte array.
385     *
386     * The byte array may or may not be allocated
387     * by the {@link Impl#newByteArray(int)} method.
388     *
389     * This is a non-blocking call.
390     */
391    @Override
392    public int release(final byte[] array) {
393      Preconditions.checkNotNull(array);
394      if (LOG.isDebugEnabled()) {
395        DEBUG_MESSAGE.get()
396            .append("recycle: array.length=").append(array.length);
397      }
398
399      final int freeQueueSize;
400      if (array.length == 0) {
401        freeQueueSize = -1;
402      } else {
403        final FixedLengthManager manager = managers.get(array.length, false);
404        freeQueueSize = manager == null? -1: manager.recycle(array);
405      }
406
407      if (LOG.isDebugEnabled()) {
408        DEBUG_MESSAGE.get().append(", freeQueueSize=").append(freeQueueSize);
409        logDebugMessage();
410      }
411      return freeQueueSize;
412    }
413
414    CounterMap getCounters() {
415      return counters;
416    }
417
418    ManagerMap getManagers() {
419      return managers;
420    }
421  }
422}