001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.yarn.event;
020    
021    import java.util.ArrayList;
022    import java.util.HashMap;
023    import java.util.List;
024    import java.util.Map;
025    import java.util.concurrent.BlockingQueue;
026    import java.util.concurrent.LinkedBlockingQueue;
027    
028    import org.apache.commons.logging.Log;
029    import org.apache.commons.logging.LogFactory;
030    import org.apache.hadoop.classification.InterfaceAudience.Public;
031    import org.apache.hadoop.classification.InterfaceStability.Evolving;
032    import org.apache.hadoop.conf.Configuration;
033    import org.apache.hadoop.service.AbstractService;
034    import org.apache.hadoop.util.ShutdownHookManager;
035    import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
036    
037    /**
038     * Dispatches {@link Event}s in a separate thread. Currently only single thread
039     * does that. Potentially there could be multiple channels for each event type
040     * class and a thread pool can be used to dispatch the events.
041     */
042    @SuppressWarnings("rawtypes")
043    @Public
044    @Evolving
045    public class AsyncDispatcher extends AbstractService implements Dispatcher {
046    
047      private static final Log LOG = LogFactory.getLog(AsyncDispatcher.class);
048    
049      private final BlockingQueue<Event> eventQueue;
050      private volatile boolean stopped = false;
051    
052      // Configuration flag for enabling/disabling draining dispatcher's events on
053      // stop functionality.
054      private volatile boolean drainEventsOnStop = false;
055    
056      // Indicates all the remaining dispatcher's events on stop have been drained
057      // and processed.
058      private volatile boolean drained = true;
059      private Object waitForDrained = new Object();
060    
061      // For drainEventsOnStop enabled only, block newly coming events into the
062      // queue while stopping.
063      private volatile boolean blockNewEvents = false;
064      private EventHandler handlerInstance = null;
065    
066      private Thread eventHandlingThread;
067      protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers;
068      private boolean exitOnDispatchException;
069    
070      public AsyncDispatcher() {
071        this(new LinkedBlockingQueue<Event>());
072      }
073    
074      public AsyncDispatcher(BlockingQueue<Event> eventQueue) {
075        super("Dispatcher");
076        this.eventQueue = eventQueue;
077        this.eventDispatchers = new HashMap<Class<? extends Enum>, EventHandler>();
078      }
079    
080      Runnable createThread() {
081        return new Runnable() {
082          @Override
083          public void run() {
084            while (!stopped && !Thread.currentThread().isInterrupted()) {
085              drained = eventQueue.isEmpty();
086              // blockNewEvents is only set when dispatcher is draining to stop,
087              // adding this check is to avoid the overhead of acquiring the lock
088              // and calling notify every time in the normal run of the loop.
089              if (blockNewEvents) {
090                synchronized (waitForDrained) {
091                  if (drained) {
092                    waitForDrained.notify();
093                  }
094                }
095              }
096              Event event;
097              try {
098                event = eventQueue.take();
099              } catch(InterruptedException ie) {
100                if (!stopped) {
101                  LOG.warn("AsyncDispatcher thread interrupted", ie);
102                }
103                return;
104              }
105              if (event != null) {
106                dispatch(event);
107              }
108            }
109          }
110        };
111      }
112    
113      @Override
114      protected void serviceInit(Configuration conf) throws Exception {
115        this.exitOnDispatchException =
116            conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
117              Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
118        super.serviceInit(conf);
119      }
120    
121      @Override
122      protected void serviceStart() throws Exception {
123        //start all the components
124        super.serviceStart();
125        eventHandlingThread = new Thread(createThread());
126        eventHandlingThread.setName("AsyncDispatcher event handler");
127        eventHandlingThread.start();
128      }
129    
130      public void setDrainEventsOnStop() {
131        drainEventsOnStop = true;
132      }
133    
134      @Override
135      protected void serviceStop() throws Exception {
136        if (drainEventsOnStop) {
137          blockNewEvents = true;
138          LOG.info("AsyncDispatcher is draining to stop, igonring any new events.");
139          synchronized (waitForDrained) {
140            while (!drained && eventHandlingThread.isAlive()) {
141              waitForDrained.wait(1000);
142              LOG.info("Waiting for AsyncDispatcher to drain.");
143            }
144          }
145        }
146        stopped = true;
147        if (eventHandlingThread != null) {
148          eventHandlingThread.interrupt();
149          try {
150            eventHandlingThread.join();
151          } catch (InterruptedException ie) {
152            LOG.warn("Interrupted Exception while stopping", ie);
153          }
154        }
155    
156        // stop all the components
157        super.serviceStop();
158      }
159    
160      @SuppressWarnings("unchecked")
161      protected void dispatch(Event event) {
162        //all events go thru this loop
163        if (LOG.isDebugEnabled()) {
164          LOG.debug("Dispatching the event " + event.getClass().getName() + "."
165              + event.toString());
166        }
167    
168        Class<? extends Enum> type = event.getType().getDeclaringClass();
169    
170        try{
171          EventHandler handler = eventDispatchers.get(type);
172          if(handler != null) {
173            handler.handle(event);
174          } else {
175            throw new Exception("No handler for registered for " + type);
176          }
177        } catch (Throwable t) {
178          //TODO Maybe log the state of the queue
179          LOG.fatal("Error in dispatcher thread", t);
180          // If serviceStop is called, we should exit this thread gracefully.
181          if (exitOnDispatchException
182              && (ShutdownHookManager.get().isShutdownInProgress()) == false
183              && stopped == false) {
184            LOG.info("Exiting, bbye..");
185            System.exit(-1);
186          }
187        }
188      }
189    
190      @SuppressWarnings("unchecked")
191      @Override
192      public void register(Class<? extends Enum> eventType,
193          EventHandler handler) {
194        /* check to see if we have a listener registered */
195        EventHandler<Event> registeredHandler = (EventHandler<Event>)
196        eventDispatchers.get(eventType);
197        LOG.info("Registering " + eventType + " for " + handler.getClass());
198        if (registeredHandler == null) {
199          eventDispatchers.put(eventType, handler);
200        } else if (!(registeredHandler instanceof MultiListenerHandler)){
201          /* for multiple listeners of an event add the multiple listener handler */
202          MultiListenerHandler multiHandler = new MultiListenerHandler();
203          multiHandler.addHandler(registeredHandler);
204          multiHandler.addHandler(handler);
205          eventDispatchers.put(eventType, multiHandler);
206        } else {
207          /* already a multilistener, just add to it */
208          MultiListenerHandler multiHandler
209          = (MultiListenerHandler) registeredHandler;
210          multiHandler.addHandler(handler);
211        }
212      }
213    
214      @Override
215      public EventHandler getEventHandler() {
216        if (handlerInstance == null) {
217          handlerInstance = new GenericEventHandler();
218        }
219        return handlerInstance;
220      }
221    
222      class GenericEventHandler implements EventHandler<Event> {
223        public void handle(Event event) {
224          if (blockNewEvents) {
225            return;
226          }
227          drained = false;
228    
229          /* all this method does is enqueue all the events onto the queue */
230          int qSize = eventQueue.size();
231          if (qSize !=0 && qSize %1000 == 0) {
232            LOG.info("Size of event-queue is " + qSize);
233          }
234          int remCapacity = eventQueue.remainingCapacity();
235          if (remCapacity < 1000) {
236            LOG.warn("Very low remaining capacity in the event-queue: "
237                + remCapacity);
238          }
239          try {
240            eventQueue.put(event);
241          } catch (InterruptedException e) {
242            if (!stopped) {
243              LOG.warn("AsyncDispatcher thread interrupted", e);
244            }
245            throw new YarnRuntimeException(e);
246          }
247        };
248      }
249    
250      /**
251       * Multiplexing an event. Sending it to different handlers that
252       * are interested in the event.
253       * @param <T> the type of event these multiple handlers are interested in.
254       */
255      static class MultiListenerHandler implements EventHandler<Event> {
256        List<EventHandler<Event>> listofHandlers;
257    
258        public MultiListenerHandler() {
259          listofHandlers = new ArrayList<EventHandler<Event>>();
260        }
261    
262        @Override
263        public void handle(Event event) {
264          for (EventHandler<Event> handler: listofHandlers) {
265            handler.handle(event);
266          }
267        }
268    
269        void addHandler(EventHandler<Event> handler) {
270          listofHandlers.add(handler);
271        }
272    
273      }
274    }