001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.conf; 020 021import com.google.common.annotations.VisibleForTesting; 022import com.google.common.base.Optional; 023import com.google.common.base.Preconditions; 024import com.google.common.collect.Maps; 025import org.apache.commons.logging.*; 026import org.apache.hadoop.util.Time; 027import org.apache.hadoop.conf.ReconfigurationUtil.PropertyChange; 028 029import java.io.IOException; 030import java.util.Collection; 031import java.util.Collections; 032import java.util.Map; 033 034/** 035 * Utility base class for implementing the Reconfigurable interface. 036 * 037 * Subclasses should override reconfigurePropertyImpl to change individual 038 * properties and getReconfigurableProperties to get all properties that 039 * can be changed at run time. 040 */ 041public abstract class ReconfigurableBase 042 extends Configured implements Reconfigurable { 043 044 private static final Log LOG = 045 LogFactory.getLog(ReconfigurableBase.class); 046 // Use for testing purpose. 047 private ReconfigurationUtil reconfigurationUtil = new ReconfigurationUtil(); 048 049 /** Background thread to reload configuration. */ 050 private Thread reconfigThread = null; 051 private volatile boolean shouldRun = true; 052 private Object reconfigLock = new Object(); 053 054 /** 055 * The timestamp when the <code>reconfigThread</code> starts. 056 */ 057 private long startTime = 0; 058 059 /** 060 * The timestamp when the <code>reconfigThread</code> finishes. 061 */ 062 private long endTime = 0; 063 064 /** 065 * A map of <changed property, error message>. If error message is present, 066 * it contains the messages about the error occurred when applies the particular 067 * change. Otherwise, it indicates that the change has been successfully applied. 068 */ 069 private Map<PropertyChange, Optional<String>> status = null; 070 071 /** 072 * Construct a ReconfigurableBase. 073 */ 074 public ReconfigurableBase() { 075 super(new Configuration()); 076 } 077 078 /** 079 * Construct a ReconfigurableBase with the {@link Configuration} 080 * conf. 081 */ 082 public ReconfigurableBase(Configuration conf) { 083 super((conf == null) ? new Configuration() : conf); 084 } 085 086 @VisibleForTesting 087 public void setReconfigurationUtil(ReconfigurationUtil ru) { 088 reconfigurationUtil = Preconditions.checkNotNull(ru); 089 } 090 091 /** 092 * Create a new configuration. 093 */ 094 protected abstract Configuration getNewConf(); 095 096 @VisibleForTesting 097 public Collection<PropertyChange> getChangedProperties( 098 Configuration newConf, Configuration oldConf) { 099 return reconfigurationUtil.parseChangedProperties(newConf, oldConf); 100 } 101 102 /** 103 * A background thread to apply configuration changes. 104 */ 105 private static class ReconfigurationThread extends Thread { 106 private ReconfigurableBase parent; 107 108 ReconfigurationThread(ReconfigurableBase base) { 109 this.parent = base; 110 } 111 112 // See {@link ReconfigurationServlet#applyChanges} 113 public void run() { 114 LOG.info("Starting reconfiguration task."); 115 final Configuration oldConf = parent.getConf(); 116 final Configuration newConf = parent.getNewConf(); 117 final Collection<PropertyChange> changes = 118 parent.getChangedProperties(newConf, oldConf); 119 Map<PropertyChange, Optional<String>> results = Maps.newHashMap(); 120 ConfigRedactor oldRedactor = new ConfigRedactor(oldConf); 121 ConfigRedactor newRedactor = new ConfigRedactor(newConf); 122 for (PropertyChange change : changes) { 123 String errorMessage = null; 124 String oldValRedacted = oldRedactor.redact(change.prop, change.oldVal); 125 String newValRedacted = newRedactor.redact(change.prop, change.newVal); 126 if (!parent.isPropertyReconfigurable(change.prop)) { 127 LOG.info(String.format( 128 "Property %s is not configurable: old value: %s, new value: %s", 129 change.prop, 130 oldValRedacted, 131 newValRedacted)); 132 continue; 133 } 134 LOG.info("Change property: " + change.prop + " from \"" 135 + ((change.oldVal == null) ? "<default>" : oldValRedacted) 136 + "\" to \"" 137 + ((change.newVal == null) ? "<default>" : newValRedacted) 138 + "\"."); 139 try { 140 String effectiveValue = 141 parent.reconfigurePropertyImpl(change.prop, change.newVal); 142 if (change.newVal != null) { 143 oldConf.set(change.prop, effectiveValue); 144 } else { 145 oldConf.unset(change.prop); 146 } 147 } catch (ReconfigurationException e) { 148 errorMessage = e.getCause().getMessage(); 149 } 150 results.put(change, Optional.fromNullable(errorMessage)); 151 } 152 153 synchronized (parent.reconfigLock) { 154 parent.endTime = Time.now(); 155 parent.status = Collections.unmodifiableMap(results); 156 parent.reconfigThread = null; 157 } 158 } 159 } 160 161 /** 162 * Start a reconfiguration task to reload configuration in background. 163 */ 164 public void startReconfigurationTask() throws IOException { 165 synchronized (reconfigLock) { 166 if (!shouldRun) { 167 String errorMessage = "The server is stopped."; 168 LOG.warn(errorMessage); 169 throw new IOException(errorMessage); 170 } 171 if (reconfigThread != null) { 172 String errorMessage = "Another reconfiguration task is running."; 173 LOG.warn(errorMessage); 174 throw new IOException(errorMessage); 175 } 176 reconfigThread = new ReconfigurationThread(this); 177 reconfigThread.setDaemon(true); 178 reconfigThread.setName("Reconfiguration Task"); 179 reconfigThread.start(); 180 startTime = Time.now(); 181 } 182 } 183 184 public ReconfigurationTaskStatus getReconfigurationTaskStatus() { 185 synchronized (reconfigLock) { 186 if (reconfigThread != null) { 187 return new ReconfigurationTaskStatus(startTime, 0, null); 188 } 189 return new ReconfigurationTaskStatus(startTime, endTime, status); 190 } 191 } 192 193 public void shutdownReconfigurationTask() { 194 Thread tempThread; 195 synchronized (reconfigLock) { 196 shouldRun = false; 197 if (reconfigThread == null) { 198 return; 199 } 200 tempThread = reconfigThread; 201 reconfigThread = null; 202 } 203 204 try { 205 tempThread.join(); 206 } catch (InterruptedException e) { 207 } 208 } 209 210 /** 211 * {@inheritDoc} 212 * 213 * This method makes the change to this objects {@link Configuration} 214 * and calls reconfigurePropertyImpl to update internal data structures. 215 * This method cannot be overridden, subclasses should instead override 216 * reconfigureProperty. 217 */ 218 @Override 219 public final void reconfigureProperty(String property, String newVal) 220 throws ReconfigurationException { 221 if (isPropertyReconfigurable(property)) { 222 LOG.info("changing property " + property + " to " + newVal); 223 synchronized(getConf()) { 224 getConf().get(property); 225 String effectiveValue = reconfigurePropertyImpl(property, newVal); 226 if (newVal != null) { 227 getConf().set(property, effectiveValue); 228 } else { 229 getConf().unset(property); 230 } 231 } 232 } else { 233 throw new ReconfigurationException(property, newVal, 234 getConf().get(property)); 235 } 236 } 237 238 /** 239 * {@inheritDoc} 240 * 241 * Subclasses must override this. 242 */ 243 @Override 244 public abstract Collection<String> getReconfigurableProperties(); 245 246 247 /** 248 * {@inheritDoc} 249 * 250 * Subclasses may wish to override this with a more efficient implementation. 251 */ 252 @Override 253 public boolean isPropertyReconfigurable(String property) { 254 return getReconfigurableProperties().contains(property); 255 } 256 257 /** 258 * Change a configuration property. 259 * 260 * Subclasses must override this. This method applies the change to 261 * all internal data structures derived from the configuration property 262 * that is being changed. If this object owns other Reconfigurable objects 263 * reconfigureProperty should be called recursively to make sure that 264 * to make sure that the configuration of these objects is updated. 265 * 266 * @param property Name of the property that is being reconfigured. 267 * @param newVal Proposed new value of the property. 268 * @return Effective new value of the property. This may be different from 269 * newVal. 270 * 271 * @throws ReconfigurationException if there was an error applying newVal. 272 */ 273 protected abstract String reconfigurePropertyImpl( 274 String property, String newVal) throws ReconfigurationException; 275 276}