001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.yarn.util; 020 021 import java.util.HashMap; 022 import java.util.Iterator; 023 import java.util.Map; 024 025 import org.apache.commons.logging.Log; 026 import org.apache.commons.logging.LogFactory; 027 import org.apache.hadoop.classification.InterfaceAudience.Public; 028 import org.apache.hadoop.classification.InterfaceStability.Evolving; 029 import org.apache.hadoop.service.AbstractService; 030 031 /** 032 * A simple liveliness monitor with which clients can register, trust the 033 * component to monitor liveliness, get a call-back on expiry and then finally 034 * unregister. 035 */ 036 @Public 037 @Evolving 038 public abstract class AbstractLivelinessMonitor<O> extends AbstractService { 039 040 private static final Log LOG = LogFactory.getLog(AbstractLivelinessMonitor.class); 041 042 //thread which runs periodically to see the last time since a heartbeat is 043 //received. 044 private Thread checkerThread; 045 private volatile boolean stopped; 046 public static final int DEFAULT_EXPIRE = 5*60*1000;//5 mins 047 private int expireInterval = DEFAULT_EXPIRE; 048 private int monitorInterval = expireInterval/3; 049 050 private final Clock clock; 051 052 private Map<O, Long> running = new HashMap<O, Long>(); 053 054 public AbstractLivelinessMonitor(String name, Clock clock) { 055 super(name); 056 this.clock = clock; 057 } 058 059 @Override 060 protected void serviceStart() throws Exception { 061 assert !stopped : "starting when already stopped"; 062 checkerThread = new Thread(new PingChecker()); 063 checkerThread.setName("Ping Checker"); 064 checkerThread.start(); 065 super.serviceStart(); 066 } 067 068 @Override 069 protected void serviceStop() throws Exception { 070 stopped = true; 071 if (checkerThread != null) { 072 checkerThread.interrupt(); 073 } 074 super.serviceStop(); 075 } 076 077 protected abstract void expire(O ob); 078 079 protected void setExpireInterval(int expireInterval) { 080 this.expireInterval = expireInterval; 081 } 082 083 protected void setMonitorInterval(int monitorInterval) { 084 this.monitorInterval = monitorInterval; 085 } 086 087 public synchronized void receivedPing(O ob) { 088 //only put for the registered objects 089 if (running.containsKey(ob)) { 090 running.put(ob, clock.getTime()); 091 } 092 } 093 094 public synchronized void register(O ob) { 095 running.put(ob, clock.getTime()); 096 } 097 098 public synchronized void unregister(O ob) { 099 running.remove(ob); 100 } 101 102 private class PingChecker implements Runnable { 103 104 @Override 105 public void run() { 106 while (!stopped && !Thread.currentThread().isInterrupted()) { 107 synchronized (AbstractLivelinessMonitor.this) { 108 Iterator<Map.Entry<O, Long>> iterator = 109 running.entrySet().iterator(); 110 111 //avoid calculating current time everytime in loop 112 long currentTime = clock.getTime(); 113 114 while (iterator.hasNext()) { 115 Map.Entry<O, Long> entry = iterator.next(); 116 if (currentTime > entry.getValue() + expireInterval) { 117 iterator.remove(); 118 expire(entry.getKey()); 119 LOG.info("Expired:" + entry.getKey().toString() + 120 " Timed out after " + expireInterval/1000 + " secs"); 121 } 122 } 123 } 124 try { 125 Thread.sleep(monitorInterval); 126 } catch (InterruptedException e) { 127 LOG.info(getName() + " thread interrupted"); 128 break; 129 } 130 } 131 } 132 } 133 134 }