001/**
002* Licensed to the Apache Software Foundation (ASF) under one
003* or more contributor license agreements.  See the NOTICE file
004* distributed with this work for additional information
005* regarding copyright ownership.  The ASF licenses this file
006* to you under the Apache License, Version 2.0 (the
007* "License"); you may not use this file except in compliance
008* with the License.  You may obtain a copy of the License at
009*
010*     http://www.apache.org/licenses/LICENSE-2.0
011*
012* Unless required by applicable law or agreed to in writing, software
013* distributed under the License is distributed on an "AS IS" BASIS,
014* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015* See the License for the specific language governing permissions and
016* limitations under the License.
017*/
018
019package org.apache.hadoop.yarn.util;
020
021import java.util.HashMap;
022import java.util.Iterator;
023import java.util.Map;
024
025import org.apache.commons.logging.Log;
026import org.apache.commons.logging.LogFactory;
027import org.apache.hadoop.classification.InterfaceAudience.Public;
028import org.apache.hadoop.classification.InterfaceStability.Evolving;
029import org.apache.hadoop.service.AbstractService;
030
031/**
032 * A simple liveliness monitor with which clients can register, trust the
033 * component to monitor liveliness, get a call-back on expiry and then finally
034 * unregister.
035 */
036@Public
037@Evolving
038public abstract class AbstractLivelinessMonitor<O> extends AbstractService {
039
040  private static final Log LOG = LogFactory.getLog(AbstractLivelinessMonitor.class);
041
042  //thread which runs periodically to see the last time since a heartbeat is
043  //received.
044  private Thread checkerThread;
045  private volatile boolean stopped;
046  public static final int DEFAULT_EXPIRE = 5*60*1000;//5 mins
047  private int expireInterval = DEFAULT_EXPIRE;
048  private int monitorInterval = expireInterval/3;
049
050  private final Clock clock;
051
052  private Map<O, Long> running = new HashMap<O, Long>();
053
054  public AbstractLivelinessMonitor(String name, Clock clock) {
055    super(name);
056    this.clock = clock;
057  }
058
059  public AbstractLivelinessMonitor(String name) {
060    this(name, new MonotonicClock());
061  }
062
063  @Override
064  protected void serviceStart() throws Exception {
065    assert !stopped : "starting when already stopped";
066    resetTimer();
067    checkerThread = new Thread(new PingChecker());
068    checkerThread.setName("Ping Checker");
069    checkerThread.start();
070    super.serviceStart();
071  }
072
073  @Override
074  protected void serviceStop() throws Exception {
075    stopped = true;
076    if (checkerThread != null) {
077      checkerThread.interrupt();
078    }
079    super.serviceStop();
080  }
081
082  protected abstract void expire(O ob);
083
084  protected void setExpireInterval(int expireInterval) {
085    this.expireInterval = expireInterval;
086  }
087
088  protected void setMonitorInterval(int monitorInterval) {
089    this.monitorInterval = monitorInterval;
090  }
091
092  public synchronized void receivedPing(O ob) {
093    //only put for the registered objects
094    if (running.containsKey(ob)) {
095      running.put(ob, clock.getTime());
096    }
097  }
098
099  public synchronized void register(O ob) {
100    running.put(ob, clock.getTime());
101  }
102
103  public synchronized void unregister(O ob) {
104    running.remove(ob);
105  }
106
107  public synchronized void resetTimer() {
108    long time = clock.getTime();
109    for (O ob : running.keySet()) {
110      running.put(ob, time);
111    }
112  }
113
114  private class PingChecker implements Runnable {
115
116    @Override
117    public void run() {
118      while (!stopped && !Thread.currentThread().isInterrupted()) {
119        synchronized (AbstractLivelinessMonitor.this) {
120          Iterator<Map.Entry<O, Long>> iterator = 
121            running.entrySet().iterator();
122
123          //avoid calculating current time everytime in loop
124          long currentTime = clock.getTime();
125
126          while (iterator.hasNext()) {
127            Map.Entry<O, Long> entry = iterator.next();
128            if (currentTime > entry.getValue() + expireInterval) {
129              iterator.remove();
130              expire(entry.getKey());
131              LOG.info("Expired:" + entry.getKey().toString() + 
132                      " Timed out after " + expireInterval/1000 + " secs");
133            }
134          }
135        }
136        try {
137          Thread.sleep(monitorInterval);
138        } catch (InterruptedException e) {
139          LOG.info(getName() + " thread interrupted");
140          break;
141        }
142      }
143    }
144  }
145
146}