001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.yarn.event; 020 021import java.util.ArrayList; 022import java.util.HashMap; 023import java.util.List; 024import java.util.Map; 025import java.util.concurrent.BlockingQueue; 026import java.util.concurrent.LinkedBlockingQueue; 027 028import org.apache.commons.logging.Log; 029import org.apache.commons.logging.LogFactory; 030import org.apache.hadoop.classification.InterfaceAudience.Public; 031import org.apache.hadoop.classification.InterfaceStability.Evolving; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.service.AbstractService; 034import org.apache.hadoop.util.ShutdownHookManager; 035import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; 036 037import com.google.common.annotations.VisibleForTesting; 038 039/** 040 * Dispatches {@link Event}s in a separate thread. Currently only single thread 041 * does that. Potentially there could be multiple channels for each event type 042 * class and a thread pool can be used to dispatch the events. 043 */ 044@SuppressWarnings("rawtypes") 045@Public 046@Evolving 047public class AsyncDispatcher extends AbstractService implements Dispatcher { 048 049 private static final Log LOG = LogFactory.getLog(AsyncDispatcher.class); 050 051 private final BlockingQueue<Event> eventQueue; 052 private volatile boolean stopped = false; 053 054 // Configuration flag for enabling/disabling draining dispatcher's events on 055 // stop functionality. 056 private volatile boolean drainEventsOnStop = false; 057 058 // Indicates all the remaining dispatcher's events on stop have been drained 059 // and processed. 060 private volatile boolean drained = true; 061 private Object waitForDrained = new Object(); 062 063 // For drainEventsOnStop enabled only, block newly coming events into the 064 // queue while stopping. 065 private volatile boolean blockNewEvents = false; 066 private EventHandler handlerInstance = null; 067 068 private Thread eventHandlingThread; 069 protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers; 070 private boolean exitOnDispatchException; 071 072 public AsyncDispatcher() { 073 this(new LinkedBlockingQueue<Event>()); 074 } 075 076 public AsyncDispatcher(BlockingQueue<Event> eventQueue) { 077 super("Dispatcher"); 078 this.eventQueue = eventQueue; 079 this.eventDispatchers = new HashMap<Class<? extends Enum>, EventHandler>(); 080 } 081 082 Runnable createThread() { 083 return new Runnable() { 084 @Override 085 public void run() { 086 while (!stopped && !Thread.currentThread().isInterrupted()) { 087 drained = eventQueue.isEmpty(); 088 // blockNewEvents is only set when dispatcher is draining to stop, 089 // adding this check is to avoid the overhead of acquiring the lock 090 // and calling notify every time in the normal run of the loop. 091 if (blockNewEvents) { 092 synchronized (waitForDrained) { 093 if (drained) { 094 waitForDrained.notify(); 095 } 096 } 097 } 098 Event event; 099 try { 100 event = eventQueue.take(); 101 } catch(InterruptedException ie) { 102 if (!stopped) { 103 LOG.warn("AsyncDispatcher thread interrupted", ie); 104 } 105 return; 106 } 107 if (event != null) { 108 dispatch(event); 109 } 110 } 111 } 112 }; 113 } 114 115 @Override 116 protected void serviceInit(Configuration conf) throws Exception { 117 this.exitOnDispatchException = 118 conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, 119 Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR); 120 super.serviceInit(conf); 121 } 122 123 @Override 124 protected void serviceStart() throws Exception { 125 //start all the components 126 super.serviceStart(); 127 eventHandlingThread = new Thread(createThread()); 128 eventHandlingThread.setName("AsyncDispatcher event handler"); 129 eventHandlingThread.start(); 130 } 131 132 public void setDrainEventsOnStop() { 133 drainEventsOnStop = true; 134 } 135 136 @Override 137 protected void serviceStop() throws Exception { 138 if (drainEventsOnStop) { 139 blockNewEvents = true; 140 LOG.info("AsyncDispatcher is draining to stop, igonring any new events."); 141 synchronized (waitForDrained) { 142 while (!drained && eventHandlingThread.isAlive()) { 143 waitForDrained.wait(1000); 144 LOG.info("Waiting for AsyncDispatcher to drain."); 145 } 146 } 147 } 148 stopped = true; 149 if (eventHandlingThread != null) { 150 eventHandlingThread.interrupt(); 151 try { 152 eventHandlingThread.join(); 153 } catch (InterruptedException ie) { 154 LOG.warn("Interrupted Exception while stopping", ie); 155 } 156 } 157 158 // stop all the components 159 super.serviceStop(); 160 } 161 162 @SuppressWarnings("unchecked") 163 protected void dispatch(Event event) { 164 //all events go thru this loop 165 if (LOG.isDebugEnabled()) { 166 LOG.debug("Dispatching the event " + event.getClass().getName() + "." 167 + event.toString()); 168 } 169 170 Class<? extends Enum> type = event.getType().getDeclaringClass(); 171 172 try{ 173 EventHandler handler = eventDispatchers.get(type); 174 if(handler != null) { 175 handler.handle(event); 176 } else { 177 throw new Exception("No handler for registered for " + type); 178 } 179 } catch (Throwable t) { 180 //TODO Maybe log the state of the queue 181 LOG.fatal("Error in dispatcher thread", t); 182 // If serviceStop is called, we should exit this thread gracefully. 183 if (exitOnDispatchException 184 && (ShutdownHookManager.get().isShutdownInProgress()) == false 185 && stopped == false) { 186 Thread shutDownThread = new Thread(createShutDownThread()); 187 shutDownThread.setName("AsyncDispatcher ShutDown handler"); 188 shutDownThread.start(); 189 } 190 } 191 } 192 193 @SuppressWarnings("unchecked") 194 @Override 195 public void register(Class<? extends Enum> eventType, 196 EventHandler handler) { 197 /* check to see if we have a listener registered */ 198 EventHandler<Event> registeredHandler = (EventHandler<Event>) 199 eventDispatchers.get(eventType); 200 LOG.info("Registering " + eventType + " for " + handler.getClass()); 201 if (registeredHandler == null) { 202 eventDispatchers.put(eventType, handler); 203 } else if (!(registeredHandler instanceof MultiListenerHandler)){ 204 /* for multiple listeners of an event add the multiple listener handler */ 205 MultiListenerHandler multiHandler = new MultiListenerHandler(); 206 multiHandler.addHandler(registeredHandler); 207 multiHandler.addHandler(handler); 208 eventDispatchers.put(eventType, multiHandler); 209 } else { 210 /* already a multilistener, just add to it */ 211 MultiListenerHandler multiHandler 212 = (MultiListenerHandler) registeredHandler; 213 multiHandler.addHandler(handler); 214 } 215 } 216 217 @Override 218 public EventHandler getEventHandler() { 219 if (handlerInstance == null) { 220 handlerInstance = new GenericEventHandler(); 221 } 222 return handlerInstance; 223 } 224 225 class GenericEventHandler implements EventHandler<Event> { 226 public void handle(Event event) { 227 if (blockNewEvents) { 228 return; 229 } 230 drained = false; 231 232 /* all this method does is enqueue all the events onto the queue */ 233 int qSize = eventQueue.size(); 234 if (qSize !=0 && qSize %1000 == 0) { 235 LOG.info("Size of event-queue is " + qSize); 236 } 237 int remCapacity = eventQueue.remainingCapacity(); 238 if (remCapacity < 1000) { 239 LOG.warn("Very low remaining capacity in the event-queue: " 240 + remCapacity); 241 } 242 try { 243 eventQueue.put(event); 244 } catch (InterruptedException e) { 245 if (!stopped) { 246 LOG.warn("AsyncDispatcher thread interrupted", e); 247 } 248 throw new YarnRuntimeException(e); 249 } 250 }; 251 } 252 253 /** 254 * Multiplexing an event. Sending it to different handlers that 255 * are interested in the event. 256 * @param <T> the type of event these multiple handlers are interested in. 257 */ 258 static class MultiListenerHandler implements EventHandler<Event> { 259 List<EventHandler<Event>> listofHandlers; 260 261 public MultiListenerHandler() { 262 listofHandlers = new ArrayList<EventHandler<Event>>(); 263 } 264 265 @Override 266 public void handle(Event event) { 267 for (EventHandler<Event> handler: listofHandlers) { 268 handler.handle(event); 269 } 270 } 271 272 void addHandler(EventHandler<Event> handler) { 273 listofHandlers.add(handler); 274 } 275 276 } 277 278 Runnable createShutDownThread() { 279 return new Runnable() { 280 @Override 281 public void run() { 282 LOG.info("Exiting, bbye.."); 283 System.exit(-1); 284 } 285 }; 286 } 287 288 @VisibleForTesting 289 protected boolean isDrained() { 290 return this.drained; 291 } 292}