001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.yarn.event;
020    
021    import java.util.ArrayList;
022    import java.util.HashMap;
023    import java.util.List;
024    import java.util.Map;
025    import java.util.concurrent.BlockingQueue;
026    import java.util.concurrent.LinkedBlockingQueue;
027    
028    import org.apache.commons.logging.Log;
029    import org.apache.commons.logging.LogFactory;
030    import org.apache.hadoop.classification.InterfaceAudience.Public;
031    import org.apache.hadoop.classification.InterfaceStability.Evolving;
032    import org.apache.hadoop.conf.Configuration;
033    import org.apache.hadoop.service.AbstractService;
034    import org.apache.hadoop.util.ShutdownHookManager;
035    import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
036    
037    /**
038     * Dispatches {@link Event}s in a separate thread. Currently only single thread
039     * does that. Potentially there could be multiple channels for each event type
040     * class and a thread pool can be used to dispatch the events.
041     */
042    @SuppressWarnings("rawtypes")
043    @Public
044    @Evolving
045    public class AsyncDispatcher extends AbstractService implements Dispatcher {
046    
047      private static final Log LOG = LogFactory.getLog(AsyncDispatcher.class);
048    
049      private final BlockingQueue<Event> eventQueue;
050      private volatile boolean stopped = false;
051    
052      // Configuration flag for enabling/disabling draining dispatcher's events on
053      // stop functionality.
054      private volatile boolean drainEventsOnStop = false;
055    
056      // Indicates all the remaining dispatcher's events on stop have been drained
057      // and processed.
058      private volatile boolean drained = true;
059      private Object waitForDrained = new Object();
060    
061      // For drainEventsOnStop enabled only, block newly coming events into the
062      // queue while stopping.
063      private volatile boolean blockNewEvents = false;
064      private EventHandler handlerInstance = null;
065    
066      private Thread eventHandlingThread;
067      protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers;
068      private boolean exitOnDispatchException;
069    
070      public AsyncDispatcher() {
071        this(new LinkedBlockingQueue<Event>());
072      }
073    
074      public AsyncDispatcher(BlockingQueue<Event> eventQueue) {
075        super("Dispatcher");
076        this.eventQueue = eventQueue;
077        this.eventDispatchers = new HashMap<Class<? extends Enum>, EventHandler>();
078      }
079    
080      Runnable createThread() {
081        return new Runnable() {
082          @Override
083          public void run() {
084            while (!stopped && !Thread.currentThread().isInterrupted()) {
085              drained = eventQueue.isEmpty();
086              // blockNewEvents is only set when dispatcher is draining to stop,
087              // adding this check is to avoid the overhead of acquiring the lock
088              // and calling notify every time in the normal run of the loop.
089              if (blockNewEvents) {
090                synchronized (waitForDrained) {
091                  if (drained) {
092                    waitForDrained.notify();
093                  }
094                }
095              }
096              Event event;
097              try {
098                event = eventQueue.take();
099              } catch(InterruptedException ie) {
100                if (!stopped) {
101                  LOG.warn("AsyncDispatcher thread interrupted", ie);
102                }
103                return;
104              }
105              if (event != null) {
106                dispatch(event);
107              }
108            }
109          }
110        };
111      }
112    
113      @Override
114      protected void serviceInit(Configuration conf) throws Exception {
115        this.exitOnDispatchException =
116            conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
117              Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
118        super.serviceInit(conf);
119      }
120    
121      @Override
122      protected void serviceStart() throws Exception {
123        //start all the components
124        super.serviceStart();
125        eventHandlingThread = new Thread(createThread());
126        eventHandlingThread.setName("AsyncDispatcher event handler");
127        eventHandlingThread.start();
128      }
129    
130      public void setDrainEventsOnStop() {
131        drainEventsOnStop = true;
132      }
133    
134      @Override
135      protected void serviceStop() throws Exception {
136        if (drainEventsOnStop) {
137          blockNewEvents = true;
138          LOG.info("AsyncDispatcher is draining to stop, igonring any new events.");
139          synchronized (waitForDrained) {
140            while (!drained && eventHandlingThread.isAlive()) {
141              waitForDrained.wait(1000);
142              LOG.info("Waiting for AsyncDispatcher to drain.");
143            }
144          }
145        }
146        stopped = true;
147        if (eventHandlingThread != null) {
148          eventHandlingThread.interrupt();
149          try {
150            eventHandlingThread.join();
151          } catch (InterruptedException ie) {
152            LOG.warn("Interrupted Exception while stopping", ie);
153          }
154        }
155    
156        // stop all the components
157        super.serviceStop();
158      }
159    
160      @SuppressWarnings("unchecked")
161      protected void dispatch(Event event) {
162        //all events go thru this loop
163        if (LOG.isDebugEnabled()) {
164          LOG.debug("Dispatching the event " + event.getClass().getName() + "."
165              + event.toString());
166        }
167    
168        Class<? extends Enum> type = event.getType().getDeclaringClass();
169    
170        try{
171          EventHandler handler = eventDispatchers.get(type);
172          if(handler != null) {
173            handler.handle(event);
174          } else {
175            throw new Exception("No handler for registered for " + type);
176          }
177        }
178        catch (Throwable t) {
179          //TODO Maybe log the state of the queue
180          LOG.fatal("Error in dispatcher thread", t);
181          if (exitOnDispatchException
182              && (ShutdownHookManager.get().isShutdownInProgress()) == false) {
183            LOG.info("Exiting, bbye..");
184            System.exit(-1);
185          }
186        }
187      }
188    
189      @SuppressWarnings("unchecked")
190      @Override
191      public void register(Class<? extends Enum> eventType,
192          EventHandler handler) {
193        /* check to see if we have a listener registered */
194        EventHandler<Event> registeredHandler = (EventHandler<Event>)
195        eventDispatchers.get(eventType);
196        LOG.info("Registering " + eventType + " for " + handler.getClass());
197        if (registeredHandler == null) {
198          eventDispatchers.put(eventType, handler);
199        } else if (!(registeredHandler instanceof MultiListenerHandler)){
200          /* for multiple listeners of an event add the multiple listener handler */
201          MultiListenerHandler multiHandler = new MultiListenerHandler();
202          multiHandler.addHandler(registeredHandler);
203          multiHandler.addHandler(handler);
204          eventDispatchers.put(eventType, multiHandler);
205        } else {
206          /* already a multilistener, just add to it */
207          MultiListenerHandler multiHandler
208          = (MultiListenerHandler) registeredHandler;
209          multiHandler.addHandler(handler);
210        }
211      }
212    
213      @Override
214      public EventHandler getEventHandler() {
215        if (handlerInstance == null) {
216          handlerInstance = new GenericEventHandler();
217        }
218        return handlerInstance;
219      }
220    
221      class GenericEventHandler implements EventHandler<Event> {
222        public void handle(Event event) {
223          if (blockNewEvents) {
224            return;
225          }
226          drained = false;
227    
228          /* all this method does is enqueue all the events onto the queue */
229          int qSize = eventQueue.size();
230          if (qSize !=0 && qSize %1000 == 0) {
231            LOG.info("Size of event-queue is " + qSize);
232          }
233          int remCapacity = eventQueue.remainingCapacity();
234          if (remCapacity < 1000) {
235            LOG.warn("Very low remaining capacity in the event-queue: "
236                + remCapacity);
237          }
238          try {
239            eventQueue.put(event);
240          } catch (InterruptedException e) {
241            if (!stopped) {
242              LOG.warn("AsyncDispatcher thread interrupted", e);
243            }
244            throw new YarnRuntimeException(e);
245          }
246        };
247      }
248    
249      /**
250       * Multiplexing an event. Sending it to different handlers that
251       * are interested in the event.
252       * @param <T> the type of event these multiple handlers are interested in.
253       */
254      static class MultiListenerHandler implements EventHandler<Event> {
255        List<EventHandler<Event>> listofHandlers;
256    
257        public MultiListenerHandler() {
258          listofHandlers = new ArrayList<EventHandler<Event>>();
259        }
260    
261        @Override
262        public void handle(Event event) {
263          for (EventHandler<Event> handler: listofHandlers) {
264            handler.handle(event);
265          }
266        }
267    
268        void addHandler(EventHandler<Event> handler) {
269          listofHandlers.add(handler);
270        }
271    
272      }
273    }