001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.conf;
020
021import com.google.common.annotations.VisibleForTesting;
022import com.google.common.base.Optional;
023import com.google.common.base.Preconditions;
024import com.google.common.collect.Maps;
025import org.apache.commons.logging.*;
026import org.apache.hadoop.util.Time;
027import org.apache.hadoop.conf.ReconfigurationUtil.PropertyChange;
028
029import java.io.IOException;
030import java.util.Collection;
031import java.util.Collections;
032import java.util.Map;
033
034/**
035 * Utility base class for implementing the Reconfigurable interface.
036 *
037 * Subclasses should override reconfigurePropertyImpl to change individual
038 * properties and getReconfigurableProperties to get all properties that
039 * can be changed at run time.
040 */
041public abstract class ReconfigurableBase 
042  extends Configured implements Reconfigurable {
043  
044  private static final Log LOG =
045    LogFactory.getLog(ReconfigurableBase.class);
046  // Use for testing purpose.
047  private ReconfigurationUtil reconfigurationUtil = new ReconfigurationUtil();
048
049  /** Background thread to reload configuration. */
050  private Thread reconfigThread = null;
051  private volatile boolean shouldRun = true;
052  private Object reconfigLock = new Object();
053
054  /**
055   * The timestamp when the <code>reconfigThread</code> starts.
056   */
057  private long startTime = 0;
058
059  /**
060   * The timestamp when the <code>reconfigThread</code> finishes.
061   */
062  private long endTime = 0;
063
064  /**
065   * A map of <changed property, error message>. If error message is present,
066   * it contains the messages about the error occurred when applies the particular
067   * change. Otherwise, it indicates that the change has been successfully applied.
068   */
069  private Map<PropertyChange, Optional<String>> status = null;
070
071  /**
072   * Construct a ReconfigurableBase.
073   */
074  public ReconfigurableBase() {
075    super(new Configuration());
076  }
077
078  /**
079   * Construct a ReconfigurableBase with the {@link Configuration}
080   * conf.
081   */
082  public ReconfigurableBase(Configuration conf) {
083    super((conf == null) ? new Configuration() : conf);
084  }
085
086  @VisibleForTesting
087  public void setReconfigurationUtil(ReconfigurationUtil ru) {
088    reconfigurationUtil = Preconditions.checkNotNull(ru);
089  }
090
091  /**
092   * Create a new configuration.
093   */
094  protected abstract Configuration getNewConf();
095
096  @VisibleForTesting
097  public Collection<PropertyChange> getChangedProperties(
098      Configuration newConf, Configuration oldConf) {
099    return reconfigurationUtil.parseChangedProperties(newConf, oldConf);
100  }
101
102  /**
103   * A background thread to apply configuration changes.
104   */
105  private static class ReconfigurationThread extends Thread {
106    private ReconfigurableBase parent;
107
108    ReconfigurationThread(ReconfigurableBase base) {
109      this.parent = base;
110    }
111
112    // See {@link ReconfigurationServlet#applyChanges}
113    public void run() {
114      LOG.info("Starting reconfiguration task.");
115      final Configuration oldConf = parent.getConf();
116      final Configuration newConf = parent.getNewConf();
117      final Collection<PropertyChange> changes =
118          parent.getChangedProperties(newConf, oldConf);
119      Map<PropertyChange, Optional<String>> results = Maps.newHashMap();
120      ConfigRedactor oldRedactor = new ConfigRedactor(oldConf);
121      ConfigRedactor newRedactor = new ConfigRedactor(newConf);
122      for (PropertyChange change : changes) {
123        String errorMessage = null;
124        String oldValRedacted = oldRedactor.redact(change.prop, change.oldVal);
125        String newValRedacted = newRedactor.redact(change.prop, change.newVal);
126        if (!parent.isPropertyReconfigurable(change.prop)) {
127          LOG.info(String.format(
128              "Property %s is not configurable: old value: %s, new value: %s",
129              change.prop,
130              oldValRedacted,
131              newValRedacted));
132          continue;
133        }
134        LOG.info("Change property: " + change.prop + " from \""
135            + ((change.oldVal == null) ? "<default>" : oldValRedacted)
136            + "\" to \""
137            + ((change.newVal == null) ? "<default>" : newValRedacted)
138            + "\".");
139        try {
140          String effectiveValue =
141              parent.reconfigurePropertyImpl(change.prop, change.newVal);
142          if (change.newVal != null) {
143            oldConf.set(change.prop, effectiveValue);
144          } else {
145            oldConf.unset(change.prop);
146          }
147        } catch (ReconfigurationException e) {
148          errorMessage = e.getCause().getMessage();
149        }
150        results.put(change, Optional.fromNullable(errorMessage));
151      }
152
153      synchronized (parent.reconfigLock) {
154        parent.endTime = Time.now();
155        parent.status = Collections.unmodifiableMap(results);
156        parent.reconfigThread = null;
157      }
158    }
159  }
160
161  /**
162   * Start a reconfiguration task to reload configuration in background.
163   */
164  public void startReconfigurationTask() throws IOException {
165    synchronized (reconfigLock) {
166      if (!shouldRun) {
167        String errorMessage = "The server is stopped.";
168        LOG.warn(errorMessage);
169        throw new IOException(errorMessage);
170      }
171      if (reconfigThread != null) {
172        String errorMessage = "Another reconfiguration task is running.";
173        LOG.warn(errorMessage);
174        throw new IOException(errorMessage);
175      }
176      reconfigThread = new ReconfigurationThread(this);
177      reconfigThread.setDaemon(true);
178      reconfigThread.setName("Reconfiguration Task");
179      reconfigThread.start();
180      startTime = Time.now();
181    }
182  }
183
184  public ReconfigurationTaskStatus getReconfigurationTaskStatus() {
185    synchronized (reconfigLock) {
186      if (reconfigThread != null) {
187        return new ReconfigurationTaskStatus(startTime, 0, null);
188      }
189      return new ReconfigurationTaskStatus(startTime, endTime, status);
190    }
191  }
192
193  public void shutdownReconfigurationTask() {
194    Thread tempThread;
195    synchronized (reconfigLock) {
196      shouldRun = false;
197      if (reconfigThread == null) {
198        return;
199      }
200      tempThread = reconfigThread;
201      reconfigThread = null;
202    }
203
204    try {
205      tempThread.join();
206    } catch (InterruptedException e) {
207    }
208  }
209
210  /**
211   * {@inheritDoc}
212   *
213   * This method makes the change to this objects {@link Configuration}
214   * and calls reconfigurePropertyImpl to update internal data structures.
215   * This method cannot be overridden, subclasses should instead override
216   * reconfigureProperty.
217   */
218  @Override
219  public final void reconfigureProperty(String property, String newVal)
220    throws ReconfigurationException {
221    if (isPropertyReconfigurable(property)) {
222      LOG.info("changing property " + property + " to " + newVal);
223      synchronized(getConf()) {
224        getConf().get(property);
225        String effectiveValue = reconfigurePropertyImpl(property, newVal);
226        if (newVal != null) {
227          getConf().set(property, effectiveValue);
228        } else {
229          getConf().unset(property);
230        }
231      }
232    } else {
233      throw new ReconfigurationException(property, newVal,
234                                             getConf().get(property));
235    }
236  }
237
238  /**
239   * {@inheritDoc}
240   *
241   * Subclasses must override this.
242   */
243  @Override 
244  public abstract Collection<String> getReconfigurableProperties();
245
246
247  /**
248   * {@inheritDoc}
249   *
250   * Subclasses may wish to override this with a more efficient implementation.
251   */
252  @Override
253  public boolean isPropertyReconfigurable(String property) {
254    return getReconfigurableProperties().contains(property);
255  }
256
257  /**
258   * Change a configuration property.
259   *
260   * Subclasses must override this. This method applies the change to
261   * all internal data structures derived from the configuration property
262   * that is being changed. If this object owns other Reconfigurable objects
263   * reconfigureProperty should be called recursively to make sure that
264   * to make sure that the configuration of these objects is updated.
265   *
266   * @param property Name of the property that is being reconfigured.
267   * @param newVal Proposed new value of the property.
268   * @return Effective new value of the property. This may be different from
269   *         newVal.
270   *
271   * @throws ReconfigurationException if there was an error applying newVal.
272   */
273  protected abstract String reconfigurePropertyImpl(
274      String property, String newVal) throws ReconfigurationException;
275
276}