001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hdfs.protocol.datatransfer;
019
020import org.apache.hadoop.HadoopIllegalArgumentException;
021import org.apache.hadoop.classification.InterfaceAudience;
022import org.apache.hadoop.classification.InterfaceStability;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
025import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
026
027/**
028 * The setting of replace-datanode-on-failure feature.
029 */
030@InterfaceAudience.Private
031@InterfaceStability.Evolving
032public class ReplaceDatanodeOnFailure {
033  /**
034   * DEFAULT condition:
035   *   Let r be the replication number.
036   *   Let n be the number of existing datanodes.
037   *   Add a new datanode only if r >= 3 and either
038   *   (1) floor(r/2) >= n or (2) the block is hflushed/appended.
039   */
040  private static final Condition CONDITION_DEFAULT = new Condition() {
041    @Override
042    public boolean satisfy(final short replication,
043        final DatanodeInfo[] existings, final int n, final boolean isAppend,
044        final boolean isHflushed) {
045      return replication >= 3 &&
046          (n <= (replication / 2) || isAppend || isHflushed);
047    }
048  };
049  /** Return false unconditionally. */
050  private static final Condition CONDITION_FALSE = new Condition() {
051    @Override
052    public boolean satisfy(short replication, DatanodeInfo[] existings,
053        int nExistings, boolean isAppend, boolean isHflushed) {
054      return false;
055    }
056  };
057  /** Return true unconditionally. */
058  private static final Condition CONDITION_TRUE = new Condition() {
059    @Override
060    public boolean satisfy(short replication, DatanodeInfo[] existings,
061        int nExistings, boolean isAppend, boolean isHflushed) {
062      return true;
063    }
064  };
065
066  /** The replacement policies */
067  public enum Policy {
068    /** The feature is disabled in the entire site. */
069    DISABLE(CONDITION_FALSE),
070    /** Never add a new datanode. */
071    NEVER(CONDITION_FALSE),
072    /** @see ReplaceDatanodeOnFailure#CONDITION_DEFAULT */
073    DEFAULT(CONDITION_DEFAULT),
074    /** Always add a new datanode when an existing datanode is removed. */
075    ALWAYS(CONDITION_TRUE);
076
077    private final Condition condition;
078
079    Policy(Condition condition) {
080      this.condition = condition;
081    }
082
083    Condition getCondition() {
084      return condition;
085    }
086  }
087
088  /** Datanode replacement condition */
089  private interface Condition {
090
091    /** Is the condition satisfied? */
092    boolean satisfy(short replication, DatanodeInfo[] existings, int nExistings,
093                    boolean isAppend, boolean isHflushed);
094  }
095
096  private final Policy policy;
097  private final boolean bestEffort;
098
099  public ReplaceDatanodeOnFailure(Policy policy, boolean bestEffort) {
100    this.policy = policy;
101    this.bestEffort = bestEffort;
102  }
103
104  /** Check if the feature is enabled. */
105  public void checkEnabled() {
106    if (policy == Policy.DISABLE) {
107      throw new UnsupportedOperationException(
108          "This feature is disabled.  Please refer to "
109          + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY
110          + " configuration property.");
111    }
112  }
113
114  /**
115   * Best effort means that the client will try to replace the failed datanode
116   * (provided that the policy is satisfied), however, it will continue the
117   * write operation in case that the datanode replacement also fails.
118   *
119   * @return Suppose the datanode replacement fails.
120   *     false: An exception should be thrown so that the write will fail.
121   *     true : The write should be resumed with the remaining datandoes.
122   */
123  public boolean isBestEffort() {
124    return bestEffort;
125  }
126
127  /** Does it need a replacement according to the policy? */
128  public boolean satisfy(
129      final short replication, final DatanodeInfo[] existings,
130      final boolean isAppend, final boolean isHflushed) {
131    final int n = existings == null ? 0 : existings.length;
132    //don't need to add datanode for any policy.
133    return !(n == 0 || n >= replication) &&
134        policy.getCondition().satisfy(replication, existings, n, isAppend,
135            isHflushed);
136  }
137
138  @Override
139  public String toString() {
140    return policy.toString();
141  }
142
143  /** Get the setting from configuration. */
144  public static ReplaceDatanodeOnFailure get(final Configuration conf) {
145    final Policy policy = getPolicy(conf);
146    final boolean bestEffort = conf.getBoolean(
147        HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_KEY,
148        HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_DEFAULT);
149
150    return new ReplaceDatanodeOnFailure(policy, bestEffort);
151  }
152
153  private static Policy getPolicy(final Configuration conf) {
154    final boolean enabled = conf.getBoolean(
155        HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY,
156        HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_DEFAULT);
157    if (!enabled) {
158      return Policy.DISABLE;
159    }
160
161    final String policy = conf.get(
162        HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.POLICY_KEY,
163        HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.POLICY_DEFAULT);
164    for(int i = 1; i < Policy.values().length; i++) {
165      final Policy p = Policy.values()[i];
166      if (p.name().equalsIgnoreCase(policy)) {
167        return p;
168      }
169    }
170    throw new HadoopIllegalArgumentException("Illegal configuration value for "
171        + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.POLICY_KEY
172        + ": " + policy);
173  }
174
175  /** Write the setting to configuration. */
176  public static void write(final Policy policy,
177      final boolean bestEffort, final Configuration conf) {
178    conf.setBoolean(
179        HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY,
180        policy != Policy.DISABLE);
181    conf.set(
182        HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.POLICY_KEY,
183        policy.name());
184    conf.setBoolean(
185        HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_KEY,
186        bestEffort);
187  }
188}