001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hdfs.server.blockmanagement;
019
020import java.util.*;
021
022import org.apache.hadoop.conf.Configuration;
023import org.apache.hadoop.fs.StorageType;
024import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
025import org.apache.hadoop.net.NetworkTopology;
026import org.apache.hadoop.net.NetworkTopologyWithNodeGroup;
027import org.apache.hadoop.net.Node;
028import org.apache.hadoop.net.NodeBase;
029
030/** The class is responsible for choosing the desired number of targets
031 * for placing block replicas on environment with node-group layer.
032 * The replica placement strategy is adjusted to:
033 * If the writer is on a datanode, the 1st replica is placed on the local 
034 *     node(or local node-group or on local rack), otherwise a random datanode.
035 * The 2nd replica is placed on a datanode that is on a different rack with 1st
036 *     replica node. 
037 * The 3rd replica is placed on a datanode which is on a different node-group
038 *     but the same rack as the second replica node.
039 */
040public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefault {
041
042  protected BlockPlacementPolicyWithNodeGroup() {
043  }
044
045  public void initialize(Configuration conf,  FSClusterStats stats,
046          NetworkTopology clusterMap, 
047          Host2NodesMap host2datanodeMap) {
048    super.initialize(conf, stats, clusterMap, host2datanodeMap);
049  }
050
051  /**
052   * choose all good favored nodes as target.
053   * If no enough targets, then choose one replica from
054   * each bad favored node's node group.
055   * @throws NotEnoughReplicasException
056   */
057  @Override
058  protected void chooseFavouredNodes(String src, int numOfReplicas,
059      List<DatanodeDescriptor> favoredNodes,
060      Set<Node> favoriteAndExcludedNodes, long blocksize,
061      int maxNodesPerRack, List<DatanodeStorageInfo> results,
062      boolean avoidStaleNodes, EnumMap<StorageType, Integer> storageTypes)
063      throws NotEnoughReplicasException {
064    super.chooseFavouredNodes(src, numOfReplicas, favoredNodes,
065        favoriteAndExcludedNodes, blocksize, maxNodesPerRack, results,
066        avoidStaleNodes, storageTypes);
067    if (results.size() < numOfReplicas) {
068      // Not enough replicas, choose from unselected Favorednode's Nodegroup
069      for (int i = 0;
070          i < favoredNodes.size() && results.size() < numOfReplicas; i++) {
071        DatanodeDescriptor favoredNode = favoredNodes.get(i);
072        boolean chosenNode =
073            isNodeChosen(results, favoredNode);
074        if (chosenNode) {
075          continue;
076        }
077        NetworkTopologyWithNodeGroup clusterMapNodeGroup =
078            (NetworkTopologyWithNodeGroup) clusterMap;
079        // try a node on FavouredNode's node group
080        DatanodeStorageInfo target = null;
081        String scope =
082            clusterMapNodeGroup.getNodeGroup(favoredNode.getNetworkLocation());
083        try {
084          target =
085              chooseRandom(scope, favoriteAndExcludedNodes, blocksize,
086                maxNodesPerRack, results, avoidStaleNodes, storageTypes);
087        } catch (NotEnoughReplicasException e) {
088          // catch Exception and continue with other favored nodes
089          continue;
090        }
091        if (target == null) {
092          LOG.warn("Could not find a target for file "
093              + src + " within nodegroup of favored node " + favoredNode);
094          continue;
095        }
096        favoriteAndExcludedNodes.add(target.getDatanodeDescriptor());
097      }
098    }
099  }
100
101  private boolean isNodeChosen(
102      List<DatanodeStorageInfo> results, DatanodeDescriptor favoredNode) {
103    boolean chosenNode = false;
104    for (int j = 0; j < results.size(); j++) {
105      if (results.get(j).getDatanodeDescriptor().equals(favoredNode)) {
106        chosenNode = true;
107        break;
108      }
109    }
110    return chosenNode;
111  }
112
113  /** choose local node of <i>localMachine</i> as the target.
114   * If localMachine is not available, will fallback to nodegroup/rack
115   * when flag <i>fallbackToNodeGroupAndLocalRack</i> is set.
116   * @return the chosen node
117   */
118  @Override
119  protected DatanodeStorageInfo chooseLocalStorage(Node localMachine,
120      Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
121      List<DatanodeStorageInfo> results, boolean avoidStaleNodes,
122      EnumMap<StorageType, Integer> storageTypes,
123      boolean fallbackToNodeGroupAndLocalRack)
124      throws NotEnoughReplicasException {
125    DatanodeStorageInfo localStorage = chooseLocalStorage(localMachine,
126        excludedNodes, blocksize, maxNodesPerRack, results,
127        avoidStaleNodes, storageTypes);
128    if (localStorage != null) {
129      return localStorage;
130    }
131
132    if (!fallbackToNodeGroupAndLocalRack) {
133      return null;
134    }
135    // try a node on local node group
136    DatanodeStorageInfo chosenStorage = chooseLocalNodeGroup(
137        (NetworkTopologyWithNodeGroup)clusterMap, localMachine, excludedNodes, 
138        blocksize, maxNodesPerRack, results, avoidStaleNodes, storageTypes);
139    if (chosenStorage != null) {
140      return chosenStorage;
141    }
142    // try a node on local rack
143    return chooseLocalRack(localMachine, excludedNodes, 
144        blocksize, maxNodesPerRack, results, avoidStaleNodes, storageTypes);
145  }
146
147  /** @return the node of the second replica */
148  private static DatanodeDescriptor secondNode(Node localMachine,
149      List<DatanodeStorageInfo> results) {
150    // find the second replica
151    for(DatanodeStorageInfo nextStorage : results) {
152      DatanodeDescriptor nextNode = nextStorage.getDatanodeDescriptor();
153      if (nextNode != localMachine) {
154        return nextNode;
155      }
156    }
157    return null;
158  }
159
160  @Override
161  protected DatanodeStorageInfo chooseLocalRack(Node localMachine,
162      Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
163      List<DatanodeStorageInfo> results, boolean avoidStaleNodes,
164      EnumMap<StorageType, Integer> storageTypes) throws
165      NotEnoughReplicasException {
166    // no local machine, so choose a random machine
167    if (localMachine == null) {
168      return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
169          maxNodesPerRack, results, avoidStaleNodes, storageTypes);
170    }
171
172    // choose one from the local rack, but off-nodegroup
173    try {
174      final String scope = NetworkTopology.getFirstHalf(localMachine.getNetworkLocation());
175      return chooseRandom(scope, excludedNodes, blocksize, maxNodesPerRack,
176          results, avoidStaleNodes, storageTypes);
177    } catch (NotEnoughReplicasException e1) {
178      // find the second replica
179      final DatanodeDescriptor newLocal = secondNode(localMachine, results);
180      if (newLocal != null) {
181        try {
182          return chooseRandom(
183              clusterMap.getRack(newLocal.getNetworkLocation()), excludedNodes,
184              blocksize, maxNodesPerRack, results, avoidStaleNodes,
185              storageTypes);
186        } catch(NotEnoughReplicasException e2) {
187          //otherwise randomly choose one from the network
188          return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
189              maxNodesPerRack, results, avoidStaleNodes, storageTypes);
190        }
191      } else {
192        //otherwise randomly choose one from the network
193        return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
194            maxNodesPerRack, results, avoidStaleNodes, storageTypes);
195      }
196    }
197  }
198
199  /**
200   * {@inheritDoc}
201   */
202  @Override
203  protected void chooseRemoteRack(int numOfReplicas,
204      DatanodeDescriptor localMachine, Set<Node> excludedNodes,
205      long blocksize, int maxReplicasPerRack, List<DatanodeStorageInfo> results,
206      boolean avoidStaleNodes, EnumMap<StorageType, Integer> storageTypes)
207      throws NotEnoughReplicasException {
208    int oldNumOfReplicas = results.size();
209
210    final String rackLocation = NetworkTopology.getFirstHalf(
211        localMachine.getNetworkLocation());
212    try {
213      // randomly choose from remote racks
214      chooseRandom(numOfReplicas, "~" + rackLocation, excludedNodes, blocksize,
215          maxReplicasPerRack, results, avoidStaleNodes, storageTypes);
216    } catch (NotEnoughReplicasException e) {
217      // fall back to the local rack
218      chooseRandom(numOfReplicas - (results.size() - oldNumOfReplicas),
219          rackLocation, excludedNodes, blocksize,
220          maxReplicasPerRack, results, avoidStaleNodes, storageTypes);
221    }
222  }
223
224  /* choose one node from the nodegroup that <i>localMachine</i> is on.
225   * if no such node is available, choose one node from the nodegroup where
226   * a second replica is on.
227   * if still no such node is available, return null.
228   * @return the chosen node
229   */
230  private DatanodeStorageInfo chooseLocalNodeGroup(
231      NetworkTopologyWithNodeGroup clusterMap, Node localMachine,
232      Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
233      List<DatanodeStorageInfo> results, boolean avoidStaleNodes,
234      EnumMap<StorageType, Integer> storageTypes) throws
235      NotEnoughReplicasException {
236    // no local machine, so choose a random machine
237    if (localMachine == null) {
238      return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
239          maxNodesPerRack, results, avoidStaleNodes, storageTypes);
240    }
241
242    // choose one from the local node group
243    try {
244      return chooseRandom(
245          clusterMap.getNodeGroup(localMachine.getNetworkLocation()),
246          excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes,
247          storageTypes);
248    } catch (NotEnoughReplicasException e1) {
249      final DatanodeDescriptor newLocal = secondNode(localMachine, results);
250      if (newLocal != null) {
251        try {
252          return chooseRandom(
253              clusterMap.getNodeGroup(newLocal.getNetworkLocation()),
254              excludedNodes, blocksize, maxNodesPerRack, results,
255              avoidStaleNodes, storageTypes);
256        } catch(NotEnoughReplicasException e2) {
257          //otherwise return null
258          return null;
259        }
260      } else {
261        //otherwise return null
262        return null;
263      }
264    }
265  }
266
267  @Override
268  protected String getRack(final DatanodeInfo cur) {
269    String nodeGroupString = cur.getNetworkLocation();
270    return NetworkTopology.getFirstHalf(nodeGroupString);
271  }
272  
273  /**
274   * Find other nodes in the same nodegroup of <i>localMachine</i> and add them
275   * into <i>excludeNodes</i> as replica should not be duplicated for nodes 
276   * within the same nodegroup
277   * @return number of new excluded nodes
278   */
279  @Override
280  protected int addToExcludedNodes(DatanodeDescriptor chosenNode,
281      Set<Node> excludedNodes) {
282    int countOfExcludedNodes = 0;
283    String nodeGroupScope = chosenNode.getNetworkLocation();
284    List<Node> leafNodes = clusterMap.getLeaves(nodeGroupScope);
285    for (Node leafNode : leafNodes) {
286      if (excludedNodes.add(leafNode)) {
287        // not a existing node in excludedNodes
288        countOfExcludedNodes++;
289      }
290    }
291    
292    countOfExcludedNodes += addDependentNodesToExcludedNodes(
293        chosenNode, excludedNodes);
294    return countOfExcludedNodes;
295  }
296  
297  /**
298   * Add all nodes from a dependent nodes list to excludedNodes.
299   * @return number of new excluded nodes
300   */
301  private int addDependentNodesToExcludedNodes(DatanodeDescriptor chosenNode,
302      Set<Node> excludedNodes) {
303    if (this.host2datanodeMap == null) {
304      return 0;
305    }
306    int countOfExcludedNodes = 0;
307    for(String hostname : chosenNode.getDependentHostNames()) {
308      DatanodeDescriptor node =
309          this.host2datanodeMap.getDataNodeByHostName(hostname);
310      if(node!=null) {
311        if (excludedNodes.add(node)) {
312          countOfExcludedNodes++;
313        }
314      } else {
315        LOG.warn("Not able to find datanode " + hostname
316            + " which has dependency with datanode "
317            + chosenNode.getHostName());
318      }
319    }
320    
321    return countOfExcludedNodes;
322  }
323
324  /**
325   * Pick up replica node set for deleting replica as over-replicated. 
326   * First set contains replica nodes on rack with more than one
327   * replica while second set contains remaining replica nodes.
328   * If first is not empty, divide first set into two subsets:
329   *   moreThanOne contains nodes on nodegroup with more than one replica
330   *   exactlyOne contains the remaining nodes in first set
331   * then pickup priSet if not empty.
332   * If first is empty, then pick second.
333   */
334  @Override
335  public Collection<DatanodeStorageInfo> pickupReplicaSet(
336      Collection<DatanodeStorageInfo> first,
337      Collection<DatanodeStorageInfo> second,
338      Map<String, List<DatanodeStorageInfo>> rackMap) {
339    // If no replica within same rack, return directly.
340    if (first.isEmpty()) {
341      return second;
342    }
343    // Split data nodes in the first set into two sets, 
344    // moreThanOne contains nodes on nodegroup with more than one replica
345    // exactlyOne contains the remaining nodes
346    Map<String, List<DatanodeStorageInfo>> nodeGroupMap = new HashMap<>();
347    
348    for(DatanodeStorageInfo storage : first) {
349      final String nodeGroupName = NetworkTopology.getLastHalf(
350          storage.getDatanodeDescriptor().getNetworkLocation());
351      List<DatanodeStorageInfo> storageList = nodeGroupMap.get(nodeGroupName);
352      if (storageList == null) {
353        storageList = new ArrayList<>();
354        nodeGroupMap.put(nodeGroupName, storageList);
355      }
356      storageList.add(storage);
357    }
358    
359    final List<DatanodeStorageInfo> moreThanOne = new ArrayList<>();
360    final List<DatanodeStorageInfo> exactlyOne = new ArrayList<>();
361    // split nodes into two sets
362    for(List<DatanodeStorageInfo> datanodeList : nodeGroupMap.values()) {
363      if (datanodeList.size() == 1 ) {
364        // exactlyOne contains nodes on nodegroup with exactly one replica
365        exactlyOne.add(datanodeList.get(0));
366      } else {
367        // moreThanOne contains nodes on nodegroup with more than one replica
368        moreThanOne.addAll(datanodeList);
369      }
370    }
371    
372    return moreThanOne.isEmpty()? exactlyOne : moreThanOne;
373  }
374
375  /**
376   * Check if there are any replica (other than source) on the same node group
377   * with target. If true, then target is not a good candidate for placing
378   * specific replica as we don't want 2 replicas under the same nodegroup.
379   *
380   * @return true if there are any replica (other than source) on the same node
381   *         group with target
382   */
383  @Override
384  public boolean isMovable(Collection<DatanodeInfo> locs,
385      DatanodeInfo source, DatanodeInfo target) {
386    for (DatanodeInfo dn : locs) {
387      if (dn != source && dn != target &&
388          clusterMap.isOnSameNodeGroup(dn, target)) {
389        return false;
390      }
391    }
392    return true;
393  }
394
395
396  @Override
397  public BlockPlacementStatus verifyBlockPlacement(DatanodeInfo[] locs,
398      int numberOfReplicas) {
399    if (locs == null) {
400      locs = DatanodeDescriptor.EMPTY_ARRAY;
401    }
402
403    List<String> locList = new ArrayList<String>();
404    /*
405     * remove the part of node group for BlockPlacementPolicyDefault to count
406     * distinct racks, e.g. "/d1/r1/n1" --> "/d1/r1"
407     */
408    for (int i = 0; i < locs.length; i++) {
409      locList.add(locs[i].getNetworkLocation());
410      locs[i].setNetworkLocation(NetworkTopology.getFirstHalf(locs[i]
411          .getNetworkLocation()));
412    }
413
414    BlockPlacementStatus defaultStatus = super.verifyBlockPlacement(locs,
415        numberOfReplicas);
416
417    // restore the part of node group back
418    for (int i = 0; i < locs.length; i++) {
419      locs[i].setNetworkLocation(locList.get(i));
420    }
421
422    int minNodeGroups = numberOfReplicas;
423    BlockPlacementStatusWithNodeGroup nodeGroupStatus =
424        new BlockPlacementStatusWithNodeGroup(
425            defaultStatus, getNodeGroupsFromNode(locs), minNodeGroups);
426    return nodeGroupStatus;
427  }
428
429  private Set<String> getNodeGroupsFromNode(DatanodeInfo[] nodes) {
430    Set<String> nodeGroups = new HashSet<>();
431    if (nodes == null) {
432      return nodeGroups;
433    }
434
435    for (DatanodeInfo node : nodes) {
436      nodeGroups.add(NetworkTopology.getLastHalf(node.getNetworkLocation()));
437    }
438    return nodeGroups;
439  }
440}