001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.yarn.event;
020
021 import java.util.ArrayList;
022 import java.util.HashMap;
023 import java.util.List;
024 import java.util.Map;
025 import java.util.concurrent.BlockingQueue;
026 import java.util.concurrent.LinkedBlockingQueue;
027
028 import org.apache.commons.logging.Log;
029 import org.apache.commons.logging.LogFactory;
030 import org.apache.hadoop.classification.InterfaceAudience.Public;
031 import org.apache.hadoop.classification.InterfaceStability.Evolving;
032 import org.apache.hadoop.conf.Configuration;
033 import org.apache.hadoop.service.AbstractService;
034 import org.apache.hadoop.util.ShutdownHookManager;
035 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
036
037 /**
038 * Dispatches {@link Event}s in a separate thread. Currently only single thread
039 * does that. Potentially there could be multiple channels for each event type
040 * class and a thread pool can be used to dispatch the events.
041 */
042 @SuppressWarnings("rawtypes")
043 @Public
044 @Evolving
045 public class AsyncDispatcher extends AbstractService implements Dispatcher {
046
047 private static final Log LOG = LogFactory.getLog(AsyncDispatcher.class);
048
049 private final BlockingQueue<Event> eventQueue;
050 private volatile boolean stopped = false;
051
052 // Configuration flag for enabling/disabling draining dispatcher's events on
053 // stop functionality.
054 private volatile boolean drainEventsOnStop = false;
055
056 // Indicates all the remaining dispatcher's events on stop have been drained
057 // and processed.
058 private volatile boolean drained = true;
059 private Object waitForDrained = new Object();
060
061 // For drainEventsOnStop enabled only, block newly coming events into the
062 // queue while stopping.
063 private volatile boolean blockNewEvents = false;
064 private EventHandler handlerInstance = null;
065
066 private Thread eventHandlingThread;
067 protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers;
068 private boolean exitOnDispatchException;
069
070 public AsyncDispatcher() {
071 this(new LinkedBlockingQueue<Event>());
072 }
073
074 public AsyncDispatcher(BlockingQueue<Event> eventQueue) {
075 super("Dispatcher");
076 this.eventQueue = eventQueue;
077 this.eventDispatchers = new HashMap<Class<? extends Enum>, EventHandler>();
078 }
079
080 Runnable createThread() {
081 return new Runnable() {
082 @Override
083 public void run() {
084 while (!stopped && !Thread.currentThread().isInterrupted()) {
085 drained = eventQueue.isEmpty();
086 // blockNewEvents is only set when dispatcher is draining to stop,
087 // adding this check is to avoid the overhead of acquiring the lock
088 // and calling notify every time in the normal run of the loop.
089 if (blockNewEvents) {
090 synchronized (waitForDrained) {
091 if (drained) {
092 waitForDrained.notify();
093 }
094 }
095 }
096 Event event;
097 try {
098 event = eventQueue.take();
099 } catch(InterruptedException ie) {
100 if (!stopped) {
101 LOG.warn("AsyncDispatcher thread interrupted", ie);
102 }
103 return;
104 }
105 if (event != null) {
106 dispatch(event);
107 }
108 }
109 }
110 };
111 }
112
113 @Override
114 protected void serviceInit(Configuration conf) throws Exception {
115 this.exitOnDispatchException =
116 conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
117 Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
118 super.serviceInit(conf);
119 }
120
121 @Override
122 protected void serviceStart() throws Exception {
123 //start all the components
124 super.serviceStart();
125 eventHandlingThread = new Thread(createThread());
126 eventHandlingThread.setName("AsyncDispatcher event handler");
127 eventHandlingThread.start();
128 }
129
130 public void setDrainEventsOnStop() {
131 drainEventsOnStop = true;
132 }
133
134 @Override
135 protected void serviceStop() throws Exception {
136 if (drainEventsOnStop) {
137 blockNewEvents = true;
138 LOG.info("AsyncDispatcher is draining to stop, igonring any new events.");
139 synchronized (waitForDrained) {
140 while (!drained && eventHandlingThread.isAlive()) {
141 waitForDrained.wait(1000);
142 LOG.info("Waiting for AsyncDispatcher to drain.");
143 }
144 }
145 }
146 stopped = true;
147 if (eventHandlingThread != null) {
148 eventHandlingThread.interrupt();
149 try {
150 eventHandlingThread.join();
151 } catch (InterruptedException ie) {
152 LOG.warn("Interrupted Exception while stopping", ie);
153 }
154 }
155
156 // stop all the components
157 super.serviceStop();
158 }
159
160 @SuppressWarnings("unchecked")
161 protected void dispatch(Event event) {
162 //all events go thru this loop
163 if (LOG.isDebugEnabled()) {
164 LOG.debug("Dispatching the event " + event.getClass().getName() + "."
165 + event.toString());
166 }
167
168 Class<? extends Enum> type = event.getType().getDeclaringClass();
169
170 try{
171 EventHandler handler = eventDispatchers.get(type);
172 if(handler != null) {
173 handler.handle(event);
174 } else {
175 throw new Exception("No handler for registered for " + type);
176 }
177 } catch (Throwable t) {
178 //TODO Maybe log the state of the queue
179 LOG.fatal("Error in dispatcher thread", t);
180 // If serviceStop is called, we should exit this thread gracefully.
181 if (exitOnDispatchException
182 && (ShutdownHookManager.get().isShutdownInProgress()) == false
183 && stopped == false) {
184 LOG.info("Exiting, bbye..");
185 System.exit(-1);
186 }
187 }
188 }
189
190 @SuppressWarnings("unchecked")
191 @Override
192 public void register(Class<? extends Enum> eventType,
193 EventHandler handler) {
194 /* check to see if we have a listener registered */
195 EventHandler<Event> registeredHandler = (EventHandler<Event>)
196 eventDispatchers.get(eventType);
197 LOG.info("Registering " + eventType + " for " + handler.getClass());
198 if (registeredHandler == null) {
199 eventDispatchers.put(eventType, handler);
200 } else if (!(registeredHandler instanceof MultiListenerHandler)){
201 /* for multiple listeners of an event add the multiple listener handler */
202 MultiListenerHandler multiHandler = new MultiListenerHandler();
203 multiHandler.addHandler(registeredHandler);
204 multiHandler.addHandler(handler);
205 eventDispatchers.put(eventType, multiHandler);
206 } else {
207 /* already a multilistener, just add to it */
208 MultiListenerHandler multiHandler
209 = (MultiListenerHandler) registeredHandler;
210 multiHandler.addHandler(handler);
211 }
212 }
213
214 @Override
215 public EventHandler getEventHandler() {
216 if (handlerInstance == null) {
217 handlerInstance = new GenericEventHandler();
218 }
219 return handlerInstance;
220 }
221
222 class GenericEventHandler implements EventHandler<Event> {
223 public void handle(Event event) {
224 if (blockNewEvents) {
225 return;
226 }
227 drained = false;
228
229 /* all this method does is enqueue all the events onto the queue */
230 int qSize = eventQueue.size();
231 if (qSize !=0 && qSize %1000 == 0) {
232 LOG.info("Size of event-queue is " + qSize);
233 }
234 int remCapacity = eventQueue.remainingCapacity();
235 if (remCapacity < 1000) {
236 LOG.warn("Very low remaining capacity in the event-queue: "
237 + remCapacity);
238 }
239 try {
240 eventQueue.put(event);
241 } catch (InterruptedException e) {
242 if (!stopped) {
243 LOG.warn("AsyncDispatcher thread interrupted", e);
244 }
245 throw new YarnRuntimeException(e);
246 }
247 };
248 }
249
250 /**
251 * Multiplexing an event. Sending it to different handlers that
252 * are interested in the event.
253 * @param <T> the type of event these multiple handlers are interested in.
254 */
255 static class MultiListenerHandler implements EventHandler<Event> {
256 List<EventHandler<Event>> listofHandlers;
257
258 public MultiListenerHandler() {
259 listofHandlers = new ArrayList<EventHandler<Event>>();
260 }
261
262 @Override
263 public void handle(Event event) {
264 for (EventHandler<Event> handler: listofHandlers) {
265 handler.handle(event);
266 }
267 }
268
269 void addHandler(EventHandler<Event> handler) {
270 listofHandlers.add(handler);
271 }
272
273 }
274 }