001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hdfs.server.balancer;
019
020import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed;
021
022import java.io.BufferedInputStream;
023import java.io.BufferedOutputStream;
024import java.io.DataInputStream;
025import java.io.DataOutputStream;
026import java.io.IOException;
027import java.io.InputStream;
028import java.io.OutputStream;
029import java.net.Socket;
030import java.util.ArrayList;
031import java.util.Collection;
032import java.util.EnumMap;
033import java.util.HashMap;
034import java.util.HashSet;
035import java.util.Iterator;
036import java.util.List;
037import java.util.Map;
038import java.util.Set;
039import java.util.concurrent.ExecutionException;
040import java.util.concurrent.ExecutorService;
041import java.util.concurrent.Executors;
042import java.util.concurrent.Future;
043
044import org.apache.commons.logging.Log;
045import org.apache.commons.logging.LogFactory;
046import org.apache.hadoop.classification.InterfaceAudience;
047import org.apache.hadoop.conf.Configuration;
048import org.apache.hadoop.fs.CommonConfigurationKeys;
049import org.apache.hadoop.fs.StorageType;
050import org.apache.hadoop.hdfs.DFSConfigKeys;
051import org.apache.hadoop.hdfs.DFSUtil;
052import org.apache.hadoop.hdfs.DFSUtilClient;
053import org.apache.hadoop.hdfs.DistributedFileSystem;
054import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
055import org.apache.hadoop.hdfs.protocol.Block;
056import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
057import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
058import org.apache.hadoop.hdfs.protocol.HdfsConstants;
059import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
060import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
061import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
062import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
063import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
064import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
065import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
066import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
067import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
068import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode.StorageGroup;
069import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
070import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
071import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
072import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
073import org.apache.hadoop.io.IOUtils;
074import org.apache.hadoop.net.NetUtils;
075import org.apache.hadoop.net.NetworkTopology;
076import org.apache.hadoop.security.token.Token;
077import org.apache.hadoop.util.StringUtils;
078import org.apache.hadoop.util.Time;
079
080import com.google.common.annotations.VisibleForTesting;
081import com.google.common.base.Preconditions;
082
083/** Dispatching block replica moves between datanodes. */
084@InterfaceAudience.Private
085public class Dispatcher {
086  static final Log LOG = LogFactory.getLog(Dispatcher.class);
087
088  private static final int MAX_NO_PENDING_MOVE_ITERATIONS = 5;
089  /**
090   * the period of time to delay the usage of a DataNode after hitting
091   * errors when using it for migrating data
092   */
093  private static long delayAfterErrors = 10 * 1000;
094
095  private final NameNodeConnector nnc;
096  private final SaslDataTransferClient saslClient;
097
098  /** Set of datanodes to be excluded. */
099  private final Set<String> excludedNodes;
100  /** Restrict to the following nodes. */
101  private final Set<String> includedNodes;
102
103  private final Collection<Source> sources = new HashSet<Source>();
104  private final Collection<StorageGroup> targets = new HashSet<StorageGroup>();
105
106  private final GlobalBlockMap globalBlocks = new GlobalBlockMap();
107  private final MovedBlocks<StorageGroup> movedBlocks;
108
109  /** Map (datanodeUuid,storageType -> StorageGroup) */
110  private final StorageGroupMap<StorageGroup> storageGroupMap
111      = new StorageGroupMap<StorageGroup>();
112
113  private NetworkTopology cluster;
114
115  private final ExecutorService dispatchExecutor;
116
117  private final Allocator moverThreadAllocator;
118
119  /** The maximum number of concurrent blocks moves at a datanode */
120  private final int maxConcurrentMovesPerNode;
121
122  private final long getBlocksSize;
123  private final long getBlocksMinBlockSize;
124  private final long blockMoveTimeout;
125  /**
126   * If no block can be moved out of a {@link Source} after this configured
127   * amount of time, the Source should give up choosing the next possible move.
128   */
129  private final int maxNoMoveInterval;
130
131  private final int ioFileBufferSize;
132
133  private final boolean connectToDnViaHostname;
134  private BlockPlacementPolicy placementPolicy;
135
136  static class Allocator {
137    private final int max;
138    private int count = 0;
139
140    Allocator(int max) {
141      this.max = max;
142    }
143
144    synchronized int allocate(int n) {
145      final int remaining = max - count;
146      if (remaining <= 0) {
147        return 0;
148      } else {
149        final int allocated = remaining < n? remaining: n;
150        count += allocated;
151        return allocated;
152      }
153    }
154
155    synchronized void reset() {
156      count = 0;
157    }
158  }
159
160  private static class GlobalBlockMap {
161    private final Map<Block, DBlock> map = new HashMap<Block, DBlock>();
162
163    /**
164     * Get the block from the map;
165     * if the block is not found, create a new block and put it in the map.
166     */
167    private DBlock get(Block b) {
168      DBlock block = map.get(b);
169      if (block == null) {
170        block = new DBlock(b);
171        map.put(b, block);
172      }
173      return block;
174    }
175    
176    /** Remove all blocks except for the moved blocks. */
177    private void removeAllButRetain(MovedBlocks<StorageGroup> movedBlocks) {
178      for (Iterator<Block> i = map.keySet().iterator(); i.hasNext();) {
179        if (!movedBlocks.contains(i.next())) {
180          i.remove();
181        }
182      }
183    }
184  }
185
186  public static class StorageGroupMap<G extends StorageGroup> {
187    private static String toKey(String datanodeUuid, StorageType storageType) {
188      return datanodeUuid + ":" + storageType;
189    }
190
191    private final Map<String, G> map = new HashMap<String, G>();
192
193    public G get(String datanodeUuid, StorageType storageType) {
194      return map.get(toKey(datanodeUuid, storageType));
195    }
196
197    public void put(G g) {
198      final String key = toKey(g.getDatanodeInfo().getDatanodeUuid(), g.storageType);
199      final StorageGroup existing = map.put(key, g);
200      Preconditions.checkState(existing == null);
201    }
202
203    int size() {
204      return map.size();
205    }
206
207    void clear() {
208      map.clear();
209    }
210
211    public Collection<G> values() {
212      return map.values();
213    }
214  }
215
216  /** This class keeps track of a scheduled block move */
217  public class PendingMove {
218    private DBlock block;
219    private Source source;
220    private DDatanode proxySource;
221    private StorageGroup target;
222
223    private PendingMove(Source source, StorageGroup target) {
224      this.source = source;
225      this.target = target;
226    }
227
228    @Override
229    public String toString() {
230      final Block b = block != null ? block.getBlock() : null;
231      String bStr = b != null ? (b + " with size=" + b.getNumBytes() + " ")
232          : " ";
233      return bStr + "from " + source.getDisplayName() + " to " + target
234          .getDisplayName() + " through " + (proxySource != null ? proxySource
235          .datanode : "");
236    }
237
238    /**
239     * Choose a block & a proxy source for this pendingMove whose source &
240     * target have already been chosen.
241     * 
242     * @return true if a block and its proxy are chosen; false otherwise
243     */
244    private boolean chooseBlockAndProxy() {
245      // source and target must have the same storage type
246      final StorageType t = source.getStorageType();
247      // iterate all source's blocks until find a good one
248      for (Iterator<DBlock> i = source.getBlockIterator(); i.hasNext();) {
249        if (markMovedIfGoodBlock(i.next(), t)) {
250          i.remove();
251          return true;
252        }
253      }
254      return false;
255    }
256
257    /**
258     * @return true if the given block is good for the tentative move.
259     */
260    private boolean markMovedIfGoodBlock(DBlock block, StorageType targetStorageType) {
261      synchronized (block) {
262        synchronized (movedBlocks) {
263          if (isGoodBlockCandidate(source, target, targetStorageType, block)) {
264            this.block = block;
265            if (chooseProxySource()) {
266              movedBlocks.put(block);
267              if (LOG.isDebugEnabled()) {
268                LOG.debug("Decided to move " + this);
269              }
270              return true;
271            }
272          }
273        }
274      }
275      return false;
276    }
277
278    /**
279     * Choose a proxy source.
280     * 
281     * @return true if a proxy is found; otherwise false
282     */
283    private boolean chooseProxySource() {
284      final DatanodeInfo targetDN = target.getDatanodeInfo();
285      // if source and target are same nodes then no need of proxy
286      if (source.getDatanodeInfo().equals(targetDN) && addTo(source)) {
287        return true;
288      }
289      // if node group is supported, first try add nodes in the same node group
290      if (cluster.isNodeGroupAware()) {
291        for (StorageGroup loc : block.getLocations()) {
292          if (cluster.isOnSameNodeGroup(loc.getDatanodeInfo(), targetDN)
293              && addTo(loc)) {
294            return true;
295          }
296        }
297      }
298      // check if there is replica which is on the same rack with the target
299      for (StorageGroup loc : block.getLocations()) {
300        if (cluster.isOnSameRack(loc.getDatanodeInfo(), targetDN) && addTo(loc)) {
301          return true;
302        }
303      }
304      // find out a non-busy replica
305      for (StorageGroup loc : block.getLocations()) {
306        if (addTo(loc)) {
307          return true;
308        }
309      }
310      return false;
311    }
312
313    /** add to a proxy source for specific block movement */
314    private boolean addTo(StorageGroup g) {
315      final DDatanode dn = g.getDDatanode();
316      if (dn.addPendingBlock(this)) {
317        proxySource = dn;
318        return true;
319      }
320      return false;
321    }
322
323    /** Dispatch the move to the proxy source & wait for the response. */
324    private void dispatch() {
325      LOG.info("Start moving " + this);
326
327      Socket sock = new Socket();
328      DataOutputStream out = null;
329      DataInputStream in = null;
330      try {
331        sock.connect(
332            NetUtils.createSocketAddr(target.getDatanodeInfo().
333                getXferAddr(Dispatcher.this.connectToDnViaHostname)),
334                HdfsConstants.READ_TIMEOUT);
335
336        // Set read timeout so that it doesn't hang forever against
337        // unresponsive nodes. Datanode normally sends IN_PROGRESS response
338        // twice within the client read timeout period (every 30 seconds by
339        // default). Here, we make it give up after 5 minutes of no response.
340        sock.setSoTimeout(HdfsConstants.READ_TIMEOUT * 5);
341        sock.setKeepAlive(true);
342
343        OutputStream unbufOut = sock.getOutputStream();
344        InputStream unbufIn = sock.getInputStream();
345        ExtendedBlock eb = new ExtendedBlock(nnc.getBlockpoolID(),
346            block.getBlock());
347        final KeyManager km = nnc.getKeyManager(); 
348        Token<BlockTokenIdentifier> accessToken = km.getAccessToken(eb);
349        IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
350            unbufIn, km, accessToken, target.getDatanodeInfo());
351        unbufOut = saslStreams.out;
352        unbufIn = saslStreams.in;
353        out = new DataOutputStream(new BufferedOutputStream(unbufOut,
354            ioFileBufferSize));
355        in = new DataInputStream(new BufferedInputStream(unbufIn,
356            ioFileBufferSize));
357
358        sendRequest(out, eb, accessToken);
359        receiveResponse(in);
360        nnc.getBytesMoved().addAndGet(block.getNumBytes());
361        target.getDDatanode().setHasSuccess();
362        LOG.info("Successfully moved " + this);
363      } catch (IOException e) {
364        LOG.warn("Failed to move " + this, e);
365        target.getDDatanode().setHasFailure();
366        // Proxy or target may have some issues, delay before using these nodes
367        // further in order to avoid a potential storm of "threads quota
368        // exceeded" warnings when the dispatcher gets out of sync with work
369        // going on in datanodes.
370        proxySource.activateDelay(delayAfterErrors);
371        target.getDDatanode().activateDelay(delayAfterErrors);
372      } finally {
373        IOUtils.closeStream(out);
374        IOUtils.closeStream(in);
375        IOUtils.closeSocket(sock);
376
377        proxySource.removePendingBlock(this);
378        target.getDDatanode().removePendingBlock(this);
379
380        synchronized (this) {
381          reset();
382        }
383        synchronized (Dispatcher.this) {
384          Dispatcher.this.notifyAll();
385        }
386      }
387    }
388
389    /** Send a block replace request to the output stream */
390    private void sendRequest(DataOutputStream out, ExtendedBlock eb,
391        Token<BlockTokenIdentifier> accessToken) throws IOException {
392      new Sender(out).replaceBlock(eb, target.storageType, accessToken,
393          source.getDatanodeInfo().getDatanodeUuid(), proxySource.datanode);
394    }
395
396    /** Check whether to continue waiting for response */
397    private boolean stopWaitingForResponse(long startTime) {
398      return source.isIterationOver() ||
399          (blockMoveTimeout > 0 &&
400          (Time.monotonicNow() - startTime > blockMoveTimeout));
401    }
402
403    /** Receive a reportedBlock copy response from the input stream */
404    private void receiveResponse(DataInputStream in) throws IOException {
405      long startTime = Time.monotonicNow();
406      BlockOpResponseProto response =
407          BlockOpResponseProto.parseFrom(vintPrefixed(in));
408      while (response.getStatus() == Status.IN_PROGRESS) {
409        // read intermediate responses
410        response = BlockOpResponseProto.parseFrom(vintPrefixed(in));
411        // Stop waiting for slow block moves. Even if it stops waiting,
412        // the actual move may continue.
413        if (stopWaitingForResponse(startTime)) {
414          throw new IOException("Block move timed out");
415        }
416      }
417      String logInfo = "block move is failed";
418      DataTransferProtoUtil.checkBlockOpStatus(response, logInfo);
419    }
420
421    /** reset the object */
422    private void reset() {
423      block = null;
424      source = null;
425      proxySource = null;
426      target = null;
427    }
428  }
429
430  /** A class for keeping track of block locations in the dispatcher. */
431  public static class DBlock extends MovedBlocks.Locations<StorageGroup> {
432    public DBlock(Block block) {
433      super(block);
434    }
435  }
436
437  /** The class represents a desired move. */
438  static class Task {
439    private final StorageGroup target;
440    private long size; // bytes scheduled to move
441
442    Task(StorageGroup target, long size) {
443      this.target = target;
444      this.size = size;
445    }
446
447    long getSize() {
448      return size;
449    }
450  }
451
452  /** A class that keeps track of a datanode. */
453  public static class DDatanode {
454
455    /** A group of storages in a datanode with the same storage type. */
456    public class StorageGroup {
457      final StorageType storageType;
458      final long maxSize2Move;
459      private long scheduledSize = 0L;
460
461      private StorageGroup(StorageType storageType, long maxSize2Move) {
462        this.storageType = storageType;
463        this.maxSize2Move = maxSize2Move;
464      }
465      
466      public StorageType getStorageType() {
467        return storageType;
468      }
469
470      private DDatanode getDDatanode() {
471        return DDatanode.this;
472      }
473
474      public DatanodeInfo getDatanodeInfo() {
475        return DDatanode.this.datanode;
476      }
477
478      /** Decide if still need to move more bytes */
479      boolean hasSpaceForScheduling() {
480        return hasSpaceForScheduling(0L);
481      }
482
483      synchronized boolean hasSpaceForScheduling(long size) {
484        return availableSizeToMove() > size;
485      }
486
487      /** @return the total number of bytes that need to be moved */
488      synchronized long availableSizeToMove() {
489        return maxSize2Move - scheduledSize;
490      }
491
492      /** increment scheduled size */
493      public synchronized void incScheduledSize(long size) {
494        scheduledSize += size;
495      }
496
497      /** @return scheduled size */
498      synchronized long getScheduledSize() {
499        return scheduledSize;
500      }
501
502      /** Reset scheduled size to zero. */
503      synchronized void resetScheduledSize() {
504        scheduledSize = 0L;
505      }
506
507      private PendingMove addPendingMove(DBlock block, final PendingMove pm) {
508        if (getDDatanode().addPendingBlock(pm)) {
509          if (pm.markMovedIfGoodBlock(block, getStorageType())) {
510            incScheduledSize(pm.block.getNumBytes());
511            return pm;
512          } else {
513            getDDatanode().removePendingBlock(pm);
514          }
515        }
516        return null;
517      }
518
519      /** @return the name for display */
520      String getDisplayName() {
521        return datanode + ":" + storageType;
522      }
523
524      @Override
525      public String toString() {
526        return getDisplayName();
527      }
528
529      @Override
530      public int hashCode() {
531        return getStorageType().hashCode() ^ getDatanodeInfo().hashCode();
532      }
533
534      @Override
535      public boolean equals(Object obj) {
536        if (this == obj) {
537          return true;
538        } else if (obj == null || !(obj instanceof StorageGroup)) {
539          return false;
540        } else {
541          final StorageGroup that = (StorageGroup) obj;
542          return this.getStorageType() == that.getStorageType()
543              && this.getDatanodeInfo().equals(that.getDatanodeInfo());
544        }
545      }
546
547    }
548
549    final DatanodeInfo datanode;
550    private final EnumMap<StorageType, Source> sourceMap
551        = new EnumMap<StorageType, Source>(StorageType.class);
552    private final EnumMap<StorageType, StorageGroup> targetMap
553        = new EnumMap<StorageType, StorageGroup>(StorageType.class);
554    protected long delayUntil = 0L;
555    /** blocks being moved but not confirmed yet */
556    private final List<PendingMove> pendings;
557    private volatile boolean hasFailure = false;
558    private volatile boolean hasSuccess = false;
559    private ExecutorService moveExecutor;
560
561    @Override
562    public String toString() {
563      return getClass().getSimpleName() + ":" + datanode;
564    }
565
566    private DDatanode(DatanodeInfo datanode, int maxConcurrentMoves) {
567      this.datanode = datanode;
568      this.pendings = new ArrayList<PendingMove>(maxConcurrentMoves);
569    }
570
571    public DatanodeInfo getDatanodeInfo() {
572      return datanode;
573    }
574
575    synchronized ExecutorService initMoveExecutor(int poolSize) {
576      return moveExecutor = Executors.newFixedThreadPool(poolSize);
577    }
578
579    synchronized ExecutorService getMoveExecutor() {
580      return moveExecutor;
581    }
582
583    synchronized void shutdownMoveExecutor() {
584      if (moveExecutor != null) {
585        moveExecutor.shutdown();
586        moveExecutor = null;
587      }
588    }
589
590    private static <G extends StorageGroup> void put(StorageType storageType,
591        G g, EnumMap<StorageType, G> map) {
592      final StorageGroup existing = map.put(storageType, g);
593      Preconditions.checkState(existing == null);
594    }
595
596    public StorageGroup addTarget(StorageType storageType, long maxSize2Move) {
597      final StorageGroup g = new StorageGroup(storageType, maxSize2Move);
598      put(storageType, g, targetMap);
599      return g;
600    }
601
602    public Source addSource(StorageType storageType, long maxSize2Move, Dispatcher d) {
603      final Source s = d.new Source(storageType, maxSize2Move, this);
604      put(storageType, s, sourceMap);
605      return s;
606    }
607
608    synchronized private void activateDelay(long delta) {
609      delayUntil = Time.monotonicNow() + delta;
610      LOG.info(this + " activateDelay " + delta/1000.0 + " seconds");
611    }
612
613    synchronized private boolean isDelayActive() {
614      if (delayUntil == 0 || Time.monotonicNow() > delayUntil) {
615        delayUntil = 0;
616        return false;
617      }
618      return true;
619    }
620
621    /** Check if all the dispatched moves are done */
622    synchronized boolean isPendingQEmpty() {
623      return pendings.isEmpty();
624    }
625
626    /** Add a scheduled block move to the node */
627    synchronized boolean addPendingBlock(PendingMove pendingBlock) {
628      if (!isDelayActive()) {
629        return pendings.add(pendingBlock);
630      }
631      return false;
632    }
633
634    /** Remove a scheduled block move from the node */
635    synchronized boolean removePendingBlock(PendingMove pendingBlock) {
636      return pendings.remove(pendingBlock);
637    }
638
639    void setHasFailure() {
640      this.hasFailure = true;
641    }
642
643    void setHasSuccess() {
644      this.hasSuccess = true;
645    }
646  }
647
648  /** A node that can be the sources of a block move */
649  public class Source extends DDatanode.StorageGroup {
650
651    private final List<Task> tasks = new ArrayList<Task>(2);
652    private long blocksToReceive = 0L;
653    private final long startTime = Time.monotonicNow();
654    /**
655     * Source blocks point to the objects in {@link Dispatcher#globalBlocks}
656     * because we want to keep one copy of a block and be aware that the
657     * locations are changing over time.
658     */
659    private final List<DBlock> srcBlocks = new ArrayList<DBlock>();
660
661    private Source(StorageType storageType, long maxSize2Move, DDatanode dn) {
662      dn.super(storageType, maxSize2Move);
663    }
664
665    /**
666     * Check if the iteration is over
667     */
668    public boolean isIterationOver() {
669      return (Time.monotonicNow()-startTime > MAX_ITERATION_TIME);
670    }
671
672    /** Add a task */
673    void addTask(Task task) {
674      Preconditions.checkState(task.target != this,
675          "Source and target are the same storage group " + getDisplayName());
676      incScheduledSize(task.size);
677      tasks.add(task);
678    }
679
680    /** @return an iterator to this source's blocks */
681    Iterator<DBlock> getBlockIterator() {
682      return srcBlocks.iterator();
683    }
684
685    /**
686     * Fetch new blocks of this source from namenode and update this source's
687     * block list & {@link Dispatcher#globalBlocks}.
688     * 
689     * @return the total size of the received blocks in the number of bytes.
690     */
691    private long getBlockList() throws IOException {
692      final long size = Math.min(getBlocksSize, blocksToReceive);
693      final BlocksWithLocations newBlocks = nnc.getBlocks(getDatanodeInfo(), size);
694
695      if (LOG.isTraceEnabled()) {
696        LOG.trace("getBlocks(" + getDatanodeInfo() + ", "
697            + StringUtils.TraditionalBinaryPrefix.long2String(size, "B", 2)
698            + ") returns " + newBlocks.getBlocks().length + " blocks.");
699      }
700
701      long bytesReceived = 0;
702      for (BlockWithLocations blk : newBlocks.getBlocks()) {
703        // Skip small blocks.
704        if (blk.getBlock().getNumBytes() < getBlocksMinBlockSize) {
705          continue;
706        }
707
708        bytesReceived += blk.getBlock().getNumBytes();
709        synchronized (globalBlocks) {
710          final DBlock block = globalBlocks.get(blk.getBlock());
711          synchronized (block) {
712            block.clearLocations();
713
714            // update locations
715            final String[] datanodeUuids = blk.getDatanodeUuids();
716            final StorageType[] storageTypes = blk.getStorageTypes();
717            for (int i = 0; i < datanodeUuids.length; i++) {
718              final StorageGroup g = storageGroupMap.get(
719                  datanodeUuids[i], storageTypes[i]);
720              if (g != null) { // not unknown
721                block.addLocation(g);
722              }
723            }
724          }
725          if (!srcBlocks.contains(block) && isGoodBlockCandidate(block)) {
726            if (LOG.isTraceEnabled()) {
727              LOG.trace("Add " + block + " to " + this);
728            }
729            srcBlocks.add(block);
730          }
731        }
732      }
733      return bytesReceived;
734    }
735
736    /** Decide if the given block is a good candidate to move or not */
737    private boolean isGoodBlockCandidate(DBlock block) {
738      // source and target must have the same storage type
739      final StorageType sourceStorageType = getStorageType();
740      for (Task t : tasks) {
741        if (Dispatcher.this.isGoodBlockCandidate(this, t.target,
742            sourceStorageType, block)) {
743          return true;
744        }
745      }
746      return false;
747    }
748
749    /**
750     * Choose a move for the source. The block's source, target, and proxy
751     * are determined too. When choosing proxy and target, source &
752     * target throttling has been considered. They are chosen only when they
753     * have the capacity to support this block move. The block should be
754     * dispatched immediately after this method is returned.
755     * 
756     * @return a move that's good for the source to dispatch immediately.
757     */
758    private PendingMove chooseNextMove() {
759      for (Iterator<Task> i = tasks.iterator(); i.hasNext();) {
760        final Task task = i.next();
761        final DDatanode target = task.target.getDDatanode();
762        final PendingMove pendingBlock = new PendingMove(this, task.target);
763        if (target.addPendingBlock(pendingBlock)) {
764          // target is not busy, so do a tentative block allocation
765          if (pendingBlock.chooseBlockAndProxy()) {
766            long blockSize = pendingBlock.block.getNumBytes();
767            incScheduledSize(-blockSize);
768            task.size -= blockSize;
769            if (task.size <= 0) {
770              i.remove();
771            }
772            return pendingBlock;
773          } else {
774            // cancel the tentative move
775            target.removePendingBlock(pendingBlock);
776          }
777        }
778      }
779      return null;
780    }
781    
782    /** Add a pending move */
783    public PendingMove addPendingMove(DBlock block, StorageGroup target) {
784      return target.addPendingMove(block, new PendingMove(this, target));
785    }
786
787    /** Iterate all source's blocks to remove moved ones */
788    private void removeMovedBlocks() {
789      for (Iterator<DBlock> i = getBlockIterator(); i.hasNext();) {
790        if (movedBlocks.contains(i.next().getBlock())) {
791          i.remove();
792        }
793      }
794    }
795
796    /** @return if should fetch more blocks from namenode */
797    private boolean shouldFetchMoreBlocks() {
798      return blocksToReceive > 0;
799    }
800
801    private static final long MAX_ITERATION_TIME = 20 * 60 * 1000L; // 20 mins
802
803    /**
804     * This method iteratively does the following: it first selects a block to
805     * move, then sends a request to the proxy source to start the block move
806     * when the source's block list falls below a threshold, it asks the
807     * namenode for more blocks. It terminates when it has dispatch enough block
808     * move tasks or it has received enough blocks from the namenode, or the
809     * elapsed time of the iteration has exceeded the max time limit.
810     */
811    private void dispatchBlocks() {
812      this.blocksToReceive = 2 * getScheduledSize();
813      long previousMoveTimestamp = Time.monotonicNow();
814      while (getScheduledSize() > 0 && !isIterationOver()
815          && (!srcBlocks.isEmpty() || blocksToReceive > 0)) {
816        if (LOG.isTraceEnabled()) {
817          LOG.trace(this + " blocksToReceive=" + blocksToReceive
818              + ", scheduledSize=" + getScheduledSize()
819              + ", srcBlocks#=" + srcBlocks.size());
820        }
821        final PendingMove p = chooseNextMove();
822        if (p != null) {
823          // Reset previous move timestamp
824          previousMoveTimestamp = Time.monotonicNow();
825          executePendingMove(p);
826          continue;
827        }
828
829        // Since we cannot schedule any block to move,
830        // remove any moved blocks from the source block list and
831        removeMovedBlocks(); // filter already moved blocks
832        // check if we should fetch more blocks from the namenode
833        if (shouldFetchMoreBlocks()) {
834          // fetch new blocks
835          try {
836            final long received = getBlockList();
837            if (received == 0) {
838              return;
839            }
840            blocksToReceive -= received;
841            continue;
842          } catch (IOException e) {
843            LOG.warn("Exception while getting block list", e);
844            return;
845          }
846        } else {
847          // jump out of while-loop after the configured timeout.
848          long noMoveInterval = Time.monotonicNow() - previousMoveTimestamp;
849          if (noMoveInterval > maxNoMoveInterval) {
850            LOG.info("Failed to find a pending move for "  + noMoveInterval
851                + " ms.  Skipping " + this);
852            resetScheduledSize();
853          }
854        }
855
856        // Now we can not schedule any block to move and there are
857        // no new blocks added to the source block list, so we wait.
858        try {
859          synchronized (Dispatcher.this) {
860            Dispatcher.this.wait(1000); // wait for targets/sources to be idle
861          }
862          // Didn't find a possible move in this iteration of the while loop,
863          // adding a small delay before choosing next move again.
864          Thread.sleep(100);
865        } catch (InterruptedException ignored) {
866        }
867      }
868
869      if (isIterationOver()) {
870        LOG.info("The maximum iteration time (" + MAX_ITERATION_TIME/1000
871            + " seconds) has been reached. Stopping " + this);
872      }
873    }
874
875    @Override
876    public int hashCode() {
877      return super.hashCode();
878    }
879
880    @Override
881    public boolean equals(Object obj) {
882      return super.equals(obj);
883    }
884  }
885
886  /** Constructor called by Mover. */
887  public Dispatcher(NameNodeConnector nnc, Set<String> includedNodes,
888      Set<String> excludedNodes, long movedWinWidth, int moverThreads,
889      int dispatcherThreads, int maxConcurrentMovesPerNode,
890      int maxNoMoveInterval, Configuration conf) {
891    this(nnc, includedNodes, excludedNodes, movedWinWidth,
892        moverThreads, dispatcherThreads, maxConcurrentMovesPerNode,
893        0L, 0L, 0, maxNoMoveInterval, conf);
894  }
895
896  Dispatcher(NameNodeConnector nnc, Set<String> includedNodes,
897      Set<String> excludedNodes, long movedWinWidth, int moverThreads,
898      int dispatcherThreads, int maxConcurrentMovesPerNode,
899      long getBlocksSize, long getBlocksMinBlockSize,
900      int blockMoveTimeout, int maxNoMoveInterval, Configuration conf) {
901    this.nnc = nnc;
902    this.excludedNodes = excludedNodes;
903    this.includedNodes = includedNodes;
904    this.movedBlocks = new MovedBlocks<StorageGroup>(movedWinWidth);
905
906    this.cluster = NetworkTopology.getInstance(conf);
907
908    this.dispatchExecutor = dispatcherThreads == 0? null
909        : Executors.newFixedThreadPool(dispatcherThreads);
910    this.moverThreadAllocator = new Allocator(moverThreads);
911    this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode;
912
913    this.getBlocksSize = getBlocksSize;
914    this.getBlocksMinBlockSize = getBlocksMinBlockSize;
915    this.blockMoveTimeout = blockMoveTimeout;
916    this.maxNoMoveInterval = maxNoMoveInterval;
917
918    this.saslClient = new SaslDataTransferClient(conf,
919        DataTransferSaslUtil.getSaslPropertiesResolver(conf),
920        TrustedChannelResolver.getInstance(conf), nnc.fallbackToSimpleAuth);
921    this.ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(conf);
922    this.connectToDnViaHostname = conf.getBoolean(
923        HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME,
924        HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
925    this.placementPolicy =
926        BlockPlacementPolicy.getInstance(conf, null, cluster, null);
927  }
928
929  public DistributedFileSystem getDistributedFileSystem() {
930    return nnc.getDistributedFileSystem();
931  }
932
933  public StorageGroupMap<StorageGroup> getStorageGroupMap() {
934    return storageGroupMap;
935  }
936
937  public NetworkTopology getCluster() {
938    return cluster;
939  }
940  
941  long getBytesMoved() {
942    return nnc.getBytesMoved().get();
943  }
944
945  long bytesToMove() {
946    Preconditions.checkState(
947        storageGroupMap.size() >= sources.size() + targets.size(),
948        "Mismatched number of storage groups (" + storageGroupMap.size()
949            + " < " + sources.size() + " sources + " + targets.size()
950            + " targets)");
951
952    long b = 0L;
953    for (Source src : sources) {
954      b += src.getScheduledSize();
955    }
956    return b;
957  }
958
959  void add(Source source, StorageGroup target) {
960    sources.add(source);
961    targets.add(target);
962  }
963
964  private boolean shouldIgnore(DatanodeInfo dn) {
965    // ignore decommissioned nodes
966    final boolean decommissioned = dn.isDecommissioned();
967    // ignore decommissioning nodes
968    final boolean decommissioning = dn.isDecommissionInProgress();
969    // ignore nodes in exclude list
970    final boolean excluded = Util.isExcluded(excludedNodes, dn);
971    // ignore nodes not in the include list (if include list is not empty)
972    final boolean notIncluded = !Util.isIncluded(includedNodes, dn);
973
974    if (decommissioned || decommissioning || excluded || notIncluded) {
975      if (LOG.isTraceEnabled()) {
976        LOG.trace("Excluding datanode " + dn
977            + ": decommissioned=" + decommissioned
978            + ", decommissioning=" + decommissioning
979            + ", excluded=" + excluded
980            + ", notIncluded=" + notIncluded);
981      }
982      return true;
983    }
984    return false;
985  }
986
987  /** Get live datanode storage reports and then build the network topology. */
988  public List<DatanodeStorageReport> init() throws IOException {
989    final DatanodeStorageReport[] reports = nnc.getLiveDatanodeStorageReport();
990    final List<DatanodeStorageReport> trimmed = new ArrayList<DatanodeStorageReport>(); 
991    // create network topology and classify utilization collections:
992    // over-utilized, above-average, below-average and under-utilized.
993    for (DatanodeStorageReport r : DFSUtil.shuffle(reports)) {
994      final DatanodeInfo datanode = r.getDatanodeInfo();
995      if (shouldIgnore(datanode)) {
996        continue;
997      }
998      trimmed.add(r);
999      cluster.add(datanode);
1000    }
1001    return trimmed;
1002  }
1003
1004  public DDatanode newDatanode(DatanodeInfo datanode) {
1005    return new DDatanode(datanode, maxConcurrentMovesPerNode);
1006  }
1007
1008
1009  public void executePendingMove(final PendingMove p) {
1010    // move the block
1011    final DDatanode targetDn = p.target.getDDatanode();
1012    ExecutorService moveExecutor = targetDn.getMoveExecutor();
1013    if (moveExecutor == null) {
1014      final int nThreads = moverThreadAllocator.allocate(maxConcurrentMovesPerNode);
1015      if (nThreads > 0) {
1016        moveExecutor = targetDn.initMoveExecutor(nThreads);
1017      }
1018    }
1019    if (moveExecutor == null) {
1020      LOG.warn("No mover threads available: skip moving " + p);
1021      return;
1022    }
1023
1024    moveExecutor.execute(new Runnable() {
1025      @Override
1026      public void run() {
1027        p.dispatch();
1028      }
1029    });
1030  }
1031
1032  public boolean dispatchAndCheckContinue() throws InterruptedException {
1033    return nnc.shouldContinue(dispatchBlockMoves());
1034  }
1035
1036  /**
1037   * Dispatch block moves for each source. The thread selects blocks to move &
1038   * sends request to proxy source to initiate block move. The process is flow
1039   * controlled. Block selection is blocked if there are too many un-confirmed
1040   * block moves.
1041   * 
1042   * @return the total number of bytes successfully moved in this iteration.
1043   */
1044  private long dispatchBlockMoves() throws InterruptedException {
1045    final long bytesLastMoved = getBytesMoved();
1046    final Future<?>[] futures = new Future<?>[sources.size()];
1047
1048    final Iterator<Source> i = sources.iterator();
1049    for (int j = 0; j < futures.length; j++) {
1050      final Source s = i.next();
1051      futures[j] = dispatchExecutor.submit(new Runnable() {
1052        @Override
1053        public void run() {
1054          s.dispatchBlocks();
1055        }
1056      });
1057    }
1058
1059    // wait for all dispatcher threads to finish
1060    for (Future<?> future : futures) {
1061      try {
1062        future.get();
1063      } catch (ExecutionException e) {
1064        LOG.warn("Dispatcher thread failed", e.getCause());
1065      }
1066    }
1067
1068    // wait for all block moving to be done
1069    waitForMoveCompletion(targets);
1070
1071    return getBytesMoved() - bytesLastMoved;
1072  }
1073
1074  /**
1075   * Wait for all block move confirmations.
1076   * @return true if there is failed move execution
1077   */
1078  public static boolean waitForMoveCompletion(
1079      Iterable<? extends StorageGroup> targets) {
1080    boolean hasFailure = false;
1081    for(;;) {
1082      boolean empty = true;
1083      for (StorageGroup t : targets) {
1084        if (!t.getDDatanode().isPendingQEmpty()) {
1085          empty = false;
1086          break;
1087        } else {
1088          hasFailure |= t.getDDatanode().hasFailure;
1089        }
1090      }
1091      if (empty) {
1092        return hasFailure; // all pending queues are empty
1093      }
1094      try {
1095        Thread.sleep(1000);
1096      } catch (InterruptedException ignored) {
1097      }
1098    }
1099  }
1100
1101  /**
1102   * @return true if some moves are success.
1103   */
1104  public static boolean checkForSuccess(
1105      Iterable<? extends StorageGroup> targets) {
1106    boolean hasSuccess = false;
1107    for (StorageGroup t : targets) {
1108      hasSuccess |= t.getDDatanode().hasSuccess;
1109    }
1110    return hasSuccess;
1111  }
1112
1113  /**
1114   * Decide if the block is a good candidate to be moved from source to target.
1115   * A block is a good candidate if
1116   * 1. the block is not in the process of being moved/has not been moved;
1117   * 2. the block does not have a replica on the target;
1118   * 3. doing the move does not reduce the number of racks that the block has
1119   */
1120  private boolean isGoodBlockCandidate(StorageGroup source, StorageGroup target,
1121      StorageType targetStorageType, DBlock block) {
1122    if (source.equals(target)) {
1123      return false;
1124    }
1125    if (target.storageType != targetStorageType) {
1126      return false;
1127    }
1128    // check if the block is moved or not
1129    if (movedBlocks.contains(block.getBlock())) {
1130      return false;
1131    }
1132    final DatanodeInfo targetDatanode = target.getDatanodeInfo();
1133    if (source.getDatanodeInfo().equals(targetDatanode)) {
1134      // the block is moved inside same DN
1135      return true;
1136    }
1137
1138    // check if block has replica in target node
1139    for (StorageGroup blockLocation : block.getLocations()) {
1140      if (blockLocation.getDatanodeInfo().equals(targetDatanode)) {
1141        return false;
1142      }
1143    }
1144
1145    if (!isGoodBlockCandidateForPlacementPolicy(source, target, block)) {
1146      return false;
1147    }
1148    return true;
1149  }
1150
1151  // Check if the move will violate the block placement policy.
1152  private boolean isGoodBlockCandidateForPlacementPolicy(StorageGroup source,
1153     StorageGroup target, DBlock block) {
1154    List<DatanodeInfo> datanodeInfos = new ArrayList<>();
1155    synchronized (block) {
1156      for (StorageGroup loc : block.locations) {
1157        datanodeInfos.add(loc.getDatanodeInfo());
1158      }
1159      datanodeInfos.add(target.getDatanodeInfo());
1160    }
1161    return placementPolicy.isMovable(
1162        datanodeInfos, source.getDatanodeInfo(), target.getDatanodeInfo());
1163  }
1164
1165  /** Reset all fields in order to prepare for the next iteration */
1166  void reset(Configuration conf) {
1167    cluster = NetworkTopology.getInstance(conf);
1168    storageGroupMap.clear();
1169    sources.clear();
1170
1171    moverThreadAllocator.reset();
1172    for(StorageGroup t : targets) {
1173      t.getDDatanode().shutdownMoveExecutor();
1174    }
1175    targets.clear();
1176    globalBlocks.removeAllButRetain(movedBlocks);
1177    movedBlocks.cleanup();
1178  }
1179
1180  @VisibleForTesting
1181  public static void setDelayAfterErrors(long time) {
1182    delayAfterErrors = time;
1183  }
1184
1185  /** shutdown thread pools */
1186  public void shutdownNow() {
1187    if (dispatchExecutor != null) {
1188      dispatchExecutor.shutdownNow();
1189    }
1190  }
1191
1192  static class Util {
1193    /** @return true if data node is part of the excludedNodes. */
1194    static boolean isExcluded(Set<String> excludedNodes, DatanodeInfo dn) {
1195      return isIn(excludedNodes, dn);
1196    }
1197
1198    /**
1199     * @return true if includedNodes is empty or data node is part of the
1200     *         includedNodes.
1201     */
1202    static boolean isIncluded(Set<String> includedNodes, DatanodeInfo dn) {
1203      return (includedNodes.isEmpty() || isIn(includedNodes, dn));
1204    }
1205
1206    /**
1207     * Match is checked using host name , ip address with and without port
1208     * number.
1209     * 
1210     * @return true if the datanode's transfer address matches the set of nodes.
1211     */
1212    private static boolean isIn(Set<String> datanodes, DatanodeInfo dn) {
1213      return isIn(datanodes, dn.getPeerHostName(), dn.getXferPort())
1214          || isIn(datanodes, dn.getIpAddr(), dn.getXferPort())
1215          || isIn(datanodes, dn.getHostName(), dn.getXferPort());
1216    }
1217
1218    /** @return true if nodes contains host or host:port */
1219    private static boolean isIn(Set<String> nodes, String host, int port) {
1220      if (host == null) {
1221        return false;
1222      }
1223      return (nodes.contains(host) || nodes.contains(host + ":" + port));
1224    }
1225  }
1226}