001    /**
002    * Licensed to the Apache Software Foundation (ASF) under one
003    * or more contributor license agreements.  See the NOTICE file
004    * distributed with this work for additional information
005    * regarding copyright ownership.  The ASF licenses this file
006    * to you under the Apache License, Version 2.0 (the
007    * "License"); you may not use this file except in compliance
008    * with the License.  You may obtain a copy of the License at
009    *
010    *     http://www.apache.org/licenses/LICENSE-2.0
011    *
012    * Unless required by applicable law or agreed to in writing, software
013    * distributed under the License is distributed on an "AS IS" BASIS,
014    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015    * See the License for the specific language governing permissions and
016    * limitations under the License.
017    */
018    
019    package org.apache.hadoop.yarn.util;
020    
021    import java.util.HashMap;
022    import java.util.Iterator;
023    import java.util.Map;
024    
025    import org.apache.commons.logging.Log;
026    import org.apache.commons.logging.LogFactory;
027    import org.apache.hadoop.classification.InterfaceAudience.Public;
028    import org.apache.hadoop.classification.InterfaceStability.Evolving;
029    import org.apache.hadoop.service.AbstractService;
030    
031    /**
032     * A simple liveliness monitor with which clients can register, trust the
033     * component to monitor liveliness, get a call-back on expiry and then finally
034     * unregister.
035     */
036    @Public
037    @Evolving
038    public abstract class AbstractLivelinessMonitor<O> extends AbstractService {
039    
040      private static final Log LOG = LogFactory.getLog(AbstractLivelinessMonitor.class);
041    
042      //thread which runs periodically to see the last time since a heartbeat is
043      //received.
044      private Thread checkerThread;
045      private volatile boolean stopped;
046      public static final int DEFAULT_EXPIRE = 5*60*1000;//5 mins
047      private int expireInterval = DEFAULT_EXPIRE;
048      private int monitorInterval = expireInterval/3;
049    
050      private final Clock clock;
051    
052      private Map<O, Long> running = new HashMap<O, Long>();
053    
054      public AbstractLivelinessMonitor(String name, Clock clock) {
055        super(name);
056        this.clock = clock;
057      }
058    
059      @Override
060      protected void serviceStart() throws Exception {
061        assert !stopped : "starting when already stopped";
062        checkerThread = new Thread(new PingChecker());
063        checkerThread.setName("Ping Checker");
064        checkerThread.start();
065        super.serviceStart();
066      }
067    
068      @Override
069      protected void serviceStop() throws Exception {
070        stopped = true;
071        if (checkerThread != null) {
072          checkerThread.interrupt();
073        }
074        super.serviceStop();
075      }
076    
077      protected abstract void expire(O ob);
078    
079      protected void setExpireInterval(int expireInterval) {
080        this.expireInterval = expireInterval;
081      }
082    
083      protected void setMonitorInterval(int monitorInterval) {
084        this.monitorInterval = monitorInterval;
085      }
086    
087      public synchronized void receivedPing(O ob) {
088        //only put for the registered objects
089        if (running.containsKey(ob)) {
090          running.put(ob, clock.getTime());
091        }
092      }
093    
094      public synchronized void register(O ob) {
095        running.put(ob, clock.getTime());
096      }
097    
098      public synchronized void unregister(O ob) {
099        running.remove(ob);
100      }
101    
102      private class PingChecker implements Runnable {
103    
104        @Override
105        public void run() {
106          while (!stopped && !Thread.currentThread().isInterrupted()) {
107            synchronized (AbstractLivelinessMonitor.this) {
108              Iterator<Map.Entry<O, Long>> iterator = 
109                running.entrySet().iterator();
110    
111              //avoid calculating current time everytime in loop
112              long currentTime = clock.getTime();
113    
114              while (iterator.hasNext()) {
115                Map.Entry<O, Long> entry = iterator.next();
116                if (currentTime > entry.getValue() + expireInterval) {
117                  iterator.remove();
118                  expire(entry.getKey());
119                  LOG.info("Expired:" + entry.getKey().toString() + 
120                          " Timed out after " + expireInterval/1000 + " secs");
121                }
122              }
123            }
124            try {
125              Thread.sleep(monitorInterval);
126            } catch (InterruptedException e) {
127              LOG.info(getName() + " thread interrupted");
128              break;
129            }
130          }
131        }
132      }
133    
134    }