001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.service;
020
021 import java.io.IOException;
022 import java.util.ArrayList;
023 import java.util.HashMap;
024 import java.util.List;
025 import java.util.Map;
026 import java.util.concurrent.atomic.AtomicBoolean;
027
028 import org.apache.commons.logging.Log;
029 import org.apache.commons.logging.LogFactory;
030 import org.apache.hadoop.classification.InterfaceAudience.Public;
031 import org.apache.hadoop.classification.InterfaceStability.Evolving;
032 import org.apache.hadoop.conf.Configuration;
033
034 import com.google.common.annotations.VisibleForTesting;
035
036 /**
037 * This is the base implementation class for services.
038 */
039 @Public
040 @Evolving
041 public abstract class AbstractService implements Service {
042
043 private static final Log LOG = LogFactory.getLog(AbstractService.class);
044
045 /**
046 * Service name.
047 */
048 private final String name;
049
050 /** service state */
051 private final ServiceStateModel stateModel;
052
053 /**
054 * Service start time. Will be zero until the service is started.
055 */
056 private long startTime;
057
058 /**
059 * The configuration. Will be null until the service is initialized.
060 */
061 private volatile Configuration config;
062
063 /**
064 * List of state change listeners; it is final to ensure
065 * that it will never be null.
066 */
067 private final ServiceOperations.ServiceListeners listeners
068 = new ServiceOperations.ServiceListeners();
069 /**
070 * Static listeners to all events across all services
071 */
072 private static ServiceOperations.ServiceListeners globalListeners
073 = new ServiceOperations.ServiceListeners();
074
075 /**
076 * The cause of any failure -will be null.
077 * if a service did not stop due to a failure.
078 */
079 private Exception failureCause;
080
081 /**
082 * the state in which the service was when it failed.
083 * Only valid when the service is stopped due to a failure
084 */
085 private STATE failureState = null;
086
087 /**
088 * object used to co-ordinate {@link #waitForServiceToStop(long)}
089 * across threads.
090 */
091 private final AtomicBoolean terminationNotification =
092 new AtomicBoolean(false);
093
094 /**
095 * History of lifecycle transitions
096 */
097 private final List<LifecycleEvent> lifecycleHistory
098 = new ArrayList<LifecycleEvent>(5);
099
100 /**
101 * Map of blocking dependencies
102 */
103 private final Map<String,String> blockerMap = new HashMap<String, String>();
104
105 private final Object stateChangeLock = new Object();
106
107 /**
108 * Construct the service.
109 * @param name service name
110 */
111 public AbstractService(String name) {
112 this.name = name;
113 stateModel = new ServiceStateModel(name);
114 }
115
116 @Override
117 public final STATE getServiceState() {
118 return stateModel.getState();
119 }
120
121 @Override
122 public final synchronized Throwable getFailureCause() {
123 return failureCause;
124 }
125
126 @Override
127 public synchronized STATE getFailureState() {
128 return failureState;
129 }
130
131 /**
132 * Set the configuration for this service.
133 * This method is called during {@link #init(Configuration)}
134 * and should only be needed if for some reason a service implementation
135 * needs to override that initial setting -for example replacing
136 * it with a new subclass of {@link Configuration}
137 * @param conf new configuration.
138 */
139 protected void setConfig(Configuration conf) {
140 this.config = conf;
141 }
142
143 /**
144 * {@inheritDoc}
145 * This invokes {@link #serviceInit}
146 * @param conf the configuration of the service. This must not be null
147 * @throws ServiceStateException if the configuration was null,
148 * the state change not permitted, or something else went wrong
149 */
150 @Override
151 public void init(Configuration conf) {
152 if (conf == null) {
153 throw new ServiceStateException("Cannot initialize service "
154 + getName() + ": null configuration");
155 }
156 if (isInState(STATE.INITED)) {
157 return;
158 }
159 synchronized (stateChangeLock) {
160 if (enterState(STATE.INITED) != STATE.INITED) {
161 setConfig(conf);
162 try {
163 serviceInit(config);
164 if (isInState(STATE.INITED)) {
165 //if the service ended up here during init,
166 //notify the listeners
167 notifyListeners();
168 }
169 } catch (Exception e) {
170 noteFailure(e);
171 ServiceOperations.stopQuietly(LOG, this);
172 throw ServiceStateException.convert(e);
173 }
174 }
175 }
176 }
177
178 /**
179 * {@inheritDoc}
180 * @throws ServiceStateException if the current service state does not permit
181 * this action
182 */
183 @Override
184 public void start() {
185 if (isInState(STATE.STARTED)) {
186 return;
187 }
188 //enter the started state
189 synchronized (stateChangeLock) {
190 if (stateModel.enterState(STATE.STARTED) != STATE.STARTED) {
191 try {
192 startTime = System.currentTimeMillis();
193 serviceStart();
194 if (isInState(STATE.STARTED)) {
195 //if the service started (and isn't now in a later state), notify
196 if (LOG.isDebugEnabled()) {
197 LOG.debug("Service " + getName() + " is started");
198 }
199 notifyListeners();
200 }
201 } catch (Exception e) {
202 noteFailure(e);
203 ServiceOperations.stopQuietly(LOG, this);
204 throw ServiceStateException.convert(e);
205 }
206 }
207 }
208 }
209
210 /**
211 * {@inheritDoc}
212 */
213 @Override
214 public void stop() {
215 if (isInState(STATE.STOPPED)) {
216 return;
217 }
218 synchronized (stateChangeLock) {
219 if (enterState(STATE.STOPPED) != STATE.STOPPED) {
220 try {
221 serviceStop();
222 } catch (Exception e) {
223 //stop-time exceptions are logged if they are the first one,
224 noteFailure(e);
225 throw ServiceStateException.convert(e);
226 } finally {
227 //report that the service has terminated
228 terminationNotification.set(true);
229 synchronized (terminationNotification) {
230 terminationNotification.notifyAll();
231 }
232 //notify anything listening for events
233 notifyListeners();
234 }
235 } else {
236 //already stopped: note it
237 if (LOG.isDebugEnabled()) {
238 LOG.debug("Ignoring re-entrant call to stop()");
239 }
240 }
241 }
242 }
243
244 /**
245 * Relay to {@link #stop()}
246 * @throws IOException
247 */
248 @Override
249 public final void close() throws IOException {
250 stop();
251 }
252
253 /**
254 * Failure handling: record the exception
255 * that triggered it -if there was not one already.
256 * Services are free to call this themselves.
257 * @param exception the exception
258 */
259 protected final void noteFailure(Exception exception) {
260 if (LOG.isDebugEnabled()) {
261 LOG.debug("noteFailure " + exception, null);
262 }
263 if (exception == null) {
264 //make sure failure logic doesn't itself cause problems
265 return;
266 }
267 //record the failure details, and log it
268 synchronized (this) {
269 if (failureCause == null) {
270 failureCause = exception;
271 failureState = getServiceState();
272 LOG.info("Service " + getName()
273 + " failed in state " + failureState
274 + "; cause: " + exception,
275 exception);
276 }
277 }
278 }
279
280 @Override
281 public final boolean waitForServiceToStop(long timeout) {
282 boolean completed = terminationNotification.get();
283 while (!completed) {
284 try {
285 synchronized(terminationNotification) {
286 terminationNotification.wait(timeout);
287 }
288 // here there has been a timeout, the object has terminated,
289 // or there has been a spurious wakeup (which we ignore)
290 completed = true;
291 } catch (InterruptedException e) {
292 // interrupted; have another look at the flag
293 completed = terminationNotification.get();
294 }
295 }
296 return terminationNotification.get();
297 }
298
299 /* ===================================================================== */
300 /* Override Points */
301 /* ===================================================================== */
302
303 /**
304 * All initialization code needed by a service.
305 *
306 * This method will only ever be called once during the lifecycle of
307 * a specific service instance.
308 *
309 * Implementations do not need to be synchronized as the logic
310 * in {@link #init(Configuration)} prevents re-entrancy.
311 *
312 * The base implementation checks to see if the subclass has created
313 * a new configuration instance, and if so, updates the base class value
314 * @param conf configuration
315 * @throws Exception on a failure -these will be caught,
316 * possibly wrapped, and wil; trigger a service stop
317 */
318 protected void serviceInit(Configuration conf) throws Exception {
319 if (conf != config) {
320 LOG.debug("Config has been overridden during init");
321 setConfig(conf);
322 }
323 }
324
325 /**
326 * Actions called during the INITED to STARTED transition.
327 *
328 * This method will only ever be called once during the lifecycle of
329 * a specific service instance.
330 *
331 * Implementations do not need to be synchronized as the logic
332 * in {@link #start()} prevents re-entrancy.
333 *
334 * @throws Exception if needed -these will be caught,
335 * wrapped, and trigger a service stop
336 */
337 protected void serviceStart() throws Exception {
338
339 }
340
341 /**
342 * Actions called during the transition to the STOPPED state.
343 *
344 * This method will only ever be called once during the lifecycle of
345 * a specific service instance.
346 *
347 * Implementations do not need to be synchronized as the logic
348 * in {@link #stop()} prevents re-entrancy.
349 *
350 * Implementations MUST write this to be robust against failures, including
351 * checks for null references -and for the first failure to not stop other
352 * attempts to shut down parts of the service.
353 *
354 * @throws Exception if needed -these will be caught and logged.
355 */
356 protected void serviceStop() throws Exception {
357
358 }
359
360 @Override
361 public void registerServiceListener(ServiceStateChangeListener l) {
362 listeners.add(l);
363 }
364
365 @Override
366 public void unregisterServiceListener(ServiceStateChangeListener l) {
367 listeners.remove(l);
368 }
369
370 /**
371 * Register a global listener, which receives notifications
372 * from the state change events of all services in the JVM
373 * @param l listener
374 */
375 public static void registerGlobalListener(ServiceStateChangeListener l) {
376 globalListeners.add(l);
377 }
378
379 /**
380 * unregister a global listener.
381 * @param l listener to unregister
382 * @return true if the listener was found (and then deleted)
383 */
384 public static boolean unregisterGlobalListener(ServiceStateChangeListener l) {
385 return globalListeners.remove(l);
386 }
387
388 /**
389 * Package-scoped method for testing -resets the global listener list
390 */
391 @VisibleForTesting
392 static void resetGlobalListeners() {
393 globalListeners.reset();
394 }
395
396 @Override
397 public String getName() {
398 return name;
399 }
400
401 @Override
402 public synchronized Configuration getConfig() {
403 return config;
404 }
405
406 @Override
407 public long getStartTime() {
408 return startTime;
409 }
410
411 /**
412 * Notify local and global listeners of state changes.
413 * Exceptions raised by listeners are NOT passed up.
414 */
415 private void notifyListeners() {
416 try {
417 listeners.notifyListeners(this);
418 globalListeners.notifyListeners(this);
419 } catch (Throwable e) {
420 LOG.warn("Exception while notifying listeners of " + this + ": " + e,
421 e);
422 }
423 }
424
425 /**
426 * Add a state change event to the lifecycle history
427 */
428 private void recordLifecycleEvent() {
429 LifecycleEvent event = new LifecycleEvent();
430 event.time = System.currentTimeMillis();
431 event.state = getServiceState();
432 lifecycleHistory.add(event);
433 }
434
435 @Override
436 public synchronized List<LifecycleEvent> getLifecycleHistory() {
437 return new ArrayList<LifecycleEvent>(lifecycleHistory);
438 }
439
440 /**
441 * Enter a state; record this via {@link #recordLifecycleEvent}
442 * and log at the info level.
443 * @param newState the proposed new state
444 * @return the original state
445 * it wasn't already in that state, and the state model permits state re-entrancy.
446 */
447 private STATE enterState(STATE newState) {
448 assert stateModel != null : "null state in " + name + " " + this.getClass();
449 STATE oldState = stateModel.enterState(newState);
450 if (oldState != newState) {
451 if (LOG.isDebugEnabled()) {
452 LOG.debug(
453 "Service: " + getName() + " entered state " + getServiceState());
454 }
455 recordLifecycleEvent();
456 }
457 return oldState;
458 }
459
460 @Override
461 public final boolean isInState(Service.STATE expected) {
462 return stateModel.isInState(expected);
463 }
464
465 @Override
466 public String toString() {
467 return "Service " + name + " in state " + stateModel;
468 }
469
470 /**
471 * Put a blocker to the blocker map -replacing any
472 * with the same name.
473 * @param name blocker name
474 * @param details any specifics on the block. This must be non-null.
475 */
476 protected void putBlocker(String name, String details) {
477 synchronized (blockerMap) {
478 blockerMap.put(name, details);
479 }
480 }
481
482 /**
483 * Remove a blocker from the blocker map -
484 * this is a no-op if the blocker is not present
485 * @param name the name of the blocker
486 */
487 public void removeBlocker(String name) {
488 synchronized (blockerMap) {
489 blockerMap.remove(name);
490 }
491 }
492
493 @Override
494 public Map<String, String> getBlockers() {
495 synchronized (blockerMap) {
496 Map<String, String> map = new HashMap<String, String>(blockerMap);
497 return map;
498 }
499 }
500 }