001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hdfs.protocol.datatransfer; 019 020import org.apache.hadoop.HadoopIllegalArgumentException; 021import org.apache.hadoop.classification.InterfaceAudience; 022import org.apache.hadoop.classification.InterfaceStability; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; 025import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 026 027/** 028 * The setting of replace-datanode-on-failure feature. 029 */ 030@InterfaceAudience.Private 031@InterfaceStability.Evolving 032public class ReplaceDatanodeOnFailure { 033 /** 034 * DEFAULT condition: 035 * Let r be the replication number. 036 * Let n be the number of existing datanodes. 037 * Add a new datanode only if r >= 3 and either 038 * (1) floor(r/2) >= n or (2) the block is hflushed/appended. 039 */ 040 private static final Condition CONDITION_DEFAULT = new Condition() { 041 @Override 042 public boolean satisfy(final short replication, 043 final DatanodeInfo[] existings, final int n, final boolean isAppend, 044 final boolean isHflushed) { 045 return replication >= 3 && 046 (n <= (replication / 2) || isAppend || isHflushed); 047 } 048 }; 049 /** Return false unconditionally. */ 050 private static final Condition CONDITION_FALSE = new Condition() { 051 @Override 052 public boolean satisfy(short replication, DatanodeInfo[] existings, 053 int nExistings, boolean isAppend, boolean isHflushed) { 054 return false; 055 } 056 }; 057 /** Return true unconditionally. */ 058 private static final Condition CONDITION_TRUE = new Condition() { 059 @Override 060 public boolean satisfy(short replication, DatanodeInfo[] existings, 061 int nExistings, boolean isAppend, boolean isHflushed) { 062 return true; 063 } 064 }; 065 066 /** The replacement policies */ 067 public enum Policy { 068 /** The feature is disabled in the entire site. */ 069 DISABLE(CONDITION_FALSE), 070 /** Never add a new datanode. */ 071 NEVER(CONDITION_FALSE), 072 /** @see ReplaceDatanodeOnFailure#CONDITION_DEFAULT */ 073 DEFAULT(CONDITION_DEFAULT), 074 /** Always add a new datanode when an existing datanode is removed. */ 075 ALWAYS(CONDITION_TRUE); 076 077 private final Condition condition; 078 079 Policy(Condition condition) { 080 this.condition = condition; 081 } 082 083 Condition getCondition() { 084 return condition; 085 } 086 } 087 088 /** Datanode replacement condition */ 089 private interface Condition { 090 091 /** Is the condition satisfied? */ 092 boolean satisfy(short replication, DatanodeInfo[] existings, int nExistings, 093 boolean isAppend, boolean isHflushed); 094 } 095 096 private final Policy policy; 097 private final boolean bestEffort; 098 099 public ReplaceDatanodeOnFailure(Policy policy, boolean bestEffort) { 100 this.policy = policy; 101 this.bestEffort = bestEffort; 102 } 103 104 /** Check if the feature is enabled. */ 105 public void checkEnabled() { 106 if (policy == Policy.DISABLE) { 107 throw new UnsupportedOperationException( 108 "This feature is disabled. Please refer to " 109 + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY 110 + " configuration property."); 111 } 112 } 113 114 /** 115 * Best effort means that the client will try to replace the failed datanode 116 * (provided that the policy is satisfied), however, it will continue the 117 * write operation in case that the datanode replacement also fails. 118 * 119 * @return Suppose the datanode replacement fails. 120 * false: An exception should be thrown so that the write will fail. 121 * true : The write should be resumed with the remaining datandoes. 122 */ 123 public boolean isBestEffort() { 124 return bestEffort; 125 } 126 127 /** Does it need a replacement according to the policy? */ 128 public boolean satisfy( 129 final short replication, final DatanodeInfo[] existings, 130 final boolean isAppend, final boolean isHflushed) { 131 final int n = existings == null ? 0 : existings.length; 132 //don't need to add datanode for any policy. 133 return !(n == 0 || n >= replication) && 134 policy.getCondition().satisfy(replication, existings, n, isAppend, 135 isHflushed); 136 } 137 138 @Override 139 public String toString() { 140 return policy.toString(); 141 } 142 143 /** Get the setting from configuration. */ 144 public static ReplaceDatanodeOnFailure get(final Configuration conf) { 145 final Policy policy = getPolicy(conf); 146 final boolean bestEffort = conf.getBoolean( 147 HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_KEY, 148 HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_DEFAULT); 149 150 return new ReplaceDatanodeOnFailure(policy, bestEffort); 151 } 152 153 private static Policy getPolicy(final Configuration conf) { 154 final boolean enabled = conf.getBoolean( 155 HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY, 156 HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_DEFAULT); 157 if (!enabled) { 158 return Policy.DISABLE; 159 } 160 161 final String policy = conf.get( 162 HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.POLICY_KEY, 163 HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.POLICY_DEFAULT); 164 for(int i = 1; i < Policy.values().length; i++) { 165 final Policy p = Policy.values()[i]; 166 if (p.name().equalsIgnoreCase(policy)) { 167 return p; 168 } 169 } 170 throw new HadoopIllegalArgumentException("Illegal configuration value for " 171 + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.POLICY_KEY 172 + ": " + policy); 173 } 174 175 /** Write the setting to configuration. */ 176 public static void write(final Policy policy, 177 final boolean bestEffort, final Configuration conf) { 178 conf.setBoolean( 179 HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY, 180 policy != Policy.DISABLE); 181 conf.set( 182 HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.POLICY_KEY, 183 policy.name()); 184 conf.setBoolean( 185 HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_KEY, 186 bestEffort); 187 } 188}