001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hdfs.server.blockmanagement; 019 020import java.util.*; 021 022import org.apache.hadoop.conf.Configuration; 023import org.apache.hadoop.fs.StorageType; 024import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 025import org.apache.hadoop.net.NetworkTopology; 026import org.apache.hadoop.net.NetworkTopologyWithNodeGroup; 027import org.apache.hadoop.net.Node; 028import org.apache.hadoop.net.NodeBase; 029 030/** The class is responsible for choosing the desired number of targets 031 * for placing block replicas on environment with node-group layer. 032 * The replica placement strategy is adjusted to: 033 * If the writer is on a datanode, the 1st replica is placed on the local 034 * node(or local node-group or on local rack), otherwise a random datanode. 035 * The 2nd replica is placed on a datanode that is on a different rack with 1st 036 * replica node. 037 * The 3rd replica is placed on a datanode which is on a different node-group 038 * but the same rack as the second replica node. 039 */ 040public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefault { 041 042 protected BlockPlacementPolicyWithNodeGroup() { 043 } 044 045 public void initialize(Configuration conf, FSClusterStats stats, 046 NetworkTopology clusterMap, 047 Host2NodesMap host2datanodeMap) { 048 super.initialize(conf, stats, clusterMap, host2datanodeMap); 049 } 050 051 /** 052 * choose all good favored nodes as target. 053 * If no enough targets, then choose one replica from 054 * each bad favored node's node group. 055 * @throws NotEnoughReplicasException 056 */ 057 @Override 058 protected void chooseFavouredNodes(String src, int numOfReplicas, 059 List<DatanodeDescriptor> favoredNodes, 060 Set<Node> favoriteAndExcludedNodes, long blocksize, 061 int maxNodesPerRack, List<DatanodeStorageInfo> results, 062 boolean avoidStaleNodes, EnumMap<StorageType, Integer> storageTypes) 063 throws NotEnoughReplicasException { 064 super.chooseFavouredNodes(src, numOfReplicas, favoredNodes, 065 favoriteAndExcludedNodes, blocksize, maxNodesPerRack, results, 066 avoidStaleNodes, storageTypes); 067 if (results.size() < numOfReplicas) { 068 // Not enough replicas, choose from unselected Favorednode's Nodegroup 069 for (int i = 0; 070 i < favoredNodes.size() && results.size() < numOfReplicas; i++) { 071 DatanodeDescriptor favoredNode = favoredNodes.get(i); 072 boolean chosenNode = 073 isNodeChosen(results, favoredNode); 074 if (chosenNode) { 075 continue; 076 } 077 NetworkTopologyWithNodeGroup clusterMapNodeGroup = 078 (NetworkTopologyWithNodeGroup) clusterMap; 079 // try a node on FavouredNode's node group 080 DatanodeStorageInfo target = null; 081 String scope = 082 clusterMapNodeGroup.getNodeGroup(favoredNode.getNetworkLocation()); 083 try { 084 target = 085 chooseRandom(scope, favoriteAndExcludedNodes, blocksize, 086 maxNodesPerRack, results, avoidStaleNodes, storageTypes); 087 } catch (NotEnoughReplicasException e) { 088 // catch Exception and continue with other favored nodes 089 continue; 090 } 091 if (target == null) { 092 LOG.warn("Could not find a target for file " 093 + src + " within nodegroup of favored node " + favoredNode); 094 continue; 095 } 096 favoriteAndExcludedNodes.add(target.getDatanodeDescriptor()); 097 } 098 } 099 } 100 101 private boolean isNodeChosen( 102 List<DatanodeStorageInfo> results, DatanodeDescriptor favoredNode) { 103 boolean chosenNode = false; 104 for (int j = 0; j < results.size(); j++) { 105 if (results.get(j).getDatanodeDescriptor().equals(favoredNode)) { 106 chosenNode = true; 107 break; 108 } 109 } 110 return chosenNode; 111 } 112 113 /** choose local node of <i>localMachine</i> as the target. 114 * If localMachine is not available, will fallback to nodegroup/rack 115 * when flag <i>fallbackToNodeGroupAndLocalRack</i> is set. 116 * @return the chosen node 117 */ 118 @Override 119 protected DatanodeStorageInfo chooseLocalStorage(Node localMachine, 120 Set<Node> excludedNodes, long blocksize, int maxNodesPerRack, 121 List<DatanodeStorageInfo> results, boolean avoidStaleNodes, 122 EnumMap<StorageType, Integer> storageTypes, 123 boolean fallbackToNodeGroupAndLocalRack) 124 throws NotEnoughReplicasException { 125 DatanodeStorageInfo localStorage = chooseLocalStorage(localMachine, 126 excludedNodes, blocksize, maxNodesPerRack, results, 127 avoidStaleNodes, storageTypes); 128 if (localStorage != null) { 129 return localStorage; 130 } 131 132 if (!fallbackToNodeGroupAndLocalRack) { 133 return null; 134 } 135 // try a node on local node group 136 DatanodeStorageInfo chosenStorage = chooseLocalNodeGroup( 137 (NetworkTopologyWithNodeGroup)clusterMap, localMachine, excludedNodes, 138 blocksize, maxNodesPerRack, results, avoidStaleNodes, storageTypes); 139 if (chosenStorage != null) { 140 return chosenStorage; 141 } 142 // try a node on local rack 143 return chooseLocalRack(localMachine, excludedNodes, 144 blocksize, maxNodesPerRack, results, avoidStaleNodes, storageTypes); 145 } 146 147 /** @return the node of the second replica */ 148 private static DatanodeDescriptor secondNode(Node localMachine, 149 List<DatanodeStorageInfo> results) { 150 // find the second replica 151 for(DatanodeStorageInfo nextStorage : results) { 152 DatanodeDescriptor nextNode = nextStorage.getDatanodeDescriptor(); 153 if (nextNode != localMachine) { 154 return nextNode; 155 } 156 } 157 return null; 158 } 159 160 @Override 161 protected DatanodeStorageInfo chooseLocalRack(Node localMachine, 162 Set<Node> excludedNodes, long blocksize, int maxNodesPerRack, 163 List<DatanodeStorageInfo> results, boolean avoidStaleNodes, 164 EnumMap<StorageType, Integer> storageTypes) throws 165 NotEnoughReplicasException { 166 // no local machine, so choose a random machine 167 if (localMachine == null) { 168 return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, 169 maxNodesPerRack, results, avoidStaleNodes, storageTypes); 170 } 171 172 // choose one from the local rack, but off-nodegroup 173 try { 174 final String scope = NetworkTopology.getFirstHalf(localMachine.getNetworkLocation()); 175 return chooseRandom(scope, excludedNodes, blocksize, maxNodesPerRack, 176 results, avoidStaleNodes, storageTypes); 177 } catch (NotEnoughReplicasException e1) { 178 // find the second replica 179 final DatanodeDescriptor newLocal = secondNode(localMachine, results); 180 if (newLocal != null) { 181 try { 182 return chooseRandom( 183 clusterMap.getRack(newLocal.getNetworkLocation()), excludedNodes, 184 blocksize, maxNodesPerRack, results, avoidStaleNodes, 185 storageTypes); 186 } catch(NotEnoughReplicasException e2) { 187 //otherwise randomly choose one from the network 188 return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, 189 maxNodesPerRack, results, avoidStaleNodes, storageTypes); 190 } 191 } else { 192 //otherwise randomly choose one from the network 193 return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, 194 maxNodesPerRack, results, avoidStaleNodes, storageTypes); 195 } 196 } 197 } 198 199 /** 200 * {@inheritDoc} 201 */ 202 @Override 203 protected void chooseRemoteRack(int numOfReplicas, 204 DatanodeDescriptor localMachine, Set<Node> excludedNodes, 205 long blocksize, int maxReplicasPerRack, List<DatanodeStorageInfo> results, 206 boolean avoidStaleNodes, EnumMap<StorageType, Integer> storageTypes) 207 throws NotEnoughReplicasException { 208 int oldNumOfReplicas = results.size(); 209 210 final String rackLocation = NetworkTopology.getFirstHalf( 211 localMachine.getNetworkLocation()); 212 try { 213 // randomly choose from remote racks 214 chooseRandom(numOfReplicas, "~" + rackLocation, excludedNodes, blocksize, 215 maxReplicasPerRack, results, avoidStaleNodes, storageTypes); 216 } catch (NotEnoughReplicasException e) { 217 // fall back to the local rack 218 chooseRandom(numOfReplicas - (results.size() - oldNumOfReplicas), 219 rackLocation, excludedNodes, blocksize, 220 maxReplicasPerRack, results, avoidStaleNodes, storageTypes); 221 } 222 } 223 224 /* choose one node from the nodegroup that <i>localMachine</i> is on. 225 * if no such node is available, choose one node from the nodegroup where 226 * a second replica is on. 227 * if still no such node is available, return null. 228 * @return the chosen node 229 */ 230 private DatanodeStorageInfo chooseLocalNodeGroup( 231 NetworkTopologyWithNodeGroup clusterMap, Node localMachine, 232 Set<Node> excludedNodes, long blocksize, int maxNodesPerRack, 233 List<DatanodeStorageInfo> results, boolean avoidStaleNodes, 234 EnumMap<StorageType, Integer> storageTypes) throws 235 NotEnoughReplicasException { 236 // no local machine, so choose a random machine 237 if (localMachine == null) { 238 return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, 239 maxNodesPerRack, results, avoidStaleNodes, storageTypes); 240 } 241 242 // choose one from the local node group 243 try { 244 return chooseRandom( 245 clusterMap.getNodeGroup(localMachine.getNetworkLocation()), 246 excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes, 247 storageTypes); 248 } catch (NotEnoughReplicasException e1) { 249 final DatanodeDescriptor newLocal = secondNode(localMachine, results); 250 if (newLocal != null) { 251 try { 252 return chooseRandom( 253 clusterMap.getNodeGroup(newLocal.getNetworkLocation()), 254 excludedNodes, blocksize, maxNodesPerRack, results, 255 avoidStaleNodes, storageTypes); 256 } catch(NotEnoughReplicasException e2) { 257 //otherwise return null 258 return null; 259 } 260 } else { 261 //otherwise return null 262 return null; 263 } 264 } 265 } 266 267 @Override 268 protected String getRack(final DatanodeInfo cur) { 269 String nodeGroupString = cur.getNetworkLocation(); 270 return NetworkTopology.getFirstHalf(nodeGroupString); 271 } 272 273 /** 274 * Find other nodes in the same nodegroup of <i>localMachine</i> and add them 275 * into <i>excludeNodes</i> as replica should not be duplicated for nodes 276 * within the same nodegroup 277 * @return number of new excluded nodes 278 */ 279 @Override 280 protected int addToExcludedNodes(DatanodeDescriptor chosenNode, 281 Set<Node> excludedNodes) { 282 int countOfExcludedNodes = 0; 283 String nodeGroupScope = chosenNode.getNetworkLocation(); 284 List<Node> leafNodes = clusterMap.getLeaves(nodeGroupScope); 285 for (Node leafNode : leafNodes) { 286 if (excludedNodes.add(leafNode)) { 287 // not a existing node in excludedNodes 288 countOfExcludedNodes++; 289 } 290 } 291 292 countOfExcludedNodes += addDependentNodesToExcludedNodes( 293 chosenNode, excludedNodes); 294 return countOfExcludedNodes; 295 } 296 297 /** 298 * Add all nodes from a dependent nodes list to excludedNodes. 299 * @return number of new excluded nodes 300 */ 301 private int addDependentNodesToExcludedNodes(DatanodeDescriptor chosenNode, 302 Set<Node> excludedNodes) { 303 if (this.host2datanodeMap == null) { 304 return 0; 305 } 306 int countOfExcludedNodes = 0; 307 for(String hostname : chosenNode.getDependentHostNames()) { 308 DatanodeDescriptor node = 309 this.host2datanodeMap.getDataNodeByHostName(hostname); 310 if(node!=null) { 311 if (excludedNodes.add(node)) { 312 countOfExcludedNodes++; 313 } 314 } else { 315 LOG.warn("Not able to find datanode " + hostname 316 + " which has dependency with datanode " 317 + chosenNode.getHostName()); 318 } 319 } 320 321 return countOfExcludedNodes; 322 } 323 324 /** 325 * Pick up replica node set for deleting replica as over-replicated. 326 * First set contains replica nodes on rack with more than one 327 * replica while second set contains remaining replica nodes. 328 * If first is not empty, divide first set into two subsets: 329 * moreThanOne contains nodes on nodegroup with more than one replica 330 * exactlyOne contains the remaining nodes in first set 331 * then pickup priSet if not empty. 332 * If first is empty, then pick second. 333 */ 334 @Override 335 public Collection<DatanodeStorageInfo> pickupReplicaSet( 336 Collection<DatanodeStorageInfo> first, 337 Collection<DatanodeStorageInfo> second, 338 Map<String, List<DatanodeStorageInfo>> rackMap) { 339 // If no replica within same rack, return directly. 340 if (first.isEmpty()) { 341 return second; 342 } 343 // Split data nodes in the first set into two sets, 344 // moreThanOne contains nodes on nodegroup with more than one replica 345 // exactlyOne contains the remaining nodes 346 Map<String, List<DatanodeStorageInfo>> nodeGroupMap = new HashMap<>(); 347 348 for(DatanodeStorageInfo storage : first) { 349 final String nodeGroupName = NetworkTopology.getLastHalf( 350 storage.getDatanodeDescriptor().getNetworkLocation()); 351 List<DatanodeStorageInfo> storageList = nodeGroupMap.get(nodeGroupName); 352 if (storageList == null) { 353 storageList = new ArrayList<>(); 354 nodeGroupMap.put(nodeGroupName, storageList); 355 } 356 storageList.add(storage); 357 } 358 359 final List<DatanodeStorageInfo> moreThanOne = new ArrayList<>(); 360 final List<DatanodeStorageInfo> exactlyOne = new ArrayList<>(); 361 // split nodes into two sets 362 for(List<DatanodeStorageInfo> datanodeList : nodeGroupMap.values()) { 363 if (datanodeList.size() == 1 ) { 364 // exactlyOne contains nodes on nodegroup with exactly one replica 365 exactlyOne.add(datanodeList.get(0)); 366 } else { 367 // moreThanOne contains nodes on nodegroup with more than one replica 368 moreThanOne.addAll(datanodeList); 369 } 370 } 371 372 return moreThanOne.isEmpty()? exactlyOne : moreThanOne; 373 } 374 375 /** 376 * Check if there are any replica (other than source) on the same node group 377 * with target. If true, then target is not a good candidate for placing 378 * specific replica as we don't want 2 replicas under the same nodegroup. 379 * 380 * @return true if there are any replica (other than source) on the same node 381 * group with target 382 */ 383 @Override 384 public boolean isMovable(Collection<DatanodeInfo> locs, 385 DatanodeInfo source, DatanodeInfo target) { 386 for (DatanodeInfo dn : locs) { 387 if (dn != source && dn != target && 388 clusterMap.isOnSameNodeGroup(dn, target)) { 389 return false; 390 } 391 } 392 return true; 393 } 394 395 396 @Override 397 public BlockPlacementStatus verifyBlockPlacement(DatanodeInfo[] locs, 398 int numberOfReplicas) { 399 if (locs == null) { 400 locs = DatanodeDescriptor.EMPTY_ARRAY; 401 } 402 403 List<String> locList = new ArrayList<String>(); 404 /* 405 * remove the part of node group for BlockPlacementPolicyDefault to count 406 * distinct racks, e.g. "/d1/r1/n1" --> "/d1/r1" 407 */ 408 for (int i = 0; i < locs.length; i++) { 409 locList.add(locs[i].getNetworkLocation()); 410 locs[i].setNetworkLocation(NetworkTopology.getFirstHalf(locs[i] 411 .getNetworkLocation())); 412 } 413 414 BlockPlacementStatus defaultStatus = super.verifyBlockPlacement(locs, 415 numberOfReplicas); 416 417 // restore the part of node group back 418 for (int i = 0; i < locs.length; i++) { 419 locs[i].setNetworkLocation(locList.get(i)); 420 } 421 422 int minNodeGroups = numberOfReplicas; 423 BlockPlacementStatusWithNodeGroup nodeGroupStatus = 424 new BlockPlacementStatusWithNodeGroup( 425 defaultStatus, getNodeGroupsFromNode(locs), minNodeGroups); 426 return nodeGroupStatus; 427 } 428 429 private Set<String> getNodeGroupsFromNode(DatanodeInfo[] nodes) { 430 Set<String> nodeGroups = new HashSet<>(); 431 if (nodes == null) { 432 return nodeGroups; 433 } 434 435 for (DatanodeInfo node : nodes) { 436 nodeGroups.add(NetworkTopology.getLastHalf(node.getNetworkLocation())); 437 } 438 return nodeGroups; 439 } 440}