001/**
002* Licensed to the Apache Software Foundation (ASF) under one
003* or more contributor license agreements.  See the NOTICE file
004* distributed with this work for additional information
005* regarding copyright ownership.  The ASF licenses this file
006* to you under the Apache License, Version 2.0 (the
007* "License"); you may not use this file except in compliance
008* with the License.  You may obtain a copy of the License at
009*
010*     http://www.apache.org/licenses/LICENSE-2.0
011*
012* Unless required by applicable law or agreed to in writing, software
013* distributed under the License is distributed on an "AS IS" BASIS,
014* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015* See the License for the specific language governing permissions and
016* limitations under the License.
017*/
018
019package org.apache.hadoop.yarn.util;
020
021import java.util.HashMap;
022import java.util.Iterator;
023import java.util.Map;
024
025import org.apache.commons.logging.Log;
026import org.apache.commons.logging.LogFactory;
027import org.apache.hadoop.classification.InterfaceAudience.Public;
028import org.apache.hadoop.classification.InterfaceStability.Evolving;
029import org.apache.hadoop.service.AbstractService;
030
031/**
032 * A simple liveliness monitor with which clients can register, trust the
033 * component to monitor liveliness, get a call-back on expiry and then finally
034 * unregister.
035 */
036@Public
037@Evolving
038public abstract class AbstractLivelinessMonitor<O> extends AbstractService {
039
040  private static final Log LOG = LogFactory.getLog(AbstractLivelinessMonitor.class);
041
042  //thread which runs periodically to see the last time since a heartbeat is
043  //received.
044  private Thread checkerThread;
045  private volatile boolean stopped;
046  public static final int DEFAULT_EXPIRE = 5*60*1000;//5 mins
047  private int expireInterval = DEFAULT_EXPIRE;
048  private int monitorInterval = expireInterval/3;
049
050  private final Clock clock;
051
052  private Map<O, Long> running = new HashMap<O, Long>();
053
054  public AbstractLivelinessMonitor(String name, Clock clock) {
055    super(name);
056    this.clock = clock;
057  }
058
059  @Override
060  protected void serviceStart() throws Exception {
061    assert !stopped : "starting when already stopped";
062    resetTimer();
063    checkerThread = new Thread(new PingChecker());
064    checkerThread.setName("Ping Checker");
065    checkerThread.start();
066    super.serviceStart();
067  }
068
069  @Override
070  protected void serviceStop() throws Exception {
071    stopped = true;
072    if (checkerThread != null) {
073      checkerThread.interrupt();
074    }
075    super.serviceStop();
076  }
077
078  protected abstract void expire(O ob);
079
080  protected void setExpireInterval(int expireInterval) {
081    this.expireInterval = expireInterval;
082  }
083
084  protected void setMonitorInterval(int monitorInterval) {
085    this.monitorInterval = monitorInterval;
086  }
087
088  public synchronized void receivedPing(O ob) {
089    //only put for the registered objects
090    if (running.containsKey(ob)) {
091      running.put(ob, clock.getTime());
092    }
093  }
094
095  public synchronized void register(O ob) {
096    running.put(ob, clock.getTime());
097  }
098
099  public synchronized void unregister(O ob) {
100    running.remove(ob);
101  }
102
103  public synchronized void resetTimer() {
104    long time = clock.getTime();
105    for (O ob : running.keySet()) {
106      running.put(ob, time);
107    }
108  }
109
110  private class PingChecker implements Runnable {
111
112    @Override
113    public void run() {
114      while (!stopped && !Thread.currentThread().isInterrupted()) {
115        synchronized (AbstractLivelinessMonitor.this) {
116          Iterator<Map.Entry<O, Long>> iterator = 
117            running.entrySet().iterator();
118
119          //avoid calculating current time everytime in loop
120          long currentTime = clock.getTime();
121
122          while (iterator.hasNext()) {
123            Map.Entry<O, Long> entry = iterator.next();
124            if (currentTime > entry.getValue() + expireInterval) {
125              iterator.remove();
126              expire(entry.getKey());
127              LOG.info("Expired:" + entry.getKey().toString() + 
128                      " Timed out after " + expireInterval/1000 + " secs");
129            }
130          }
131        }
132        try {
133          Thread.sleep(monitorInterval);
134        } catch (InterruptedException e) {
135          LOG.info(getName() + " thread interrupted");
136          break;
137        }
138      }
139    }
140  }
141
142}