001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.yarn.event;
020
021import java.util.ArrayList;
022import java.util.HashMap;
023import java.util.List;
024import java.util.Map;
025import java.util.concurrent.BlockingQueue;
026import java.util.concurrent.LinkedBlockingQueue;
027
028import org.apache.commons.logging.Log;
029import org.apache.commons.logging.LogFactory;
030import org.apache.hadoop.classification.InterfaceAudience.Public;
031import org.apache.hadoop.classification.InterfaceStability.Evolving;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.service.AbstractService;
034import org.apache.hadoop.util.ShutdownHookManager;
035import org.apache.hadoop.yarn.conf.YarnConfiguration;
036import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
037
038import com.google.common.annotations.VisibleForTesting;
039
040/**
041 * Dispatches {@link Event}s in a separate thread. Currently only single thread
042 * does that. Potentially there could be multiple channels for each event type
043 * class and a thread pool can be used to dispatch the events.
044 */
045@SuppressWarnings("rawtypes")
046@Public
047@Evolving
048public class AsyncDispatcher extends AbstractService implements Dispatcher {
049
050  private static final Log LOG = LogFactory.getLog(AsyncDispatcher.class);
051
052  private final BlockingQueue<Event> eventQueue;
053  private volatile boolean stopped = false;
054
055  // Configuration flag for enabling/disabling draining dispatcher's events on
056  // stop functionality.
057  private volatile boolean drainEventsOnStop = false;
058
059  // Indicates all the remaining dispatcher's events on stop have been drained
060  // and processed.
061  private volatile boolean drained = true;
062  private Object waitForDrained = new Object();
063
064  // For drainEventsOnStop enabled only, block newly coming events into the
065  // queue while stopping.
066  private volatile boolean blockNewEvents = false;
067  private EventHandler handlerInstance = null;
068
069  private Thread eventHandlingThread;
070  protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers;
071  private boolean exitOnDispatchException;
072
073  public AsyncDispatcher() {
074    this(new LinkedBlockingQueue<Event>());
075  }
076
077  public AsyncDispatcher(BlockingQueue<Event> eventQueue) {
078    super("Dispatcher");
079    this.eventQueue = eventQueue;
080    this.eventDispatchers = new HashMap<Class<? extends Enum>, EventHandler>();
081  }
082
083  Runnable createThread() {
084    return new Runnable() {
085      @Override
086      public void run() {
087        while (!stopped && !Thread.currentThread().isInterrupted()) {
088          drained = eventQueue.isEmpty();
089          // blockNewEvents is only set when dispatcher is draining to stop,
090          // adding this check is to avoid the overhead of acquiring the lock
091          // and calling notify every time in the normal run of the loop.
092          if (blockNewEvents) {
093            synchronized (waitForDrained) {
094              if (drained) {
095                waitForDrained.notify();
096              }
097            }
098          }
099          Event event;
100          try {
101            event = eventQueue.take();
102          } catch(InterruptedException ie) {
103            if (!stopped) {
104              LOG.warn("AsyncDispatcher thread interrupted", ie);
105            }
106            return;
107          }
108          if (event != null) {
109            dispatch(event);
110          }
111        }
112      }
113    };
114  }
115
116  @Override
117  protected void serviceInit(Configuration conf) throws Exception {
118    this.exitOnDispatchException =
119        conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
120          Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
121    super.serviceInit(conf);
122  }
123
124  @Override
125  protected void serviceStart() throws Exception {
126    //start all the components
127    super.serviceStart();
128    eventHandlingThread = new Thread(createThread());
129    eventHandlingThread.setName("AsyncDispatcher event handler");
130    eventHandlingThread.start();
131  }
132
133  public void setDrainEventsOnStop() {
134    drainEventsOnStop = true;
135  }
136
137  @Override
138  protected void serviceStop() throws Exception {
139    if (drainEventsOnStop) {
140      blockNewEvents = true;
141      LOG.info("AsyncDispatcher is draining to stop, igonring any new events.");
142      long endTime = System.currentTimeMillis() + getConfig()
143          .getLong(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT,
144              YarnConfiguration.DEFAULT_DISPATCHER_DRAIN_EVENTS_TIMEOUT);
145
146      synchronized (waitForDrained) {
147        while (!drained && eventHandlingThread != null
148            && eventHandlingThread.isAlive()
149            && System.currentTimeMillis() < endTime) {
150          waitForDrained.wait(1000);
151          LOG.info("Waiting for AsyncDispatcher to drain.");
152        }
153      }
154    }
155    stopped = true;
156    if (eventHandlingThread != null) {
157      eventHandlingThread.interrupt();
158      try {
159        eventHandlingThread.join();
160      } catch (InterruptedException ie) {
161        LOG.warn("Interrupted Exception while stopping", ie);
162      }
163    }
164
165    // stop all the components
166    super.serviceStop();
167  }
168
169  @SuppressWarnings("unchecked")
170  protected void dispatch(Event event) {
171    //all events go thru this loop
172    if (LOG.isDebugEnabled()) {
173      LOG.debug("Dispatching the event " + event.getClass().getName() + "."
174          + event.toString());
175    }
176
177    Class<? extends Enum> type = event.getType().getDeclaringClass();
178
179    try{
180      EventHandler handler = eventDispatchers.get(type);
181      if(handler != null) {
182        handler.handle(event);
183      } else {
184        throw new Exception("No handler for registered for " + type);
185      }
186    } catch (Throwable t) {
187      //TODO Maybe log the state of the queue
188      LOG.fatal("Error in dispatcher thread", t);
189      // If serviceStop is called, we should exit this thread gracefully.
190      if (exitOnDispatchException
191          && (ShutdownHookManager.get().isShutdownInProgress()) == false
192          && stopped == false) {
193        Thread shutDownThread = new Thread(createShutDownThread());
194        shutDownThread.setName("AsyncDispatcher ShutDown handler");
195        shutDownThread.start();
196      }
197    }
198  }
199
200  @SuppressWarnings("unchecked")
201  @Override
202  public void register(Class<? extends Enum> eventType,
203      EventHandler handler) {
204    /* check to see if we have a listener registered */
205    EventHandler<Event> registeredHandler = (EventHandler<Event>)
206    eventDispatchers.get(eventType);
207    LOG.info("Registering " + eventType + " for " + handler.getClass());
208    if (registeredHandler == null) {
209      eventDispatchers.put(eventType, handler);
210    } else if (!(registeredHandler instanceof MultiListenerHandler)){
211      /* for multiple listeners of an event add the multiple listener handler */
212      MultiListenerHandler multiHandler = new MultiListenerHandler();
213      multiHandler.addHandler(registeredHandler);
214      multiHandler.addHandler(handler);
215      eventDispatchers.put(eventType, multiHandler);
216    } else {
217      /* already a multilistener, just add to it */
218      MultiListenerHandler multiHandler
219      = (MultiListenerHandler) registeredHandler;
220      multiHandler.addHandler(handler);
221    }
222  }
223
224  @Override
225  public EventHandler getEventHandler() {
226    if (handlerInstance == null) {
227      handlerInstance = new GenericEventHandler();
228    }
229    return handlerInstance;
230  }
231
232  class GenericEventHandler implements EventHandler<Event> {
233    public void handle(Event event) {
234      if (blockNewEvents) {
235        return;
236      }
237      drained = false;
238
239      /* all this method does is enqueue all the events onto the queue */
240      int qSize = eventQueue.size();
241      if (qSize !=0 && qSize %1000 == 0) {
242        LOG.info("Size of event-queue is " + qSize);
243      }
244      int remCapacity = eventQueue.remainingCapacity();
245      if (remCapacity < 1000) {
246        LOG.warn("Very low remaining capacity in the event-queue: "
247            + remCapacity);
248      }
249      try {
250        eventQueue.put(event);
251      } catch (InterruptedException e) {
252        if (!stopped) {
253          LOG.warn("AsyncDispatcher thread interrupted", e);
254        }
255        // Need to reset drained flag to true if event queue is empty,
256        // otherwise dispatcher will hang on stop.
257        drained = eventQueue.isEmpty();
258        throw new YarnRuntimeException(e);
259      }
260    };
261  }
262
263  @VisibleForTesting
264  protected boolean isEventThreadWaiting() {
265    return eventHandlingThread.getState() == Thread.State.WAITING;
266  }
267
268  /**
269   * Multiplexing an event. Sending it to different handlers that
270   * are interested in the event.
271   * @param <T> the type of event these multiple handlers are interested in.
272   */
273  static class MultiListenerHandler implements EventHandler<Event> {
274    List<EventHandler<Event>> listofHandlers;
275
276    public MultiListenerHandler() {
277      listofHandlers = new ArrayList<EventHandler<Event>>();
278    }
279
280    @Override
281    public void handle(Event event) {
282      for (EventHandler<Event> handler: listofHandlers) {
283        handler.handle(event);
284      }
285    }
286
287    void addHandler(EventHandler<Event> handler) {
288      listofHandlers.add(handler);
289    }
290
291  }
292
293  Runnable createShutDownThread() {
294    return new Runnable() {
295      @Override
296      public void run() {
297        LOG.info("Exiting, bbye..");
298        System.exit(-1);
299      }
300    };
301  }
302}