001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.yarn.util;
020
021 import java.util.HashMap;
022 import java.util.Iterator;
023 import java.util.Map;
024
025 import org.apache.commons.logging.Log;
026 import org.apache.commons.logging.LogFactory;
027 import org.apache.hadoop.classification.InterfaceAudience.Public;
028 import org.apache.hadoop.classification.InterfaceStability.Evolving;
029 import org.apache.hadoop.service.AbstractService;
030
031 /**
032 * A simple liveliness monitor with which clients can register, trust the
033 * component to monitor liveliness, get a call-back on expiry and then finally
034 * unregister.
035 */
036 @Public
037 @Evolving
038 public abstract class AbstractLivelinessMonitor<O> extends AbstractService {
039
040 private static final Log LOG = LogFactory.getLog(AbstractLivelinessMonitor.class);
041
042 //thread which runs periodically to see the last time since a heartbeat is
043 //received.
044 private Thread checkerThread;
045 private volatile boolean stopped;
046 public static final int DEFAULT_EXPIRE = 5*60*1000;//5 mins
047 private int expireInterval = DEFAULT_EXPIRE;
048 private int monitorInterval = expireInterval/3;
049
050 private final Clock clock;
051
052 private Map<O, Long> running = new HashMap<O, Long>();
053
054 public AbstractLivelinessMonitor(String name, Clock clock) {
055 super(name);
056 this.clock = clock;
057 }
058
059 @Override
060 protected void serviceStart() throws Exception {
061 assert !stopped : "starting when already stopped";
062 checkerThread = new Thread(new PingChecker());
063 checkerThread.setName("Ping Checker");
064 checkerThread.start();
065 super.serviceStart();
066 }
067
068 @Override
069 protected void serviceStop() throws Exception {
070 stopped = true;
071 if (checkerThread != null) {
072 checkerThread.interrupt();
073 }
074 super.serviceStop();
075 }
076
077 protected abstract void expire(O ob);
078
079 protected void setExpireInterval(int expireInterval) {
080 this.expireInterval = expireInterval;
081 }
082
083 protected void setMonitorInterval(int monitorInterval) {
084 this.monitorInterval = monitorInterval;
085 }
086
087 public synchronized void receivedPing(O ob) {
088 //only put for the registered objects
089 if (running.containsKey(ob)) {
090 running.put(ob, clock.getTime());
091 }
092 }
093
094 public synchronized void register(O ob) {
095 running.put(ob, clock.getTime());
096 }
097
098 public synchronized void unregister(O ob) {
099 running.remove(ob);
100 }
101
102 private class PingChecker implements Runnable {
103
104 @Override
105 public void run() {
106 while (!stopped && !Thread.currentThread().isInterrupted()) {
107 synchronized (AbstractLivelinessMonitor.this) {
108 Iterator<Map.Entry<O, Long>> iterator =
109 running.entrySet().iterator();
110
111 //avoid calculating current time everytime in loop
112 long currentTime = clock.getTime();
113
114 while (iterator.hasNext()) {
115 Map.Entry<O, Long> entry = iterator.next();
116 if (currentTime > entry.getValue() + expireInterval) {
117 iterator.remove();
118 expire(entry.getKey());
119 LOG.info("Expired:" + entry.getKey().toString() +
120 " Timed out after " + expireInterval/1000 + " secs");
121 }
122 }
123 }
124 try {
125 Thread.sleep(monitorInterval);
126 } catch (InterruptedException e) {
127 LOG.info(getName() + " thread interrupted");
128 break;
129 }
130 }
131 }
132 }
133
134 }