001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.yarn.event;
020
021import java.util.ArrayList;
022import java.util.HashMap;
023import java.util.List;
024import java.util.Map;
025import java.util.concurrent.BlockingQueue;
026import java.util.concurrent.LinkedBlockingQueue;
027
028import org.apache.commons.logging.Log;
029import org.apache.commons.logging.LogFactory;
030import org.apache.hadoop.classification.InterfaceAudience.Public;
031import org.apache.hadoop.classification.InterfaceStability.Evolving;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.service.AbstractService;
034import org.apache.hadoop.util.ShutdownHookManager;
035import org.apache.hadoop.yarn.conf.YarnConfiguration;
036import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
037
038/**
039 * Dispatches {@link Event}s in a separate thread. Currently only single thread
040 * does that. Potentially there could be multiple channels for each event type
041 * class and a thread pool can be used to dispatch the events.
042 */
043@SuppressWarnings("rawtypes")
044@Public
045@Evolving
046public class AsyncDispatcher extends AbstractService implements Dispatcher {
047
048  private static final Log LOG = LogFactory.getLog(AsyncDispatcher.class);
049
050  private final BlockingQueue<Event> eventQueue;
051  private volatile boolean stopped = false;
052
053  // Configuration flag for enabling/disabling draining dispatcher's events on
054  // stop functionality.
055  private volatile boolean drainEventsOnStop = false;
056
057  // Indicates all the remaining dispatcher's events on stop have been drained
058  // and processed.
059  private volatile boolean drained = true;
060  private Object waitForDrained = new Object();
061
062  // For drainEventsOnStop enabled only, block newly coming events into the
063  // queue while stopping.
064  private volatile boolean blockNewEvents = false;
065  private EventHandler handlerInstance = null;
066
067  private Thread eventHandlingThread;
068  protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers;
069  private boolean exitOnDispatchException;
070
071  public AsyncDispatcher() {
072    this(new LinkedBlockingQueue<Event>());
073  }
074
075  public AsyncDispatcher(BlockingQueue<Event> eventQueue) {
076    super("Dispatcher");
077    this.eventQueue = eventQueue;
078    this.eventDispatchers = new HashMap<Class<? extends Enum>, EventHandler>();
079  }
080
081  Runnable createThread() {
082    return new Runnable() {
083      @Override
084      public void run() {
085        while (!stopped && !Thread.currentThread().isInterrupted()) {
086          drained = eventQueue.isEmpty();
087          // blockNewEvents is only set when dispatcher is draining to stop,
088          // adding this check is to avoid the overhead of acquiring the lock
089          // and calling notify every time in the normal run of the loop.
090          if (blockNewEvents) {
091            synchronized (waitForDrained) {
092              if (drained) {
093                waitForDrained.notify();
094              }
095            }
096          }
097          Event event;
098          try {
099            event = eventQueue.take();
100          } catch(InterruptedException ie) {
101            if (!stopped) {
102              LOG.warn("AsyncDispatcher thread interrupted", ie);
103            }
104            return;
105          }
106          if (event != null) {
107            dispatch(event);
108          }
109        }
110      }
111    };
112  }
113
114  @Override
115  protected void serviceInit(Configuration conf) throws Exception {
116    this.exitOnDispatchException =
117        conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
118          Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
119    super.serviceInit(conf);
120  }
121
122  @Override
123  protected void serviceStart() throws Exception {
124    //start all the components
125    super.serviceStart();
126    eventHandlingThread = new Thread(createThread());
127    eventHandlingThread.setName("AsyncDispatcher event handler");
128    eventHandlingThread.start();
129  }
130
131  public void setDrainEventsOnStop() {
132    drainEventsOnStop = true;
133  }
134
135  @Override
136  protected void serviceStop() throws Exception {
137    if (drainEventsOnStop) {
138      blockNewEvents = true;
139      LOG.info("AsyncDispatcher is draining to stop, igonring any new events.");
140      long endTime = System.currentTimeMillis() + getConfig()
141          .getLong(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT,
142              YarnConfiguration.DEFAULT_DISPATCHER_DRAIN_EVENTS_TIMEOUT);
143
144      synchronized (waitForDrained) {
145        while (!drained && eventHandlingThread != null
146            && eventHandlingThread.isAlive()
147            && System.currentTimeMillis() < endTime) {
148          waitForDrained.wait(1000);
149          LOG.info("Waiting for AsyncDispatcher to drain.");
150        }
151      }
152    }
153    stopped = true;
154    if (eventHandlingThread != null) {
155      eventHandlingThread.interrupt();
156      try {
157        eventHandlingThread.join();
158      } catch (InterruptedException ie) {
159        LOG.warn("Interrupted Exception while stopping", ie);
160      }
161    }
162
163    // stop all the components
164    super.serviceStop();
165  }
166
167  @SuppressWarnings("unchecked")
168  protected void dispatch(Event event) {
169    //all events go thru this loop
170    if (LOG.isDebugEnabled()) {
171      LOG.debug("Dispatching the event " + event.getClass().getName() + "."
172          + event.toString());
173    }
174
175    Class<? extends Enum> type = event.getType().getDeclaringClass();
176
177    try{
178      EventHandler handler = eventDispatchers.get(type);
179      if(handler != null) {
180        handler.handle(event);
181      } else {
182        throw new Exception("No handler for registered for " + type);
183      }
184    } catch (Throwable t) {
185      //TODO Maybe log the state of the queue
186      LOG.fatal("Error in dispatcher thread", t);
187      // If serviceStop is called, we should exit this thread gracefully.
188      if (exitOnDispatchException
189          && (ShutdownHookManager.get().isShutdownInProgress()) == false
190          && stopped == false) {
191        Thread shutDownThread = new Thread(createShutDownThread());
192        shutDownThread.setName("AsyncDispatcher ShutDown handler");
193        shutDownThread.start();
194      }
195    }
196  }
197
198  @SuppressWarnings("unchecked")
199  @Override
200  public void register(Class<? extends Enum> eventType,
201      EventHandler handler) {
202    /* check to see if we have a listener registered */
203    EventHandler<Event> registeredHandler = (EventHandler<Event>)
204    eventDispatchers.get(eventType);
205    LOG.info("Registering " + eventType + " for " + handler.getClass());
206    if (registeredHandler == null) {
207      eventDispatchers.put(eventType, handler);
208    } else if (!(registeredHandler instanceof MultiListenerHandler)){
209      /* for multiple listeners of an event add the multiple listener handler */
210      MultiListenerHandler multiHandler = new MultiListenerHandler();
211      multiHandler.addHandler(registeredHandler);
212      multiHandler.addHandler(handler);
213      eventDispatchers.put(eventType, multiHandler);
214    } else {
215      /* already a multilistener, just add to it */
216      MultiListenerHandler multiHandler
217      = (MultiListenerHandler) registeredHandler;
218      multiHandler.addHandler(handler);
219    }
220  }
221
222  @Override
223  public EventHandler getEventHandler() {
224    if (handlerInstance == null) {
225      handlerInstance = new GenericEventHandler();
226    }
227    return handlerInstance;
228  }
229
230  class GenericEventHandler implements EventHandler<Event> {
231    public void handle(Event event) {
232      if (blockNewEvents) {
233        return;
234      }
235      drained = false;
236
237      /* all this method does is enqueue all the events onto the queue */
238      int qSize = eventQueue.size();
239      if (qSize !=0 && qSize %1000 == 0) {
240        LOG.info("Size of event-queue is " + qSize);
241      }
242      int remCapacity = eventQueue.remainingCapacity();
243      if (remCapacity < 1000) {
244        LOG.warn("Very low remaining capacity in the event-queue: "
245            + remCapacity);
246      }
247      try {
248        eventQueue.put(event);
249      } catch (InterruptedException e) {
250        if (!stopped) {
251          LOG.warn("AsyncDispatcher thread interrupted", e);
252        }
253        throw new YarnRuntimeException(e);
254      }
255    };
256  }
257
258  /**
259   * Multiplexing an event. Sending it to different handlers that
260   * are interested in the event.
261   * @param <T> the type of event these multiple handlers are interested in.
262   */
263  static class MultiListenerHandler implements EventHandler<Event> {
264    List<EventHandler<Event>> listofHandlers;
265
266    public MultiListenerHandler() {
267      listofHandlers = new ArrayList<EventHandler<Event>>();
268    }
269
270    @Override
271    public void handle(Event event) {
272      for (EventHandler<Event> handler: listofHandlers) {
273        handler.handle(event);
274      }
275    }
276
277    void addHandler(EventHandler<Event> handler) {
278      listofHandlers.add(handler);
279    }
280
281  }
282
283  Runnable createShutDownThread() {
284    return new Runnable() {
285      @Override
286      public void run() {
287        LOG.info("Exiting, bbye..");
288        System.exit(-1);
289      }
290    };
291  }
292}