001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hdfs.server.balancer; 019 020import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed; 021 022import java.io.BufferedInputStream; 023import java.io.BufferedOutputStream; 024import java.io.DataInputStream; 025import java.io.DataOutputStream; 026import java.io.IOException; 027import java.io.InputStream; 028import java.io.OutputStream; 029import java.net.Socket; 030import java.util.ArrayList; 031import java.util.Collection; 032import java.util.EnumMap; 033import java.util.HashMap; 034import java.util.HashSet; 035import java.util.Iterator; 036import java.util.List; 037import java.util.Map; 038import java.util.Set; 039import java.util.concurrent.ExecutionException; 040import java.util.concurrent.ExecutorService; 041import java.util.concurrent.Executors; 042import java.util.concurrent.Future; 043 044import org.apache.commons.logging.Log; 045import org.apache.commons.logging.LogFactory; 046import org.apache.hadoop.classification.InterfaceAudience; 047import org.apache.hadoop.conf.Configuration; 048import org.apache.hadoop.fs.CommonConfigurationKeys; 049import org.apache.hadoop.fs.StorageType; 050import org.apache.hadoop.hdfs.DFSConfigKeys; 051import org.apache.hadoop.hdfs.DFSUtil; 052import org.apache.hadoop.hdfs.DFSUtilClient; 053import org.apache.hadoop.hdfs.DistributedFileSystem; 054import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; 055import org.apache.hadoop.hdfs.protocol.Block; 056import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 057import org.apache.hadoop.hdfs.protocol.ExtendedBlock; 058import org.apache.hadoop.hdfs.protocol.HdfsConstants; 059import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; 060import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; 061import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; 062import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; 063import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil; 064import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; 065import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; 066import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; 067import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; 068import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode.StorageGroup; 069import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; 070import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; 071import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; 072import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; 073import org.apache.hadoop.io.IOUtils; 074import org.apache.hadoop.net.NetUtils; 075import org.apache.hadoop.net.NetworkTopology; 076import org.apache.hadoop.security.token.Token; 077import org.apache.hadoop.util.StringUtils; 078import org.apache.hadoop.util.Time; 079 080import com.google.common.annotations.VisibleForTesting; 081import com.google.common.base.Preconditions; 082 083/** Dispatching block replica moves between datanodes. */ 084@InterfaceAudience.Private 085public class Dispatcher { 086 static final Log LOG = LogFactory.getLog(Dispatcher.class); 087 088 private static final int MAX_NO_PENDING_MOVE_ITERATIONS = 5; 089 /** 090 * the period of time to delay the usage of a DataNode after hitting 091 * errors when using it for migrating data 092 */ 093 private static long delayAfterErrors = 10 * 1000; 094 095 private final NameNodeConnector nnc; 096 private final SaslDataTransferClient saslClient; 097 098 /** Set of datanodes to be excluded. */ 099 private final Set<String> excludedNodes; 100 /** Restrict to the following nodes. */ 101 private final Set<String> includedNodes; 102 103 private final Collection<Source> sources = new HashSet<Source>(); 104 private final Collection<StorageGroup> targets = new HashSet<StorageGroup>(); 105 106 private final GlobalBlockMap globalBlocks = new GlobalBlockMap(); 107 private final MovedBlocks<StorageGroup> movedBlocks; 108 109 /** Map (datanodeUuid,storageType -> StorageGroup) */ 110 private final StorageGroupMap<StorageGroup> storageGroupMap 111 = new StorageGroupMap<StorageGroup>(); 112 113 private NetworkTopology cluster; 114 115 private final ExecutorService dispatchExecutor; 116 117 private final Allocator moverThreadAllocator; 118 119 /** The maximum number of concurrent blocks moves at a datanode */ 120 private final int maxConcurrentMovesPerNode; 121 122 private final long getBlocksSize; 123 private final long getBlocksMinBlockSize; 124 private final long blockMoveTimeout; 125 /** 126 * If no block can be moved out of a {@link Source} after this configured 127 * amount of time, the Source should give up choosing the next possible move. 128 */ 129 private final int maxNoMoveInterval; 130 131 private final int ioFileBufferSize; 132 133 private final boolean connectToDnViaHostname; 134 private BlockPlacementPolicy placementPolicy; 135 136 static class Allocator { 137 private final int max; 138 private int count = 0; 139 140 Allocator(int max) { 141 this.max = max; 142 } 143 144 synchronized int allocate(int n) { 145 final int remaining = max - count; 146 if (remaining <= 0) { 147 return 0; 148 } else { 149 final int allocated = remaining < n? remaining: n; 150 count += allocated; 151 return allocated; 152 } 153 } 154 155 synchronized void reset() { 156 count = 0; 157 } 158 } 159 160 private static class GlobalBlockMap { 161 private final Map<Block, DBlock> map = new HashMap<Block, DBlock>(); 162 163 /** 164 * Get the block from the map; 165 * if the block is not found, create a new block and put it in the map. 166 */ 167 private DBlock get(Block b) { 168 DBlock block = map.get(b); 169 if (block == null) { 170 block = new DBlock(b); 171 map.put(b, block); 172 } 173 return block; 174 } 175 176 /** Remove all blocks except for the moved blocks. */ 177 private void removeAllButRetain(MovedBlocks<StorageGroup> movedBlocks) { 178 for (Iterator<Block> i = map.keySet().iterator(); i.hasNext();) { 179 if (!movedBlocks.contains(i.next())) { 180 i.remove(); 181 } 182 } 183 } 184 } 185 186 public static class StorageGroupMap<G extends StorageGroup> { 187 private static String toKey(String datanodeUuid, StorageType storageType) { 188 return datanodeUuid + ":" + storageType; 189 } 190 191 private final Map<String, G> map = new HashMap<String, G>(); 192 193 public G get(String datanodeUuid, StorageType storageType) { 194 return map.get(toKey(datanodeUuid, storageType)); 195 } 196 197 public void put(G g) { 198 final String key = toKey(g.getDatanodeInfo().getDatanodeUuid(), g.storageType); 199 final StorageGroup existing = map.put(key, g); 200 Preconditions.checkState(existing == null); 201 } 202 203 int size() { 204 return map.size(); 205 } 206 207 void clear() { 208 map.clear(); 209 } 210 211 public Collection<G> values() { 212 return map.values(); 213 } 214 } 215 216 /** This class keeps track of a scheduled block move */ 217 public class PendingMove { 218 private DBlock block; 219 private Source source; 220 private DDatanode proxySource; 221 private StorageGroup target; 222 223 private PendingMove(Source source, StorageGroup target) { 224 this.source = source; 225 this.target = target; 226 } 227 228 @Override 229 public String toString() { 230 final Block b = block != null ? block.getBlock() : null; 231 String bStr = b != null ? (b + " with size=" + b.getNumBytes() + " ") 232 : " "; 233 return bStr + "from " + source.getDisplayName() + " to " + target 234 .getDisplayName() + " through " + (proxySource != null ? proxySource 235 .datanode : ""); 236 } 237 238 /** 239 * Choose a block & a proxy source for this pendingMove whose source & 240 * target have already been chosen. 241 * 242 * @return true if a block and its proxy are chosen; false otherwise 243 */ 244 private boolean chooseBlockAndProxy() { 245 // source and target must have the same storage type 246 final StorageType t = source.getStorageType(); 247 // iterate all source's blocks until find a good one 248 for (Iterator<DBlock> i = source.getBlockIterator(); i.hasNext();) { 249 if (markMovedIfGoodBlock(i.next(), t)) { 250 i.remove(); 251 return true; 252 } 253 } 254 return false; 255 } 256 257 /** 258 * @return true if the given block is good for the tentative move. 259 */ 260 private boolean markMovedIfGoodBlock(DBlock block, StorageType targetStorageType) { 261 synchronized (block) { 262 synchronized (movedBlocks) { 263 if (isGoodBlockCandidate(source, target, targetStorageType, block)) { 264 this.block = block; 265 if (chooseProxySource()) { 266 movedBlocks.put(block); 267 if (LOG.isDebugEnabled()) { 268 LOG.debug("Decided to move " + this); 269 } 270 return true; 271 } 272 } 273 } 274 } 275 return false; 276 } 277 278 /** 279 * Choose a proxy source. 280 * 281 * @return true if a proxy is found; otherwise false 282 */ 283 private boolean chooseProxySource() { 284 final DatanodeInfo targetDN = target.getDatanodeInfo(); 285 // if source and target are same nodes then no need of proxy 286 if (source.getDatanodeInfo().equals(targetDN) && addTo(source)) { 287 return true; 288 } 289 // if node group is supported, first try add nodes in the same node group 290 if (cluster.isNodeGroupAware()) { 291 for (StorageGroup loc : block.getLocations()) { 292 if (cluster.isOnSameNodeGroup(loc.getDatanodeInfo(), targetDN) 293 && addTo(loc)) { 294 return true; 295 } 296 } 297 } 298 // check if there is replica which is on the same rack with the target 299 for (StorageGroup loc : block.getLocations()) { 300 if (cluster.isOnSameRack(loc.getDatanodeInfo(), targetDN) && addTo(loc)) { 301 return true; 302 } 303 } 304 // find out a non-busy replica 305 for (StorageGroup loc : block.getLocations()) { 306 if (addTo(loc)) { 307 return true; 308 } 309 } 310 return false; 311 } 312 313 /** add to a proxy source for specific block movement */ 314 private boolean addTo(StorageGroup g) { 315 final DDatanode dn = g.getDDatanode(); 316 if (dn.addPendingBlock(this)) { 317 proxySource = dn; 318 return true; 319 } 320 return false; 321 } 322 323 /** Dispatch the move to the proxy source & wait for the response. */ 324 private void dispatch() { 325 LOG.info("Start moving " + this); 326 327 Socket sock = new Socket(); 328 DataOutputStream out = null; 329 DataInputStream in = null; 330 try { 331 sock.connect( 332 NetUtils.createSocketAddr(target.getDatanodeInfo(). 333 getXferAddr(Dispatcher.this.connectToDnViaHostname)), 334 HdfsConstants.READ_TIMEOUT); 335 336 // Set read timeout so that it doesn't hang forever against 337 // unresponsive nodes. Datanode normally sends IN_PROGRESS response 338 // twice within the client read timeout period (every 30 seconds by 339 // default). Here, we make it give up after 5 minutes of no response. 340 sock.setSoTimeout(HdfsConstants.READ_TIMEOUT * 5); 341 sock.setKeepAlive(true); 342 343 OutputStream unbufOut = sock.getOutputStream(); 344 InputStream unbufIn = sock.getInputStream(); 345 ExtendedBlock eb = new ExtendedBlock(nnc.getBlockpoolID(), 346 block.getBlock()); 347 final KeyManager km = nnc.getKeyManager(); 348 Token<BlockTokenIdentifier> accessToken = km.getAccessToken(eb); 349 IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut, 350 unbufIn, km, accessToken, target.getDatanodeInfo()); 351 unbufOut = saslStreams.out; 352 unbufIn = saslStreams.in; 353 out = new DataOutputStream(new BufferedOutputStream(unbufOut, 354 ioFileBufferSize)); 355 in = new DataInputStream(new BufferedInputStream(unbufIn, 356 ioFileBufferSize)); 357 358 sendRequest(out, eb, accessToken); 359 receiveResponse(in); 360 nnc.getBytesMoved().addAndGet(block.getNumBytes()); 361 target.getDDatanode().setHasSuccess(); 362 LOG.info("Successfully moved " + this); 363 } catch (IOException e) { 364 LOG.warn("Failed to move " + this, e); 365 target.getDDatanode().setHasFailure(); 366 // Proxy or target may have some issues, delay before using these nodes 367 // further in order to avoid a potential storm of "threads quota 368 // exceeded" warnings when the dispatcher gets out of sync with work 369 // going on in datanodes. 370 proxySource.activateDelay(delayAfterErrors); 371 target.getDDatanode().activateDelay(delayAfterErrors); 372 } finally { 373 IOUtils.closeStream(out); 374 IOUtils.closeStream(in); 375 IOUtils.closeSocket(sock); 376 377 proxySource.removePendingBlock(this); 378 target.getDDatanode().removePendingBlock(this); 379 380 synchronized (this) { 381 reset(); 382 } 383 synchronized (Dispatcher.this) { 384 Dispatcher.this.notifyAll(); 385 } 386 } 387 } 388 389 /** Send a block replace request to the output stream */ 390 private void sendRequest(DataOutputStream out, ExtendedBlock eb, 391 Token<BlockTokenIdentifier> accessToken) throws IOException { 392 new Sender(out).replaceBlock(eb, target.storageType, accessToken, 393 source.getDatanodeInfo().getDatanodeUuid(), proxySource.datanode); 394 } 395 396 /** Check whether to continue waiting for response */ 397 private boolean stopWaitingForResponse(long startTime) { 398 return source.isIterationOver() || 399 (blockMoveTimeout > 0 && 400 (Time.monotonicNow() - startTime > blockMoveTimeout)); 401 } 402 403 /** Receive a reportedBlock copy response from the input stream */ 404 private void receiveResponse(DataInputStream in) throws IOException { 405 long startTime = Time.monotonicNow(); 406 BlockOpResponseProto response = 407 BlockOpResponseProto.parseFrom(vintPrefixed(in)); 408 while (response.getStatus() == Status.IN_PROGRESS) { 409 // read intermediate responses 410 response = BlockOpResponseProto.parseFrom(vintPrefixed(in)); 411 // Stop waiting for slow block moves. Even if it stops waiting, 412 // the actual move may continue. 413 if (stopWaitingForResponse(startTime)) { 414 throw new IOException("Block move timed out"); 415 } 416 } 417 String logInfo = "block move is failed"; 418 DataTransferProtoUtil.checkBlockOpStatus(response, logInfo); 419 } 420 421 /** reset the object */ 422 private void reset() { 423 block = null; 424 source = null; 425 proxySource = null; 426 target = null; 427 } 428 } 429 430 /** A class for keeping track of block locations in the dispatcher. */ 431 public static class DBlock extends MovedBlocks.Locations<StorageGroup> { 432 public DBlock(Block block) { 433 super(block); 434 } 435 } 436 437 /** The class represents a desired move. */ 438 static class Task { 439 private final StorageGroup target; 440 private long size; // bytes scheduled to move 441 442 Task(StorageGroup target, long size) { 443 this.target = target; 444 this.size = size; 445 } 446 447 long getSize() { 448 return size; 449 } 450 } 451 452 /** A class that keeps track of a datanode. */ 453 public static class DDatanode { 454 455 /** A group of storages in a datanode with the same storage type. */ 456 public class StorageGroup { 457 final StorageType storageType; 458 final long maxSize2Move; 459 private long scheduledSize = 0L; 460 461 private StorageGroup(StorageType storageType, long maxSize2Move) { 462 this.storageType = storageType; 463 this.maxSize2Move = maxSize2Move; 464 } 465 466 public StorageType getStorageType() { 467 return storageType; 468 } 469 470 private DDatanode getDDatanode() { 471 return DDatanode.this; 472 } 473 474 public DatanodeInfo getDatanodeInfo() { 475 return DDatanode.this.datanode; 476 } 477 478 /** Decide if still need to move more bytes */ 479 boolean hasSpaceForScheduling() { 480 return hasSpaceForScheduling(0L); 481 } 482 483 synchronized boolean hasSpaceForScheduling(long size) { 484 return availableSizeToMove() > size; 485 } 486 487 /** @return the total number of bytes that need to be moved */ 488 synchronized long availableSizeToMove() { 489 return maxSize2Move - scheduledSize; 490 } 491 492 /** increment scheduled size */ 493 public synchronized void incScheduledSize(long size) { 494 scheduledSize += size; 495 } 496 497 /** @return scheduled size */ 498 synchronized long getScheduledSize() { 499 return scheduledSize; 500 } 501 502 /** Reset scheduled size to zero. */ 503 synchronized void resetScheduledSize() { 504 scheduledSize = 0L; 505 } 506 507 private PendingMove addPendingMove(DBlock block, final PendingMove pm) { 508 if (getDDatanode().addPendingBlock(pm)) { 509 if (pm.markMovedIfGoodBlock(block, getStorageType())) { 510 incScheduledSize(pm.block.getNumBytes()); 511 return pm; 512 } else { 513 getDDatanode().removePendingBlock(pm); 514 } 515 } 516 return null; 517 } 518 519 /** @return the name for display */ 520 String getDisplayName() { 521 return datanode + ":" + storageType; 522 } 523 524 @Override 525 public String toString() { 526 return getDisplayName(); 527 } 528 529 @Override 530 public int hashCode() { 531 return getStorageType().hashCode() ^ getDatanodeInfo().hashCode(); 532 } 533 534 @Override 535 public boolean equals(Object obj) { 536 if (this == obj) { 537 return true; 538 } else if (obj == null || !(obj instanceof StorageGroup)) { 539 return false; 540 } else { 541 final StorageGroup that = (StorageGroup) obj; 542 return this.getStorageType() == that.getStorageType() 543 && this.getDatanodeInfo().equals(that.getDatanodeInfo()); 544 } 545 } 546 547 } 548 549 final DatanodeInfo datanode; 550 private final EnumMap<StorageType, Source> sourceMap 551 = new EnumMap<StorageType, Source>(StorageType.class); 552 private final EnumMap<StorageType, StorageGroup> targetMap 553 = new EnumMap<StorageType, StorageGroup>(StorageType.class); 554 protected long delayUntil = 0L; 555 /** blocks being moved but not confirmed yet */ 556 private final List<PendingMove> pendings; 557 private volatile boolean hasFailure = false; 558 private volatile boolean hasSuccess = false; 559 private ExecutorService moveExecutor; 560 561 @Override 562 public String toString() { 563 return getClass().getSimpleName() + ":" + datanode; 564 } 565 566 private DDatanode(DatanodeInfo datanode, int maxConcurrentMoves) { 567 this.datanode = datanode; 568 this.pendings = new ArrayList<PendingMove>(maxConcurrentMoves); 569 } 570 571 public DatanodeInfo getDatanodeInfo() { 572 return datanode; 573 } 574 575 synchronized ExecutorService initMoveExecutor(int poolSize) { 576 return moveExecutor = Executors.newFixedThreadPool(poolSize); 577 } 578 579 synchronized ExecutorService getMoveExecutor() { 580 return moveExecutor; 581 } 582 583 synchronized void shutdownMoveExecutor() { 584 if (moveExecutor != null) { 585 moveExecutor.shutdown(); 586 moveExecutor = null; 587 } 588 } 589 590 private static <G extends StorageGroup> void put(StorageType storageType, 591 G g, EnumMap<StorageType, G> map) { 592 final StorageGroup existing = map.put(storageType, g); 593 Preconditions.checkState(existing == null); 594 } 595 596 public StorageGroup addTarget(StorageType storageType, long maxSize2Move) { 597 final StorageGroup g = new StorageGroup(storageType, maxSize2Move); 598 put(storageType, g, targetMap); 599 return g; 600 } 601 602 public Source addSource(StorageType storageType, long maxSize2Move, Dispatcher d) { 603 final Source s = d.new Source(storageType, maxSize2Move, this); 604 put(storageType, s, sourceMap); 605 return s; 606 } 607 608 synchronized private void activateDelay(long delta) { 609 delayUntil = Time.monotonicNow() + delta; 610 LOG.info(this + " activateDelay " + delta/1000.0 + " seconds"); 611 } 612 613 synchronized private boolean isDelayActive() { 614 if (delayUntil == 0 || Time.monotonicNow() > delayUntil) { 615 delayUntil = 0; 616 return false; 617 } 618 return true; 619 } 620 621 /** Check if all the dispatched moves are done */ 622 synchronized boolean isPendingQEmpty() { 623 return pendings.isEmpty(); 624 } 625 626 /** Add a scheduled block move to the node */ 627 synchronized boolean addPendingBlock(PendingMove pendingBlock) { 628 if (!isDelayActive()) { 629 return pendings.add(pendingBlock); 630 } 631 return false; 632 } 633 634 /** Remove a scheduled block move from the node */ 635 synchronized boolean removePendingBlock(PendingMove pendingBlock) { 636 return pendings.remove(pendingBlock); 637 } 638 639 void setHasFailure() { 640 this.hasFailure = true; 641 } 642 643 void setHasSuccess() { 644 this.hasSuccess = true; 645 } 646 } 647 648 /** A node that can be the sources of a block move */ 649 public class Source extends DDatanode.StorageGroup { 650 651 private final List<Task> tasks = new ArrayList<Task>(2); 652 private long blocksToReceive = 0L; 653 private final long startTime = Time.monotonicNow(); 654 /** 655 * Source blocks point to the objects in {@link Dispatcher#globalBlocks} 656 * because we want to keep one copy of a block and be aware that the 657 * locations are changing over time. 658 */ 659 private final List<DBlock> srcBlocks = new ArrayList<DBlock>(); 660 661 private Source(StorageType storageType, long maxSize2Move, DDatanode dn) { 662 dn.super(storageType, maxSize2Move); 663 } 664 665 /** 666 * Check if the iteration is over 667 */ 668 public boolean isIterationOver() { 669 return (Time.monotonicNow()-startTime > MAX_ITERATION_TIME); 670 } 671 672 /** Add a task */ 673 void addTask(Task task) { 674 Preconditions.checkState(task.target != this, 675 "Source and target are the same storage group " + getDisplayName()); 676 incScheduledSize(task.size); 677 tasks.add(task); 678 } 679 680 /** @return an iterator to this source's blocks */ 681 Iterator<DBlock> getBlockIterator() { 682 return srcBlocks.iterator(); 683 } 684 685 /** 686 * Fetch new blocks of this source from namenode and update this source's 687 * block list & {@link Dispatcher#globalBlocks}. 688 * 689 * @return the total size of the received blocks in the number of bytes. 690 */ 691 private long getBlockList() throws IOException { 692 final long size = Math.min(getBlocksSize, blocksToReceive); 693 final BlocksWithLocations newBlocks = nnc.getBlocks(getDatanodeInfo(), size); 694 695 if (LOG.isTraceEnabled()) { 696 LOG.trace("getBlocks(" + getDatanodeInfo() + ", " 697 + StringUtils.TraditionalBinaryPrefix.long2String(size, "B", 2) 698 + ") returns " + newBlocks.getBlocks().length + " blocks."); 699 } 700 701 long bytesReceived = 0; 702 for (BlockWithLocations blk : newBlocks.getBlocks()) { 703 // Skip small blocks. 704 if (blk.getBlock().getNumBytes() < getBlocksMinBlockSize) { 705 continue; 706 } 707 708 bytesReceived += blk.getBlock().getNumBytes(); 709 synchronized (globalBlocks) { 710 final DBlock block = globalBlocks.get(blk.getBlock()); 711 synchronized (block) { 712 block.clearLocations(); 713 714 // update locations 715 final String[] datanodeUuids = blk.getDatanodeUuids(); 716 final StorageType[] storageTypes = blk.getStorageTypes(); 717 for (int i = 0; i < datanodeUuids.length; i++) { 718 final StorageGroup g = storageGroupMap.get( 719 datanodeUuids[i], storageTypes[i]); 720 if (g != null) { // not unknown 721 block.addLocation(g); 722 } 723 } 724 } 725 if (!srcBlocks.contains(block) && isGoodBlockCandidate(block)) { 726 if (LOG.isTraceEnabled()) { 727 LOG.trace("Add " + block + " to " + this); 728 } 729 srcBlocks.add(block); 730 } 731 } 732 } 733 return bytesReceived; 734 } 735 736 /** Decide if the given block is a good candidate to move or not */ 737 private boolean isGoodBlockCandidate(DBlock block) { 738 // source and target must have the same storage type 739 final StorageType sourceStorageType = getStorageType(); 740 for (Task t : tasks) { 741 if (Dispatcher.this.isGoodBlockCandidate(this, t.target, 742 sourceStorageType, block)) { 743 return true; 744 } 745 } 746 return false; 747 } 748 749 /** 750 * Choose a move for the source. The block's source, target, and proxy 751 * are determined too. When choosing proxy and target, source & 752 * target throttling has been considered. They are chosen only when they 753 * have the capacity to support this block move. The block should be 754 * dispatched immediately after this method is returned. 755 * 756 * @return a move that's good for the source to dispatch immediately. 757 */ 758 private PendingMove chooseNextMove() { 759 for (Iterator<Task> i = tasks.iterator(); i.hasNext();) { 760 final Task task = i.next(); 761 final DDatanode target = task.target.getDDatanode(); 762 final PendingMove pendingBlock = new PendingMove(this, task.target); 763 if (target.addPendingBlock(pendingBlock)) { 764 // target is not busy, so do a tentative block allocation 765 if (pendingBlock.chooseBlockAndProxy()) { 766 long blockSize = pendingBlock.block.getNumBytes(); 767 incScheduledSize(-blockSize); 768 task.size -= blockSize; 769 if (task.size <= 0) { 770 i.remove(); 771 } 772 return pendingBlock; 773 } else { 774 // cancel the tentative move 775 target.removePendingBlock(pendingBlock); 776 } 777 } 778 } 779 return null; 780 } 781 782 /** Add a pending move */ 783 public PendingMove addPendingMove(DBlock block, StorageGroup target) { 784 return target.addPendingMove(block, new PendingMove(this, target)); 785 } 786 787 /** Iterate all source's blocks to remove moved ones */ 788 private void removeMovedBlocks() { 789 for (Iterator<DBlock> i = getBlockIterator(); i.hasNext();) { 790 if (movedBlocks.contains(i.next().getBlock())) { 791 i.remove(); 792 } 793 } 794 } 795 796 /** @return if should fetch more blocks from namenode */ 797 private boolean shouldFetchMoreBlocks() { 798 return blocksToReceive > 0; 799 } 800 801 private static final long MAX_ITERATION_TIME = 20 * 60 * 1000L; // 20 mins 802 803 /** 804 * This method iteratively does the following: it first selects a block to 805 * move, then sends a request to the proxy source to start the block move 806 * when the source's block list falls below a threshold, it asks the 807 * namenode for more blocks. It terminates when it has dispatch enough block 808 * move tasks or it has received enough blocks from the namenode, or the 809 * elapsed time of the iteration has exceeded the max time limit. 810 */ 811 private void dispatchBlocks() { 812 this.blocksToReceive = 2 * getScheduledSize(); 813 long previousMoveTimestamp = Time.monotonicNow(); 814 while (getScheduledSize() > 0 && !isIterationOver() 815 && (!srcBlocks.isEmpty() || blocksToReceive > 0)) { 816 if (LOG.isTraceEnabled()) { 817 LOG.trace(this + " blocksToReceive=" + blocksToReceive 818 + ", scheduledSize=" + getScheduledSize() 819 + ", srcBlocks#=" + srcBlocks.size()); 820 } 821 final PendingMove p = chooseNextMove(); 822 if (p != null) { 823 // Reset previous move timestamp 824 previousMoveTimestamp = Time.monotonicNow(); 825 executePendingMove(p); 826 continue; 827 } 828 829 // Since we cannot schedule any block to move, 830 // remove any moved blocks from the source block list and 831 removeMovedBlocks(); // filter already moved blocks 832 // check if we should fetch more blocks from the namenode 833 if (shouldFetchMoreBlocks()) { 834 // fetch new blocks 835 try { 836 final long received = getBlockList(); 837 if (received == 0) { 838 return; 839 } 840 blocksToReceive -= received; 841 continue; 842 } catch (IOException e) { 843 LOG.warn("Exception while getting block list", e); 844 return; 845 } 846 } else { 847 // jump out of while-loop after the configured timeout. 848 long noMoveInterval = Time.monotonicNow() - previousMoveTimestamp; 849 if (noMoveInterval > maxNoMoveInterval) { 850 LOG.info("Failed to find a pending move for " + noMoveInterval 851 + " ms. Skipping " + this); 852 resetScheduledSize(); 853 } 854 } 855 856 // Now we can not schedule any block to move and there are 857 // no new blocks added to the source block list, so we wait. 858 try { 859 synchronized (Dispatcher.this) { 860 Dispatcher.this.wait(1000); // wait for targets/sources to be idle 861 } 862 // Didn't find a possible move in this iteration of the while loop, 863 // adding a small delay before choosing next move again. 864 Thread.sleep(100); 865 } catch (InterruptedException ignored) { 866 } 867 } 868 869 if (isIterationOver()) { 870 LOG.info("The maximum iteration time (" + MAX_ITERATION_TIME/1000 871 + " seconds) has been reached. Stopping " + this); 872 } 873 } 874 875 @Override 876 public int hashCode() { 877 return super.hashCode(); 878 } 879 880 @Override 881 public boolean equals(Object obj) { 882 return super.equals(obj); 883 } 884 } 885 886 /** Constructor called by Mover. */ 887 public Dispatcher(NameNodeConnector nnc, Set<String> includedNodes, 888 Set<String> excludedNodes, long movedWinWidth, int moverThreads, 889 int dispatcherThreads, int maxConcurrentMovesPerNode, 890 int maxNoMoveInterval, Configuration conf) { 891 this(nnc, includedNodes, excludedNodes, movedWinWidth, 892 moverThreads, dispatcherThreads, maxConcurrentMovesPerNode, 893 0L, 0L, 0, maxNoMoveInterval, conf); 894 } 895 896 Dispatcher(NameNodeConnector nnc, Set<String> includedNodes, 897 Set<String> excludedNodes, long movedWinWidth, int moverThreads, 898 int dispatcherThreads, int maxConcurrentMovesPerNode, 899 long getBlocksSize, long getBlocksMinBlockSize, 900 int blockMoveTimeout, int maxNoMoveInterval, Configuration conf) { 901 this.nnc = nnc; 902 this.excludedNodes = excludedNodes; 903 this.includedNodes = includedNodes; 904 this.movedBlocks = new MovedBlocks<StorageGroup>(movedWinWidth); 905 906 this.cluster = NetworkTopology.getInstance(conf); 907 908 this.dispatchExecutor = dispatcherThreads == 0? null 909 : Executors.newFixedThreadPool(dispatcherThreads); 910 this.moverThreadAllocator = new Allocator(moverThreads); 911 this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode; 912 913 this.getBlocksSize = getBlocksSize; 914 this.getBlocksMinBlockSize = getBlocksMinBlockSize; 915 this.blockMoveTimeout = blockMoveTimeout; 916 this.maxNoMoveInterval = maxNoMoveInterval; 917 918 this.saslClient = new SaslDataTransferClient(conf, 919 DataTransferSaslUtil.getSaslPropertiesResolver(conf), 920 TrustedChannelResolver.getInstance(conf), nnc.fallbackToSimpleAuth); 921 this.ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(conf); 922 this.connectToDnViaHostname = conf.getBoolean( 923 HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME, 924 HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT); 925 this.placementPolicy = 926 BlockPlacementPolicy.getInstance(conf, null, cluster, null); 927 } 928 929 public DistributedFileSystem getDistributedFileSystem() { 930 return nnc.getDistributedFileSystem(); 931 } 932 933 public StorageGroupMap<StorageGroup> getStorageGroupMap() { 934 return storageGroupMap; 935 } 936 937 public NetworkTopology getCluster() { 938 return cluster; 939 } 940 941 long getBytesMoved() { 942 return nnc.getBytesMoved().get(); 943 } 944 945 long bytesToMove() { 946 Preconditions.checkState( 947 storageGroupMap.size() >= sources.size() + targets.size(), 948 "Mismatched number of storage groups (" + storageGroupMap.size() 949 + " < " + sources.size() + " sources + " + targets.size() 950 + " targets)"); 951 952 long b = 0L; 953 for (Source src : sources) { 954 b += src.getScheduledSize(); 955 } 956 return b; 957 } 958 959 void add(Source source, StorageGroup target) { 960 sources.add(source); 961 targets.add(target); 962 } 963 964 private boolean shouldIgnore(DatanodeInfo dn) { 965 // ignore decommissioned nodes 966 final boolean decommissioned = dn.isDecommissioned(); 967 // ignore decommissioning nodes 968 final boolean decommissioning = dn.isDecommissionInProgress(); 969 // ignore nodes in exclude list 970 final boolean excluded = Util.isExcluded(excludedNodes, dn); 971 // ignore nodes not in the include list (if include list is not empty) 972 final boolean notIncluded = !Util.isIncluded(includedNodes, dn); 973 974 if (decommissioned || decommissioning || excluded || notIncluded) { 975 if (LOG.isTraceEnabled()) { 976 LOG.trace("Excluding datanode " + dn 977 + ": decommissioned=" + decommissioned 978 + ", decommissioning=" + decommissioning 979 + ", excluded=" + excluded 980 + ", notIncluded=" + notIncluded); 981 } 982 return true; 983 } 984 return false; 985 } 986 987 /** Get live datanode storage reports and then build the network topology. */ 988 public List<DatanodeStorageReport> init() throws IOException { 989 final DatanodeStorageReport[] reports = nnc.getLiveDatanodeStorageReport(); 990 final List<DatanodeStorageReport> trimmed = new ArrayList<DatanodeStorageReport>(); 991 // create network topology and classify utilization collections: 992 // over-utilized, above-average, below-average and under-utilized. 993 for (DatanodeStorageReport r : DFSUtil.shuffle(reports)) { 994 final DatanodeInfo datanode = r.getDatanodeInfo(); 995 if (shouldIgnore(datanode)) { 996 continue; 997 } 998 trimmed.add(r); 999 cluster.add(datanode); 1000 } 1001 return trimmed; 1002 } 1003 1004 public DDatanode newDatanode(DatanodeInfo datanode) { 1005 return new DDatanode(datanode, maxConcurrentMovesPerNode); 1006 } 1007 1008 1009 public void executePendingMove(final PendingMove p) { 1010 // move the block 1011 final DDatanode targetDn = p.target.getDDatanode(); 1012 ExecutorService moveExecutor = targetDn.getMoveExecutor(); 1013 if (moveExecutor == null) { 1014 final int nThreads = moverThreadAllocator.allocate(maxConcurrentMovesPerNode); 1015 if (nThreads > 0) { 1016 moveExecutor = targetDn.initMoveExecutor(nThreads); 1017 } 1018 } 1019 if (moveExecutor == null) { 1020 LOG.warn("No mover threads available: skip moving " + p); 1021 return; 1022 } 1023 1024 moveExecutor.execute(new Runnable() { 1025 @Override 1026 public void run() { 1027 p.dispatch(); 1028 } 1029 }); 1030 } 1031 1032 public boolean dispatchAndCheckContinue() throws InterruptedException { 1033 return nnc.shouldContinue(dispatchBlockMoves()); 1034 } 1035 1036 /** 1037 * Dispatch block moves for each source. The thread selects blocks to move & 1038 * sends request to proxy source to initiate block move. The process is flow 1039 * controlled. Block selection is blocked if there are too many un-confirmed 1040 * block moves. 1041 * 1042 * @return the total number of bytes successfully moved in this iteration. 1043 */ 1044 private long dispatchBlockMoves() throws InterruptedException { 1045 final long bytesLastMoved = getBytesMoved(); 1046 final Future<?>[] futures = new Future<?>[sources.size()]; 1047 1048 final Iterator<Source> i = sources.iterator(); 1049 for (int j = 0; j < futures.length; j++) { 1050 final Source s = i.next(); 1051 futures[j] = dispatchExecutor.submit(new Runnable() { 1052 @Override 1053 public void run() { 1054 s.dispatchBlocks(); 1055 } 1056 }); 1057 } 1058 1059 // wait for all dispatcher threads to finish 1060 for (Future<?> future : futures) { 1061 try { 1062 future.get(); 1063 } catch (ExecutionException e) { 1064 LOG.warn("Dispatcher thread failed", e.getCause()); 1065 } 1066 } 1067 1068 // wait for all block moving to be done 1069 waitForMoveCompletion(targets); 1070 1071 return getBytesMoved() - bytesLastMoved; 1072 } 1073 1074 /** 1075 * Wait for all block move confirmations. 1076 * @return true if there is failed move execution 1077 */ 1078 public static boolean waitForMoveCompletion( 1079 Iterable<? extends StorageGroup> targets) { 1080 boolean hasFailure = false; 1081 for(;;) { 1082 boolean empty = true; 1083 for (StorageGroup t : targets) { 1084 if (!t.getDDatanode().isPendingQEmpty()) { 1085 empty = false; 1086 break; 1087 } else { 1088 hasFailure |= t.getDDatanode().hasFailure; 1089 } 1090 } 1091 if (empty) { 1092 return hasFailure; // all pending queues are empty 1093 } 1094 try { 1095 Thread.sleep(1000); 1096 } catch (InterruptedException ignored) { 1097 } 1098 } 1099 } 1100 1101 /** 1102 * @return true if some moves are success. 1103 */ 1104 public static boolean checkForSuccess( 1105 Iterable<? extends StorageGroup> targets) { 1106 boolean hasSuccess = false; 1107 for (StorageGroup t : targets) { 1108 hasSuccess |= t.getDDatanode().hasSuccess; 1109 } 1110 return hasSuccess; 1111 } 1112 1113 /** 1114 * Decide if the block is a good candidate to be moved from source to target. 1115 * A block is a good candidate if 1116 * 1. the block is not in the process of being moved/has not been moved; 1117 * 2. the block does not have a replica on the target; 1118 * 3. doing the move does not reduce the number of racks that the block has 1119 */ 1120 private boolean isGoodBlockCandidate(StorageGroup source, StorageGroup target, 1121 StorageType targetStorageType, DBlock block) { 1122 if (source.equals(target)) { 1123 return false; 1124 } 1125 if (target.storageType != targetStorageType) { 1126 return false; 1127 } 1128 // check if the block is moved or not 1129 if (movedBlocks.contains(block.getBlock())) { 1130 return false; 1131 } 1132 final DatanodeInfo targetDatanode = target.getDatanodeInfo(); 1133 if (source.getDatanodeInfo().equals(targetDatanode)) { 1134 // the block is moved inside same DN 1135 return true; 1136 } 1137 1138 // check if block has replica in target node 1139 for (StorageGroup blockLocation : block.getLocations()) { 1140 if (blockLocation.getDatanodeInfo().equals(targetDatanode)) { 1141 return false; 1142 } 1143 } 1144 1145 if (!isGoodBlockCandidateForPlacementPolicy(source, target, block)) { 1146 return false; 1147 } 1148 return true; 1149 } 1150 1151 // Check if the move will violate the block placement policy. 1152 private boolean isGoodBlockCandidateForPlacementPolicy(StorageGroup source, 1153 StorageGroup target, DBlock block) { 1154 List<DatanodeInfo> datanodeInfos = new ArrayList<>(); 1155 synchronized (block) { 1156 for (StorageGroup loc : block.locations) { 1157 datanodeInfos.add(loc.getDatanodeInfo()); 1158 } 1159 datanodeInfos.add(target.getDatanodeInfo()); 1160 } 1161 return placementPolicy.isMovable( 1162 datanodeInfos, source.getDatanodeInfo(), target.getDatanodeInfo()); 1163 } 1164 1165 /** Reset all fields in order to prepare for the next iteration */ 1166 void reset(Configuration conf) { 1167 cluster = NetworkTopology.getInstance(conf); 1168 storageGroupMap.clear(); 1169 sources.clear(); 1170 1171 moverThreadAllocator.reset(); 1172 for(StorageGroup t : targets) { 1173 t.getDDatanode().shutdownMoveExecutor(); 1174 } 1175 targets.clear(); 1176 globalBlocks.removeAllButRetain(movedBlocks); 1177 movedBlocks.cleanup(); 1178 } 1179 1180 @VisibleForTesting 1181 public static void setDelayAfterErrors(long time) { 1182 delayAfterErrors = time; 1183 } 1184 1185 /** shutdown thread pools */ 1186 public void shutdownNow() { 1187 if (dispatchExecutor != null) { 1188 dispatchExecutor.shutdownNow(); 1189 } 1190 } 1191 1192 static class Util { 1193 /** @return true if data node is part of the excludedNodes. */ 1194 static boolean isExcluded(Set<String> excludedNodes, DatanodeInfo dn) { 1195 return isIn(excludedNodes, dn); 1196 } 1197 1198 /** 1199 * @return true if includedNodes is empty or data node is part of the 1200 * includedNodes. 1201 */ 1202 static boolean isIncluded(Set<String> includedNodes, DatanodeInfo dn) { 1203 return (includedNodes.isEmpty() || isIn(includedNodes, dn)); 1204 } 1205 1206 /** 1207 * Match is checked using host name , ip address with and without port 1208 * number. 1209 * 1210 * @return true if the datanode's transfer address matches the set of nodes. 1211 */ 1212 private static boolean isIn(Set<String> datanodes, DatanodeInfo dn) { 1213 return isIn(datanodes, dn.getPeerHostName(), dn.getXferPort()) 1214 || isIn(datanodes, dn.getIpAddr(), dn.getXferPort()) 1215 || isIn(datanodes, dn.getHostName(), dn.getXferPort()); 1216 } 1217 1218 /** @return true if nodes contains host or host:port */ 1219 private static boolean isIn(Set<String> nodes, String host, int port) { 1220 if (host == null) { 1221 return false; 1222 } 1223 return (nodes.contains(host) || nodes.contains(host + ":" + port)); 1224 } 1225 } 1226}