001/**
002* Licensed to the Apache Software Foundation (ASF) under one
003* or more contributor license agreements.  See the NOTICE file
004* distributed with this work for additional information
005* regarding copyright ownership.  The ASF licenses this file
006* to you under the Apache License, Version 2.0 (the
007* "License"); you may not use this file except in compliance
008* with the License.  You may obtain a copy of the License at
009*
010*     http://www.apache.org/licenses/LICENSE-2.0
011*
012* Unless required by applicable law or agreed to in writing, software
013* distributed under the License is distributed on an "AS IS" BASIS,
014* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015* See the License for the specific language governing permissions and
016* limitations under the License.
017*/
018
019package org.apache.hadoop.service;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.HashMap;
024import java.util.List;
025import java.util.Map;
026import java.util.concurrent.atomic.AtomicBoolean;
027
028import org.apache.commons.logging.Log;
029import org.apache.commons.logging.LogFactory;
030import org.apache.hadoop.classification.InterfaceAudience.Public;
031import org.apache.hadoop.classification.InterfaceStability.Evolving;
032import org.apache.hadoop.conf.Configuration;
033
034import com.google.common.annotations.VisibleForTesting;
035
036/**
037 * This is the base implementation class for services.
038 */
039@Public
040@Evolving
041public abstract class AbstractService implements Service {
042
043  private static final Log LOG = LogFactory.getLog(AbstractService.class);
044
045  /**
046   * Service name.
047   */
048  private final String name;
049
050  /** service state */
051  private final ServiceStateModel stateModel;
052
053  /**
054   * Service start time. Will be zero until the service is started.
055   */
056  private long startTime;
057
058  /**
059   * The configuration. Will be null until the service is initialized.
060   */
061  private volatile Configuration config;
062
063  /**
064   * List of state change listeners; it is final to ensure
065   * that it will never be null.
066   */
067  private final ServiceOperations.ServiceListeners listeners
068    = new ServiceOperations.ServiceListeners();
069  /**
070   * Static listeners to all events across all services
071   */
072  private static ServiceOperations.ServiceListeners globalListeners
073    = new ServiceOperations.ServiceListeners();
074
075  /**
076   * The cause of any failure -will be null.
077   * if a service did not stop due to a failure.
078   */
079  private Exception failureCause;
080
081  /**
082   * the state in which the service was when it failed.
083   * Only valid when the service is stopped due to a failure
084   */
085  private STATE failureState = null;
086
087  /**
088   * object used to co-ordinate {@link #waitForServiceToStop(long)}
089   * across threads.
090   */
091  private final AtomicBoolean terminationNotification =
092    new AtomicBoolean(false);
093
094  /**
095   * History of lifecycle transitions
096   */
097  private final List<LifecycleEvent> lifecycleHistory
098    = new ArrayList<LifecycleEvent>(5);
099
100  /**
101   * Map of blocking dependencies
102   */
103  private final Map<String,String> blockerMap = new HashMap<String, String>();
104
105  private final Object stateChangeLock = new Object();
106 
107  /**
108   * Construct the service.
109   * @param name service name
110   */
111  public AbstractService(String name) {
112    this.name = name;
113    stateModel = new ServiceStateModel(name);
114  }
115
116  @Override
117  public final STATE getServiceState() {
118    return stateModel.getState();
119  }
120
121  @Override
122  public final synchronized Throwable getFailureCause() {
123    return failureCause;
124  }
125
126  @Override
127  public synchronized STATE getFailureState() {
128    return failureState;
129  }
130
131  /**
132   * Set the configuration for this service.
133   * This method is called during {@link #init(Configuration)}
134   * and should only be needed if for some reason a service implementation
135   * needs to override that initial setting -for example replacing
136   * it with a new subclass of {@link Configuration}
137   * @param conf new configuration.
138   */
139  protected void setConfig(Configuration conf) {
140    this.config = conf;
141  }
142
143  /**
144   * {@inheritDoc}
145   * This invokes {@link #serviceInit}
146   * @param conf the configuration of the service. This must not be null
147   * @throws ServiceStateException if the configuration was null,
148   * the state change not permitted, or something else went wrong
149   */
150  @Override
151  public void init(Configuration conf) {
152    if (conf == null) {
153      throw new ServiceStateException("Cannot initialize service "
154                                      + getName() + ": null configuration");
155    }
156    if (isInState(STATE.INITED)) {
157      return;
158    }
159    synchronized (stateChangeLock) {
160      if (enterState(STATE.INITED) != STATE.INITED) {
161        setConfig(conf);
162        try {
163          serviceInit(config);
164          if (isInState(STATE.INITED)) {
165            //if the service ended up here during init,
166            //notify the listeners
167            notifyListeners();
168          }
169        } catch (Exception e) {
170          noteFailure(e);
171          ServiceOperations.stopQuietly(LOG, this);
172          throw ServiceStateException.convert(e);
173        }
174      }
175    }
176  }
177
178  /**
179   * {@inheritDoc}
180   * @throws ServiceStateException if the current service state does not permit
181   * this action
182   */
183  @Override
184  public void start() {
185    if (isInState(STATE.STARTED)) {
186      return;
187    }
188    //enter the started state
189    synchronized (stateChangeLock) {
190      if (stateModel.enterState(STATE.STARTED) != STATE.STARTED) {
191        try {
192          startTime = System.currentTimeMillis();
193          serviceStart();
194          if (isInState(STATE.STARTED)) {
195            //if the service started (and isn't now in a later state), notify
196            if (LOG.isDebugEnabled()) {
197              LOG.debug("Service " + getName() + " is started");
198            }
199            notifyListeners();
200          }
201        } catch (Exception e) {
202          noteFailure(e);
203          ServiceOperations.stopQuietly(LOG, this);
204          throw ServiceStateException.convert(e);
205        }
206      }
207    }
208  }
209
210  /**
211   * {@inheritDoc}
212   */
213  @Override
214  public void stop() {
215    if (isInState(STATE.STOPPED)) {
216      return;
217    }
218    synchronized (stateChangeLock) {
219      if (enterState(STATE.STOPPED) != STATE.STOPPED) {
220        try {
221          serviceStop();
222        } catch (Exception e) {
223          //stop-time exceptions are logged if they are the first one,
224          noteFailure(e);
225          throw ServiceStateException.convert(e);
226        } finally {
227          //report that the service has terminated
228          terminationNotification.set(true);
229          synchronized (terminationNotification) {
230            terminationNotification.notifyAll();
231          }
232          //notify anything listening for events
233          notifyListeners();
234        }
235      } else {
236        //already stopped: note it
237        if (LOG.isDebugEnabled()) {
238          LOG.debug("Ignoring re-entrant call to stop()");
239        }
240      }
241    }
242  }
243
244  /**
245   * Relay to {@link #stop()}
246   * @throws IOException
247   */
248  @Override
249  public final void close() throws IOException {
250    stop();
251  }
252
253  /**
254   * Failure handling: record the exception
255   * that triggered it -if there was not one already.
256   * Services are free to call this themselves.
257   * @param exception the exception
258   */
259  protected final void noteFailure(Exception exception) {
260    if (LOG.isDebugEnabled()) {
261      LOG.debug("noteFailure " + exception, null);
262    }
263    if (exception == null) {
264      //make sure failure logic doesn't itself cause problems
265      return;
266    }
267    //record the failure details, and log it
268    synchronized (this) {
269      if (failureCause == null) {
270        failureCause = exception;
271        failureState = getServiceState();
272        LOG.info("Service " + getName()
273                 + " failed in state " + failureState
274                 + "; cause: " + exception,
275                 exception);
276      }
277    }
278  }
279
280  @Override
281  public final boolean waitForServiceToStop(long timeout) {
282    boolean completed = terminationNotification.get();
283    while (!completed) {
284      try {
285        synchronized(terminationNotification) {
286          terminationNotification.wait(timeout);
287        }
288        // here there has been a timeout, the object has terminated,
289        // or there has been a spurious wakeup (which we ignore)
290        completed = true;
291      } catch (InterruptedException e) {
292        // interrupted; have another look at the flag
293        completed = terminationNotification.get();
294      }
295    }
296    return terminationNotification.get();
297  }
298
299  /* ===================================================================== */
300  /* Override Points */
301  /* ===================================================================== */
302
303  /**
304   * All initialization code needed by a service.
305   *
306   * This method will only ever be called once during the lifecycle of
307   * a specific service instance.
308   *
309   * Implementations do not need to be synchronized as the logic
310   * in {@link #init(Configuration)} prevents re-entrancy.
311   *
312   * The base implementation checks to see if the subclass has created
313   * a new configuration instance, and if so, updates the base class value
314   * @param conf configuration
315   * @throws Exception on a failure -these will be caught,
316   * possibly wrapped, and wil; trigger a service stop
317   */
318  protected void serviceInit(Configuration conf) throws Exception {
319    if (conf != config) {
320      LOG.debug("Config has been overridden during init");
321      setConfig(conf);
322    }
323  }
324
325  /**
326   * Actions called during the INITED to STARTED transition.
327   *
328   * This method will only ever be called once during the lifecycle of
329   * a specific service instance.
330   *
331   * Implementations do not need to be synchronized as the logic
332   * in {@link #start()} prevents re-entrancy.
333   *
334   * @throws Exception if needed -these will be caught,
335   * wrapped, and trigger a service stop
336   */
337  protected void serviceStart() throws Exception {
338
339  }
340
341  /**
342   * Actions called during the transition to the STOPPED state.
343   *
344   * This method will only ever be called once during the lifecycle of
345   * a specific service instance.
346   *
347   * Implementations do not need to be synchronized as the logic
348   * in {@link #stop()} prevents re-entrancy.
349   *
350   * Implementations MUST write this to be robust against failures, including
351   * checks for null references -and for the first failure to not stop other
352   * attempts to shut down parts of the service.
353   *
354   * @throws Exception if needed -these will be caught and logged.
355   */
356  protected void serviceStop() throws Exception {
357
358  }
359
360  @Override
361  public void registerServiceListener(ServiceStateChangeListener l) {
362    listeners.add(l);
363  }
364
365  @Override
366  public void unregisterServiceListener(ServiceStateChangeListener l) {
367    listeners.remove(l);
368  }
369
370  /**
371   * Register a global listener, which receives notifications
372   * from the state change events of all services in the JVM
373   * @param l listener
374   */
375  public static void registerGlobalListener(ServiceStateChangeListener l) {
376    globalListeners.add(l);
377  }
378
379  /**
380   * unregister a global listener.
381   * @param l listener to unregister
382   * @return true if the listener was found (and then deleted)
383   */
384  public static boolean unregisterGlobalListener(ServiceStateChangeListener l) {
385    return globalListeners.remove(l);
386  }
387
388  /**
389   * Package-scoped method for testing -resets the global listener list
390   */
391  @VisibleForTesting
392  static void resetGlobalListeners() {
393    globalListeners.reset();
394  }
395
396  @Override
397  public String getName() {
398    return name;
399  }
400
401  @Override
402  public synchronized Configuration getConfig() {
403    return config;
404  }
405
406  @Override
407  public long getStartTime() {
408    return startTime;
409  }
410
411  /**
412   * Notify local and global listeners of state changes.
413   * Exceptions raised by listeners are NOT passed up.
414   */
415  private void notifyListeners() {
416    try {
417      listeners.notifyListeners(this);
418      globalListeners.notifyListeners(this);
419    } catch (Throwable e) {
420      LOG.warn("Exception while notifying listeners of " + this + ": " + e,
421               e);
422    }
423  }
424
425  /**
426   * Add a state change event to the lifecycle history
427   */
428  private void recordLifecycleEvent() {
429    LifecycleEvent event = new LifecycleEvent();
430    event.time = System.currentTimeMillis();
431    event.state = getServiceState();
432    lifecycleHistory.add(event);
433  }
434
435  @Override
436  public synchronized List<LifecycleEvent> getLifecycleHistory() {
437    return new ArrayList<LifecycleEvent>(lifecycleHistory);
438  }
439
440  /**
441   * Enter a state; record this via {@link #recordLifecycleEvent}
442   * and log at the info level.
443   * @param newState the proposed new state
444   * @return the original state
445   * it wasn't already in that state, and the state model permits state re-entrancy.
446   */
447  private STATE enterState(STATE newState) {
448    assert stateModel != null : "null state in " + name + " " + this.getClass();
449    STATE oldState = stateModel.enterState(newState);
450    if (oldState != newState) {
451      if (LOG.isDebugEnabled()) {
452        LOG.debug(
453          "Service: " + getName() + " entered state " + getServiceState());
454      }
455      recordLifecycleEvent();
456    }
457    return oldState;
458  }
459
460  @Override
461  public final boolean isInState(Service.STATE expected) {
462    return stateModel.isInState(expected);
463  }
464
465  @Override
466  public String toString() {
467    return "Service " + name + " in state " + stateModel;
468  }
469
470  /**
471   * Put a blocker to the blocker map -replacing any
472   * with the same name.
473   * @param name blocker name
474   * @param details any specifics on the block. This must be non-null.
475   */
476  protected void putBlocker(String name, String details) {
477    synchronized (blockerMap) {
478      blockerMap.put(name, details);
479    }
480  }
481
482  /**
483   * Remove a blocker from the blocker map -
484   * this is a no-op if the blocker is not present
485   * @param name the name of the blocker
486   */
487  public void removeBlocker(String name) {
488    synchronized (blockerMap) {
489      blockerMap.remove(name);
490    }
491  }
492
493  @Override
494  public Map<String, String> getBlockers() {
495    synchronized (blockerMap) {
496      Map<String, String> map = new HashMap<String, String>(blockerMap);
497      return map;
498    }
499  }
500}