001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.crypto.key.kms;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.HashSet;
023import java.util.LinkedList;
024import java.util.List;
025import java.util.Map;
026import java.util.Queue;
027import java.util.concurrent.ExecutionException;
028import java.util.concurrent.LinkedBlockingQueue;
029import java.util.concurrent.ThreadPoolExecutor;
030import java.util.concurrent.TimeUnit;
031
032import com.google.common.base.Preconditions;
033import com.google.common.cache.CacheBuilder;
034import com.google.common.cache.CacheLoader;
035import com.google.common.cache.LoadingCache;
036import com.google.common.util.concurrent.ThreadFactoryBuilder;
037import org.apache.hadoop.classification.InterfaceAudience;
038
039/**
040 * A Utility class that maintains a Queue of entries for a given key. It tries
041 * to ensure that there is are always at-least <code>numValues</code> entries
042 * available for the client to consume for a particular key.
043 * It also uses an underlying Cache to evict queues for keys that have not been
044 * accessed for a configurable period of time.
045 * Implementing classes are required to implement the
046 * <code>QueueRefiller</code> interface that exposes a method to refill the
047 * queue, when empty
048 */
049@InterfaceAudience.Private
050public class ValueQueue <E> {
051
052  /**
053   * QueueRefiller interface a client must implement to use this class
054   */
055  public interface QueueRefiller <E> {
056    /**
057     * Method that has to be implemented by implementing classes to fill the
058     * Queue.
059     * @param keyName Key name
060     * @param keyQueue Queue that needs to be filled
061     * @param numValues number of Values to be added to the queue.
062     * @throws IOException
063     */
064    public void fillQueueForKey(String keyName,
065        Queue<E> keyQueue, int numValues) throws IOException;
066  }
067
068  private static final String REFILL_THREAD =
069      ValueQueue.class.getName() + "_thread";
070
071  private final LoadingCache<String, LinkedBlockingQueue<E>> keyQueues;
072  private final ThreadPoolExecutor executor;
073  private final UniqueKeyBlockingQueue queue = new UniqueKeyBlockingQueue();
074  private final QueueRefiller<E> refiller;
075  private final SyncGenerationPolicy policy;
076
077  private final int numValues;
078  private final float lowWatermark;
079
080  private volatile boolean executorThreadsStarted = false;
081
082  /**
083   * A <code>Runnable</code> which takes a string name.
084   */
085  private abstract static class NamedRunnable implements Runnable {
086    final String name;
087    private NamedRunnable(String keyName) {
088      this.name = keyName;
089    }
090  }
091
092  /**
093   * This backing blocking queue used in conjunction with the
094   * <code>ThreadPoolExecutor</code> used by the <code>ValueQueue</code>. This
095   * Queue accepts a task only if the task is not currently in the process
096   * of being run by a thread which is implied by the presence of the key
097   * in the <code>keysInProgress</code> set.
098   *
099   * NOTE: Only methods that ware explicitly called by the
100   * <code>ThreadPoolExecutor</code> need to be over-ridden.
101   */
102  private static class UniqueKeyBlockingQueue extends
103      LinkedBlockingQueue<Runnable> {
104
105    private static final long serialVersionUID = -2152747693695890371L;
106    private HashSet<String> keysInProgress = new HashSet<String>();
107
108    @Override
109    public synchronized void put(Runnable e) throws InterruptedException {
110      if (keysInProgress.add(((NamedRunnable)e).name)) {
111        super.put(e);
112      }
113    }
114
115    @Override
116    public Runnable take() throws InterruptedException {
117      Runnable k = super.take();
118      if (k != null) {
119        keysInProgress.remove(((NamedRunnable)k).name);
120      }
121      return k;
122    }
123
124    @Override
125    public Runnable poll(long timeout, TimeUnit unit)
126        throws InterruptedException {
127      Runnable k = super.poll(timeout, unit);
128      if (k != null) {
129        keysInProgress.remove(((NamedRunnable)k).name);
130      }
131      return k;
132    }
133
134  }
135
136  /**
137   * Policy to decide how many values to return to client when client asks for
138   * "n" values and Queue is empty.
139   * This decides how many values to return when client calls "getAtMost"
140   */
141  public static enum SyncGenerationPolicy {
142    ATLEAST_ONE, // Return atleast 1 value
143    LOW_WATERMARK, // Return min(n, lowWatermark * numValues) values
144    ALL // Return n values
145  }
146
147  /**
148   * Constructor takes the following tunable configuration parameters
149   * @param numValues The number of values cached in the Queue for a
150   *    particular key.
151   * @param lowWatermark The ratio of (number of current entries/numValues)
152   *    below which the <code>fillQueueForKey()</code> funciton will be
153   *    invoked to fill the Queue.
154   * @param expiry Expiry time after which the Key and associated Queue are
155   *    evicted from the cache.
156   * @param numFillerThreads Number of threads to use for the filler thread
157   * @param policy The SyncGenerationPolicy to use when client
158   *    calls "getAtMost"
159   * @param refiller implementation of the QueueRefiller
160   */
161  public ValueQueue(final int numValues, final float lowWatermark,
162      long expiry, int numFillerThreads, SyncGenerationPolicy policy,
163      final QueueRefiller<E> refiller) {
164    Preconditions.checkArgument(numValues > 0, "\"numValues\" must be > 0");
165    Preconditions.checkArgument(((lowWatermark > 0)&&(lowWatermark <= 1)),
166        "\"lowWatermark\" must be > 0 and <= 1");
167    Preconditions.checkArgument(expiry > 0, "\"expiry\" must be > 0");
168    Preconditions.checkArgument(numFillerThreads > 0,
169        "\"numFillerThreads\" must be > 0");
170    Preconditions.checkNotNull(policy, "\"policy\" must not be null");
171    this.refiller = refiller;
172    this.policy = policy;
173    this.numValues = numValues;
174    this.lowWatermark = lowWatermark;
175    keyQueues = CacheBuilder.newBuilder()
176            .expireAfterAccess(expiry, TimeUnit.MILLISECONDS)
177            .build(new CacheLoader<String, LinkedBlockingQueue<E>>() {
178                  @Override
179                  public LinkedBlockingQueue<E> load(String keyName)
180                      throws Exception {
181                    LinkedBlockingQueue<E> keyQueue =
182                        new LinkedBlockingQueue<E>();
183                    refiller.fillQueueForKey(keyName, keyQueue,
184                        (int)(lowWatermark * numValues));
185                    return keyQueue;
186                  }
187                });
188
189    executor =
190        new ThreadPoolExecutor(numFillerThreads, numFillerThreads, 0L,
191            TimeUnit.MILLISECONDS, queue, new ThreadFactoryBuilder()
192                .setDaemon(true)
193                .setNameFormat(REFILL_THREAD).build());
194  }
195
196  public ValueQueue(final int numValues, final float lowWaterMark, long expiry,
197      int numFillerThreads, QueueRefiller<E> fetcher) {
198    this(numValues, lowWaterMark, expiry, numFillerThreads,
199        SyncGenerationPolicy.ALL, fetcher);
200  }
201
202  /**
203   * Initializes the Value Queues for the provided keys by calling the
204   * fill Method with "numInitValues" values
205   * @param keyNames Array of key Names
206   * @throws ExecutionException
207   */
208  public void initializeQueuesForKeys(String... keyNames)
209      throws ExecutionException {
210    for (String keyName : keyNames) {
211      keyQueues.get(keyName);
212    }
213  }
214
215  /**
216   * This removes the value currently at the head of the Queue for the
217   * provided key. Will immediately fire the Queue filler function if key
218   * does not exist.
219   * If Queue exists but all values are drained, It will ask the generator
220   * function to add 1 value to Queue and then drain it.
221   * @param keyName String key name
222   * @return E the next value in the Queue
223   * @throws IOException
224   * @throws ExecutionException
225   */
226  public E getNext(String keyName)
227      throws IOException, ExecutionException {
228    return getAtMost(keyName, 1).get(0);
229  }
230
231  /**
232   * Drains the Queue for the provided key.
233   *
234   * @param keyName the key to drain the Queue for
235   */
236  public void drain(String keyName ) {
237    try {
238      keyQueues.get(keyName).clear();
239    } catch (ExecutionException ex) {
240      //NOP
241    }
242  }
243
244  /**
245   * Get size of the Queue for keyName. This is only used in unit tests.
246   * @param keyName the key name
247   * @return int queue size
248   */
249  public int getSize(String keyName) {
250    // We can't do keyQueues.get(keyName).size() here,
251    // since that will have the side effect of populating the cache.
252    Map<String, LinkedBlockingQueue<E>> map =
253        keyQueues.getAllPresent(Arrays.asList(keyName));
254    if (map.get(keyName) == null) {
255      return 0;
256    }
257    return map.get(keyName).size();
258  }
259
260  /**
261   * This removes the "num" values currently at the head of the Queue for the
262   * provided key. Will immediately fire the Queue filler function if key
263   * does not exist
264   * How many values are actually returned is governed by the
265   * <code>SyncGenerationPolicy</code> specified by the user.
266   * @param keyName String key name
267   * @param num Minimum number of values to return.
268   * @return List<E> values returned
269   * @throws IOException
270   * @throws ExecutionException
271   */
272  public List<E> getAtMost(String keyName, int num) throws IOException,
273      ExecutionException {
274    LinkedBlockingQueue<E> keyQueue = keyQueues.get(keyName);
275    // Using poll to avoid race condition..
276    LinkedList<E> ekvs = new LinkedList<E>();
277    try {
278      for (int i = 0; i < num; i++) {
279        E val = keyQueue.poll();
280        // If queue is empty now, Based on the provided SyncGenerationPolicy,
281        // figure out how many new values need to be generated synchronously
282        if (val == null) {
283          // Synchronous call to get remaining values
284          int numToFill = 0;
285          switch (policy) {
286          case ATLEAST_ONE:
287            numToFill = (ekvs.size() < 1) ? 1 : 0;
288            break;
289          case LOW_WATERMARK:
290            numToFill =
291                Math.min(num, (int) (lowWatermark * numValues)) - ekvs.size();
292            break;
293          case ALL:
294            numToFill = num - ekvs.size();
295            break;
296          }
297          // Synchronous fill if not enough values found
298          if (numToFill > 0) {
299            refiller.fillQueueForKey(keyName, ekvs, numToFill);
300          }
301          // Asynch task to fill > lowWatermark
302          if (i <= (int) (lowWatermark * numValues)) {
303            submitRefillTask(keyName, keyQueue);
304          }
305          return ekvs;
306        }
307        ekvs.add(val);
308      }
309    } catch (Exception e) {
310      throw new IOException("Exception while contacting value generator ", e);
311    }
312    return ekvs;
313  }
314
315  private void submitRefillTask(final String keyName,
316      final Queue<E> keyQueue) throws InterruptedException {
317    if (!executorThreadsStarted) {
318      synchronized (this) {
319        if (!executorThreadsStarted) {
320          // To ensure all requests are first queued, make coreThreads =
321          // maxThreads
322          // and pre-start all the Core Threads.
323          executor.prestartAllCoreThreads();
324          executorThreadsStarted = true;
325        }
326      }
327    }
328    // The submit/execute method of the ThreadPoolExecutor is bypassed and
329    // the Runnable is directly put in the backing BlockingQueue so that we
330    // can control exactly how the runnable is inserted into the queue.
331    queue.put(
332        new NamedRunnable(keyName) {
333          @Override
334          public void run() {
335            int cacheSize = numValues;
336            int threshold = (int) (lowWatermark * (float) cacheSize);
337            // Need to ensure that only one refill task per key is executed
338            try {
339              if (keyQueue.size() < threshold) {
340                refiller.fillQueueForKey(name, keyQueue,
341                    cacheSize - keyQueue.size());
342              }
343            } catch (final Exception e) {
344              throw new RuntimeException(e);
345            }
346          }
347        }
348        );
349  }
350
351  /**
352   * Cleanly shutdown
353   */
354  public void shutdown() {
355    executor.shutdownNow();
356  }
357
358}