001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.yarn.event; 020 021import java.util.ArrayList; 022import java.util.HashMap; 023import java.util.List; 024import java.util.Map; 025import java.util.concurrent.BlockingQueue; 026import java.util.concurrent.LinkedBlockingQueue; 027 028import org.apache.commons.logging.Log; 029import org.apache.commons.logging.LogFactory; 030import org.apache.hadoop.classification.InterfaceAudience.Public; 031import org.apache.hadoop.classification.InterfaceStability.Evolving; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.service.AbstractService; 034import org.apache.hadoop.util.ShutdownHookManager; 035import org.apache.hadoop.yarn.conf.YarnConfiguration; 036import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; 037 038/** 039 * Dispatches {@link Event}s in a separate thread. Currently only single thread 040 * does that. Potentially there could be multiple channels for each event type 041 * class and a thread pool can be used to dispatch the events. 042 */ 043@SuppressWarnings("rawtypes") 044@Public 045@Evolving 046public class AsyncDispatcher extends AbstractService implements Dispatcher { 047 048 private static final Log LOG = LogFactory.getLog(AsyncDispatcher.class); 049 050 private final BlockingQueue<Event> eventQueue; 051 private volatile boolean stopped = false; 052 053 // Configuration flag for enabling/disabling draining dispatcher's events on 054 // stop functionality. 055 private volatile boolean drainEventsOnStop = false; 056 057 // Indicates all the remaining dispatcher's events on stop have been drained 058 // and processed. 059 private volatile boolean drained = true; 060 private Object waitForDrained = new Object(); 061 062 // For drainEventsOnStop enabled only, block newly coming events into the 063 // queue while stopping. 064 private volatile boolean blockNewEvents = false; 065 private EventHandler handlerInstance = null; 066 067 private Thread eventHandlingThread; 068 protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers; 069 private boolean exitOnDispatchException; 070 071 public AsyncDispatcher() { 072 this(new LinkedBlockingQueue<Event>()); 073 } 074 075 public AsyncDispatcher(BlockingQueue<Event> eventQueue) { 076 super("Dispatcher"); 077 this.eventQueue = eventQueue; 078 this.eventDispatchers = new HashMap<Class<? extends Enum>, EventHandler>(); 079 } 080 081 Runnable createThread() { 082 return new Runnable() { 083 @Override 084 public void run() { 085 while (!stopped && !Thread.currentThread().isInterrupted()) { 086 drained = eventQueue.isEmpty(); 087 // blockNewEvents is only set when dispatcher is draining to stop, 088 // adding this check is to avoid the overhead of acquiring the lock 089 // and calling notify every time in the normal run of the loop. 090 if (blockNewEvents) { 091 synchronized (waitForDrained) { 092 if (drained) { 093 waitForDrained.notify(); 094 } 095 } 096 } 097 Event event; 098 try { 099 event = eventQueue.take(); 100 } catch(InterruptedException ie) { 101 if (!stopped) { 102 LOG.warn("AsyncDispatcher thread interrupted", ie); 103 } 104 return; 105 } 106 if (event != null) { 107 dispatch(event); 108 } 109 } 110 } 111 }; 112 } 113 114 @Override 115 protected void serviceInit(Configuration conf) throws Exception { 116 this.exitOnDispatchException = 117 conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, 118 Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR); 119 super.serviceInit(conf); 120 } 121 122 @Override 123 protected void serviceStart() throws Exception { 124 //start all the components 125 super.serviceStart(); 126 eventHandlingThread = new Thread(createThread()); 127 eventHandlingThread.setName("AsyncDispatcher event handler"); 128 eventHandlingThread.start(); 129 } 130 131 public void setDrainEventsOnStop() { 132 drainEventsOnStop = true; 133 } 134 135 @Override 136 protected void serviceStop() throws Exception { 137 if (drainEventsOnStop) { 138 blockNewEvents = true; 139 LOG.info("AsyncDispatcher is draining to stop, igonring any new events."); 140 long endTime = System.currentTimeMillis() + getConfig() 141 .getLong(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT, 142 YarnConfiguration.DEFAULT_DISPATCHER_DRAIN_EVENTS_TIMEOUT); 143 144 synchronized (waitForDrained) { 145 while (!drained && eventHandlingThread != null 146 && eventHandlingThread.isAlive() 147 && System.currentTimeMillis() < endTime) { 148 waitForDrained.wait(1000); 149 LOG.info("Waiting for AsyncDispatcher to drain."); 150 } 151 } 152 } 153 stopped = true; 154 if (eventHandlingThread != null) { 155 eventHandlingThread.interrupt(); 156 try { 157 eventHandlingThread.join(); 158 } catch (InterruptedException ie) { 159 LOG.warn("Interrupted Exception while stopping", ie); 160 } 161 } 162 163 // stop all the components 164 super.serviceStop(); 165 } 166 167 @SuppressWarnings("unchecked") 168 protected void dispatch(Event event) { 169 //all events go thru this loop 170 if (LOG.isDebugEnabled()) { 171 LOG.debug("Dispatching the event " + event.getClass().getName() + "." 172 + event.toString()); 173 } 174 175 Class<? extends Enum> type = event.getType().getDeclaringClass(); 176 177 try{ 178 EventHandler handler = eventDispatchers.get(type); 179 if(handler != null) { 180 handler.handle(event); 181 } else { 182 throw new Exception("No handler for registered for " + type); 183 } 184 } catch (Throwable t) { 185 //TODO Maybe log the state of the queue 186 LOG.fatal("Error in dispatcher thread", t); 187 // If serviceStop is called, we should exit this thread gracefully. 188 if (exitOnDispatchException 189 && (ShutdownHookManager.get().isShutdownInProgress()) == false 190 && stopped == false) { 191 Thread shutDownThread = new Thread(createShutDownThread()); 192 shutDownThread.setName("AsyncDispatcher ShutDown handler"); 193 shutDownThread.start(); 194 } 195 } 196 } 197 198 @SuppressWarnings("unchecked") 199 @Override 200 public void register(Class<? extends Enum> eventType, 201 EventHandler handler) { 202 /* check to see if we have a listener registered */ 203 EventHandler<Event> registeredHandler = (EventHandler<Event>) 204 eventDispatchers.get(eventType); 205 LOG.info("Registering " + eventType + " for " + handler.getClass()); 206 if (registeredHandler == null) { 207 eventDispatchers.put(eventType, handler); 208 } else if (!(registeredHandler instanceof MultiListenerHandler)){ 209 /* for multiple listeners of an event add the multiple listener handler */ 210 MultiListenerHandler multiHandler = new MultiListenerHandler(); 211 multiHandler.addHandler(registeredHandler); 212 multiHandler.addHandler(handler); 213 eventDispatchers.put(eventType, multiHandler); 214 } else { 215 /* already a multilistener, just add to it */ 216 MultiListenerHandler multiHandler 217 = (MultiListenerHandler) registeredHandler; 218 multiHandler.addHandler(handler); 219 } 220 } 221 222 @Override 223 public EventHandler getEventHandler() { 224 if (handlerInstance == null) { 225 handlerInstance = new GenericEventHandler(); 226 } 227 return handlerInstance; 228 } 229 230 class GenericEventHandler implements EventHandler<Event> { 231 public void handle(Event event) { 232 if (blockNewEvents) { 233 return; 234 } 235 drained = false; 236 237 /* all this method does is enqueue all the events onto the queue */ 238 int qSize = eventQueue.size(); 239 if (qSize !=0 && qSize %1000 == 0) { 240 LOG.info("Size of event-queue is " + qSize); 241 } 242 int remCapacity = eventQueue.remainingCapacity(); 243 if (remCapacity < 1000) { 244 LOG.warn("Very low remaining capacity in the event-queue: " 245 + remCapacity); 246 } 247 try { 248 eventQueue.put(event); 249 } catch (InterruptedException e) { 250 if (!stopped) { 251 LOG.warn("AsyncDispatcher thread interrupted", e); 252 } 253 throw new YarnRuntimeException(e); 254 } 255 }; 256 } 257 258 /** 259 * Multiplexing an event. Sending it to different handlers that 260 * are interested in the event. 261 * @param <T> the type of event these multiple handlers are interested in. 262 */ 263 static class MultiListenerHandler implements EventHandler<Event> { 264 List<EventHandler<Event>> listofHandlers; 265 266 public MultiListenerHandler() { 267 listofHandlers = new ArrayList<EventHandler<Event>>(); 268 } 269 270 @Override 271 public void handle(Event event) { 272 for (EventHandler<Event> handler: listofHandlers) { 273 handler.handle(event); 274 } 275 } 276 277 void addHandler(EventHandler<Event> handler) { 278 listofHandlers.add(handler); 279 } 280 281 } 282 283 Runnable createShutDownThread() { 284 return new Runnable() { 285 @Override 286 public void run() { 287 LOG.info("Exiting, bbye.."); 288 System.exit(-1); 289 } 290 }; 291 } 292}