001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.crypto.key.kms; 019 020import java.io.IOException; 021import java.util.Arrays; 022import java.util.HashSet; 023import java.util.LinkedList; 024import java.util.List; 025import java.util.Map; 026import java.util.Queue; 027import java.util.concurrent.ExecutionException; 028import java.util.concurrent.LinkedBlockingQueue; 029import java.util.concurrent.ThreadPoolExecutor; 030import java.util.concurrent.TimeUnit; 031 032import com.google.common.base.Preconditions; 033import com.google.common.cache.CacheBuilder; 034import com.google.common.cache.CacheLoader; 035import com.google.common.cache.LoadingCache; 036import com.google.common.util.concurrent.ThreadFactoryBuilder; 037import org.apache.hadoop.classification.InterfaceAudience; 038 039/** 040 * A Utility class that maintains a Queue of entries for a given key. It tries 041 * to ensure that there is are always at-least <code>numValues</code> entries 042 * available for the client to consume for a particular key. 043 * It also uses an underlying Cache to evict queues for keys that have not been 044 * accessed for a configurable period of time. 045 * Implementing classes are required to implement the 046 * <code>QueueRefiller</code> interface that exposes a method to refill the 047 * queue, when empty 048 */ 049@InterfaceAudience.Private 050public class ValueQueue <E> { 051 052 /** 053 * QueueRefiller interface a client must implement to use this class 054 */ 055 public interface QueueRefiller <E> { 056 /** 057 * Method that has to be implemented by implementing classes to fill the 058 * Queue. 059 * @param keyName Key name 060 * @param keyQueue Queue that needs to be filled 061 * @param numValues number of Values to be added to the queue. 062 * @throws IOException 063 */ 064 public void fillQueueForKey(String keyName, 065 Queue<E> keyQueue, int numValues) throws IOException; 066 } 067 068 private static final String REFILL_THREAD = 069 ValueQueue.class.getName() + "_thread"; 070 071 private final LoadingCache<String, LinkedBlockingQueue<E>> keyQueues; 072 private final ThreadPoolExecutor executor; 073 private final UniqueKeyBlockingQueue queue = new UniqueKeyBlockingQueue(); 074 private final QueueRefiller<E> refiller; 075 private final SyncGenerationPolicy policy; 076 077 private final int numValues; 078 private final float lowWatermark; 079 080 private volatile boolean executorThreadsStarted = false; 081 082 /** 083 * A <code>Runnable</code> which takes a string name. 084 */ 085 private abstract static class NamedRunnable implements Runnable { 086 final String name; 087 private NamedRunnable(String keyName) { 088 this.name = keyName; 089 } 090 } 091 092 /** 093 * This backing blocking queue used in conjunction with the 094 * <code>ThreadPoolExecutor</code> used by the <code>ValueQueue</code>. This 095 * Queue accepts a task only if the task is not currently in the process 096 * of being run by a thread which is implied by the presence of the key 097 * in the <code>keysInProgress</code> set. 098 * 099 * NOTE: Only methods that ware explicitly called by the 100 * <code>ThreadPoolExecutor</code> need to be over-ridden. 101 */ 102 private static class UniqueKeyBlockingQueue extends 103 LinkedBlockingQueue<Runnable> { 104 105 private static final long serialVersionUID = -2152747693695890371L; 106 private HashSet<String> keysInProgress = new HashSet<String>(); 107 108 @Override 109 public synchronized void put(Runnable e) throws InterruptedException { 110 if (keysInProgress.add(((NamedRunnable)e).name)) { 111 super.put(e); 112 } 113 } 114 115 @Override 116 public Runnable take() throws InterruptedException { 117 Runnable k = super.take(); 118 if (k != null) { 119 keysInProgress.remove(((NamedRunnable)k).name); 120 } 121 return k; 122 } 123 124 @Override 125 public Runnable poll(long timeout, TimeUnit unit) 126 throws InterruptedException { 127 Runnable k = super.poll(timeout, unit); 128 if (k != null) { 129 keysInProgress.remove(((NamedRunnable)k).name); 130 } 131 return k; 132 } 133 134 } 135 136 /** 137 * Policy to decide how many values to return to client when client asks for 138 * "n" values and Queue is empty. 139 * This decides how many values to return when client calls "getAtMost" 140 */ 141 public static enum SyncGenerationPolicy { 142 ATLEAST_ONE, // Return atleast 1 value 143 LOW_WATERMARK, // Return min(n, lowWatermark * numValues) values 144 ALL // Return n values 145 } 146 147 /** 148 * Constructor takes the following tunable configuration parameters 149 * @param numValues The number of values cached in the Queue for a 150 * particular key. 151 * @param lowWatermark The ratio of (number of current entries/numValues) 152 * below which the <code>fillQueueForKey()</code> funciton will be 153 * invoked to fill the Queue. 154 * @param expiry Expiry time after which the Key and associated Queue are 155 * evicted from the cache. 156 * @param numFillerThreads Number of threads to use for the filler thread 157 * @param policy The SyncGenerationPolicy to use when client 158 * calls "getAtMost" 159 * @param refiller implementation of the QueueRefiller 160 */ 161 public ValueQueue(final int numValues, final float lowWatermark, 162 long expiry, int numFillerThreads, SyncGenerationPolicy policy, 163 final QueueRefiller<E> refiller) { 164 Preconditions.checkArgument(numValues > 0, "\"numValues\" must be > 0"); 165 Preconditions.checkArgument(((lowWatermark > 0)&&(lowWatermark <= 1)), 166 "\"lowWatermark\" must be > 0 and <= 1"); 167 Preconditions.checkArgument(expiry > 0, "\"expiry\" must be > 0"); 168 Preconditions.checkArgument(numFillerThreads > 0, 169 "\"numFillerThreads\" must be > 0"); 170 Preconditions.checkNotNull(policy, "\"policy\" must not be null"); 171 this.refiller = refiller; 172 this.policy = policy; 173 this.numValues = numValues; 174 this.lowWatermark = lowWatermark; 175 keyQueues = CacheBuilder.newBuilder() 176 .expireAfterAccess(expiry, TimeUnit.MILLISECONDS) 177 .build(new CacheLoader<String, LinkedBlockingQueue<E>>() { 178 @Override 179 public LinkedBlockingQueue<E> load(String keyName) 180 throws Exception { 181 LinkedBlockingQueue<E> keyQueue = 182 new LinkedBlockingQueue<E>(); 183 refiller.fillQueueForKey(keyName, keyQueue, 184 (int)(lowWatermark * numValues)); 185 return keyQueue; 186 } 187 }); 188 189 executor = 190 new ThreadPoolExecutor(numFillerThreads, numFillerThreads, 0L, 191 TimeUnit.MILLISECONDS, queue, new ThreadFactoryBuilder() 192 .setDaemon(true) 193 .setNameFormat(REFILL_THREAD).build()); 194 } 195 196 public ValueQueue(final int numValues, final float lowWaterMark, long expiry, 197 int numFillerThreads, QueueRefiller<E> fetcher) { 198 this(numValues, lowWaterMark, expiry, numFillerThreads, 199 SyncGenerationPolicy.ALL, fetcher); 200 } 201 202 /** 203 * Initializes the Value Queues for the provided keys by calling the 204 * fill Method with "numInitValues" values 205 * @param keyNames Array of key Names 206 * @throws ExecutionException 207 */ 208 public void initializeQueuesForKeys(String... keyNames) 209 throws ExecutionException { 210 for (String keyName : keyNames) { 211 keyQueues.get(keyName); 212 } 213 } 214 215 /** 216 * This removes the value currently at the head of the Queue for the 217 * provided key. Will immediately fire the Queue filler function if key 218 * does not exist. 219 * If Queue exists but all values are drained, It will ask the generator 220 * function to add 1 value to Queue and then drain it. 221 * @param keyName String key name 222 * @return E the next value in the Queue 223 * @throws IOException 224 * @throws ExecutionException 225 */ 226 public E getNext(String keyName) 227 throws IOException, ExecutionException { 228 return getAtMost(keyName, 1).get(0); 229 } 230 231 /** 232 * Drains the Queue for the provided key. 233 * 234 * @param keyName the key to drain the Queue for 235 */ 236 public void drain(String keyName ) { 237 try { 238 keyQueues.get(keyName).clear(); 239 } catch (ExecutionException ex) { 240 //NOP 241 } 242 } 243 244 /** 245 * Get size of the Queue for keyName. This is only used in unit tests. 246 * @param keyName the key name 247 * @return int queue size 248 */ 249 public int getSize(String keyName) { 250 // We can't do keyQueues.get(keyName).size() here, 251 // since that will have the side effect of populating the cache. 252 Map<String, LinkedBlockingQueue<E>> map = 253 keyQueues.getAllPresent(Arrays.asList(keyName)); 254 if (map.get(keyName) == null) { 255 return 0; 256 } 257 return map.get(keyName).size(); 258 } 259 260 /** 261 * This removes the "num" values currently at the head of the Queue for the 262 * provided key. Will immediately fire the Queue filler function if key 263 * does not exist 264 * How many values are actually returned is governed by the 265 * <code>SyncGenerationPolicy</code> specified by the user. 266 * @param keyName String key name 267 * @param num Minimum number of values to return. 268 * @return List<E> values returned 269 * @throws IOException 270 * @throws ExecutionException 271 */ 272 public List<E> getAtMost(String keyName, int num) throws IOException, 273 ExecutionException { 274 LinkedBlockingQueue<E> keyQueue = keyQueues.get(keyName); 275 // Using poll to avoid race condition.. 276 LinkedList<E> ekvs = new LinkedList<E>(); 277 try { 278 for (int i = 0; i < num; i++) { 279 E val = keyQueue.poll(); 280 // If queue is empty now, Based on the provided SyncGenerationPolicy, 281 // figure out how many new values need to be generated synchronously 282 if (val == null) { 283 // Synchronous call to get remaining values 284 int numToFill = 0; 285 switch (policy) { 286 case ATLEAST_ONE: 287 numToFill = (ekvs.size() < 1) ? 1 : 0; 288 break; 289 case LOW_WATERMARK: 290 numToFill = 291 Math.min(num, (int) (lowWatermark * numValues)) - ekvs.size(); 292 break; 293 case ALL: 294 numToFill = num - ekvs.size(); 295 break; 296 } 297 // Synchronous fill if not enough values found 298 if (numToFill > 0) { 299 refiller.fillQueueForKey(keyName, ekvs, numToFill); 300 } 301 // Asynch task to fill > lowWatermark 302 if (i <= (int) (lowWatermark * numValues)) { 303 submitRefillTask(keyName, keyQueue); 304 } 305 return ekvs; 306 } 307 ekvs.add(val); 308 } 309 } catch (Exception e) { 310 throw new IOException("Exception while contacting value generator ", e); 311 } 312 return ekvs; 313 } 314 315 private void submitRefillTask(final String keyName, 316 final Queue<E> keyQueue) throws InterruptedException { 317 if (!executorThreadsStarted) { 318 synchronized (this) { 319 if (!executorThreadsStarted) { 320 // To ensure all requests are first queued, make coreThreads = 321 // maxThreads 322 // and pre-start all the Core Threads. 323 executor.prestartAllCoreThreads(); 324 executorThreadsStarted = true; 325 } 326 } 327 } 328 // The submit/execute method of the ThreadPoolExecutor is bypassed and 329 // the Runnable is directly put in the backing BlockingQueue so that we 330 // can control exactly how the runnable is inserted into the queue. 331 queue.put( 332 new NamedRunnable(keyName) { 333 @Override 334 public void run() { 335 int cacheSize = numValues; 336 int threshold = (int) (lowWatermark * (float) cacheSize); 337 // Need to ensure that only one refill task per key is executed 338 try { 339 if (keyQueue.size() < threshold) { 340 refiller.fillQueueForKey(name, keyQueue, 341 cacheSize - keyQueue.size()); 342 } 343 } catch (final Exception e) { 344 throw new RuntimeException(e); 345 } 346 } 347 } 348 ); 349 } 350 351 /** 352 * Cleanly shutdown 353 */ 354 public void shutdown() { 355 executor.shutdownNow(); 356 } 357 358}