001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.yarn.event;
020
021import java.util.ArrayList;
022import java.util.HashMap;
023import java.util.List;
024import java.util.Map;
025import java.util.concurrent.BlockingQueue;
026import java.util.concurrent.LinkedBlockingQueue;
027
028import org.apache.commons.logging.Log;
029import org.apache.commons.logging.LogFactory;
030import org.apache.hadoop.classification.InterfaceAudience.Public;
031import org.apache.hadoop.classification.InterfaceStability.Evolving;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.service.AbstractService;
034import org.apache.hadoop.util.ShutdownHookManager;
035import org.apache.hadoop.yarn.conf.YarnConfiguration;
036import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
037
038import com.google.common.annotations.VisibleForTesting;
039
040/**
041 * Dispatches {@link Event}s in a separate thread. Currently only single thread
042 * does that. Potentially there could be multiple channels for each event type
043 * class and a thread pool can be used to dispatch the events.
044 */
045@SuppressWarnings("rawtypes")
046@Public
047@Evolving
048public class AsyncDispatcher extends AbstractService implements Dispatcher {
049
050  private static final Log LOG = LogFactory.getLog(AsyncDispatcher.class);
051
052  private final BlockingQueue<Event> eventQueue;
053  private volatile int lastEventQueueSizeLogged = 0;
054  private volatile boolean stopped = false;
055
056  // Configuration flag for enabling/disabling draining dispatcher's events on
057  // stop functionality.
058  private volatile boolean drainEventsOnStop = false;
059
060  // Indicates all the remaining dispatcher's events on stop have been drained
061  // and processed.
062  private volatile boolean drained = true;
063  private Object waitForDrained = new Object();
064
065  // For drainEventsOnStop enabled only, block newly coming events into the
066  // queue while stopping.
067  private volatile boolean blockNewEvents = false;
068  private final EventHandler handlerInstance = new GenericEventHandler();
069
070  private Thread eventHandlingThread;
071  protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers;
072  private boolean exitOnDispatchException;
073
074  public AsyncDispatcher() {
075    this(new LinkedBlockingQueue<Event>());
076  }
077
078  public AsyncDispatcher(BlockingQueue<Event> eventQueue) {
079    super("Dispatcher");
080    this.eventQueue = eventQueue;
081    this.eventDispatchers = new HashMap<Class<? extends Enum>, EventHandler>();
082  }
083
084  Runnable createThread() {
085    return new Runnable() {
086      @Override
087      public void run() {
088        while (!stopped && !Thread.currentThread().isInterrupted()) {
089          drained = eventQueue.isEmpty();
090          // blockNewEvents is only set when dispatcher is draining to stop,
091          // adding this check is to avoid the overhead of acquiring the lock
092          // and calling notify every time in the normal run of the loop.
093          if (blockNewEvents) {
094            synchronized (waitForDrained) {
095              if (drained) {
096                waitForDrained.notify();
097              }
098            }
099          }
100          Event event;
101          try {
102            event = eventQueue.take();
103          } catch(InterruptedException ie) {
104            if (!stopped) {
105              LOG.warn("AsyncDispatcher thread interrupted", ie);
106            }
107            return;
108          }
109          if (event != null) {
110            dispatch(event);
111          }
112        }
113      }
114    };
115  }
116
117  @Override
118  protected void serviceInit(Configuration conf) throws Exception {
119    this.exitOnDispatchException =
120        conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
121          Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
122    super.serviceInit(conf);
123  }
124
125  @Override
126  protected void serviceStart() throws Exception {
127    //start all the components
128    super.serviceStart();
129    eventHandlingThread = new Thread(createThread());
130    eventHandlingThread.setName("AsyncDispatcher event handler");
131    eventHandlingThread.start();
132  }
133
134  public void setDrainEventsOnStop() {
135    drainEventsOnStop = true;
136  }
137
138  @Override
139  protected void serviceStop() throws Exception {
140    if (drainEventsOnStop) {
141      blockNewEvents = true;
142      LOG.info("AsyncDispatcher is draining to stop, igonring any new events.");
143      long endTime = System.currentTimeMillis() + getConfig()
144          .getLong(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT,
145              YarnConfiguration.DEFAULT_DISPATCHER_DRAIN_EVENTS_TIMEOUT);
146
147      synchronized (waitForDrained) {
148        while (!drained && eventHandlingThread != null
149            && eventHandlingThread.isAlive()
150            && System.currentTimeMillis() < endTime) {
151          waitForDrained.wait(1000);
152          LOG.info("Waiting for AsyncDispatcher to drain. Thread state is :" +
153              eventHandlingThread.getState());
154        }
155      }
156    }
157    stopped = true;
158    if (eventHandlingThread != null) {
159      eventHandlingThread.interrupt();
160      try {
161        eventHandlingThread.join();
162      } catch (InterruptedException ie) {
163        LOG.warn("Interrupted Exception while stopping", ie);
164      }
165    }
166
167    // stop all the components
168    super.serviceStop();
169  }
170
171  @SuppressWarnings("unchecked")
172  protected void dispatch(Event event) {
173    //all events go thru this loop
174    if (LOG.isDebugEnabled()) {
175      LOG.debug("Dispatching the event " + event.getClass().getName() + "."
176          + event.toString());
177    }
178
179    Class<? extends Enum> type = event.getType().getDeclaringClass();
180
181    try{
182      EventHandler handler = eventDispatchers.get(type);
183      if(handler != null) {
184        handler.handle(event);
185      } else {
186        throw new Exception("No handler for registered for " + type);
187      }
188    } catch (Throwable t) {
189      //TODO Maybe log the state of the queue
190      LOG.fatal("Error in dispatcher thread", t);
191      // If serviceStop is called, we should exit this thread gracefully.
192      if (exitOnDispatchException
193          && (ShutdownHookManager.get().isShutdownInProgress()) == false
194          && stopped == false) {
195        Thread shutDownThread = new Thread(createShutDownThread());
196        shutDownThread.setName("AsyncDispatcher ShutDown handler");
197        shutDownThread.start();
198      }
199    }
200  }
201
202  @SuppressWarnings("unchecked")
203  @Override
204  public void register(Class<? extends Enum> eventType,
205      EventHandler handler) {
206    /* check to see if we have a listener registered */
207    EventHandler<Event> registeredHandler = (EventHandler<Event>)
208    eventDispatchers.get(eventType);
209    LOG.info("Registering " + eventType + " for " + handler.getClass());
210    if (registeredHandler == null) {
211      eventDispatchers.put(eventType, handler);
212    } else if (!(registeredHandler instanceof MultiListenerHandler)){
213      /* for multiple listeners of an event add the multiple listener handler */
214      MultiListenerHandler multiHandler = new MultiListenerHandler();
215      multiHandler.addHandler(registeredHandler);
216      multiHandler.addHandler(handler);
217      eventDispatchers.put(eventType, multiHandler);
218    } else {
219      /* already a multilistener, just add to it */
220      MultiListenerHandler multiHandler
221      = (MultiListenerHandler) registeredHandler;
222      multiHandler.addHandler(handler);
223    }
224  }
225
226  @Override
227  public EventHandler getEventHandler() {
228    return handlerInstance;
229  }
230
231  class GenericEventHandler implements EventHandler<Event> {
232    public void handle(Event event) {
233      if (blockNewEvents) {
234        return;
235      }
236      drained = false;
237
238      /* all this method does is enqueue all the events onto the queue */
239      int qSize = eventQueue.size();
240      if (qSize != 0 && qSize % 1000 == 0
241          && lastEventQueueSizeLogged != qSize) {
242        lastEventQueueSizeLogged = qSize;
243        LOG.info("Size of event-queue is " + qSize);
244      }
245      int remCapacity = eventQueue.remainingCapacity();
246      if (remCapacity < 1000) {
247        LOG.warn("Very low remaining capacity in the event-queue: "
248            + remCapacity);
249      }
250      try {
251        eventQueue.put(event);
252      } catch (InterruptedException e) {
253        if (!stopped) {
254          LOG.warn("AsyncDispatcher thread interrupted", e);
255        }
256        // Need to reset drained flag to true if event queue is empty,
257        // otherwise dispatcher will hang on stop.
258        drained = eventQueue.isEmpty();
259        throw new YarnRuntimeException(e);
260      }
261    };
262  }
263
264  /**
265   * Multiplexing an event. Sending it to different handlers that
266   * are interested in the event.
267   * @param <T> the type of event these multiple handlers are interested in.
268   */
269  static class MultiListenerHandler implements EventHandler<Event> {
270    List<EventHandler<Event>> listofHandlers;
271
272    public MultiListenerHandler() {
273      listofHandlers = new ArrayList<EventHandler<Event>>();
274    }
275
276    @Override
277    public void handle(Event event) {
278      for (EventHandler<Event> handler: listofHandlers) {
279        handler.handle(event);
280      }
281    }
282
283    void addHandler(EventHandler<Event> handler) {
284      listofHandlers.add(handler);
285    }
286
287  }
288
289  Runnable createShutDownThread() {
290    return new Runnable() {
291      @Override
292      public void run() {
293        LOG.info("Exiting, bbye..");
294        System.exit(-1);
295      }
296    };
297  }
298
299  @VisibleForTesting
300  protected boolean isEventThreadWaiting() {
301    return eventHandlingThread.getState() == Thread.State.WAITING;
302  }
303
304  @VisibleForTesting
305  protected boolean isDrained() {
306    return this.drained;
307  }
308}