001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.net;
019
020import java.util.ArrayList;
021import java.util.HashMap;
022import java.util.Collection;
023import java.util.Collections;
024import java.util.List;
025import java.util.Map;
026import java.util.Random;
027import java.util.TreeMap;
028import java.util.concurrent.locks.ReadWriteLock;
029import java.util.concurrent.locks.ReentrantReadWriteLock;
030
031import com.google.common.annotations.VisibleForTesting;
032import org.apache.hadoop.classification.InterfaceAudience;
033import org.apache.hadoop.classification.InterfaceStability;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
036import org.apache.hadoop.util.ReflectionUtils;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040import com.google.common.base.Preconditions;
041import com.google.common.collect.Lists;
042
043/** The class represents a cluster of computer with a tree hierarchical
044 * network topology.
045 * For example, a cluster may be consists of many data centers filled 
046 * with racks of computers.
047 * In a network topology, leaves represent data nodes (computers) and inner
048 * nodes represent switches/routers that manage traffic in/out of data centers
049 * or racks.  
050 * 
051 */
052@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
053@InterfaceStability.Unstable
054public class NetworkTopology {
055  public final static String DEFAULT_RACK = "/default-rack";
056  public final static int DEFAULT_HOST_LEVEL = 2;
057  public static final Logger LOG =
058      LoggerFactory.getLogger(NetworkTopology.class);
059
060  public static class InvalidTopologyException extends RuntimeException {
061    private static final long serialVersionUID = 1L;
062    public InvalidTopologyException(String msg) {
063      super(msg);
064    }
065  }
066  
067  /**
068   * Get an instance of NetworkTopology based on the value of the configuration
069   * parameter net.topology.impl.
070   * 
071   * @param conf the configuration to be used
072   * @return an instance of NetworkTopology
073   */
074  public static NetworkTopology getInstance(Configuration conf){
075    return ReflectionUtils.newInstance(
076        conf.getClass(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY,
077        NetworkTopology.class, NetworkTopology.class), conf);
078  }
079
080  /** InnerNode represents a switch/router of a data center or rack.
081   * Different from a leaf node, it has non-null children.
082   */
083  static class InnerNode extends NodeBase {
084    protected List<Node> children=new ArrayList<Node>();
085    private Map<String, Node> childrenMap = new HashMap<String, Node>();
086    private int numOfLeaves;
087        
088    /** Construct an InnerNode from a path-like string */
089    InnerNode(String path) {
090      super(path);
091    }
092        
093    /** Construct an InnerNode from its name and its network location */
094    InnerNode(String name, String location) {
095      super(name, location);
096    }
097        
098    /** Construct an InnerNode
099     * from its name, its network location, its parent, and its level */
100    InnerNode(String name, String location, InnerNode parent, int level) {
101      super(name, location, parent, level);
102    }
103        
104    /** @return its children */
105    List<Node> getChildren() {return children;}
106        
107    /** @return the number of children this node has */
108    int getNumOfChildren() {
109      return children.size();
110    }
111        
112    /** Judge if this node represents a rack 
113     * @return true if it has no child or its children are not InnerNodes
114     */ 
115    boolean isRack() {
116      if (children.isEmpty()) {
117        return true;
118      }
119            
120      Node firstChild = children.get(0);
121      if (firstChild instanceof InnerNode) {
122        return false;
123      }
124            
125      return true;
126    }
127        
128    /** Judge if this node is an ancestor of node <i>n</i>
129     * 
130     * @param n a node
131     * @return true if this node is an ancestor of <i>n</i>
132     */
133    boolean isAncestor(Node n) {
134      return getPath(this).equals(NodeBase.PATH_SEPARATOR_STR) ||
135        (n.getNetworkLocation()+NodeBase.PATH_SEPARATOR_STR).
136        startsWith(getPath(this)+NodeBase.PATH_SEPARATOR_STR);
137    }
138        
139    /** Judge if this node is the parent of node <i>n</i>
140     * 
141     * @param n a node
142     * @return true if this node is the parent of <i>n</i>
143     */
144    boolean isParent(Node n) {
145      return n.getNetworkLocation().equals(getPath(this));
146    }
147        
148    /* Return a child name of this node who is an ancestor of node <i>n</i> */
149    private String getNextAncestorName(Node n) {
150      if (!isAncestor(n)) {
151        throw new IllegalArgumentException(
152                                           this + "is not an ancestor of " + n);
153      }
154      String name = n.getNetworkLocation().substring(getPath(this).length());
155      if (name.charAt(0) == PATH_SEPARATOR) {
156        name = name.substring(1);
157      }
158      int index=name.indexOf(PATH_SEPARATOR);
159      if (index !=-1)
160        name = name.substring(0, index);
161      return name;
162    }
163        
164    /** Add node <i>n</i> to the subtree of this node 
165     * @param n node to be added
166     * @return true if the node is added; false otherwise
167     */
168    boolean add(Node n) {
169      if (!isAncestor(n)) {
170        throw new IllegalArgumentException(n.getName()
171            + ", which is located at " + n.getNetworkLocation()
172            + ", is not a descendant of " + getPath(this));
173      }
174      if (isParent(n)) {
175        // this node is the parent of n; add n directly
176        n.setParent(this);
177        n.setLevel(this.level+1);
178        Node prev = childrenMap.put(n.getName(), n);
179        if (prev != null) {
180          for(int i=0; i<children.size(); i++) {
181            if (children.get(i).getName().equals(n.getName())) {
182              children.set(i, n);
183              return false;
184            }
185          }
186        }
187        children.add(n);
188        numOfLeaves++;
189        return true;
190      } else {
191        // find the next ancestor node
192        String parentName = getNextAncestorName(n);
193        InnerNode parentNode = (InnerNode)childrenMap.get(parentName);
194        if (parentNode == null) {
195          // create a new InnerNode
196          parentNode = createParentNode(parentName);
197          children.add(parentNode);
198          childrenMap.put(parentNode.getName(), parentNode);
199        }
200        // add n to the subtree of the next ancestor node
201        if (parentNode.add(n)) {
202          numOfLeaves++;
203          return true;
204        } else {
205          return false;
206        }
207      }
208    }
209
210    /**
211     * Creates a parent node to be added to the list of children.  
212     * Creates a node using the InnerNode four argument constructor specifying 
213     * the name, location, parent, and level of this node.
214     * 
215     * <p>To be overridden in subclasses for specific InnerNode implementations,
216     * as alternative to overriding the full {@link #add(Node)} method.
217     * 
218     * @param parentName The name of the parent node
219     * @return A new inner node
220     * @see InnerNode#InnerNode(String, String, InnerNode, int)
221     */
222    protected InnerNode createParentNode(String parentName) {
223      return new InnerNode(parentName, getPath(this), this, this.getLevel()+1);
224    }
225
226    /** Remove node <i>n</i> from the subtree of this node
227     * @param n node to be deleted 
228     * @return true if the node is deleted; false otherwise
229     */
230    boolean remove(Node n) {
231      if (!isAncestor(n)) {
232        throw new IllegalArgumentException(n.getName()
233            + ", which is located at " + n.getNetworkLocation()
234            + ", is not a descendant of " + getPath(this));
235      }
236      if (isParent(n)) {
237        // this node is the parent of n; remove n directly
238        if (childrenMap.containsKey(n.getName())) {
239          for (int i=0; i<children.size(); i++) {
240            if (children.get(i).getName().equals(n.getName())) {
241              children.remove(i);
242              childrenMap.remove(n.getName());
243              numOfLeaves--;
244              n.setParent(null);
245              return true;
246            }
247          }
248        }
249        return false;
250      } else {
251        // find the next ancestor node: the parent node
252        String parentName = getNextAncestorName(n);
253        InnerNode parentNode = (InnerNode)childrenMap.get(parentName);
254        if (parentNode == null) {
255          return false;
256        }
257        // remove n from the parent node
258        boolean isRemoved = parentNode.remove(n);
259        // if the parent node has no children, remove the parent node too
260        if (isRemoved) {
261          if (parentNode.getNumOfChildren() == 0) {
262            for(int i=0; i < children.size(); i++) {
263              if (children.get(i).getName().equals(parentName)) {
264                children.remove(i);
265                childrenMap.remove(parentName);
266                break;
267              }
268            }
269          }
270          numOfLeaves--;
271        }
272        return isRemoved;
273      }
274    } // end of remove
275        
276    /** Given a node's string representation, return a reference to the node
277     * @param loc string location of the form /rack/node
278     * @return null if the node is not found or the childnode is there but
279     * not an instance of {@link InnerNode}
280     */
281    private Node getLoc(String loc) {
282      if (loc == null || loc.length() == 0) return this;
283            
284      String[] path = loc.split(PATH_SEPARATOR_STR, 2);
285      Node childnode = childrenMap.get(path[0]);
286      if (childnode == null) return null; // non-existing node
287      if (path.length == 1) return childnode;
288      if (childnode instanceof InnerNode) {
289        return ((InnerNode)childnode).getLoc(path[1]);
290      } else {
291        return null;
292      }
293    }
294        
295    /** get <i>leafIndex</i> leaf of this subtree 
296     * if it is not in the <i>excludedNode</i>
297     *
298     * @param leafIndex an indexed leaf of the node
299     * @param excludedNode an excluded node (can be null)
300     * @return
301     */
302    Node getLeaf(int leafIndex, Node excludedNode) {
303      int count=0;
304      // check if the excluded node a leaf
305      boolean isLeaf =
306        excludedNode == null || !(excludedNode instanceof InnerNode);
307      // calculate the total number of excluded leaf nodes
308      int numOfExcludedLeaves =
309        isLeaf ? 1 : ((InnerNode)excludedNode).getNumOfLeaves();
310      if (isLeafParent()) { // children are leaves
311        if (isLeaf) { // excluded node is a leaf node
312          if (excludedNode != null &&
313              childrenMap.containsKey(excludedNode.getName())) {
314            int excludedIndex = children.indexOf(excludedNode);
315            if (excludedIndex != -1 && leafIndex >= 0) {
316              // excluded node is one of the children so adjust the leaf index
317              leafIndex = leafIndex>=excludedIndex ? leafIndex+1 : leafIndex;
318            }
319          }
320        }
321        // range check
322        if (leafIndex<0 || leafIndex>=this.getNumOfChildren()) {
323          return null;
324        }
325        return children.get(leafIndex);
326      } else {
327        for(int i=0; i<children.size(); i++) {
328          InnerNode child = (InnerNode)children.get(i);
329          if (excludedNode == null || excludedNode != child) {
330            // not the excludedNode
331            int numOfLeaves = child.getNumOfLeaves();
332            if (excludedNode != null && child.isAncestor(excludedNode)) {
333              numOfLeaves -= numOfExcludedLeaves;
334            }
335            if (count+numOfLeaves > leafIndex) {
336              // the leaf is in the child subtree
337              return child.getLeaf(leafIndex-count, excludedNode);
338            } else {
339              // go to the next child
340              count = count+numOfLeaves;
341            }
342          } else { // it is the excluededNode
343            // skip it and set the excludedNode to be null
344            excludedNode = null;
345          }
346        }
347        return null;
348      }
349    }
350    
351    protected boolean isLeafParent() {
352      return isRack();
353    }
354
355    /**
356      * Determine if children a leaves, default implementation calls {@link #isRack()}
357      * <p>To be overridden in subclasses for specific InnerNode implementations,
358      * as alternative to overriding the full {@link #getLeaf(int, Node)} method.
359      * 
360      * @return true if children are leaves, false otherwise
361      */
362    protected boolean areChildrenLeaves() {
363      return isRack();
364    }
365
366    /**
367     * Get number of leaves.
368     */
369    int getNumOfLeaves() {
370      return numOfLeaves;
371    }
372  } // end of InnerNode
373
374  /**
375   * the root cluster map
376   */
377  InnerNode clusterMap;
378  /** Depth of all leaf nodes */
379  private int depthOfAllLeaves = -1;
380  /** rack counter */
381  protected int numOfRacks = 0;
382
383  /**
384   * Whether or not this cluster has ever consisted of more than 1 rack,
385   * according to the NetworkTopology.
386   */
387  private boolean clusterEverBeenMultiRack = false;
388
389  /** the lock used to manage access */
390  protected ReadWriteLock netlock = new ReentrantReadWriteLock();
391
392  public NetworkTopology() {
393    clusterMap = new InnerNode(InnerNode.ROOT);
394  }
395
396  /** Add a leaf node
397   * Update node counter & rack counter if necessary
398   * @param node node to be added; can be null
399   * @exception IllegalArgumentException if add a node to a leave 
400                                         or node to be added is not a leaf
401   */
402  public void add(Node node) {
403    if (node==null) return;
404    int newDepth = NodeBase.locationToDepth(node.getNetworkLocation()) + 1;
405    netlock.writeLock().lock();
406    try {
407      if( node instanceof InnerNode ) {
408        throw new IllegalArgumentException(
409          "Not allow to add an inner node: "+NodeBase.getPath(node));
410      }
411      if ((depthOfAllLeaves != -1) && (depthOfAllLeaves != newDepth)) {
412        LOG.error("Error: can't add leaf node " + NodeBase.getPath(node) +
413            " at depth " + newDepth + " to topology:\n" + this.toString());
414        throw new InvalidTopologyException("Failed to add " + NodeBase.getPath(node) +
415            ": You cannot have a rack and a non-rack node at the same " +
416            "level of the network topology.");
417      }
418      Node rack = getNodeForNetworkLocation(node);
419      if (rack != null && !(rack instanceof InnerNode)) {
420        throw new IllegalArgumentException("Unexpected data node " 
421                                           + node.toString() 
422                                           + " at an illegal network location");
423      }
424      if (clusterMap.add(node)) {
425        LOG.info("Adding a new node: "+NodeBase.getPath(node));
426        if (rack == null) {
427          incrementRacks();
428        }
429        if (!(node instanceof InnerNode)) {
430          if (depthOfAllLeaves == -1) {
431            depthOfAllLeaves = node.getLevel();
432          }
433        }
434      }
435      LOG.debug("NetworkTopology became:\n{}", this.toString());
436    } finally {
437      netlock.writeLock().unlock();
438    }
439  }
440
441  protected void incrementRacks() {
442    numOfRacks++;
443    if (!clusterEverBeenMultiRack && numOfRacks > 1) {
444      clusterEverBeenMultiRack = true;
445    }
446  }
447
448  /**
449   * Return a reference to the node given its string representation.
450   * Default implementation delegates to {@link #getNode(String)}.
451   * 
452   * <p>To be overridden in subclasses for specific NetworkTopology 
453   * implementations, as alternative to overriding the full {@link #add(Node)}
454   *  method.
455   * 
456   * @param node The string representation of this node's network location is
457   * used to retrieve a Node object. 
458   * @return a reference to the node; null if the node is not in the tree
459   * 
460   * @see #add(Node)
461   * @see #getNode(String)
462   */
463  protected Node getNodeForNetworkLocation(Node node) {
464    return getNode(node.getNetworkLocation());
465  }
466  
467  /**
468   * Given a string representation of a rack, return its children
469   * @param loc a path-like string representation of a rack
470   * @return a newly allocated list with all the node's children
471   */
472  public List<Node> getDatanodesInRack(String loc) {
473    netlock.readLock().lock();
474    try {
475      loc = NodeBase.normalize(loc);
476      if (!NodeBase.ROOT.equals(loc)) {
477        loc = loc.substring(1);
478      }
479      InnerNode rack = (InnerNode) clusterMap.getLoc(loc);
480      if (rack == null) {
481        return null;
482      }
483      return new ArrayList<Node>(rack.getChildren());
484    } finally {
485      netlock.readLock().unlock();
486    }
487  }
488
489  /** Remove a node
490   * Update node counter and rack counter if necessary
491   * @param node node to be removed; can be null
492   */ 
493  public void remove(Node node) {
494    if (node==null) return;
495    if( node instanceof InnerNode ) {
496      throw new IllegalArgumentException(
497        "Not allow to remove an inner node: "+NodeBase.getPath(node));
498    }
499    LOG.info("Removing a node: "+NodeBase.getPath(node));
500    netlock.writeLock().lock();
501    try {
502      if (clusterMap.remove(node)) {
503        InnerNode rack = (InnerNode)getNode(node.getNetworkLocation());
504        if (rack == null) {
505          numOfRacks--;
506        }
507      }
508      LOG.debug("NetworkTopology became:\n{}", this.toString());
509    } finally {
510      netlock.writeLock().unlock();
511    }
512  }
513
514  /** Check if the tree contains node <i>node</i>
515   * 
516   * @param node a node
517   * @return true if <i>node</i> is already in the tree; false otherwise
518   */
519  public boolean contains(Node node) {
520    if (node == null) return false;
521    netlock.readLock().lock();
522    try {
523      Node parent = node.getParent();
524      for (int level = node.getLevel(); parent != null && level > 0;
525           parent = parent.getParent(), level--) {
526        if (parent == clusterMap) {
527          return true;
528        }
529      }
530    } finally {
531      netlock.readLock().unlock();
532    }
533    return false; 
534  }
535    
536  /** Given a string representation of a node, return its reference
537   * 
538   * @param loc
539   *          a path-like string representation of a node
540   * @return a reference to the node; null if the node is not in the tree
541   */
542  public Node getNode(String loc) {
543    netlock.readLock().lock();
544    try {
545      loc = NodeBase.normalize(loc);
546      if (!NodeBase.ROOT.equals(loc))
547        loc = loc.substring(1);
548      return clusterMap.getLoc(loc);
549    } finally {
550      netlock.readLock().unlock();
551    }
552  }
553
554  /**
555   * @return true if this cluster has ever consisted of multiple racks, even if
556   *         it is not now a multi-rack cluster.
557   */
558  public boolean hasClusterEverBeenMultiRack() {
559    return clusterEverBeenMultiRack;
560  }
561
562  /** Given a string representation of a rack for a specific network
563   *  location
564   *
565   * To be overridden in subclasses for specific NetworkTopology 
566   * implementations, as alternative to overriding the full 
567   * {@link #getRack(String)} method.
568   * @param loc
569   *          a path-like string representation of a network location
570   * @return a rack string
571   */
572  public String getRack(String loc) {
573    return loc;
574  }
575  
576  /** @return the total number of racks */
577  public int getNumOfRacks() {
578    netlock.readLock().lock();
579    try {
580      return numOfRacks;
581    } finally {
582      netlock.readLock().unlock();
583    }
584  }
585
586  /** @return the total number of leaf nodes */
587  public int getNumOfLeaves() {
588    netlock.readLock().lock();
589    try {
590      return clusterMap.getNumOfLeaves();
591    } finally {
592      netlock.readLock().unlock();
593    }
594  }
595
596  /** Return the distance between two nodes
597   * It is assumed that the distance from one node to its parent is 1
598   * The distance between two nodes is calculated by summing up their distances
599   * to their closest common ancestor.
600   * @param node1 one node
601   * @param node2 another node
602   * @return the distance between node1 and node2 which is zero if they are the same
603   *  or {@link Integer#MAX_VALUE} if node1 or node2 do not belong to the cluster
604   */
605  public int getDistance(Node node1, Node node2) {
606    if (node1 == node2) {
607      return 0;
608    }
609    Node n1=node1, n2=node2;
610    int dis = 0;
611    netlock.readLock().lock();
612    try {
613      int level1=node1.getLevel(), level2=node2.getLevel();
614      while(n1!=null && level1>level2) {
615        n1 = n1.getParent();
616        level1--;
617        dis++;
618      }
619      while(n2!=null && level2>level1) {
620        n2 = n2.getParent();
621        level2--;
622        dis++;
623      }
624      while(n1!=null && n2!=null && n1.getParent()!=n2.getParent()) {
625        n1=n1.getParent();
626        n2=n2.getParent();
627        dis+=2;
628      }
629    } finally {
630      netlock.readLock().unlock();
631    }
632    if (n1==null) {
633      LOG.warn("The cluster does not contain node: "+NodeBase.getPath(node1));
634      return Integer.MAX_VALUE;
635    }
636    if (n2==null) {
637      LOG.warn("The cluster does not contain node: "+NodeBase.getPath(node2));
638      return Integer.MAX_VALUE;
639    }
640    return dis+2;
641  }
642
643  /** Check if two nodes are on the same rack
644   * @param node1 one node (can be null)
645   * @param node2 another node (can be null)
646   * @return true if node1 and node2 are on the same rack; false otherwise
647   * @exception IllegalArgumentException when either node1 or node2 is null, or
648   * node1 or node2 do not belong to the cluster
649   */
650  public boolean isOnSameRack( Node node1,  Node node2) {
651    if (node1 == null || node2 == null) {
652      return false;
653    }
654      
655    netlock.readLock().lock();
656    try {
657      return isSameParents(node1, node2);
658    } finally {
659      netlock.readLock().unlock();
660    }
661  }
662  
663  /**
664   * Check if network topology is aware of NodeGroup
665   */
666  public boolean isNodeGroupAware() {
667    return false;
668  }
669  
670  /** 
671   * Return false directly as not aware of NodeGroup, to be override in sub-class
672   */
673  public boolean isOnSameNodeGroup(Node node1, Node node2) {
674    return false;
675  }
676
677  /**
678   * Compare the parents of each node for equality
679   * 
680   * <p>To be overridden in subclasses for specific NetworkTopology 
681   * implementations, as alternative to overriding the full 
682   * {@link #isOnSameRack(Node, Node)} method.
683   * 
684   * @param node1 the first node to compare
685   * @param node2 the second node to compare
686   * @return true if their parents are equal, false otherwise
687   * 
688   * @see #isOnSameRack(Node, Node)
689   */
690  protected boolean isSameParents(Node node1, Node node2) {
691    return node1.getParent()==node2.getParent();
692  }
693
694  private static final Random r = new Random();
695
696  @VisibleForTesting
697  void setRandomSeed(long seed) {
698    r.setSeed(seed);
699  }
700
701  /**
702   * Randomly choose a node.
703   *
704   * @param scope range of nodes from which a node will be chosen
705   * @return the chosen node
706   *
707   * @see #chooseRandom(String, Collection)
708   */
709  public Node chooseRandom(final String scope) {
710    return chooseRandom(scope, null);
711  }
712
713  /**
714   * Randomly choose one node from <i>scope</i>.
715   *
716   * If scope starts with ~, choose one from the all nodes except for the
717   * ones in <i>scope</i>; otherwise, choose one from <i>scope</i>.
718   * If excludedNodes is given, choose a node that's not in excludedNodes.
719   *
720   * @param scope range of nodes from which a node will be chosen
721   * @param excludedNodes nodes to be excluded from
722   * @return the chosen node
723   */
724  public Node chooseRandom(final String scope,
725      final Collection<Node> excludedNodes) {
726    netlock.readLock().lock();
727    try {
728      if (scope.startsWith("~")) {
729        return chooseRandom(NodeBase.ROOT, scope.substring(1), excludedNodes);
730      } else {
731        return chooseRandom(scope, null, excludedNodes);
732      }
733    } finally {
734      netlock.readLock().unlock();
735    }
736  }
737
738  private Node chooseRandom(final String scope, String excludedScope,
739      final Collection<Node> excludedNodes) {
740    if (excludedScope != null) {
741      if (scope.startsWith(excludedScope)) {
742        return null;
743      }
744      if (!excludedScope.startsWith(scope)) {
745        excludedScope = null;
746      }
747    }
748    Node node = getNode(scope);
749    if (!(node instanceof InnerNode)) {
750      return excludedNodes != null && excludedNodes.contains(node) ?
751          null : node;
752    }
753    InnerNode innerNode = (InnerNode)node;
754    int numOfDatanodes = innerNode.getNumOfLeaves();
755    if (excludedScope == null) {
756      node = null;
757    } else {
758      node = getNode(excludedScope);
759      if (!(node instanceof InnerNode)) {
760        numOfDatanodes -= 1;
761      } else {
762        numOfDatanodes -= ((InnerNode)node).getNumOfLeaves();
763      }
764    }
765    if (numOfDatanodes == 0) {
766      LOG.debug("Failed to find datanode (scope=\"{}\" excludedScope=\"{}\").",
767          String.valueOf(scope), String.valueOf(excludedScope));
768      return null;
769    }
770    Node ret = null;
771    final int availableNodes;
772    if (excludedScope == null) {
773      availableNodes = countNumOfAvailableNodes(scope, excludedNodes);
774    } else {
775      availableNodes =
776          countNumOfAvailableNodes("~" + excludedScope, excludedNodes);
777    }
778    LOG.debug("Choosing random from {} available nodes on node {},"
779        + " scope={}, excludedScope={}, excludeNodes={}", availableNodes,
780        innerNode.toString(), scope, excludedScope, excludedNodes);
781    if (availableNodes > 0) {
782      do {
783        int leaveIndex = r.nextInt(numOfDatanodes);
784        ret = innerNode.getLeaf(leaveIndex, node);
785        if (excludedNodes == null || !excludedNodes.contains(ret)) {
786          break;
787        } else {
788          LOG.debug("Node {} is excluded, continuing.", ret);
789        }
790        // We've counted numOfAvailableNodes inside the lock, so there must be
791        // at least 1 satisfying node. Keep trying until we found it.
792      } while (true);
793    }
794    LOG.debug("chooseRandom returning {}", ret);
795    return ret;
796  }
797
798  /** return leaves in <i>scope</i>
799   * @param scope a path string
800   * @return leaves nodes under specific scope
801   */
802  public List<Node> getLeaves(String scope) {
803    Node node = getNode(scope);
804    List<Node> leafNodes = new ArrayList<Node>();
805    if (!(node instanceof InnerNode)) {
806      leafNodes.add(node);
807    } else {
808      InnerNode innerNode = (InnerNode) node;
809      for (int i=0;i<innerNode.getNumOfLeaves();i++) {
810        leafNodes.add(innerNode.getLeaf(i, null));
811      }
812    }
813    return leafNodes;
814  }
815
816  /** return the number of leaves in <i>scope</i> but not in <i>excludedNodes</i>
817   * if scope starts with ~, return the number of nodes that are not
818   * in <i>scope</i> and <i>excludedNodes</i>; 
819   * @param scope a path string that may start with ~
820   * @param excludedNodes a list of nodes
821   * @return number of available nodes
822   */
823  @VisibleForTesting
824  public int countNumOfAvailableNodes(String scope,
825                                      Collection<Node> excludedNodes) {
826    boolean isExcluded=false;
827    if (scope.startsWith("~")) {
828      isExcluded=true;
829      scope=scope.substring(1);
830    }
831    scope = NodeBase.normalize(scope);
832    int excludedCountInScope = 0; // the number of nodes in both scope & excludedNodes
833    int excludedCountOffScope = 0; // the number of nodes outside scope & excludedNodes
834    netlock.readLock().lock();
835    try {
836      if (excludedNodes != null) {
837        for (Node node : excludedNodes) {
838          node = getNode(NodeBase.getPath(node));
839          if (node == null) {
840            continue;
841          }
842          if ((NodeBase.getPath(node) + NodeBase.PATH_SEPARATOR_STR)
843              .startsWith(scope + NodeBase.PATH_SEPARATOR_STR)) {
844            excludedCountInScope++;
845          } else {
846            excludedCountOffScope++;
847          }
848        }
849      }
850      Node n = getNode(scope);
851      int scopeNodeCount = 0;
852      if (n != null) {
853        scopeNodeCount++;
854      }
855      if (n instanceof InnerNode) {
856        scopeNodeCount=((InnerNode)n).getNumOfLeaves();
857      }
858      if (isExcluded) {
859        return clusterMap.getNumOfLeaves() - scopeNodeCount
860            - excludedCountOffScope;
861      } else {
862        return scopeNodeCount - excludedCountInScope;
863      }
864    } finally {
865      netlock.readLock().unlock();
866    }
867  }
868
869  /** convert a network tree to a string */
870  @Override
871  public String toString() {
872    // print the number of racks
873    StringBuilder tree = new StringBuilder();
874    tree.append("Number of racks: ");
875    tree.append(numOfRacks);
876    tree.append("\n");
877    // print the number of leaves
878    int numOfLeaves = getNumOfLeaves();
879    tree.append("Expected number of leaves:");
880    tree.append(numOfLeaves);
881    tree.append("\n");
882    // print nodes
883    for(int i=0; i<numOfLeaves; i++) {
884      tree.append(NodeBase.getPath(clusterMap.getLeaf(i, null)));
885      tree.append("\n");
886    }
887    return tree.toString();
888  }
889  
890  /**
891   * Divide networklocation string into two parts by last separator, and get 
892   * the first part here.
893   * 
894   * @param networkLocation
895   * @return
896   */
897  public static String getFirstHalf(String networkLocation) {
898    int index = networkLocation.lastIndexOf(NodeBase.PATH_SEPARATOR_STR);
899    return networkLocation.substring(0, index);
900  }
901
902  /**
903   * Divide networklocation string into two parts by last separator, and get 
904   * the second part here.
905   * 
906   * @param networkLocation
907   * @return
908   */
909  public static String getLastHalf(String networkLocation) {
910    int index = networkLocation.lastIndexOf(NodeBase.PATH_SEPARATOR_STR);
911    return networkLocation.substring(index);
912  }
913
914  /**
915   * Returns an integer weight which specifies how far away {node} is away from
916   * {reader}. A lower value signifies that a node is closer.
917   * 
918   * @param reader Node where data will be read
919   * @param node Replica of data
920   * @return weight
921   */
922  protected int getWeight(Node reader, Node node) {
923    // 0 is local, 1 is same rack, 2 is off rack
924    // Start off by initializing to off rack
925    int weight = 2;
926    if (reader != null) {
927      if (reader.equals(node)) {
928        weight = 0;
929      } else if (isOnSameRack(reader, node)) {
930        weight = 1;
931      }
932    }
933    return weight;
934  }
935
936  /**
937   * Sort nodes array by network distance to <i>reader</i>.
938   * <p/>
939   * In a three-level topology, a node can be either local, on the same rack,
940   * or on a different rack from the reader. Sorting the nodes based on network
941   * distance from the reader reduces network traffic and improves
942   * performance.
943   * <p/>
944   * As an additional twist, we also randomize the nodes at each network
945   * distance. This helps with load balancing when there is data skew.
946   *
947   * @param reader    Node where data will be read
948   * @param nodes     Available replicas with the requested data
949   * @param activeLen Number of active nodes at the front of the array
950   */
951  public void sortByDistance(Node reader, Node[] nodes, int activeLen) {
952    /** Sort weights for the nodes array */
953    int[] weights = new int[activeLen];
954    for (int i=0; i<activeLen; i++) {
955      weights[i] = getWeight(reader, nodes[i]);
956    }
957    // Add weight/node pairs to a TreeMap to sort
958    TreeMap<Integer, List<Node>> tree = new TreeMap<Integer, List<Node>>();
959    for (int i=0; i<activeLen; i++) {
960      int weight = weights[i];
961      Node node = nodes[i];
962      List<Node> list = tree.get(weight);
963      if (list == null) {
964        list = Lists.newArrayListWithExpectedSize(1);
965        tree.put(weight, list);
966      }
967      list.add(node);
968    }
969
970    int idx = 0;
971    for (List<Node> list: tree.values()) {
972      if (list != null) {
973        Collections.shuffle(list, r);
974        for (Node n: list) {
975          nodes[idx] = n;
976          idx++;
977        }
978      }
979    }
980    Preconditions.checkState(idx == activeLen,
981        "Sorted the wrong number of nodes!");
982  }
983}