001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.yarn.event;
020
021import java.util.ArrayList;
022import java.util.HashMap;
023import java.util.List;
024import java.util.Map;
025import java.util.concurrent.BlockingQueue;
026import java.util.concurrent.LinkedBlockingQueue;
027
028import org.apache.commons.logging.Log;
029import org.apache.commons.logging.LogFactory;
030import org.apache.hadoop.classification.InterfaceAudience.Public;
031import org.apache.hadoop.classification.InterfaceStability.Evolving;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.service.AbstractService;
034import org.apache.hadoop.util.ShutdownHookManager;
035import org.apache.hadoop.yarn.conf.YarnConfiguration;
036import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
037
038import com.google.common.annotations.VisibleForTesting;
039
040/**
041 * Dispatches {@link Event}s in a separate thread. Currently only single thread
042 * does that. Potentially there could be multiple channels for each event type
043 * class and a thread pool can be used to dispatch the events.
044 */
045@SuppressWarnings("rawtypes")
046@Public
047@Evolving
048public class AsyncDispatcher extends AbstractService implements Dispatcher {
049
050  private static final Log LOG = LogFactory.getLog(AsyncDispatcher.class);
051
052  private final BlockingQueue<Event> eventQueue;
053  private volatile boolean stopped = false;
054
055  // Configuration flag for enabling/disabling draining dispatcher's events on
056  // stop functionality.
057  private volatile boolean drainEventsOnStop = false;
058
059  // Indicates all the remaining dispatcher's events on stop have been drained
060  // and processed.
061  private volatile boolean drained = true;
062  private Object waitForDrained = new Object();
063
064  // For drainEventsOnStop enabled only, block newly coming events into the
065  // queue while stopping.
066  private volatile boolean blockNewEvents = false;
067  private EventHandler handlerInstance = null;
068
069  private Thread eventHandlingThread;
070  protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers;
071  private boolean exitOnDispatchException;
072
073  public AsyncDispatcher() {
074    this(new LinkedBlockingQueue<Event>());
075  }
076
077  public AsyncDispatcher(BlockingQueue<Event> eventQueue) {
078    super("Dispatcher");
079    this.eventQueue = eventQueue;
080    this.eventDispatchers = new HashMap<Class<? extends Enum>, EventHandler>();
081  }
082
083  Runnable createThread() {
084    return new Runnable() {
085      @Override
086      public void run() {
087        while (!stopped && !Thread.currentThread().isInterrupted()) {
088          drained = eventQueue.isEmpty();
089          // blockNewEvents is only set when dispatcher is draining to stop,
090          // adding this check is to avoid the overhead of acquiring the lock
091          // and calling notify every time in the normal run of the loop.
092          if (blockNewEvents) {
093            synchronized (waitForDrained) {
094              if (drained) {
095                waitForDrained.notify();
096              }
097            }
098          }
099          Event event;
100          try {
101            event = eventQueue.take();
102          } catch(InterruptedException ie) {
103            if (!stopped) {
104              LOG.warn("AsyncDispatcher thread interrupted", ie);
105            }
106            return;
107          }
108          if (event != null) {
109            dispatch(event);
110          }
111        }
112      }
113    };
114  }
115
116  @Override
117  protected void serviceInit(Configuration conf) throws Exception {
118    this.exitOnDispatchException =
119        conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
120          Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
121    super.serviceInit(conf);
122  }
123
124  @Override
125  protected void serviceStart() throws Exception {
126    //start all the components
127    super.serviceStart();
128    eventHandlingThread = new Thread(createThread());
129    eventHandlingThread.setName("AsyncDispatcher event handler");
130    eventHandlingThread.start();
131  }
132
133  public void setDrainEventsOnStop() {
134    drainEventsOnStop = true;
135  }
136
137  @Override
138  protected void serviceStop() throws Exception {
139    if (drainEventsOnStop) {
140      blockNewEvents = true;
141      LOG.info("AsyncDispatcher is draining to stop, igonring any new events.");
142      long endTime = System.currentTimeMillis() + getConfig()
143          .getLong(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT,
144              YarnConfiguration.DEFAULT_DISPATCHER_DRAIN_EVENTS_TIMEOUT);
145
146      synchronized (waitForDrained) {
147        while (!drained && eventHandlingThread != null
148            && eventHandlingThread.isAlive()
149            && System.currentTimeMillis() < endTime) {
150          waitForDrained.wait(1000);
151          LOG.info("Waiting for AsyncDispatcher to drain. Thread state is :" +
152              eventHandlingThread.getState());
153        }
154      }
155    }
156    stopped = true;
157    if (eventHandlingThread != null) {
158      eventHandlingThread.interrupt();
159      try {
160        eventHandlingThread.join();
161      } catch (InterruptedException ie) {
162        LOG.warn("Interrupted Exception while stopping", ie);
163      }
164    }
165
166    // stop all the components
167    super.serviceStop();
168  }
169
170  @SuppressWarnings("unchecked")
171  protected void dispatch(Event event) {
172    //all events go thru this loop
173    if (LOG.isDebugEnabled()) {
174      LOG.debug("Dispatching the event " + event.getClass().getName() + "."
175          + event.toString());
176    }
177
178    Class<? extends Enum> type = event.getType().getDeclaringClass();
179
180    try{
181      EventHandler handler = eventDispatchers.get(type);
182      if(handler != null) {
183        handler.handle(event);
184      } else {
185        throw new Exception("No handler for registered for " + type);
186      }
187    } catch (Throwable t) {
188      //TODO Maybe log the state of the queue
189      LOG.fatal("Error in dispatcher thread", t);
190      // If serviceStop is called, we should exit this thread gracefully.
191      if (exitOnDispatchException
192          && (ShutdownHookManager.get().isShutdownInProgress()) == false
193          && stopped == false) {
194        Thread shutDownThread = new Thread(createShutDownThread());
195        shutDownThread.setName("AsyncDispatcher ShutDown handler");
196        shutDownThread.start();
197      }
198    }
199  }
200
201  @SuppressWarnings("unchecked")
202  @Override
203  public void register(Class<? extends Enum> eventType,
204      EventHandler handler) {
205    /* check to see if we have a listener registered */
206    EventHandler<Event> registeredHandler = (EventHandler<Event>)
207    eventDispatchers.get(eventType);
208    LOG.info("Registering " + eventType + " for " + handler.getClass());
209    if (registeredHandler == null) {
210      eventDispatchers.put(eventType, handler);
211    } else if (!(registeredHandler instanceof MultiListenerHandler)){
212      /* for multiple listeners of an event add the multiple listener handler */
213      MultiListenerHandler multiHandler = new MultiListenerHandler();
214      multiHandler.addHandler(registeredHandler);
215      multiHandler.addHandler(handler);
216      eventDispatchers.put(eventType, multiHandler);
217    } else {
218      /* already a multilistener, just add to it */
219      MultiListenerHandler multiHandler
220      = (MultiListenerHandler) registeredHandler;
221      multiHandler.addHandler(handler);
222    }
223  }
224
225  @Override
226  public EventHandler getEventHandler() {
227    if (handlerInstance == null) {
228      handlerInstance = new GenericEventHandler();
229    }
230    return handlerInstance;
231  }
232
233  class GenericEventHandler implements EventHandler<Event> {
234    public void handle(Event event) {
235      if (blockNewEvents) {
236        return;
237      }
238      drained = false;
239
240      /* all this method does is enqueue all the events onto the queue */
241      int qSize = eventQueue.size();
242      if (qSize !=0 && qSize %1000 == 0) {
243        LOG.info("Size of event-queue is " + qSize);
244      }
245      int remCapacity = eventQueue.remainingCapacity();
246      if (remCapacity < 1000) {
247        LOG.warn("Very low remaining capacity in the event-queue: "
248            + remCapacity);
249      }
250      try {
251        eventQueue.put(event);
252      } catch (InterruptedException e) {
253        if (!stopped) {
254          LOG.warn("AsyncDispatcher thread interrupted", e);
255        }
256        // Need to reset drained flag to true if event queue is empty,
257        // otherwise dispatcher will hang on stop.
258        drained = eventQueue.isEmpty();
259        throw new YarnRuntimeException(e);
260      }
261    };
262  }
263
264  /**
265   * Multiplexing an event. Sending it to different handlers that
266   * are interested in the event.
267   * @param <T> the type of event these multiple handlers are interested in.
268   */
269  static class MultiListenerHandler implements EventHandler<Event> {
270    List<EventHandler<Event>> listofHandlers;
271
272    public MultiListenerHandler() {
273      listofHandlers = new ArrayList<EventHandler<Event>>();
274    }
275
276    @Override
277    public void handle(Event event) {
278      for (EventHandler<Event> handler: listofHandlers) {
279        handler.handle(event);
280      }
281    }
282
283    void addHandler(EventHandler<Event> handler) {
284      listofHandlers.add(handler);
285    }
286
287  }
288
289  Runnable createShutDownThread() {
290    return new Runnable() {
291      @Override
292      public void run() {
293        LOG.info("Exiting, bbye..");
294        System.exit(-1);
295      }
296    };
297  }
298
299  @VisibleForTesting
300  protected boolean isEventThreadWaiting() {
301    return eventHandlingThread.getState() == Thread.State.WAITING;
302  }
303
304  @VisibleForTesting
305  protected boolean isDrained() {
306    return this.drained;
307  }
308}