001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.yarn.event; 020 021import java.util.ArrayList; 022import java.util.HashMap; 023import java.util.List; 024import java.util.Map; 025import java.util.concurrent.BlockingQueue; 026import java.util.concurrent.LinkedBlockingQueue; 027 028import org.apache.commons.logging.Log; 029import org.apache.commons.logging.LogFactory; 030import org.apache.hadoop.classification.InterfaceAudience.Public; 031import org.apache.hadoop.classification.InterfaceStability.Evolving; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.service.AbstractService; 034import org.apache.hadoop.util.ShutdownHookManager; 035import org.apache.hadoop.yarn.conf.YarnConfiguration; 036import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; 037 038import com.google.common.annotations.VisibleForTesting; 039 040/** 041 * Dispatches {@link Event}s in a separate thread. Currently only single thread 042 * does that. Potentially there could be multiple channels for each event type 043 * class and a thread pool can be used to dispatch the events. 044 */ 045@SuppressWarnings("rawtypes") 046@Public 047@Evolving 048public class AsyncDispatcher extends AbstractService implements Dispatcher { 049 050 private static final Log LOG = LogFactory.getLog(AsyncDispatcher.class); 051 052 private final BlockingQueue<Event> eventQueue; 053 private volatile int lastEventQueueSizeLogged = 0; 054 private volatile boolean stopped = false; 055 056 // Configuration flag for enabling/disabling draining dispatcher's events on 057 // stop functionality. 058 private volatile boolean drainEventsOnStop = false; 059 060 // Indicates all the remaining dispatcher's events on stop have been drained 061 // and processed. 062 private volatile boolean drained = true; 063 private Object waitForDrained = new Object(); 064 065 // For drainEventsOnStop enabled only, block newly coming events into the 066 // queue while stopping. 067 private volatile boolean blockNewEvents = false; 068 private final EventHandler handlerInstance = new GenericEventHandler(); 069 070 private Thread eventHandlingThread; 071 protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers; 072 private boolean exitOnDispatchException; 073 074 public AsyncDispatcher() { 075 this(new LinkedBlockingQueue<Event>()); 076 } 077 078 public AsyncDispatcher(BlockingQueue<Event> eventQueue) { 079 super("Dispatcher"); 080 this.eventQueue = eventQueue; 081 this.eventDispatchers = new HashMap<Class<? extends Enum>, EventHandler>(); 082 } 083 084 Runnable createThread() { 085 return new Runnable() { 086 @Override 087 public void run() { 088 while (!stopped && !Thread.currentThread().isInterrupted()) { 089 drained = eventQueue.isEmpty(); 090 // blockNewEvents is only set when dispatcher is draining to stop, 091 // adding this check is to avoid the overhead of acquiring the lock 092 // and calling notify every time in the normal run of the loop. 093 if (blockNewEvents) { 094 synchronized (waitForDrained) { 095 if (drained) { 096 waitForDrained.notify(); 097 } 098 } 099 } 100 Event event; 101 try { 102 event = eventQueue.take(); 103 } catch(InterruptedException ie) { 104 if (!stopped) { 105 LOG.warn("AsyncDispatcher thread interrupted", ie); 106 } 107 return; 108 } 109 if (event != null) { 110 dispatch(event); 111 } 112 } 113 } 114 }; 115 } 116 117 @Override 118 protected void serviceInit(Configuration conf) throws Exception { 119 this.exitOnDispatchException = 120 conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, 121 Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR); 122 super.serviceInit(conf); 123 } 124 125 @Override 126 protected void serviceStart() throws Exception { 127 //start all the components 128 super.serviceStart(); 129 eventHandlingThread = new Thread(createThread()); 130 eventHandlingThread.setName("AsyncDispatcher event handler"); 131 eventHandlingThread.start(); 132 } 133 134 public void setDrainEventsOnStop() { 135 drainEventsOnStop = true; 136 } 137 138 @Override 139 protected void serviceStop() throws Exception { 140 if (drainEventsOnStop) { 141 blockNewEvents = true; 142 LOG.info("AsyncDispatcher is draining to stop, igonring any new events."); 143 long endTime = System.currentTimeMillis() + getConfig() 144 .getLong(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT, 145 YarnConfiguration.DEFAULT_DISPATCHER_DRAIN_EVENTS_TIMEOUT); 146 147 synchronized (waitForDrained) { 148 while (!drained && eventHandlingThread != null 149 && eventHandlingThread.isAlive() 150 && System.currentTimeMillis() < endTime) { 151 waitForDrained.wait(1000); 152 LOG.info("Waiting for AsyncDispatcher to drain. Thread state is :" + 153 eventHandlingThread.getState()); 154 } 155 } 156 } 157 stopped = true; 158 if (eventHandlingThread != null) { 159 eventHandlingThread.interrupt(); 160 try { 161 eventHandlingThread.join(); 162 } catch (InterruptedException ie) { 163 LOG.warn("Interrupted Exception while stopping", ie); 164 } 165 } 166 167 // stop all the components 168 super.serviceStop(); 169 } 170 171 @SuppressWarnings("unchecked") 172 protected void dispatch(Event event) { 173 //all events go thru this loop 174 if (LOG.isDebugEnabled()) { 175 LOG.debug("Dispatching the event " + event.getClass().getName() + "." 176 + event.toString()); 177 } 178 179 Class<? extends Enum> type = event.getType().getDeclaringClass(); 180 181 try{ 182 EventHandler handler = eventDispatchers.get(type); 183 if(handler != null) { 184 handler.handle(event); 185 } else { 186 throw new Exception("No handler for registered for " + type); 187 } 188 } catch (Throwable t) { 189 //TODO Maybe log the state of the queue 190 LOG.fatal("Error in dispatcher thread", t); 191 // If serviceStop is called, we should exit this thread gracefully. 192 if (exitOnDispatchException 193 && (ShutdownHookManager.get().isShutdownInProgress()) == false 194 && stopped == false) { 195 Thread shutDownThread = new Thread(createShutDownThread()); 196 shutDownThread.setName("AsyncDispatcher ShutDown handler"); 197 shutDownThread.start(); 198 } 199 } 200 } 201 202 @SuppressWarnings("unchecked") 203 @Override 204 public void register(Class<? extends Enum> eventType, 205 EventHandler handler) { 206 /* check to see if we have a listener registered */ 207 EventHandler<Event> registeredHandler = (EventHandler<Event>) 208 eventDispatchers.get(eventType); 209 LOG.info("Registering " + eventType + " for " + handler.getClass()); 210 if (registeredHandler == null) { 211 eventDispatchers.put(eventType, handler); 212 } else if (!(registeredHandler instanceof MultiListenerHandler)){ 213 /* for multiple listeners of an event add the multiple listener handler */ 214 MultiListenerHandler multiHandler = new MultiListenerHandler(); 215 multiHandler.addHandler(registeredHandler); 216 multiHandler.addHandler(handler); 217 eventDispatchers.put(eventType, multiHandler); 218 } else { 219 /* already a multilistener, just add to it */ 220 MultiListenerHandler multiHandler 221 = (MultiListenerHandler) registeredHandler; 222 multiHandler.addHandler(handler); 223 } 224 } 225 226 @Override 227 public EventHandler getEventHandler() { 228 return handlerInstance; 229 } 230 231 class GenericEventHandler implements EventHandler<Event> { 232 public void handle(Event event) { 233 if (blockNewEvents) { 234 return; 235 } 236 drained = false; 237 238 /* all this method does is enqueue all the events onto the queue */ 239 int qSize = eventQueue.size(); 240 if (qSize != 0 && qSize % 1000 == 0 241 && lastEventQueueSizeLogged != qSize) { 242 lastEventQueueSizeLogged = qSize; 243 LOG.info("Size of event-queue is " + qSize); 244 } 245 int remCapacity = eventQueue.remainingCapacity(); 246 if (remCapacity < 1000) { 247 LOG.warn("Very low remaining capacity in the event-queue: " 248 + remCapacity); 249 } 250 try { 251 eventQueue.put(event); 252 } catch (InterruptedException e) { 253 if (!stopped) { 254 LOG.warn("AsyncDispatcher thread interrupted", e); 255 } 256 // Need to reset drained flag to true if event queue is empty, 257 // otherwise dispatcher will hang on stop. 258 drained = eventQueue.isEmpty(); 259 throw new YarnRuntimeException(e); 260 } 261 }; 262 } 263 264 /** 265 * Multiplexing an event. Sending it to different handlers that 266 * are interested in the event. 267 * @param <T> the type of event these multiple handlers are interested in. 268 */ 269 static class MultiListenerHandler implements EventHandler<Event> { 270 List<EventHandler<Event>> listofHandlers; 271 272 public MultiListenerHandler() { 273 listofHandlers = new ArrayList<EventHandler<Event>>(); 274 } 275 276 @Override 277 public void handle(Event event) { 278 for (EventHandler<Event> handler: listofHandlers) { 279 handler.handle(event); 280 } 281 } 282 283 void addHandler(EventHandler<Event> handler) { 284 listofHandlers.add(handler); 285 } 286 287 } 288 289 Runnable createShutDownThread() { 290 return new Runnable() { 291 @Override 292 public void run() { 293 LOG.info("Exiting, bbye.."); 294 System.exit(-1); 295 } 296 }; 297 } 298 299 @VisibleForTesting 300 protected boolean isEventThreadWaiting() { 301 return eventHandlingThread.getState() == Thread.State.WAITING; 302 } 303 304 @VisibleForTesting 305 protected boolean isDrained() { 306 return this.drained; 307 } 308}