001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.service;
020    
021    import java.util.ArrayList;
022    import java.util.List;
023    
024    import org.apache.commons.logging.Log;
025    import org.apache.commons.logging.LogFactory;
026    import org.apache.hadoop.classification.InterfaceAudience.Public;
027    import org.apache.hadoop.classification.InterfaceStability.Evolving;
028    import org.apache.hadoop.conf.Configuration;
029    
030    /**
031     * Composition of services.
032     */
033    @Public
034    @Evolving
035    public class CompositeService extends AbstractService {
036    
037      private static final Log LOG = LogFactory.getLog(CompositeService.class);
038    
039      /**
040       * Policy on shutdown: attempt to close everything (purest) or
041       * only try to close started services (which assumes
042       * that the service implementations may not handle the stop() operation
043       * except when started.
044       * Irrespective of this policy, if a child service fails during
045       * its init() or start() operations, it will have stop() called on it.
046       */
047      protected static final boolean STOP_ONLY_STARTED_SERVICES = false;
048    
049      private final List<Service> serviceList = new ArrayList<Service>();
050    
051      public CompositeService(String name) {
052        super(name);
053      }
054    
055      /**
056       * Get a cloned list of services
057       * @return a list of child services at the time of invocation -
058       * added services will not be picked up.
059       */
060      public List<Service> getServices() {
061        synchronized (serviceList) {
062          return new ArrayList<Service>(serviceList);
063        }
064      }
065    
066      /**
067       * Add the passed {@link Service} to the list of services managed by this
068       * {@link CompositeService}
069       * @param service the {@link Service} to be added
070       */
071      protected void addService(Service service) {
072        if (LOG.isDebugEnabled()) {
073          LOG.debug("Adding service " + service.getName());
074        }
075        synchronized (serviceList) {
076          serviceList.add(service);
077        }
078      }
079    
080      /**
081       * If the passed object is an instance of {@link Service},
082       * add it to the list of services managed by this {@link CompositeService}
083       * @param object
084       * @return true if a service is added, false otherwise.
085       */
086      protected boolean addIfService(Object object) {
087        if (object instanceof Service) {
088          addService((Service) object);
089          return true;
090        } else {
091          return false;
092        }
093      }
094    
095      protected synchronized boolean removeService(Service service) {
096        synchronized (serviceList) {
097          return serviceList.remove(service);
098        }
099      }
100    
101      protected void serviceInit(Configuration conf) throws Exception {
102        List<Service> services = getServices();
103        if (LOG.isDebugEnabled()) {
104          LOG.debug(getName() + ": initing services, size=" + services.size());
105        }
106        for (Service service : services) {
107          service.init(conf);
108        }
109        super.serviceInit(conf);
110      }
111    
112      protected void serviceStart() throws Exception {
113        List<Service> services = getServices();
114        if (LOG.isDebugEnabled()) {
115          LOG.debug(getName() + ": starting services, size=" + services.size());
116        }
117        for (Service service : services) {
118          // start the service. If this fails that service
119          // will be stopped and an exception raised
120          service.start();
121        }
122        super.serviceStart();
123      }
124    
125      protected void serviceStop() throws Exception {
126        //stop all services that were started
127        int numOfServicesToStop = serviceList.size();
128        if (LOG.isDebugEnabled()) {
129          LOG.debug(getName() + ": stopping services, size=" + numOfServicesToStop);
130        }
131        stop(numOfServicesToStop, STOP_ONLY_STARTED_SERVICES);
132        super.serviceStop();
133      }
134    
135      /**
136       * Stop the services in reverse order
137       *
138       * @param numOfServicesStarted index from where the stop should work
139       * @param stopOnlyStartedServices flag to say "only start services that are
140       * started, not those that are NOTINITED or INITED.
141       * @throws RuntimeException the first exception raised during the
142       * stop process -<i>after all services are stopped</i>
143       */
144      private void stop(int numOfServicesStarted, boolean stopOnlyStartedServices) {
145        // stop in reverse order of start
146        Exception firstException = null;
147        List<Service> services = getServices();
148        for (int i = numOfServicesStarted - 1; i >= 0; i--) {
149          Service service = services.get(i);
150          if (LOG.isDebugEnabled()) {
151            LOG.debug("Stopping service #" + i + ": " + service);
152          }
153          STATE state = service.getServiceState();
154          //depending on the stop police
155          if (state == STATE.STARTED 
156             || (!stopOnlyStartedServices && state == STATE.INITED)) {
157            Exception ex = ServiceOperations.stopQuietly(LOG, service);
158            if (ex != null && firstException == null) {
159              firstException = ex;
160            }
161          }
162        }
163        //after stopping all services, rethrow the first exception raised
164        if (firstException != null) {
165          throw ServiceStateException.convert(firstException);
166        }
167      }
168    
169      /**
170       * JVM Shutdown hook for CompositeService which will stop the give
171       * CompositeService gracefully in case of JVM shutdown.
172       */
173      public static class CompositeServiceShutdownHook implements Runnable {
174    
175        private CompositeService compositeService;
176    
177        public CompositeServiceShutdownHook(CompositeService compositeService) {
178          this.compositeService = compositeService;
179        }
180    
181        @Override
182        public void run() {
183          ServiceOperations.stopQuietly(compositeService);
184        }
185      }
186    
187    }