001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.yarn.event;
020
021import java.util.ArrayList;
022import java.util.HashMap;
023import java.util.List;
024import java.util.Map;
025import java.util.concurrent.BlockingQueue;
026import java.util.concurrent.LinkedBlockingQueue;
027
028import org.apache.commons.logging.Log;
029import org.apache.commons.logging.LogFactory;
030import org.apache.hadoop.classification.InterfaceAudience.Public;
031import org.apache.hadoop.classification.InterfaceStability.Evolving;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.service.AbstractService;
034import org.apache.hadoop.util.ShutdownHookManager;
035import org.apache.hadoop.yarn.conf.YarnConfiguration;
036import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
037
038import com.google.common.annotations.VisibleForTesting;
039
040/**
041 * Dispatches {@link Event}s in a separate thread. Currently only single thread
042 * does that. Potentially there could be multiple channels for each event type
043 * class and a thread pool can be used to dispatch the events.
044 */
045@SuppressWarnings("rawtypes")
046@Public
047@Evolving
048public class AsyncDispatcher extends AbstractService implements Dispatcher {
049
050  private static final Log LOG = LogFactory.getLog(AsyncDispatcher.class);
051
052  private final BlockingQueue<Event> eventQueue;
053  private volatile int lastEventQueueSizeLogged = 0;
054  private volatile boolean stopped = false;
055
056  // Configuration flag for enabling/disabling draining dispatcher's events on
057  // stop functionality.
058  private volatile boolean drainEventsOnStop = false;
059
060  // Indicates all the remaining dispatcher's events on stop have been drained
061  // and processed.
062  private volatile boolean drained = true;
063  private Object waitForDrained = new Object();
064
065  // For drainEventsOnStop enabled only, block newly coming events into the
066  // queue while stopping.
067  private volatile boolean blockNewEvents = false;
068  private EventHandler handlerInstance = null;
069
070  private Thread eventHandlingThread;
071  protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers;
072  private boolean exitOnDispatchException;
073
074  public AsyncDispatcher() {
075    this(new LinkedBlockingQueue<Event>());
076  }
077
078  public AsyncDispatcher(BlockingQueue<Event> eventQueue) {
079    super("Dispatcher");
080    this.eventQueue = eventQueue;
081    this.eventDispatchers = new HashMap<Class<? extends Enum>, EventHandler>();
082  }
083
084  Runnable createThread() {
085    return new Runnable() {
086      @Override
087      public void run() {
088        while (!stopped && !Thread.currentThread().isInterrupted()) {
089          drained = eventQueue.isEmpty();
090          // blockNewEvents is only set when dispatcher is draining to stop,
091          // adding this check is to avoid the overhead of acquiring the lock
092          // and calling notify every time in the normal run of the loop.
093          if (blockNewEvents) {
094            synchronized (waitForDrained) {
095              if (drained) {
096                waitForDrained.notify();
097              }
098            }
099          }
100          Event event;
101          try {
102            event = eventQueue.take();
103          } catch(InterruptedException ie) {
104            if (!stopped) {
105              LOG.warn("AsyncDispatcher thread interrupted", ie);
106            }
107            return;
108          }
109          if (event != null) {
110            dispatch(event);
111          }
112        }
113      }
114    };
115  }
116
117  @Override
118  protected void serviceInit(Configuration conf) throws Exception {
119    this.exitOnDispatchException =
120        conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
121          Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
122    super.serviceInit(conf);
123  }
124
125  @Override
126  protected void serviceStart() throws Exception {
127    //start all the components
128    super.serviceStart();
129    eventHandlingThread = new Thread(createThread());
130    eventHandlingThread.setName("AsyncDispatcher event handler");
131    eventHandlingThread.start();
132  }
133
134  public void setDrainEventsOnStop() {
135    drainEventsOnStop = true;
136  }
137
138  @Override
139  protected void serviceStop() throws Exception {
140    if (drainEventsOnStop) {
141      blockNewEvents = true;
142      LOG.info("AsyncDispatcher is draining to stop, igonring any new events.");
143      long endTime = System.currentTimeMillis() + getConfig()
144          .getLong(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT,
145              YarnConfiguration.DEFAULT_DISPATCHER_DRAIN_EVENTS_TIMEOUT);
146
147      synchronized (waitForDrained) {
148        while (!drained && eventHandlingThread != null
149            && eventHandlingThread.isAlive()
150            && System.currentTimeMillis() < endTime) {
151          waitForDrained.wait(1000);
152          LOG.info("Waiting for AsyncDispatcher to drain.");
153        }
154      }
155    }
156    stopped = true;
157    if (eventHandlingThread != null) {
158      eventHandlingThread.interrupt();
159      try {
160        eventHandlingThread.join();
161      } catch (InterruptedException ie) {
162        LOG.warn("Interrupted Exception while stopping", ie);
163      }
164    }
165
166    // stop all the components
167    super.serviceStop();
168  }
169
170  @SuppressWarnings("unchecked")
171  protected void dispatch(Event event) {
172    //all events go thru this loop
173    if (LOG.isDebugEnabled()) {
174      LOG.debug("Dispatching the event " + event.getClass().getName() + "."
175          + event.toString());
176    }
177
178    Class<? extends Enum> type = event.getType().getDeclaringClass();
179
180    try{
181      EventHandler handler = eventDispatchers.get(type);
182      if(handler != null) {
183        handler.handle(event);
184      } else {
185        throw new Exception("No handler for registered for " + type);
186      }
187    } catch (Throwable t) {
188      //TODO Maybe log the state of the queue
189      LOG.fatal("Error in dispatcher thread", t);
190      // If serviceStop is called, we should exit this thread gracefully.
191      if (exitOnDispatchException
192          && (ShutdownHookManager.get().isShutdownInProgress()) == false
193          && stopped == false) {
194        Thread shutDownThread = new Thread(createShutDownThread());
195        shutDownThread.setName("AsyncDispatcher ShutDown handler");
196        shutDownThread.start();
197      }
198    }
199  }
200
201  @SuppressWarnings("unchecked")
202  @Override
203  public void register(Class<? extends Enum> eventType,
204      EventHandler handler) {
205    /* check to see if we have a listener registered */
206    EventHandler<Event> registeredHandler = (EventHandler<Event>)
207    eventDispatchers.get(eventType);
208    LOG.info("Registering " + eventType + " for " + handler.getClass());
209    if (registeredHandler == null) {
210      eventDispatchers.put(eventType, handler);
211    } else if (!(registeredHandler instanceof MultiListenerHandler)){
212      /* for multiple listeners of an event add the multiple listener handler */
213      MultiListenerHandler multiHandler = new MultiListenerHandler();
214      multiHandler.addHandler(registeredHandler);
215      multiHandler.addHandler(handler);
216      eventDispatchers.put(eventType, multiHandler);
217    } else {
218      /* already a multilistener, just add to it */
219      MultiListenerHandler multiHandler
220      = (MultiListenerHandler) registeredHandler;
221      multiHandler.addHandler(handler);
222    }
223  }
224
225  @Override
226  public EventHandler getEventHandler() {
227    if (handlerInstance == null) {
228      handlerInstance = new GenericEventHandler();
229    }
230    return handlerInstance;
231  }
232
233  class GenericEventHandler implements EventHandler<Event> {
234    public void handle(Event event) {
235      if (blockNewEvents) {
236        return;
237      }
238      drained = false;
239
240      /* all this method does is enqueue all the events onto the queue */
241      int qSize = eventQueue.size();
242      if (qSize != 0 && qSize % 1000 == 0
243          && lastEventQueueSizeLogged != qSize) {
244        lastEventQueueSizeLogged = qSize;
245        LOG.info("Size of event-queue is " + qSize);
246      }
247      int remCapacity = eventQueue.remainingCapacity();
248      if (remCapacity < 1000) {
249        LOG.warn("Very low remaining capacity in the event-queue: "
250            + remCapacity);
251      }
252      try {
253        eventQueue.put(event);
254      } catch (InterruptedException e) {
255        if (!stopped) {
256          LOG.warn("AsyncDispatcher thread interrupted", e);
257        }
258        // Need to reset drained flag to true if event queue is empty,
259        // otherwise dispatcher will hang on stop.
260        drained = eventQueue.isEmpty();
261        throw new YarnRuntimeException(e);
262      }
263    };
264  }
265
266  @VisibleForTesting
267  protected boolean isEventThreadWaiting() {
268    return eventHandlingThread.getState() == Thread.State.WAITING;
269  }
270
271  /**
272   * Multiplexing an event. Sending it to different handlers that
273   * are interested in the event.
274   * @param <T> the type of event these multiple handlers are interested in.
275   */
276  static class MultiListenerHandler implements EventHandler<Event> {
277    List<EventHandler<Event>> listofHandlers;
278
279    public MultiListenerHandler() {
280      listofHandlers = new ArrayList<EventHandler<Event>>();
281    }
282
283    @Override
284    public void handle(Event event) {
285      for (EventHandler<Event> handler: listofHandlers) {
286        handler.handle(event);
287      }
288    }
289
290    void addHandler(EventHandler<Event> handler) {
291      listofHandlers.add(handler);
292    }
293
294  }
295
296  Runnable createShutDownThread() {
297    return new Runnable() {
298      @Override
299      public void run() {
300        LOG.info("Exiting, bbye..");
301        System.exit(-1);
302      }
303    };
304  }
305}