001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.service; 020 021 import java.io.IOException; 022 import java.util.ArrayList; 023 import java.util.HashMap; 024 import java.util.List; 025 import java.util.Map; 026 import java.util.concurrent.atomic.AtomicBoolean; 027 028 import org.apache.commons.logging.Log; 029 import org.apache.commons.logging.LogFactory; 030 import org.apache.hadoop.classification.InterfaceAudience.Public; 031 import org.apache.hadoop.classification.InterfaceStability.Evolving; 032 import org.apache.hadoop.conf.Configuration; 033 034 import com.google.common.annotations.VisibleForTesting; 035 036 /** 037 * This is the base implementation class for services. 038 */ 039 @Public 040 @Evolving 041 public abstract class AbstractService implements Service { 042 043 private static final Log LOG = LogFactory.getLog(AbstractService.class); 044 045 /** 046 * Service name. 047 */ 048 private final String name; 049 050 /** service state */ 051 private final ServiceStateModel stateModel; 052 053 /** 054 * Service start time. Will be zero until the service is started. 055 */ 056 private long startTime; 057 058 /** 059 * The configuration. Will be null until the service is initialized. 060 */ 061 private volatile Configuration config; 062 063 /** 064 * List of state change listeners; it is final to ensure 065 * that it will never be null. 066 */ 067 private final ServiceOperations.ServiceListeners listeners 068 = new ServiceOperations.ServiceListeners(); 069 /** 070 * Static listeners to all events across all services 071 */ 072 private static ServiceOperations.ServiceListeners globalListeners 073 = new ServiceOperations.ServiceListeners(); 074 075 /** 076 * The cause of any failure -will be null. 077 * if a service did not stop due to a failure. 078 */ 079 private Exception failureCause; 080 081 /** 082 * the state in which the service was when it failed. 083 * Only valid when the service is stopped due to a failure 084 */ 085 private STATE failureState = null; 086 087 /** 088 * object used to co-ordinate {@link #waitForServiceToStop(long)} 089 * across threads. 090 */ 091 private final AtomicBoolean terminationNotification = 092 new AtomicBoolean(false); 093 094 /** 095 * History of lifecycle transitions 096 */ 097 private final List<LifecycleEvent> lifecycleHistory 098 = new ArrayList<LifecycleEvent>(5); 099 100 /** 101 * Map of blocking dependencies 102 */ 103 private final Map<String,String> blockerMap = new HashMap<String, String>(); 104 105 private final Object stateChangeLock = new Object(); 106 107 /** 108 * Construct the service. 109 * @param name service name 110 */ 111 public AbstractService(String name) { 112 this.name = name; 113 stateModel = new ServiceStateModel(name); 114 } 115 116 @Override 117 public final STATE getServiceState() { 118 return stateModel.getState(); 119 } 120 121 @Override 122 public final synchronized Throwable getFailureCause() { 123 return failureCause; 124 } 125 126 @Override 127 public synchronized STATE getFailureState() { 128 return failureState; 129 } 130 131 /** 132 * Set the configuration for this service. 133 * This method is called during {@link #init(Configuration)} 134 * and should only be needed if for some reason a service implementation 135 * needs to override that initial setting -for example replacing 136 * it with a new subclass of {@link Configuration} 137 * @param conf new configuration. 138 */ 139 protected void setConfig(Configuration conf) { 140 this.config = conf; 141 } 142 143 /** 144 * {@inheritDoc} 145 * This invokes {@link #serviceInit} 146 * @param conf the configuration of the service. This must not be null 147 * @throws ServiceStateException if the configuration was null, 148 * the state change not permitted, or something else went wrong 149 */ 150 @Override 151 public void init(Configuration conf) { 152 if (conf == null) { 153 throw new ServiceStateException("Cannot initialize service " 154 + getName() + ": null configuration"); 155 } 156 if (isInState(STATE.INITED)) { 157 return; 158 } 159 synchronized (stateChangeLock) { 160 if (enterState(STATE.INITED) != STATE.INITED) { 161 setConfig(conf); 162 try { 163 serviceInit(config); 164 if (isInState(STATE.INITED)) { 165 //if the service ended up here during init, 166 //notify the listeners 167 notifyListeners(); 168 } 169 } catch (Exception e) { 170 noteFailure(e); 171 ServiceOperations.stopQuietly(LOG, this); 172 throw ServiceStateException.convert(e); 173 } 174 } 175 } 176 } 177 178 /** 179 * {@inheritDoc} 180 * @throws ServiceStateException if the current service state does not permit 181 * this action 182 */ 183 @Override 184 public void start() { 185 if (isInState(STATE.STARTED)) { 186 return; 187 } 188 //enter the started state 189 synchronized (stateChangeLock) { 190 if (stateModel.enterState(STATE.STARTED) != STATE.STARTED) { 191 try { 192 startTime = System.currentTimeMillis(); 193 serviceStart(); 194 if (isInState(STATE.STARTED)) { 195 //if the service started (and isn't now in a later state), notify 196 if (LOG.isDebugEnabled()) { 197 LOG.debug("Service " + getName() + " is started"); 198 } 199 notifyListeners(); 200 } 201 } catch (Exception e) { 202 noteFailure(e); 203 ServiceOperations.stopQuietly(LOG, this); 204 throw ServiceStateException.convert(e); 205 } 206 } 207 } 208 } 209 210 /** 211 * {@inheritDoc} 212 */ 213 @Override 214 public void stop() { 215 if (isInState(STATE.STOPPED)) { 216 return; 217 } 218 synchronized (stateChangeLock) { 219 if (enterState(STATE.STOPPED) != STATE.STOPPED) { 220 try { 221 serviceStop(); 222 } catch (Exception e) { 223 //stop-time exceptions are logged if they are the first one, 224 noteFailure(e); 225 throw ServiceStateException.convert(e); 226 } finally { 227 //report that the service has terminated 228 terminationNotification.set(true); 229 synchronized (terminationNotification) { 230 terminationNotification.notifyAll(); 231 } 232 //notify anything listening for events 233 notifyListeners(); 234 } 235 } else { 236 //already stopped: note it 237 if (LOG.isDebugEnabled()) { 238 LOG.debug("Ignoring re-entrant call to stop()"); 239 } 240 } 241 } 242 } 243 244 /** 245 * Relay to {@link #stop()} 246 * @throws IOException 247 */ 248 @Override 249 public final void close() throws IOException { 250 stop(); 251 } 252 253 /** 254 * Failure handling: record the exception 255 * that triggered it -if there was not one already. 256 * Services are free to call this themselves. 257 * @param exception the exception 258 */ 259 protected final void noteFailure(Exception exception) { 260 if (LOG.isDebugEnabled()) { 261 LOG.debug("noteFailure " + exception, null); 262 } 263 if (exception == null) { 264 //make sure failure logic doesn't itself cause problems 265 return; 266 } 267 //record the failure details, and log it 268 synchronized (this) { 269 if (failureCause == null) { 270 failureCause = exception; 271 failureState = getServiceState(); 272 LOG.info("Service " + getName() 273 + " failed in state " + failureState 274 + "; cause: " + exception, 275 exception); 276 } 277 } 278 } 279 280 @Override 281 public final boolean waitForServiceToStop(long timeout) { 282 boolean completed = terminationNotification.get(); 283 while (!completed) { 284 try { 285 synchronized(terminationNotification) { 286 terminationNotification.wait(timeout); 287 } 288 // here there has been a timeout, the object has terminated, 289 // or there has been a spurious wakeup (which we ignore) 290 completed = true; 291 } catch (InterruptedException e) { 292 // interrupted; have another look at the flag 293 completed = terminationNotification.get(); 294 } 295 } 296 return terminationNotification.get(); 297 } 298 299 /* ===================================================================== */ 300 /* Override Points */ 301 /* ===================================================================== */ 302 303 /** 304 * All initialization code needed by a service. 305 * 306 * This method will only ever be called once during the lifecycle of 307 * a specific service instance. 308 * 309 * Implementations do not need to be synchronized as the logic 310 * in {@link #init(Configuration)} prevents re-entrancy. 311 * 312 * The base implementation checks to see if the subclass has created 313 * a new configuration instance, and if so, updates the base class value 314 * @param conf configuration 315 * @throws Exception on a failure -these will be caught, 316 * possibly wrapped, and wil; trigger a service stop 317 */ 318 protected void serviceInit(Configuration conf) throws Exception { 319 if (conf != config) { 320 LOG.debug("Config has been overridden during init"); 321 setConfig(conf); 322 } 323 } 324 325 /** 326 * Actions called during the INITED to STARTED transition. 327 * 328 * This method will only ever be called once during the lifecycle of 329 * a specific service instance. 330 * 331 * Implementations do not need to be synchronized as the logic 332 * in {@link #start()} prevents re-entrancy. 333 * 334 * @throws Exception if needed -these will be caught, 335 * wrapped, and trigger a service stop 336 */ 337 protected void serviceStart() throws Exception { 338 339 } 340 341 /** 342 * Actions called during the transition to the STOPPED state. 343 * 344 * This method will only ever be called once during the lifecycle of 345 * a specific service instance. 346 * 347 * Implementations do not need to be synchronized as the logic 348 * in {@link #stop()} prevents re-entrancy. 349 * 350 * Implementations MUST write this to be robust against failures, including 351 * checks for null references -and for the first failure to not stop other 352 * attempts to shut down parts of the service. 353 * 354 * @throws Exception if needed -these will be caught and logged. 355 */ 356 protected void serviceStop() throws Exception { 357 358 } 359 360 @Override 361 public void registerServiceListener(ServiceStateChangeListener l) { 362 listeners.add(l); 363 } 364 365 @Override 366 public void unregisterServiceListener(ServiceStateChangeListener l) { 367 listeners.remove(l); 368 } 369 370 /** 371 * Register a global listener, which receives notifications 372 * from the state change events of all services in the JVM 373 * @param l listener 374 */ 375 public static void registerGlobalListener(ServiceStateChangeListener l) { 376 globalListeners.add(l); 377 } 378 379 /** 380 * unregister a global listener. 381 * @param l listener to unregister 382 * @return true if the listener was found (and then deleted) 383 */ 384 public static boolean unregisterGlobalListener(ServiceStateChangeListener l) { 385 return globalListeners.remove(l); 386 } 387 388 /** 389 * Package-scoped method for testing -resets the global listener list 390 */ 391 @VisibleForTesting 392 static void resetGlobalListeners() { 393 globalListeners.reset(); 394 } 395 396 @Override 397 public String getName() { 398 return name; 399 } 400 401 @Override 402 public synchronized Configuration getConfig() { 403 return config; 404 } 405 406 @Override 407 public long getStartTime() { 408 return startTime; 409 } 410 411 /** 412 * Notify local and global listeners of state changes. 413 * Exceptions raised by listeners are NOT passed up. 414 */ 415 private void notifyListeners() { 416 try { 417 listeners.notifyListeners(this); 418 globalListeners.notifyListeners(this); 419 } catch (Throwable e) { 420 LOG.warn("Exception while notifying listeners of " + this + ": " + e, 421 e); 422 } 423 } 424 425 /** 426 * Add a state change event to the lifecycle history 427 */ 428 private void recordLifecycleEvent() { 429 LifecycleEvent event = new LifecycleEvent(); 430 event.time = System.currentTimeMillis(); 431 event.state = getServiceState(); 432 lifecycleHistory.add(event); 433 } 434 435 @Override 436 public synchronized List<LifecycleEvent> getLifecycleHistory() { 437 return new ArrayList<LifecycleEvent>(lifecycleHistory); 438 } 439 440 /** 441 * Enter a state; record this via {@link #recordLifecycleEvent} 442 * and log at the info level. 443 * @param newState the proposed new state 444 * @return the original state 445 * it wasn't already in that state, and the state model permits state re-entrancy. 446 */ 447 private STATE enterState(STATE newState) { 448 assert stateModel != null : "null state in " + name + " " + this.getClass(); 449 STATE oldState = stateModel.enterState(newState); 450 if (oldState != newState) { 451 if (LOG.isDebugEnabled()) { 452 LOG.debug( 453 "Service: " + getName() + " entered state " + getServiceState()); 454 } 455 recordLifecycleEvent(); 456 } 457 return oldState; 458 } 459 460 @Override 461 public final boolean isInState(Service.STATE expected) { 462 return stateModel.isInState(expected); 463 } 464 465 @Override 466 public String toString() { 467 return "Service " + name + " in state " + stateModel; 468 } 469 470 /** 471 * Put a blocker to the blocker map -replacing any 472 * with the same name. 473 * @param name blocker name 474 * @param details any specifics on the block. This must be non-null. 475 */ 476 protected void putBlocker(String name, String details) { 477 synchronized (blockerMap) { 478 blockerMap.put(name, details); 479 } 480 } 481 482 /** 483 * Remove a blocker from the blocker map - 484 * this is a no-op if the blocker is not present 485 * @param name the name of the blocker 486 */ 487 public void removeBlocker(String name) { 488 synchronized (blockerMap) { 489 blockerMap.remove(name); 490 } 491 } 492 493 @Override 494 public Map<String, String> getBlockers() { 495 synchronized (blockerMap) { 496 Map<String, String> map = new HashMap<String, String>(blockerMap); 497 return map; 498 } 499 } 500 }