001    /**
002    * Licensed to the Apache Software Foundation (ASF) under one
003    * or more contributor license agreements.  See the NOTICE file
004    * distributed with this work for additional information
005    * regarding copyright ownership.  The ASF licenses this file
006    * to you under the Apache License, Version 2.0 (the
007    * "License"); you may not use this file except in compliance
008    * with the License.  You may obtain a copy of the License at
009    *
010    *     http://www.apache.org/licenses/LICENSE-2.0
011    *
012    * Unless required by applicable law or agreed to in writing, software
013    * distributed under the License is distributed on an "AS IS" BASIS,
014    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015    * See the License for the specific language governing permissions and
016    * limitations under the License.
017    */
018    
019    package org.apache.hadoop.service;
020    
021    import java.io.IOException;
022    import java.util.ArrayList;
023    import java.util.HashMap;
024    import java.util.List;
025    import java.util.Map;
026    import java.util.concurrent.atomic.AtomicBoolean;
027    
028    import org.apache.commons.logging.Log;
029    import org.apache.commons.logging.LogFactory;
030    import org.apache.hadoop.classification.InterfaceAudience.Public;
031    import org.apache.hadoop.classification.InterfaceStability.Evolving;
032    import org.apache.hadoop.conf.Configuration;
033    
034    import com.google.common.annotations.VisibleForTesting;
035    
036    /**
037     * This is the base implementation class for services.
038     */
039    @Public
040    @Evolving
041    public abstract class AbstractService implements Service {
042    
043      private static final Log LOG = LogFactory.getLog(AbstractService.class);
044    
045      /**
046       * Service name.
047       */
048      private final String name;
049    
050      /** service state */
051      private final ServiceStateModel stateModel;
052    
053      /**
054       * Service start time. Will be zero until the service is started.
055       */
056      private long startTime;
057    
058      /**
059       * The configuration. Will be null until the service is initialized.
060       */
061      private volatile Configuration config;
062    
063      /**
064       * List of state change listeners; it is final to ensure
065       * that it will never be null.
066       */
067      private final ServiceOperations.ServiceListeners listeners
068        = new ServiceOperations.ServiceListeners();
069      /**
070       * Static listeners to all events across all services
071       */
072      private static ServiceOperations.ServiceListeners globalListeners
073        = new ServiceOperations.ServiceListeners();
074    
075      /**
076       * The cause of any failure -will be null.
077       * if a service did not stop due to a failure.
078       */
079      private Exception failureCause;
080    
081      /**
082       * the state in which the service was when it failed.
083       * Only valid when the service is stopped due to a failure
084       */
085      private STATE failureState = null;
086    
087      /**
088       * object used to co-ordinate {@link #waitForServiceToStop(long)}
089       * across threads.
090       */
091      private final AtomicBoolean terminationNotification =
092        new AtomicBoolean(false);
093    
094      /**
095       * History of lifecycle transitions
096       */
097      private final List<LifecycleEvent> lifecycleHistory
098        = new ArrayList<LifecycleEvent>(5);
099    
100      /**
101       * Map of blocking dependencies
102       */
103      private final Map<String,String> blockerMap = new HashMap<String, String>();
104    
105      private final Object stateChangeLock = new Object();
106     
107      /**
108       * Construct the service.
109       * @param name service name
110       */
111      public AbstractService(String name) {
112        this.name = name;
113        stateModel = new ServiceStateModel(name);
114      }
115    
116      @Override
117      public final STATE getServiceState() {
118        return stateModel.getState();
119      }
120    
121      @Override
122      public final synchronized Throwable getFailureCause() {
123        return failureCause;
124      }
125    
126      @Override
127      public synchronized STATE getFailureState() {
128        return failureState;
129      }
130    
131      /**
132       * Set the configuration for this service.
133       * This method is called during {@link #init(Configuration)}
134       * and should only be needed if for some reason a service implementation
135       * needs to override that initial setting -for example replacing
136       * it with a new subclass of {@link Configuration}
137       * @param conf new configuration.
138       */
139      protected void setConfig(Configuration conf) {
140        this.config = conf;
141      }
142    
143      /**
144       * {@inheritDoc}
145       * This invokes {@link #serviceInit}
146       * @param conf the configuration of the service. This must not be null
147       * @throws ServiceStateException if the configuration was null,
148       * the state change not permitted, or something else went wrong
149       */
150      @Override
151      public void init(Configuration conf) {
152        if (conf == null) {
153          throw new ServiceStateException("Cannot initialize service "
154                                          + getName() + ": null configuration");
155        }
156        if (isInState(STATE.INITED)) {
157          return;
158        }
159        synchronized (stateChangeLock) {
160          if (enterState(STATE.INITED) != STATE.INITED) {
161            setConfig(conf);
162            try {
163              serviceInit(config);
164              if (isInState(STATE.INITED)) {
165                //if the service ended up here during init,
166                //notify the listeners
167                notifyListeners();
168              }
169            } catch (Exception e) {
170              noteFailure(e);
171              ServiceOperations.stopQuietly(LOG, this);
172              throw ServiceStateException.convert(e);
173            }
174          }
175        }
176      }
177    
178      /**
179       * {@inheritDoc}
180       * @throws ServiceStateException if the current service state does not permit
181       * this action
182       */
183      @Override
184      public void start() {
185        if (isInState(STATE.STARTED)) {
186          return;
187        }
188        //enter the started state
189        synchronized (stateChangeLock) {
190          if (stateModel.enterState(STATE.STARTED) != STATE.STARTED) {
191            try {
192              startTime = System.currentTimeMillis();
193              serviceStart();
194              if (isInState(STATE.STARTED)) {
195                //if the service started (and isn't now in a later state), notify
196                if (LOG.isDebugEnabled()) {
197                  LOG.debug("Service " + getName() + " is started");
198                }
199                notifyListeners();
200              }
201            } catch (Exception e) {
202              noteFailure(e);
203              ServiceOperations.stopQuietly(LOG, this);
204              throw ServiceStateException.convert(e);
205            }
206          }
207        }
208      }
209    
210      /**
211       * {@inheritDoc}
212       */
213      @Override
214      public void stop() {
215        if (isInState(STATE.STOPPED)) {
216          return;
217        }
218        synchronized (stateChangeLock) {
219          if (enterState(STATE.STOPPED) != STATE.STOPPED) {
220            try {
221              serviceStop();
222            } catch (Exception e) {
223              //stop-time exceptions are logged if they are the first one,
224              noteFailure(e);
225              throw ServiceStateException.convert(e);
226            } finally {
227              //report that the service has terminated
228              terminationNotification.set(true);
229              synchronized (terminationNotification) {
230                terminationNotification.notifyAll();
231              }
232              //notify anything listening for events
233              notifyListeners();
234            }
235          } else {
236            //already stopped: note it
237            if (LOG.isDebugEnabled()) {
238              LOG.debug("Ignoring re-entrant call to stop()");
239            }
240          }
241        }
242      }
243    
244      /**
245       * Relay to {@link #stop()}
246       * @throws IOException
247       */
248      @Override
249      public final void close() throws IOException {
250        stop();
251      }
252    
253      /**
254       * Failure handling: record the exception
255       * that triggered it -if there was not one already.
256       * Services are free to call this themselves.
257       * @param exception the exception
258       */
259      protected final void noteFailure(Exception exception) {
260        if (LOG.isDebugEnabled()) {
261          LOG.debug("noteFailure " + exception, null);
262        }
263        if (exception == null) {
264          //make sure failure logic doesn't itself cause problems
265          return;
266        }
267        //record the failure details, and log it
268        synchronized (this) {
269          if (failureCause == null) {
270            failureCause = exception;
271            failureState = getServiceState();
272            LOG.info("Service " + getName()
273                     + " failed in state " + failureState
274                     + "; cause: " + exception,
275                     exception);
276          }
277        }
278      }
279    
280      @Override
281      public final boolean waitForServiceToStop(long timeout) {
282        boolean completed = terminationNotification.get();
283        while (!completed) {
284          try {
285            synchronized(terminationNotification) {
286              terminationNotification.wait(timeout);
287            }
288            // here there has been a timeout, the object has terminated,
289            // or there has been a spurious wakeup (which we ignore)
290            completed = true;
291          } catch (InterruptedException e) {
292            // interrupted; have another look at the flag
293            completed = terminationNotification.get();
294          }
295        }
296        return terminationNotification.get();
297      }
298    
299      /* ===================================================================== */
300      /* Override Points */
301      /* ===================================================================== */
302    
303      /**
304       * All initialization code needed by a service.
305       *
306       * This method will only ever be called once during the lifecycle of
307       * a specific service instance.
308       *
309       * Implementations do not need to be synchronized as the logic
310       * in {@link #init(Configuration)} prevents re-entrancy.
311       *
312       * The base implementation checks to see if the subclass has created
313       * a new configuration instance, and if so, updates the base class value
314       * @param conf configuration
315       * @throws Exception on a failure -these will be caught,
316       * possibly wrapped, and wil; trigger a service stop
317       */
318      protected void serviceInit(Configuration conf) throws Exception {
319        if (conf != config) {
320          LOG.debug("Config has been overridden during init");
321          setConfig(conf);
322        }
323      }
324    
325      /**
326       * Actions called during the INITED to STARTED transition.
327       *
328       * This method will only ever be called once during the lifecycle of
329       * a specific service instance.
330       *
331       * Implementations do not need to be synchronized as the logic
332       * in {@link #start()} prevents re-entrancy.
333       *
334       * @throws Exception if needed -these will be caught,
335       * wrapped, and trigger a service stop
336       */
337      protected void serviceStart() throws Exception {
338    
339      }
340    
341      /**
342       * Actions called during the transition to the STOPPED state.
343       *
344       * This method will only ever be called once during the lifecycle of
345       * a specific service instance.
346       *
347       * Implementations do not need to be synchronized as the logic
348       * in {@link #stop()} prevents re-entrancy.
349       *
350       * Implementations MUST write this to be robust against failures, including
351       * checks for null references -and for the first failure to not stop other
352       * attempts to shut down parts of the service.
353       *
354       * @throws Exception if needed -these will be caught and logged.
355       */
356      protected void serviceStop() throws Exception {
357    
358      }
359    
360      @Override
361      public void registerServiceListener(ServiceStateChangeListener l) {
362        listeners.add(l);
363      }
364    
365      @Override
366      public void unregisterServiceListener(ServiceStateChangeListener l) {
367        listeners.remove(l);
368      }
369    
370      /**
371       * Register a global listener, which receives notifications
372       * from the state change events of all services in the JVM
373       * @param l listener
374       */
375      public static void registerGlobalListener(ServiceStateChangeListener l) {
376        globalListeners.add(l);
377      }
378    
379      /**
380       * unregister a global listener.
381       * @param l listener to unregister
382       * @return true if the listener was found (and then deleted)
383       */
384      public static boolean unregisterGlobalListener(ServiceStateChangeListener l) {
385        return globalListeners.remove(l);
386      }
387    
388      /**
389       * Package-scoped method for testing -resets the global listener list
390       */
391      @VisibleForTesting
392      static void resetGlobalListeners() {
393        globalListeners.reset();
394      }
395    
396      @Override
397      public String getName() {
398        return name;
399      }
400    
401      @Override
402      public synchronized Configuration getConfig() {
403        return config;
404      }
405    
406      @Override
407      public long getStartTime() {
408        return startTime;
409      }
410    
411      /**
412       * Notify local and global listeners of state changes.
413       * Exceptions raised by listeners are NOT passed up.
414       */
415      private void notifyListeners() {
416        try {
417          listeners.notifyListeners(this);
418          globalListeners.notifyListeners(this);
419        } catch (Throwable e) {
420          LOG.warn("Exception while notifying listeners of " + this + ": " + e,
421                   e);
422        }
423      }
424    
425      /**
426       * Add a state change event to the lifecycle history
427       */
428      private void recordLifecycleEvent() {
429        LifecycleEvent event = new LifecycleEvent();
430        event.time = System.currentTimeMillis();
431        event.state = getServiceState();
432        lifecycleHistory.add(event);
433      }
434    
435      @Override
436      public synchronized List<LifecycleEvent> getLifecycleHistory() {
437        return new ArrayList<LifecycleEvent>(lifecycleHistory);
438      }
439    
440      /**
441       * Enter a state; record this via {@link #recordLifecycleEvent}
442       * and log at the info level.
443       * @param newState the proposed new state
444       * @return the original state
445       * it wasn't already in that state, and the state model permits state re-entrancy.
446       */
447      private STATE enterState(STATE newState) {
448        assert stateModel != null : "null state in " + name + " " + this.getClass();
449        STATE oldState = stateModel.enterState(newState);
450        if (oldState != newState) {
451          if (LOG.isDebugEnabled()) {
452            LOG.debug(
453              "Service: " + getName() + " entered state " + getServiceState());
454          }
455          recordLifecycleEvent();
456        }
457        return oldState;
458      }
459    
460      @Override
461      public final boolean isInState(Service.STATE expected) {
462        return stateModel.isInState(expected);
463      }
464    
465      @Override
466      public String toString() {
467        return "Service " + name + " in state " + stateModel;
468      }
469    
470      /**
471       * Put a blocker to the blocker map -replacing any
472       * with the same name.
473       * @param name blocker name
474       * @param details any specifics on the block. This must be non-null.
475       */
476      protected void putBlocker(String name, String details) {
477        synchronized (blockerMap) {
478          blockerMap.put(name, details);
479        }
480      }
481    
482      /**
483       * Remove a blocker from the blocker map -
484       * this is a no-op if the blocker is not present
485       * @param name the name of the blocker
486       */
487      public void removeBlocker(String name) {
488        synchronized (blockerMap) {
489          blockerMap.remove(name);
490        }
491      }
492    
493      @Override
494      public Map<String, String> getBlockers() {
495        synchronized (blockerMap) {
496          Map<String, String> map = new HashMap<String, String>(blockerMap);
497          return map;
498        }
499      }
500    }