001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.yarn.event;
020
021import java.util.ArrayList;
022import java.util.HashMap;
023import java.util.List;
024import java.util.Map;
025import java.util.concurrent.BlockingQueue;
026import java.util.concurrent.LinkedBlockingQueue;
027
028import org.apache.commons.logging.Log;
029import org.apache.commons.logging.LogFactory;
030import org.apache.hadoop.classification.InterfaceAudience.Public;
031import org.apache.hadoop.classification.InterfaceStability.Evolving;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.service.AbstractService;
034import org.apache.hadoop.util.ShutdownHookManager;
035import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
036
037import com.google.common.annotations.VisibleForTesting;
038
039/**
040 * Dispatches {@link Event}s in a separate thread. Currently only single thread
041 * does that. Potentially there could be multiple channels for each event type
042 * class and a thread pool can be used to dispatch the events.
043 */
044@SuppressWarnings("rawtypes")
045@Public
046@Evolving
047public class AsyncDispatcher extends AbstractService implements Dispatcher {
048
049  private static final Log LOG = LogFactory.getLog(AsyncDispatcher.class);
050
051  private final BlockingQueue<Event> eventQueue;
052  private volatile boolean stopped = false;
053
054  // Configuration flag for enabling/disabling draining dispatcher's events on
055  // stop functionality.
056  private volatile boolean drainEventsOnStop = false;
057
058  // Indicates all the remaining dispatcher's events on stop have been drained
059  // and processed.
060  private volatile boolean drained = true;
061  private Object waitForDrained = new Object();
062
063  // For drainEventsOnStop enabled only, block newly coming events into the
064  // queue while stopping.
065  private volatile boolean blockNewEvents = false;
066  private EventHandler handlerInstance = null;
067
068  private Thread eventHandlingThread;
069  protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers;
070  private boolean exitOnDispatchException;
071
072  public AsyncDispatcher() {
073    this(new LinkedBlockingQueue<Event>());
074  }
075
076  public AsyncDispatcher(BlockingQueue<Event> eventQueue) {
077    super("Dispatcher");
078    this.eventQueue = eventQueue;
079    this.eventDispatchers = new HashMap<Class<? extends Enum>, EventHandler>();
080  }
081
082  Runnable createThread() {
083    return new Runnable() {
084      @Override
085      public void run() {
086        while (!stopped && !Thread.currentThread().isInterrupted()) {
087          drained = eventQueue.isEmpty();
088          // blockNewEvents is only set when dispatcher is draining to stop,
089          // adding this check is to avoid the overhead of acquiring the lock
090          // and calling notify every time in the normal run of the loop.
091          if (blockNewEvents) {
092            synchronized (waitForDrained) {
093              if (drained) {
094                waitForDrained.notify();
095              }
096            }
097          }
098          Event event;
099          try {
100            event = eventQueue.take();
101          } catch(InterruptedException ie) {
102            if (!stopped) {
103              LOG.warn("AsyncDispatcher thread interrupted", ie);
104            }
105            return;
106          }
107          if (event != null) {
108            dispatch(event);
109          }
110        }
111      }
112    };
113  }
114
115  @Override
116  protected void serviceInit(Configuration conf) throws Exception {
117    this.exitOnDispatchException =
118        conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
119          Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
120    super.serviceInit(conf);
121  }
122
123  @Override
124  protected void serviceStart() throws Exception {
125    //start all the components
126    super.serviceStart();
127    eventHandlingThread = new Thread(createThread());
128    eventHandlingThread.setName("AsyncDispatcher event handler");
129    eventHandlingThread.start();
130  }
131
132  public void setDrainEventsOnStop() {
133    drainEventsOnStop = true;
134  }
135
136  @Override
137  protected void serviceStop() throws Exception {
138    if (drainEventsOnStop) {
139      blockNewEvents = true;
140      LOG.info("AsyncDispatcher is draining to stop, igonring any new events.");
141      synchronized (waitForDrained) {
142        while (!drained && eventHandlingThread.isAlive()) {
143          waitForDrained.wait(1000);
144          LOG.info("Waiting for AsyncDispatcher to drain.");
145        }
146      }
147    }
148    stopped = true;
149    if (eventHandlingThread != null) {
150      eventHandlingThread.interrupt();
151      try {
152        eventHandlingThread.join();
153      } catch (InterruptedException ie) {
154        LOG.warn("Interrupted Exception while stopping", ie);
155      }
156    }
157
158    // stop all the components
159    super.serviceStop();
160  }
161
162  @SuppressWarnings("unchecked")
163  protected void dispatch(Event event) {
164    //all events go thru this loop
165    if (LOG.isDebugEnabled()) {
166      LOG.debug("Dispatching the event " + event.getClass().getName() + "."
167          + event.toString());
168    }
169
170    Class<? extends Enum> type = event.getType().getDeclaringClass();
171
172    try{
173      EventHandler handler = eventDispatchers.get(type);
174      if(handler != null) {
175        handler.handle(event);
176      } else {
177        throw new Exception("No handler for registered for " + type);
178      }
179    } catch (Throwable t) {
180      //TODO Maybe log the state of the queue
181      LOG.fatal("Error in dispatcher thread", t);
182      // If serviceStop is called, we should exit this thread gracefully.
183      if (exitOnDispatchException
184          && (ShutdownHookManager.get().isShutdownInProgress()) == false
185          && stopped == false) {
186        Thread shutDownThread = new Thread(createShutDownThread());
187        shutDownThread.setName("AsyncDispatcher ShutDown handler");
188        shutDownThread.start();
189      }
190    }
191  }
192
193  @SuppressWarnings("unchecked")
194  @Override
195  public void register(Class<? extends Enum> eventType,
196      EventHandler handler) {
197    /* check to see if we have a listener registered */
198    EventHandler<Event> registeredHandler = (EventHandler<Event>)
199    eventDispatchers.get(eventType);
200    LOG.info("Registering " + eventType + " for " + handler.getClass());
201    if (registeredHandler == null) {
202      eventDispatchers.put(eventType, handler);
203    } else if (!(registeredHandler instanceof MultiListenerHandler)){
204      /* for multiple listeners of an event add the multiple listener handler */
205      MultiListenerHandler multiHandler = new MultiListenerHandler();
206      multiHandler.addHandler(registeredHandler);
207      multiHandler.addHandler(handler);
208      eventDispatchers.put(eventType, multiHandler);
209    } else {
210      /* already a multilistener, just add to it */
211      MultiListenerHandler multiHandler
212      = (MultiListenerHandler) registeredHandler;
213      multiHandler.addHandler(handler);
214    }
215  }
216
217  @Override
218  public EventHandler getEventHandler() {
219    if (handlerInstance == null) {
220      handlerInstance = new GenericEventHandler();
221    }
222    return handlerInstance;
223  }
224
225  class GenericEventHandler implements EventHandler<Event> {
226    public void handle(Event event) {
227      if (blockNewEvents) {
228        return;
229      }
230      drained = false;
231
232      /* all this method does is enqueue all the events onto the queue */
233      int qSize = eventQueue.size();
234      if (qSize !=0 && qSize %1000 == 0) {
235        LOG.info("Size of event-queue is " + qSize);
236      }
237      int remCapacity = eventQueue.remainingCapacity();
238      if (remCapacity < 1000) {
239        LOG.warn("Very low remaining capacity in the event-queue: "
240            + remCapacity);
241      }
242      try {
243        eventQueue.put(event);
244      } catch (InterruptedException e) {
245        if (!stopped) {
246          LOG.warn("AsyncDispatcher thread interrupted", e);
247        }
248        throw new YarnRuntimeException(e);
249      }
250    };
251  }
252
253  /**
254   * Multiplexing an event. Sending it to different handlers that
255   * are interested in the event.
256   * @param <T> the type of event these multiple handlers are interested in.
257   */
258  static class MultiListenerHandler implements EventHandler<Event> {
259    List<EventHandler<Event>> listofHandlers;
260
261    public MultiListenerHandler() {
262      listofHandlers = new ArrayList<EventHandler<Event>>();
263    }
264
265    @Override
266    public void handle(Event event) {
267      for (EventHandler<Event> handler: listofHandlers) {
268        handler.handle(event);
269      }
270    }
271
272    void addHandler(EventHandler<Event> handler) {
273      listofHandlers.add(handler);
274    }
275
276  }
277
278  Runnable createShutDownThread() {
279    return new Runnable() {
280      @Override
281      public void run() {
282        LOG.info("Exiting, bbye..");
283        System.exit(-1);
284      }
285    };
286  }
287
288  @VisibleForTesting
289  protected boolean isDrained() {
290    return this.drained;
291  }
292}