001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.util;
019
020import java.util.Comparator;
021import java.util.Iterator;
022import java.util.PriorityQueue;
023
024import org.apache.hadoop.HadoopIllegalArgumentException;
025import org.apache.hadoop.classification.InterfaceAudience;
026
027import com.google.common.annotations.VisibleForTesting;
028import com.google.common.base.Preconditions;
029
030/**
031 * A low memory footprint Cache which extends {@link LightWeightGSet}.
032 * An entry in the cache is expired if
033 * (1) it is added to the cache longer than the creation-expiration period, and
034 * (2) it is not accessed for the access-expiration period.
035 * When an entry is expired, it may be evicted from the cache.
036 * When the size limit of the cache is set, the cache will evict the entries
037 * with earliest expiration time, even if they are not expired.
038 * 
039 * It is guaranteed that number of entries in the cache is less than or equal
040 * to the size limit.  However, It is not guaranteed that expired entries are
041 * evicted from the cache. An expired entry may possibly be accessed after its
042 * expiration time. In such case, the expiration time may be updated.
043 *
044 * This class does not support null entry.
045 *
046 * This class is not thread safe.
047 *
048 * @param <K> Key type for looking up the entries
049 * @param <E> Entry type, which must be
050 *       (1) a subclass of K, and
051 *       (2) implementing {@link Entry} interface, and
052 */
053@InterfaceAudience.Private
054public class LightWeightCache<K, E extends K> extends LightWeightGSet<K, E> {
055  /** Limit the number of entries in each eviction. */
056  private static final int EVICTION_LIMIT = 1 << 16;
057
058  /**
059   * Entries of {@link LightWeightCache}.
060   */
061  public static interface Entry extends LinkedElement {
062    /** Set the expiration time. */
063    public void setExpirationTime(long timeNano);
064
065    /** Get the expiration time. */
066    public long getExpirationTime();
067  }
068
069  /** Comparator for sorting entries by expiration time in ascending order. */
070  private static final Comparator<Entry> expirationTimeComparator
071      = new Comparator<Entry>() {
072    @Override
073    public int compare(Entry left, Entry right) {
074      final long l = left.getExpirationTime();
075      final long r = right.getExpirationTime();
076      return l > r? 1: l < r? -1: 0;
077    }
078  };
079
080  /** A clock for measuring time so that it can be mocked in unit tests. */
081  static class Clock {
082    /** @return the current time. */
083    long currentTime() {
084      return System.nanoTime();
085    }
086  }
087  
088  private static int updateRecommendedLength(int recommendedLength,
089      int sizeLimit) {
090    return sizeLimit > 0 && sizeLimit < recommendedLength?
091        (sizeLimit/4*3) // 0.75 load factor
092        : recommendedLength;
093  }
094
095  /*
096   * The memory footprint for java.util.PriorityQueue is low but the
097   * remove(Object) method runs in linear time. We may improve it by using a
098   * balanced tree. However, we do not yet have a low memory footprint balanced
099   * tree implementation.
100   */
101  private final PriorityQueue<Entry> queue;
102  private final long creationExpirationPeriod;
103  private final long accessExpirationPeriod;
104  private final int sizeLimit;
105  private final Clock clock;
106
107  /**
108   * @param recommendedLength Recommended size of the internal array.
109   * @param sizeLimit the limit of the size of the cache.
110   *            The limit is disabled if it is <= 0.
111   * @param creationExpirationPeriod the time period C > 0 in nanoseconds that
112   *            the creation of an entry is expired if it is added to the cache
113   *            longer than C.
114   * @param accessExpirationPeriod the time period A >= 0 in nanoseconds that
115   *            the access of an entry is expired if it is not accessed
116   *            longer than A. 
117   */
118  public LightWeightCache(final int recommendedLength,
119      final int sizeLimit,
120      final long creationExpirationPeriod,
121      final long accessExpirationPeriod) {
122    this(recommendedLength, sizeLimit,
123        creationExpirationPeriod, accessExpirationPeriod, new Clock());
124  }
125
126  @VisibleForTesting
127  LightWeightCache(final int recommendedLength,
128      final int sizeLimit,
129      final long creationExpirationPeriod,
130      final long accessExpirationPeriod,
131      final Clock clock) {
132    super(updateRecommendedLength(recommendedLength, sizeLimit));
133
134    this.sizeLimit = sizeLimit;
135
136    if (creationExpirationPeriod <= 0) {
137      throw new IllegalArgumentException("creationExpirationPeriod = "
138          + creationExpirationPeriod + " <= 0");
139    }
140    this.creationExpirationPeriod = creationExpirationPeriod;
141
142    if (accessExpirationPeriod < 0) {
143      throw new IllegalArgumentException("accessExpirationPeriod = "
144          + accessExpirationPeriod + " < 0");
145    }
146    this.accessExpirationPeriod = accessExpirationPeriod;
147
148    this.queue = new PriorityQueue<Entry>(
149        sizeLimit > 0? sizeLimit + 1: 1 << 10, expirationTimeComparator);
150    this.clock = clock;
151  }
152
153  void setExpirationTime(final Entry e, final long expirationPeriod) {
154    e.setExpirationTime(clock.currentTime() + expirationPeriod);
155  }
156
157  boolean isExpired(final Entry e, final long now) {
158    return now > e.getExpirationTime();
159  }
160
161  private E evict() {
162    @SuppressWarnings("unchecked")
163    final E polled = (E)queue.poll();
164    final E removed = super.remove(polled);
165    Preconditions.checkState(removed == polled);
166    return polled;
167  }
168
169  /** Evict expired entries. */
170  private void evictExpiredEntries() {
171    final long now = clock.currentTime();
172    for(int i = 0; i < EVICTION_LIMIT; i++) {
173      final Entry peeked = queue.peek();
174      if (peeked == null || !isExpired(peeked, now)) {
175        return;
176      }
177
178      final E evicted = evict();
179      Preconditions.checkState(evicted == peeked);
180    }
181  }
182
183  /** Evict entries in order to enforce the size limit of the cache. */
184  private void evictEntries() {
185    if (sizeLimit > 0) {
186      for(int i = size(); i > sizeLimit; i--) {
187        evict();
188      }
189    }
190  }
191  
192  @Override
193  public E get(K key) {
194    final E entry = super.get(key);
195    if (entry != null) {
196      if (accessExpirationPeriod > 0) {
197        // update expiration time
198        final Entry existing = (Entry)entry;
199        Preconditions.checkState(queue.remove(existing));
200        setExpirationTime(existing, accessExpirationPeriod);
201        queue.offer(existing);
202      }
203    }
204    return entry;
205  }
206
207  @Override
208  public E put(final E entry) {
209    if (!(entry instanceof Entry)) {
210      throw new HadoopIllegalArgumentException(
211          "!(entry instanceof Entry), entry.getClass()=" + entry.getClass());
212    }
213
214    evictExpiredEntries();
215
216    final E existing = super.put(entry);
217    if (existing != null) {
218      queue.remove(existing);
219    }
220
221    final Entry e = (Entry)entry;
222    setExpirationTime(e, creationExpirationPeriod);
223    queue.offer(e);
224    
225    evictEntries();
226    return existing;
227  }
228
229  @Override
230  public E remove(K key) {
231    evictExpiredEntries();
232
233    final E removed = super.remove(key);
234    if (removed != null) {
235      Preconditions.checkState(queue.remove(removed));
236    }
237    return removed;
238  }
239
240  @Override
241  public Iterator<E> iterator() {
242    final Iterator<E> iter = super.iterator();
243    return new Iterator<E>() {
244      @Override
245      public boolean hasNext() {
246        return iter.hasNext();
247      }
248
249      @Override
250      public E next() {
251        return iter.next();
252      }
253
254      @Override
255      public void remove() {
256        // It would be tricky to support this because LightWeightCache#remove
257        // may evict multiple elements via evictExpiredEntries.
258        throw new UnsupportedOperationException("Remove via iterator is " +
259            "not supported for LightWeightCache");
260      }
261    };
262  }
263}