001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.yarn.event; 020 021 import java.util.ArrayList; 022 import java.util.HashMap; 023 import java.util.List; 024 import java.util.Map; 025 import java.util.concurrent.BlockingQueue; 026 import java.util.concurrent.LinkedBlockingQueue; 027 028 import org.apache.commons.logging.Log; 029 import org.apache.commons.logging.LogFactory; 030 import org.apache.hadoop.classification.InterfaceAudience.Public; 031 import org.apache.hadoop.classification.InterfaceStability.Evolving; 032 import org.apache.hadoop.conf.Configuration; 033 import org.apache.hadoop.service.AbstractService; 034 import org.apache.hadoop.util.ShutdownHookManager; 035 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; 036 037 /** 038 * Dispatches {@link Event}s in a separate thread. Currently only single thread 039 * does that. Potentially there could be multiple channels for each event type 040 * class and a thread pool can be used to dispatch the events. 041 */ 042 @SuppressWarnings("rawtypes") 043 @Public 044 @Evolving 045 public class AsyncDispatcher extends AbstractService implements Dispatcher { 046 047 private static final Log LOG = LogFactory.getLog(AsyncDispatcher.class); 048 049 private final BlockingQueue<Event> eventQueue; 050 private volatile boolean stopped = false; 051 052 // Configuration flag for enabling/disabling draining dispatcher's events on 053 // stop functionality. 054 private volatile boolean drainEventsOnStop = false; 055 056 // Indicates all the remaining dispatcher's events on stop have been drained 057 // and processed. 058 private volatile boolean drained = true; 059 private Object waitForDrained = new Object(); 060 061 // For drainEventsOnStop enabled only, block newly coming events into the 062 // queue while stopping. 063 private volatile boolean blockNewEvents = false; 064 private EventHandler handlerInstance = null; 065 066 private Thread eventHandlingThread; 067 protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers; 068 private boolean exitOnDispatchException; 069 070 public AsyncDispatcher() { 071 this(new LinkedBlockingQueue<Event>()); 072 } 073 074 public AsyncDispatcher(BlockingQueue<Event> eventQueue) { 075 super("Dispatcher"); 076 this.eventQueue = eventQueue; 077 this.eventDispatchers = new HashMap<Class<? extends Enum>, EventHandler>(); 078 } 079 080 Runnable createThread() { 081 return new Runnable() { 082 @Override 083 public void run() { 084 while (!stopped && !Thread.currentThread().isInterrupted()) { 085 drained = eventQueue.isEmpty(); 086 // blockNewEvents is only set when dispatcher is draining to stop, 087 // adding this check is to avoid the overhead of acquiring the lock 088 // and calling notify every time in the normal run of the loop. 089 if (blockNewEvents) { 090 synchronized (waitForDrained) { 091 if (drained) { 092 waitForDrained.notify(); 093 } 094 } 095 } 096 Event event; 097 try { 098 event = eventQueue.take(); 099 } catch(InterruptedException ie) { 100 if (!stopped) { 101 LOG.warn("AsyncDispatcher thread interrupted", ie); 102 } 103 return; 104 } 105 if (event != null) { 106 dispatch(event); 107 } 108 } 109 } 110 }; 111 } 112 113 @Override 114 protected void serviceInit(Configuration conf) throws Exception { 115 this.exitOnDispatchException = 116 conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, 117 Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR); 118 super.serviceInit(conf); 119 } 120 121 @Override 122 protected void serviceStart() throws Exception { 123 //start all the components 124 super.serviceStart(); 125 eventHandlingThread = new Thread(createThread()); 126 eventHandlingThread.setName("AsyncDispatcher event handler"); 127 eventHandlingThread.start(); 128 } 129 130 public void setDrainEventsOnStop() { 131 drainEventsOnStop = true; 132 } 133 134 @Override 135 protected void serviceStop() throws Exception { 136 if (drainEventsOnStop) { 137 blockNewEvents = true; 138 LOG.info("AsyncDispatcher is draining to stop, igonring any new events."); 139 synchronized (waitForDrained) { 140 while (!drained && eventHandlingThread.isAlive()) { 141 waitForDrained.wait(1000); 142 LOG.info("Waiting for AsyncDispatcher to drain."); 143 } 144 } 145 } 146 stopped = true; 147 if (eventHandlingThread != null) { 148 eventHandlingThread.interrupt(); 149 try { 150 eventHandlingThread.join(); 151 } catch (InterruptedException ie) { 152 LOG.warn("Interrupted Exception while stopping", ie); 153 } 154 } 155 156 // stop all the components 157 super.serviceStop(); 158 } 159 160 @SuppressWarnings("unchecked") 161 protected void dispatch(Event event) { 162 //all events go thru this loop 163 if (LOG.isDebugEnabled()) { 164 LOG.debug("Dispatching the event " + event.getClass().getName() + "." 165 + event.toString()); 166 } 167 168 Class<? extends Enum> type = event.getType().getDeclaringClass(); 169 170 try{ 171 EventHandler handler = eventDispatchers.get(type); 172 if(handler != null) { 173 handler.handle(event); 174 } else { 175 throw new Exception("No handler for registered for " + type); 176 } 177 } catch (Throwable t) { 178 //TODO Maybe log the state of the queue 179 LOG.fatal("Error in dispatcher thread", t); 180 // If serviceStop is called, we should exit this thread gracefully. 181 if (exitOnDispatchException 182 && (ShutdownHookManager.get().isShutdownInProgress()) == false 183 && stopped == false) { 184 LOG.info("Exiting, bbye.."); 185 System.exit(-1); 186 } 187 } 188 } 189 190 @SuppressWarnings("unchecked") 191 @Override 192 public void register(Class<? extends Enum> eventType, 193 EventHandler handler) { 194 /* check to see if we have a listener registered */ 195 EventHandler<Event> registeredHandler = (EventHandler<Event>) 196 eventDispatchers.get(eventType); 197 LOG.info("Registering " + eventType + " for " + handler.getClass()); 198 if (registeredHandler == null) { 199 eventDispatchers.put(eventType, handler); 200 } else if (!(registeredHandler instanceof MultiListenerHandler)){ 201 /* for multiple listeners of an event add the multiple listener handler */ 202 MultiListenerHandler multiHandler = new MultiListenerHandler(); 203 multiHandler.addHandler(registeredHandler); 204 multiHandler.addHandler(handler); 205 eventDispatchers.put(eventType, multiHandler); 206 } else { 207 /* already a multilistener, just add to it */ 208 MultiListenerHandler multiHandler 209 = (MultiListenerHandler) registeredHandler; 210 multiHandler.addHandler(handler); 211 } 212 } 213 214 @Override 215 public EventHandler getEventHandler() { 216 if (handlerInstance == null) { 217 handlerInstance = new GenericEventHandler(); 218 } 219 return handlerInstance; 220 } 221 222 class GenericEventHandler implements EventHandler<Event> { 223 public void handle(Event event) { 224 if (blockNewEvents) { 225 return; 226 } 227 drained = false; 228 229 /* all this method does is enqueue all the events onto the queue */ 230 int qSize = eventQueue.size(); 231 if (qSize !=0 && qSize %1000 == 0) { 232 LOG.info("Size of event-queue is " + qSize); 233 } 234 int remCapacity = eventQueue.remainingCapacity(); 235 if (remCapacity < 1000) { 236 LOG.warn("Very low remaining capacity in the event-queue: " 237 + remCapacity); 238 } 239 try { 240 eventQueue.put(event); 241 } catch (InterruptedException e) { 242 if (!stopped) { 243 LOG.warn("AsyncDispatcher thread interrupted", e); 244 } 245 throw new YarnRuntimeException(e); 246 } 247 }; 248 } 249 250 /** 251 * Multiplexing an event. Sending it to different handlers that 252 * are interested in the event. 253 * @param <T> the type of event these multiple handlers are interested in. 254 */ 255 static class MultiListenerHandler implements EventHandler<Event> { 256 List<EventHandler<Event>> listofHandlers; 257 258 public MultiListenerHandler() { 259 listofHandlers = new ArrayList<EventHandler<Event>>(); 260 } 261 262 @Override 263 public void handle(Event event) { 264 for (EventHandler<Event> handler: listofHandlers) { 265 handler.handle(event); 266 } 267 } 268 269 void addHandler(EventHandler<Event> handler) { 270 listofHandlers.add(handler); 271 } 272 273 } 274 }