001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hdfs.server.namenode; 019 020import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT; 021import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT_DEFAULT; 022import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES; 023import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES_DEFAULT; 024import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES; 025import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES_DEFAULT; 026import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS; 027import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS_DEFAULT; 028 029import java.io.DataInput; 030import java.io.DataOutputStream; 031import java.io.IOException; 032import java.util.ArrayList; 033import java.util.Collection; 034import java.util.Collections; 035import java.util.Date; 036import java.util.EnumSet; 037import java.util.Iterator; 038import java.util.LinkedList; 039import java.util.List; 040import java.util.Map.Entry; 041import java.util.SortedMap; 042import java.util.TreeMap; 043import java.util.concurrent.locks.ReentrantLock; 044 045import org.apache.commons.io.IOUtils; 046import org.apache.hadoop.classification.InterfaceAudience; 047import org.apache.hadoop.conf.Configuration; 048import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries; 049import org.apache.hadoop.fs.CacheFlag; 050import org.apache.hadoop.fs.InvalidRequestException; 051import org.apache.hadoop.fs.Path; 052import org.apache.hadoop.fs.permission.FsAction; 053import org.apache.hadoop.fs.permission.FsPermission; 054import org.apache.hadoop.hdfs.DFSUtil; 055import org.apache.hadoop.hdfs.protocol.CacheDirective; 056import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; 057import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; 058import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo.Expiration; 059import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats; 060import org.apache.hadoop.hdfs.protocol.CachePoolEntry; 061import org.apache.hadoop.hdfs.protocol.CachePoolInfo; 062import org.apache.hadoop.hdfs.protocol.DatanodeID; 063import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 064import org.apache.hadoop.hdfs.protocol.LocatedBlock; 065import org.apache.hadoop.hdfs.protocol.LocatedBlocks; 066import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoProto; 067import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolInfoProto; 068import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; 069import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; 070import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor; 071import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; 072import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList; 073import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type; 074import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp; 075import org.apache.hadoop.hdfs.server.namenode.FsImageProto.CacheManagerSection; 076import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics; 077import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; 078import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase; 079import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress; 080import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Counter; 081import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step; 082import org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType; 083import org.apache.hadoop.hdfs.util.ReadOnlyList; 084import org.apache.hadoop.security.AccessControlException; 085import org.apache.hadoop.util.GSet; 086import org.apache.hadoop.util.LightWeightGSet; 087import org.apache.hadoop.util.Time; 088import org.slf4j.Logger; 089import org.slf4j.LoggerFactory; 090 091import com.google.common.annotations.VisibleForTesting; 092import com.google.common.collect.Lists; 093 094/** 095 * The Cache Manager handles caching on DataNodes. 096 * 097 * This class is instantiated by the FSNamesystem. 098 * It maintains the mapping of cached blocks to datanodes via processing 099 * datanode cache reports. Based on these reports and addition and removal of 100 * caching directives, we will schedule caching and uncaching work. 101 */ 102@InterfaceAudience.LimitedPrivate({"HDFS"}) 103public final class CacheManager { 104 public static final Logger LOG = LoggerFactory.getLogger(CacheManager.class); 105 106 private static final float MIN_CACHED_BLOCKS_PERCENT = 0.001f; 107 108 // TODO: add pending / underCached / schedule cached blocks stats. 109 110 /** 111 * The FSNamesystem that contains this CacheManager. 112 */ 113 private final FSNamesystem namesystem; 114 115 /** 116 * The BlockManager associated with the FSN that owns this CacheManager. 117 */ 118 private final BlockManager blockManager; 119 120 /** 121 * Cache directives, sorted by ID. 122 * 123 * listCacheDirectives relies on the ordering of elements in this map 124 * to track what has already been listed by the client. 125 */ 126 private final TreeMap<Long, CacheDirective> directivesById = 127 new TreeMap<Long, CacheDirective>(); 128 129 /** 130 * The directive ID to use for a new directive. IDs always increase, and are 131 * never reused. 132 */ 133 private long nextDirectiveId; 134 135 /** 136 * Cache directives, sorted by path 137 */ 138 private final TreeMap<String, List<CacheDirective>> directivesByPath = 139 new TreeMap<String, List<CacheDirective>>(); 140 141 /** 142 * Cache pools, sorted by name. 143 */ 144 private final TreeMap<String, CachePool> cachePools = 145 new TreeMap<String, CachePool>(); 146 147 /** 148 * Maximum number of cache pools to list in one operation. 149 */ 150 private final int maxListCachePoolsResponses; 151 152 /** 153 * Maximum number of cache pool directives to list in one operation. 154 */ 155 private final int maxListCacheDirectivesNumResponses; 156 157 /** 158 * Interval between scans in milliseconds. 159 */ 160 private final long scanIntervalMs; 161 162 /** 163 * All cached blocks. 164 */ 165 private final GSet<CachedBlock, CachedBlock> cachedBlocks; 166 167 /** 168 * Lock which protects the CacheReplicationMonitor. 169 */ 170 private final ReentrantLock crmLock = new ReentrantLock(); 171 172 private final SerializerCompat serializerCompat = new SerializerCompat(); 173 174 /** 175 * The CacheReplicationMonitor. 176 */ 177 private CacheReplicationMonitor monitor; 178 179 public static final class PersistState { 180 public final CacheManagerSection section; 181 public final List<CachePoolInfoProto> pools; 182 public final List<CacheDirectiveInfoProto> directives; 183 184 public PersistState(CacheManagerSection section, 185 List<CachePoolInfoProto> pools, List<CacheDirectiveInfoProto> directives) { 186 this.section = section; 187 this.pools = pools; 188 this.directives = directives; 189 } 190 } 191 192 CacheManager(FSNamesystem namesystem, Configuration conf, 193 BlockManager blockManager) { 194 this.namesystem = namesystem; 195 this.blockManager = blockManager; 196 this.nextDirectiveId = 1; 197 this.maxListCachePoolsResponses = conf.getInt( 198 DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES, 199 DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES_DEFAULT); 200 this.maxListCacheDirectivesNumResponses = conf.getInt( 201 DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES, 202 DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES_DEFAULT); 203 scanIntervalMs = conf.getLong( 204 DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 205 DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS_DEFAULT); 206 float cachedBlocksPercent = conf.getFloat( 207 DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT, 208 DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT_DEFAULT); 209 if (cachedBlocksPercent < MIN_CACHED_BLOCKS_PERCENT) { 210 LOG.info("Using minimum value {} for {}", MIN_CACHED_BLOCKS_PERCENT, 211 DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT); 212 cachedBlocksPercent = MIN_CACHED_BLOCKS_PERCENT; 213 } 214 this.cachedBlocks = new LightWeightGSet<CachedBlock, CachedBlock>( 215 LightWeightGSet.computeCapacity(cachedBlocksPercent, 216 "cachedBlocks")); 217 218 } 219 220 /** 221 * Resets all tracked directives and pools. Called during 2NN checkpointing to 222 * reset FSNamesystem state. See {@link FSNamesystem#clear()}. 223 */ 224 void clear() { 225 directivesById.clear(); 226 directivesByPath.clear(); 227 cachePools.clear(); 228 nextDirectiveId = 1; 229 } 230 231 public void startMonitorThread() { 232 crmLock.lock(); 233 try { 234 if (this.monitor == null) { 235 this.monitor = new CacheReplicationMonitor(namesystem, this, 236 scanIntervalMs, crmLock); 237 this.monitor.start(); 238 } 239 } finally { 240 crmLock.unlock(); 241 } 242 } 243 244 public void stopMonitorThread() { 245 crmLock.lock(); 246 try { 247 if (this.monitor != null) { 248 CacheReplicationMonitor prevMonitor = this.monitor; 249 this.monitor = null; 250 IOUtils.closeQuietly(prevMonitor); 251 } 252 } finally { 253 crmLock.unlock(); 254 } 255 } 256 257 public void clearDirectiveStats() { 258 assert namesystem.hasWriteLock(); 259 for (CacheDirective directive : directivesById.values()) { 260 directive.resetStatistics(); 261 } 262 } 263 264 /** 265 * @return Unmodifiable view of the collection of CachePools. 266 */ 267 public Collection<CachePool> getCachePools() { 268 assert namesystem.hasReadLock(); 269 return Collections.unmodifiableCollection(cachePools.values()); 270 } 271 272 /** 273 * @return Unmodifiable view of the collection of CacheDirectives. 274 */ 275 public Collection<CacheDirective> getCacheDirectives() { 276 assert namesystem.hasReadLock(); 277 return Collections.unmodifiableCollection(directivesById.values()); 278 } 279 280 @VisibleForTesting 281 public GSet<CachedBlock, CachedBlock> getCachedBlocks() { 282 assert namesystem.hasReadLock(); 283 return cachedBlocks; 284 } 285 286 private long getNextDirectiveId() throws IOException { 287 assert namesystem.hasWriteLock(); 288 if (nextDirectiveId >= Long.MAX_VALUE - 1) { 289 throw new IOException("No more available IDs."); 290 } 291 return nextDirectiveId++; 292 } 293 294 // Helper getter / validation methods 295 296 private static void checkWritePermission(FSPermissionChecker pc, 297 CachePool pool) throws AccessControlException { 298 if ((pc != null)) { 299 pc.checkPermission(pool, FsAction.WRITE); 300 } 301 } 302 303 private static String validatePoolName(CacheDirectiveInfo directive) 304 throws InvalidRequestException { 305 String pool = directive.getPool(); 306 if (pool == null) { 307 throw new InvalidRequestException("No pool specified."); 308 } 309 if (pool.isEmpty()) { 310 throw new InvalidRequestException("Invalid empty pool name."); 311 } 312 return pool; 313 } 314 315 private static String validatePath(CacheDirectiveInfo directive) 316 throws InvalidRequestException { 317 if (directive.getPath() == null) { 318 throw new InvalidRequestException("No path specified."); 319 } 320 String path = directive.getPath().toUri().getPath(); 321 if (!DFSUtil.isValidName(path)) { 322 throw new InvalidRequestException("Invalid path '" + path + "'."); 323 } 324 return path; 325 } 326 327 private static short validateReplication(CacheDirectiveInfo directive, 328 short defaultValue) throws InvalidRequestException { 329 short repl = (directive.getReplication() != null) 330 ? directive.getReplication() : defaultValue; 331 if (repl <= 0) { 332 throw new InvalidRequestException("Invalid replication factor " + repl 333 + " <= 0"); 334 } 335 return repl; 336 } 337 338 /** 339 * Calculates the absolute expiry time of the directive from the 340 * {@link CacheDirectiveInfo.Expiration}. This converts a relative Expiration 341 * into an absolute time based on the local clock. 342 * 343 * @param info to validate. 344 * @param maxRelativeExpiryTime of the info's pool. 345 * @return the expiration time, or the pool's max absolute expiration if the 346 * info's expiration was not set. 347 * @throws InvalidRequestException if the info's Expiration is invalid. 348 */ 349 private static long validateExpiryTime(CacheDirectiveInfo info, 350 long maxRelativeExpiryTime) throws InvalidRequestException { 351 LOG.trace("Validating directive {} pool maxRelativeExpiryTime {}", info, 352 maxRelativeExpiryTime); 353 final long now = new Date().getTime(); 354 final long maxAbsoluteExpiryTime = now + maxRelativeExpiryTime; 355 if (info == null || info.getExpiration() == null) { 356 return maxAbsoluteExpiryTime; 357 } 358 Expiration expiry = info.getExpiration(); 359 if (expiry.getMillis() < 0l) { 360 throw new InvalidRequestException("Cannot set a negative expiration: " 361 + expiry.getMillis()); 362 } 363 long relExpiryTime, absExpiryTime; 364 if (expiry.isRelative()) { 365 relExpiryTime = expiry.getMillis(); 366 absExpiryTime = now + relExpiryTime; 367 } else { 368 absExpiryTime = expiry.getMillis(); 369 relExpiryTime = absExpiryTime - now; 370 } 371 // Need to cap the expiry so we don't overflow a long when doing math 372 if (relExpiryTime > Expiration.MAX_RELATIVE_EXPIRY_MS) { 373 throw new InvalidRequestException("Expiration " 374 + expiry.toString() + " is too far in the future!"); 375 } 376 // Fail if the requested expiry is greater than the max 377 if (relExpiryTime > maxRelativeExpiryTime) { 378 throw new InvalidRequestException("Expiration " + expiry.toString() 379 + " exceeds the max relative expiration time of " 380 + maxRelativeExpiryTime + " ms."); 381 } 382 return absExpiryTime; 383 } 384 385 /** 386 * Throws an exception if the CachePool does not have enough capacity to 387 * cache the given path at the replication factor. 388 * 389 * @param pool CachePool where the path is being cached 390 * @param path Path that is being cached 391 * @param replication Replication factor of the path 392 * @throws InvalidRequestException if the pool does not have enough capacity 393 */ 394 private void checkLimit(CachePool pool, String path, 395 short replication) throws InvalidRequestException { 396 CacheDirectiveStats stats = computeNeeded(path, replication); 397 if (pool.getLimit() == CachePoolInfo.LIMIT_UNLIMITED) { 398 return; 399 } 400 if (pool.getBytesNeeded() + stats.getBytesNeeded() > pool.getLimit()) { 401 throw new InvalidRequestException("Caching path " + path + " of size " 402 + stats.getBytesNeeded() / replication + " bytes at replication " 403 + replication + " would exceed pool " + pool.getPoolName() 404 + "'s remaining capacity of " 405 + (pool.getLimit() - pool.getBytesNeeded()) + " bytes."); 406 } 407 } 408 409 /** 410 * Computes the needed number of bytes and files for a path. 411 * @return CacheDirectiveStats describing the needed stats for this path 412 */ 413 private CacheDirectiveStats computeNeeded(String path, short replication) { 414 FSDirectory fsDir = namesystem.getFSDirectory(); 415 INode node; 416 long requestedBytes = 0; 417 long requestedFiles = 0; 418 CacheDirectiveStats.Builder builder = new CacheDirectiveStats.Builder(); 419 try { 420 node = fsDir.getINode(path, DirOp.READ); 421 } catch (IOException e) { 422 // We don't cache through invalid paths 423 return builder.build(); 424 } 425 if (node == null) { 426 return builder.build(); 427 } 428 if (node.isFile()) { 429 requestedFiles = 1; 430 INodeFile file = node.asFile(); 431 requestedBytes = file.computeFileSize(); 432 } else if (node.isDirectory()) { 433 INodeDirectory dir = node.asDirectory(); 434 ReadOnlyList<INode> children = dir 435 .getChildrenList(Snapshot.CURRENT_STATE_ID); 436 requestedFiles = children.size(); 437 for (INode child : children) { 438 if (child.isFile()) { 439 requestedBytes += child.asFile().computeFileSize(); 440 } 441 } 442 } 443 return new CacheDirectiveStats.Builder() 444 .setBytesNeeded(requestedBytes * replication) 445 .setFilesCached(requestedFiles) 446 .build(); 447 } 448 449 /** 450 * Get a CacheDirective by ID, validating the ID and that the directive 451 * exists. 452 */ 453 private CacheDirective getById(long id) throws InvalidRequestException { 454 // Check for invalid IDs. 455 if (id <= 0) { 456 throw new InvalidRequestException("Invalid negative ID."); 457 } 458 // Find the directive. 459 CacheDirective directive = directivesById.get(id); 460 if (directive == null) { 461 throw new InvalidRequestException("No directive with ID " + id 462 + " found."); 463 } 464 return directive; 465 } 466 467 /** 468 * Get a CachePool by name, validating that it exists. 469 */ 470 private CachePool getCachePool(String poolName) 471 throws InvalidRequestException { 472 CachePool pool = cachePools.get(poolName); 473 if (pool == null) { 474 throw new InvalidRequestException("Unknown pool " + poolName); 475 } 476 return pool; 477 } 478 479 // RPC handlers 480 481 private void addInternal(CacheDirective directive, CachePool pool) { 482 boolean addedDirective = pool.getDirectiveList().add(directive); 483 assert addedDirective; 484 directivesById.put(directive.getId(), directive); 485 String path = directive.getPath(); 486 List<CacheDirective> directives = directivesByPath.get(path); 487 if (directives == null) { 488 directives = new ArrayList<CacheDirective>(1); 489 directivesByPath.put(path, directives); 490 } 491 directives.add(directive); 492 // Fix up pool stats 493 CacheDirectiveStats stats = 494 computeNeeded(directive.getPath(), directive.getReplication()); 495 directive.addBytesNeeded(stats.getBytesNeeded()); 496 directive.addFilesNeeded(directive.getFilesNeeded()); 497 498 setNeedsRescan(); 499 } 500 501 /** 502 * Adds a directive, skipping most error checking. This should only be called 503 * internally in special scenarios like edit log replay. 504 */ 505 CacheDirectiveInfo addDirectiveFromEditLog(CacheDirectiveInfo directive) 506 throws InvalidRequestException { 507 long id = directive.getId(); 508 CacheDirective entry = new CacheDirective(directive); 509 CachePool pool = cachePools.get(directive.getPool()); 510 addInternal(entry, pool); 511 if (nextDirectiveId <= id) { 512 nextDirectiveId = id + 1; 513 } 514 return entry.toInfo(); 515 } 516 517 public CacheDirectiveInfo addDirective( 518 CacheDirectiveInfo info, FSPermissionChecker pc, EnumSet<CacheFlag> flags) 519 throws IOException { 520 assert namesystem.hasWriteLock(); 521 CacheDirective directive; 522 try { 523 CachePool pool = getCachePool(validatePoolName(info)); 524 checkWritePermission(pc, pool); 525 String path = validatePath(info); 526 short replication = validateReplication(info, (short)1); 527 long expiryTime = validateExpiryTime(info, pool.getMaxRelativeExpiryMs()); 528 // Do quota validation if required 529 if (!flags.contains(CacheFlag.FORCE)) { 530 checkLimit(pool, path, replication); 531 } 532 // All validation passed 533 // Add a new entry with the next available ID. 534 long id = getNextDirectiveId(); 535 directive = new CacheDirective(id, path, replication, expiryTime); 536 addInternal(directive, pool); 537 } catch (IOException e) { 538 LOG.warn("addDirective of " + info + " failed: ", e); 539 throw e; 540 } 541 LOG.info("addDirective of {} successful.", info); 542 return directive.toInfo(); 543 } 544 545 /** 546 * Factory method that makes a new CacheDirectiveInfo by applying fields in a 547 * CacheDirectiveInfo to an existing CacheDirective. 548 * 549 * @param info with some or all fields set. 550 * @param defaults directive providing default values for unset fields in 551 * info. 552 * 553 * @return new CacheDirectiveInfo of the info applied to the defaults. 554 */ 555 private static CacheDirectiveInfo createFromInfoAndDefaults( 556 CacheDirectiveInfo info, CacheDirective defaults) { 557 // Initialize the builder with the default values 558 CacheDirectiveInfo.Builder builder = 559 new CacheDirectiveInfo.Builder(defaults.toInfo()); 560 // Replace default with new value if present 561 if (info.getPath() != null) { 562 builder.setPath(info.getPath()); 563 } 564 if (info.getReplication() != null) { 565 builder.setReplication(info.getReplication()); 566 } 567 if (info.getPool() != null) { 568 builder.setPool(info.getPool()); 569 } 570 if (info.getExpiration() != null) { 571 builder.setExpiration(info.getExpiration()); 572 } 573 return builder.build(); 574 } 575 576 /** 577 * Modifies a directive, skipping most error checking. This is for careful 578 * internal use only. modifyDirective can be non-deterministic since its error 579 * checking depends on current system time, which poses a problem for edit log 580 * replay. 581 */ 582 void modifyDirectiveFromEditLog(CacheDirectiveInfo info) 583 throws InvalidRequestException { 584 // Check for invalid IDs. 585 Long id = info.getId(); 586 if (id == null) { 587 throw new InvalidRequestException("Must supply an ID."); 588 } 589 CacheDirective prevEntry = getById(id); 590 CacheDirectiveInfo newInfo = createFromInfoAndDefaults(info, prevEntry); 591 removeInternal(prevEntry); 592 addInternal(new CacheDirective(newInfo), getCachePool(newInfo.getPool())); 593 } 594 595 public void modifyDirective(CacheDirectiveInfo info, 596 FSPermissionChecker pc, EnumSet<CacheFlag> flags) throws IOException { 597 assert namesystem.hasWriteLock(); 598 String idString = 599 (info.getId() == null) ? 600 "(null)" : info.getId().toString(); 601 try { 602 // Check for invalid IDs. 603 Long id = info.getId(); 604 if (id == null) { 605 throw new InvalidRequestException("Must supply an ID."); 606 } 607 CacheDirective prevEntry = getById(id); 608 checkWritePermission(pc, prevEntry.getPool()); 609 610 // Fill in defaults 611 CacheDirectiveInfo infoWithDefaults = 612 createFromInfoAndDefaults(info, prevEntry); 613 CacheDirectiveInfo.Builder builder = 614 new CacheDirectiveInfo.Builder(infoWithDefaults); 615 616 // Do validation 617 validatePath(infoWithDefaults); 618 validateReplication(infoWithDefaults, (short)-1); 619 // Need to test the pool being set here to avoid rejecting a modify for a 620 // directive that's already been forced into a pool 621 CachePool srcPool = prevEntry.getPool(); 622 CachePool destPool = getCachePool(validatePoolName(infoWithDefaults)); 623 if (!srcPool.getPoolName().equals(destPool.getPoolName())) { 624 checkWritePermission(pc, destPool); 625 if (!flags.contains(CacheFlag.FORCE)) { 626 checkLimit(destPool, infoWithDefaults.getPath().toUri().getPath(), 627 infoWithDefaults.getReplication()); 628 } 629 } 630 // Verify the expiration against the destination pool 631 validateExpiryTime(infoWithDefaults, destPool.getMaxRelativeExpiryMs()); 632 633 // Indicate changes to the CRM 634 setNeedsRescan(); 635 636 // Validation passed 637 removeInternal(prevEntry); 638 addInternal(new CacheDirective(builder.build()), destPool); 639 } catch (IOException e) { 640 LOG.warn("modifyDirective of " + idString + " failed: ", e); 641 throw e; 642 } 643 LOG.info("modifyDirective of {} successfully applied {}.", idString, info); 644 } 645 646 private void removeInternal(CacheDirective directive) 647 throws InvalidRequestException { 648 assert namesystem.hasWriteLock(); 649 // Remove the corresponding entry in directivesByPath. 650 String path = directive.getPath(); 651 List<CacheDirective> directives = directivesByPath.get(path); 652 if (directives == null || !directives.remove(directive)) { 653 throw new InvalidRequestException("Failed to locate entry " + 654 directive.getId() + " by path " + directive.getPath()); 655 } 656 if (directives.size() == 0) { 657 directivesByPath.remove(path); 658 } 659 // Fix up the stats from removing the pool 660 final CachePool pool = directive.getPool(); 661 directive.addBytesNeeded(-directive.getBytesNeeded()); 662 directive.addFilesNeeded(-directive.getFilesNeeded()); 663 664 directivesById.remove(directive.getId()); 665 pool.getDirectiveList().remove(directive); 666 assert directive.getPool() == null; 667 668 setNeedsRescan(); 669 } 670 671 public void removeDirective(long id, FSPermissionChecker pc) 672 throws IOException { 673 assert namesystem.hasWriteLock(); 674 try { 675 CacheDirective directive = getById(id); 676 checkWritePermission(pc, directive.getPool()); 677 removeInternal(directive); 678 } catch (IOException e) { 679 LOG.warn("removeDirective of " + id + " failed: ", e); 680 throw e; 681 } 682 LOG.info("removeDirective of " + id + " successful."); 683 } 684 685 public BatchedListEntries<CacheDirectiveEntry> 686 listCacheDirectives(long prevId, 687 CacheDirectiveInfo filter, 688 FSPermissionChecker pc) throws IOException { 689 assert namesystem.hasReadLock(); 690 final int NUM_PRE_ALLOCATED_ENTRIES = 16; 691 String filterPath = null; 692 if (filter.getPath() != null) { 693 filterPath = validatePath(filter); 694 } 695 if (filter.getReplication() != null) { 696 throw new InvalidRequestException( 697 "Filtering by replication is unsupported."); 698 } 699 700 // Querying for a single ID 701 final Long id = filter.getId(); 702 if (id != null) { 703 if (!directivesById.containsKey(id)) { 704 throw new InvalidRequestException("Did not find requested id " + id); 705 } 706 // Since we use a tailMap on directivesById, setting prev to id-1 gets 707 // us the directive with the id (if present) 708 prevId = id - 1; 709 } 710 711 ArrayList<CacheDirectiveEntry> replies = 712 new ArrayList<CacheDirectiveEntry>(NUM_PRE_ALLOCATED_ENTRIES); 713 int numReplies = 0; 714 SortedMap<Long, CacheDirective> tailMap = 715 directivesById.tailMap(prevId + 1); 716 for (Entry<Long, CacheDirective> cur : tailMap.entrySet()) { 717 if (numReplies >= maxListCacheDirectivesNumResponses) { 718 return new BatchedListEntries<CacheDirectiveEntry>(replies, true); 719 } 720 CacheDirective curDirective = cur.getValue(); 721 CacheDirectiveInfo info = cur.getValue().toInfo(); 722 723 // If the requested ID is present, it should be the first item. 724 // Hitting this case means the ID is not present, or we're on the second 725 // item and should break out. 726 if (id != null && 727 !(info.getId().equals(id))) { 728 break; 729 } 730 if (filter.getPool() != null && 731 !info.getPool().equals(filter.getPool())) { 732 continue; 733 } 734 if (filterPath != null && 735 !info.getPath().toUri().getPath().equals(filterPath)) { 736 continue; 737 } 738 boolean hasPermission = true; 739 if (pc != null) { 740 try { 741 pc.checkPermission(curDirective.getPool(), FsAction.READ); 742 } catch (AccessControlException e) { 743 hasPermission = false; 744 } 745 } 746 if (hasPermission) { 747 replies.add(new CacheDirectiveEntry(info, cur.getValue().toStats())); 748 numReplies++; 749 } 750 } 751 return new BatchedListEntries<CacheDirectiveEntry>(replies, false); 752 } 753 754 /** 755 * Create a cache pool. 756 * 757 * Only the superuser should be able to call this function. 758 * 759 * @param info The info for the cache pool to create. 760 * @return Information about the cache pool we created. 761 */ 762 public CachePoolInfo addCachePool(CachePoolInfo info) 763 throws IOException { 764 assert namesystem.hasWriteLock(); 765 CachePool pool; 766 try { 767 CachePoolInfo.validate(info); 768 String poolName = info.getPoolName(); 769 pool = cachePools.get(poolName); 770 if (pool != null) { 771 throw new InvalidRequestException("Cache pool " + poolName 772 + " already exists."); 773 } 774 pool = CachePool.createFromInfoAndDefaults(info); 775 cachePools.put(pool.getPoolName(), pool); 776 } catch (IOException e) { 777 LOG.info("addCachePool of " + info + " failed: ", e); 778 throw e; 779 } 780 LOG.info("addCachePool of {} successful.", info); 781 return pool.getInfo(true); 782 } 783 784 /** 785 * Modify a cache pool. 786 * 787 * Only the superuser should be able to call this function. 788 * 789 * @param info 790 * The info for the cache pool to modify. 791 */ 792 public void modifyCachePool(CachePoolInfo info) 793 throws IOException { 794 assert namesystem.hasWriteLock(); 795 StringBuilder bld = new StringBuilder(); 796 try { 797 CachePoolInfo.validate(info); 798 String poolName = info.getPoolName(); 799 CachePool pool = cachePools.get(poolName); 800 if (pool == null) { 801 throw new InvalidRequestException("Cache pool " + poolName 802 + " does not exist."); 803 } 804 String prefix = ""; 805 if (info.getOwnerName() != null) { 806 pool.setOwnerName(info.getOwnerName()); 807 bld.append(prefix). 808 append("set owner to ").append(info.getOwnerName()); 809 prefix = "; "; 810 } 811 if (info.getGroupName() != null) { 812 pool.setGroupName(info.getGroupName()); 813 bld.append(prefix). 814 append("set group to ").append(info.getGroupName()); 815 prefix = "; "; 816 } 817 if (info.getMode() != null) { 818 pool.setMode(info.getMode()); 819 bld.append(prefix).append("set mode to " + info.getMode()); 820 prefix = "; "; 821 } 822 if (info.getLimit() != null) { 823 pool.setLimit(info.getLimit()); 824 bld.append(prefix).append("set limit to " + info.getLimit()); 825 prefix = "; "; 826 // New limit changes stats, need to set needs refresh 827 setNeedsRescan(); 828 } 829 if (info.getMaxRelativeExpiryMs() != null) { 830 final Long maxRelativeExpiry = info.getMaxRelativeExpiryMs(); 831 pool.setMaxRelativeExpiryMs(maxRelativeExpiry); 832 bld.append(prefix).append("set maxRelativeExpiry to " 833 + maxRelativeExpiry); 834 prefix = "; "; 835 } 836 if (prefix.isEmpty()) { 837 bld.append("no changes."); 838 } 839 } catch (IOException e) { 840 LOG.info("modifyCachePool of " + info + " failed: ", e); 841 throw e; 842 } 843 LOG.info("modifyCachePool of {} successful; {}", info.getPoolName(), 844 bld.toString()); 845 } 846 847 /** 848 * Remove a cache pool. 849 * 850 * Only the superuser should be able to call this function. 851 * 852 * @param poolName 853 * The name for the cache pool to remove. 854 */ 855 public void removeCachePool(String poolName) 856 throws IOException { 857 assert namesystem.hasWriteLock(); 858 try { 859 CachePoolInfo.validateName(poolName); 860 CachePool pool = cachePools.remove(poolName); 861 if (pool == null) { 862 throw new InvalidRequestException( 863 "Cannot remove non-existent cache pool " + poolName); 864 } 865 // Remove all directives in this pool. 866 Iterator<CacheDirective> iter = pool.getDirectiveList().iterator(); 867 while (iter.hasNext()) { 868 CacheDirective directive = iter.next(); 869 directivesByPath.remove(directive.getPath()); 870 directivesById.remove(directive.getId()); 871 iter.remove(); 872 } 873 setNeedsRescan(); 874 } catch (IOException e) { 875 LOG.info("removeCachePool of " + poolName + " failed: ", e); 876 throw e; 877 } 878 LOG.info("removeCachePool of " + poolName + " successful."); 879 } 880 881 public BatchedListEntries<CachePoolEntry> 882 listCachePools(FSPermissionChecker pc, String prevKey) { 883 assert namesystem.hasReadLock(); 884 final int NUM_PRE_ALLOCATED_ENTRIES = 16; 885 ArrayList<CachePoolEntry> results = 886 new ArrayList<CachePoolEntry>(NUM_PRE_ALLOCATED_ENTRIES); 887 SortedMap<String, CachePool> tailMap = cachePools.tailMap(prevKey, false); 888 int numListed = 0; 889 for (Entry<String, CachePool> cur : tailMap.entrySet()) { 890 if (numListed++ >= maxListCachePoolsResponses) { 891 return new BatchedListEntries<CachePoolEntry>(results, true); 892 } 893 results.add(cur.getValue().getEntry(pc)); 894 } 895 return new BatchedListEntries<CachePoolEntry>(results, false); 896 } 897 898 public void setCachedLocations(LocatedBlocks locations) { 899 // don't attempt lookups if there are no cached blocks 900 if (cachedBlocks.size() > 0) { 901 for (LocatedBlock lb : locations.getLocatedBlocks()) { 902 setCachedLocations(lb); 903 } 904 } 905 } 906 907 private void setCachedLocations(LocatedBlock block) { 908 CachedBlock cachedBlock = 909 new CachedBlock(block.getBlock().getBlockId(), 910 (short)0, false); 911 cachedBlock = cachedBlocks.get(cachedBlock); 912 if (cachedBlock == null) { 913 return; 914 } 915 List<DatanodeDescriptor> cachedDNs = cachedBlock.getDatanodes(Type.CACHED); 916 for (DatanodeDescriptor datanode : cachedDNs) { 917 // Filter out cached blocks that do not have a backing replica. 918 // 919 // This should not happen since it means the CacheManager thinks 920 // something is cached that does not exist, but it's a safety 921 // measure. 922 boolean found = false; 923 for (DatanodeInfo loc : block.getLocations()) { 924 if (loc.equals(datanode)) { 925 block.addCachedLoc(loc); 926 found = true; 927 break; 928 } 929 } 930 if (!found) { 931 LOG.warn("Datanode {} is not a valid cache location for block {} " 932 + "because that node does not have a backing replica!", 933 datanode, block.getBlock().getBlockName()); 934 } 935 } 936 } 937 938 public final void processCacheReport(final DatanodeID datanodeID, 939 final List<Long> blockIds) throws IOException { 940 namesystem.writeLock(); 941 final long startTime = Time.monotonicNow(); 942 final long endTime; 943 try { 944 final DatanodeDescriptor datanode = 945 blockManager.getDatanodeManager().getDatanode(datanodeID); 946 if (datanode == null || !datanode.isRegistered()) { 947 throw new IOException( 948 "processCacheReport from dead or unregistered datanode: " + 949 datanode); 950 } 951 processCacheReportImpl(datanode, blockIds); 952 } finally { 953 endTime = Time.monotonicNow(); 954 namesystem.writeUnlock("processCacheReport"); 955 } 956 957 // Log the block report processing stats from Namenode perspective 958 final NameNodeMetrics metrics = NameNode.getNameNodeMetrics(); 959 if (metrics != null) { 960 metrics.addCacheBlockReport((int) (endTime - startTime)); 961 } 962 LOG.debug("Processed cache report from {}, blocks: {}, " + 963 "processing time: {} msecs", datanodeID, blockIds.size(), 964 (endTime - startTime)); 965 } 966 967 private void processCacheReportImpl(final DatanodeDescriptor datanode, 968 final List<Long> blockIds) { 969 CachedBlocksList cached = datanode.getCached(); 970 cached.clear(); 971 CachedBlocksList cachedList = datanode.getCached(); 972 CachedBlocksList pendingCachedList = datanode.getPendingCached(); 973 for (Iterator<Long> iter = blockIds.iterator(); iter.hasNext(); ) { 974 long blockId = iter.next(); 975 LOG.trace("Cache report from datanode {} has block {}", datanode, 976 blockId); 977 CachedBlock cachedBlock = 978 new CachedBlock(blockId, (short)0, false); 979 CachedBlock prevCachedBlock = cachedBlocks.get(cachedBlock); 980 // Add the block ID from the cache report to the cachedBlocks map 981 // if it's not already there. 982 if (prevCachedBlock != null) { 983 cachedBlock = prevCachedBlock; 984 } else { 985 cachedBlocks.put(cachedBlock); 986 LOG.trace("Added block {} to cachedBlocks", cachedBlock); 987 } 988 // Add the block to the datanode's implicit cached block list 989 // if it's not already there. Similarly, remove it from the pending 990 // cached block list if it exists there. 991 if (!cachedBlock.isPresent(cachedList)) { 992 cachedList.add(cachedBlock); 993 LOG.trace("Added block {} to CACHED list.", cachedBlock); 994 } 995 if (cachedBlock.isPresent(pendingCachedList)) { 996 pendingCachedList.remove(cachedBlock); 997 LOG.trace("Removed block {} from PENDING_CACHED list.", cachedBlock); 998 } 999 } 1000 } 1001 1002 /** 1003 * Saves the current state of the CacheManager to the DataOutput. Used 1004 * to persist CacheManager state in the FSImage. 1005 * @param out DataOutput to persist state 1006 * @param sdPath path of the storage directory 1007 * @throws IOException 1008 */ 1009 public void saveStateCompat(DataOutputStream out, String sdPath) 1010 throws IOException { 1011 serializerCompat.save(out, sdPath); 1012 } 1013 1014 public PersistState saveState() throws IOException { 1015 ArrayList<CachePoolInfoProto> pools = Lists 1016 .newArrayListWithCapacity(cachePools.size()); 1017 ArrayList<CacheDirectiveInfoProto> directives = Lists 1018 .newArrayListWithCapacity(directivesById.size()); 1019 1020 for (CachePool pool : cachePools.values()) { 1021 CachePoolInfo p = pool.getInfo(true); 1022 CachePoolInfoProto.Builder b = CachePoolInfoProto.newBuilder() 1023 .setPoolName(p.getPoolName()); 1024 1025 if (p.getOwnerName() != null) 1026 b.setOwnerName(p.getOwnerName()); 1027 1028 if (p.getGroupName() != null) 1029 b.setGroupName(p.getGroupName()); 1030 1031 if (p.getMode() != null) 1032 b.setMode(p.getMode().toShort()); 1033 1034 if (p.getLimit() != null) 1035 b.setLimit(p.getLimit()); 1036 1037 pools.add(b.build()); 1038 } 1039 1040 for (CacheDirective directive : directivesById.values()) { 1041 CacheDirectiveInfo info = directive.toInfo(); 1042 CacheDirectiveInfoProto.Builder b = CacheDirectiveInfoProto.newBuilder() 1043 .setId(info.getId()); 1044 1045 if (info.getPath() != null) { 1046 b.setPath(info.getPath().toUri().getPath()); 1047 } 1048 1049 if (info.getReplication() != null) { 1050 b.setReplication(info.getReplication()); 1051 } 1052 1053 if (info.getPool() != null) { 1054 b.setPool(info.getPool()); 1055 } 1056 1057 Expiration expiry = info.getExpiration(); 1058 if (expiry != null) { 1059 assert (!expiry.isRelative()); 1060 b.setExpiration(PBHelperClient.convert(expiry)); 1061 } 1062 1063 directives.add(b.build()); 1064 } 1065 CacheManagerSection s = CacheManagerSection.newBuilder() 1066 .setNextDirectiveId(nextDirectiveId).setNumPools(pools.size()) 1067 .setNumDirectives(directives.size()).build(); 1068 1069 return new PersistState(s, pools, directives); 1070 } 1071 1072 /** 1073 * Reloads CacheManager state from the passed DataInput. Used during namenode 1074 * startup to restore CacheManager state from an FSImage. 1075 * @param in DataInput from which to restore state 1076 * @throws IOException 1077 */ 1078 public void loadStateCompat(DataInput in) throws IOException { 1079 serializerCompat.load(in); 1080 } 1081 1082 public void loadState(PersistState s) throws IOException { 1083 nextDirectiveId = s.section.getNextDirectiveId(); 1084 for (CachePoolInfoProto p : s.pools) { 1085 CachePoolInfo info = new CachePoolInfo(p.getPoolName()); 1086 if (p.hasOwnerName()) 1087 info.setOwnerName(p.getOwnerName()); 1088 1089 if (p.hasGroupName()) 1090 info.setGroupName(p.getGroupName()); 1091 1092 if (p.hasMode()) 1093 info.setMode(new FsPermission((short) p.getMode())); 1094 1095 if (p.hasLimit()) 1096 info.setLimit(p.getLimit()); 1097 1098 addCachePool(info); 1099 } 1100 1101 for (CacheDirectiveInfoProto p : s.directives) { 1102 // Get pool reference by looking it up in the map 1103 final String poolName = p.getPool(); 1104 CacheDirective directive = new CacheDirective(p.getId(), new Path( 1105 p.getPath()).toUri().getPath(), (short) p.getReplication(), p 1106 .getExpiration().getMillis()); 1107 addCacheDirective(poolName, directive); 1108 } 1109 } 1110 1111 private void addCacheDirective(final String poolName, 1112 final CacheDirective directive) throws IOException { 1113 CachePool pool = cachePools.get(poolName); 1114 if (pool == null) { 1115 throw new IOException("Directive refers to pool " + poolName 1116 + ", which does not exist."); 1117 } 1118 boolean addedDirective = pool.getDirectiveList().add(directive); 1119 assert addedDirective; 1120 if (directivesById.put(directive.getId(), directive) != null) { 1121 throw new IOException("A directive with ID " + directive.getId() 1122 + " already exists"); 1123 } 1124 List<CacheDirective> directives = directivesByPath.get(directive.getPath()); 1125 if (directives == null) { 1126 directives = new LinkedList<CacheDirective>(); 1127 directivesByPath.put(directive.getPath(), directives); 1128 } 1129 directives.add(directive); 1130 } 1131 1132 private final class SerializerCompat { 1133 private void save(DataOutputStream out, String sdPath) throws IOException { 1134 out.writeLong(nextDirectiveId); 1135 savePools(out, sdPath); 1136 saveDirectives(out, sdPath); 1137 } 1138 1139 private void load(DataInput in) throws IOException { 1140 nextDirectiveId = in.readLong(); 1141 // pools need to be loaded first since directives point to their parent pool 1142 loadPools(in); 1143 loadDirectives(in); 1144 } 1145 1146 /** 1147 * Save cache pools to fsimage 1148 */ 1149 private void savePools(DataOutputStream out, 1150 String sdPath) throws IOException { 1151 StartupProgress prog = NameNode.getStartupProgress(); 1152 Step step = new Step(StepType.CACHE_POOLS, sdPath); 1153 prog.beginStep(Phase.SAVING_CHECKPOINT, step); 1154 prog.setTotal(Phase.SAVING_CHECKPOINT, step, cachePools.size()); 1155 Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step); 1156 out.writeInt(cachePools.size()); 1157 for (CachePool pool: cachePools.values()) { 1158 FSImageSerialization.writeCachePoolInfo(out, pool.getInfo(true)); 1159 counter.increment(); 1160 } 1161 prog.endStep(Phase.SAVING_CHECKPOINT, step); 1162 } 1163 1164 /* 1165 * Save cache entries to fsimage 1166 */ 1167 private void saveDirectives(DataOutputStream out, String sdPath) 1168 throws IOException { 1169 StartupProgress prog = NameNode.getStartupProgress(); 1170 Step step = new Step(StepType.CACHE_ENTRIES, sdPath); 1171 prog.beginStep(Phase.SAVING_CHECKPOINT, step); 1172 prog.setTotal(Phase.SAVING_CHECKPOINT, step, directivesById.size()); 1173 Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step); 1174 out.writeInt(directivesById.size()); 1175 for (CacheDirective directive : directivesById.values()) { 1176 FSImageSerialization.writeCacheDirectiveInfo(out, directive.toInfo()); 1177 counter.increment(); 1178 } 1179 prog.endStep(Phase.SAVING_CHECKPOINT, step); 1180 } 1181 1182 /** 1183 * Load cache pools from fsimage 1184 */ 1185 private void loadPools(DataInput in) 1186 throws IOException { 1187 StartupProgress prog = NameNode.getStartupProgress(); 1188 Step step = new Step(StepType.CACHE_POOLS); 1189 prog.beginStep(Phase.LOADING_FSIMAGE, step); 1190 int numberOfPools = in.readInt(); 1191 prog.setTotal(Phase.LOADING_FSIMAGE, step, numberOfPools); 1192 Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step); 1193 for (int i = 0; i < numberOfPools; i++) { 1194 addCachePool(FSImageSerialization.readCachePoolInfo(in)); 1195 counter.increment(); 1196 } 1197 prog.endStep(Phase.LOADING_FSIMAGE, step); 1198 } 1199 1200 /** 1201 * Load cache directives from the fsimage 1202 */ 1203 private void loadDirectives(DataInput in) throws IOException { 1204 StartupProgress prog = NameNode.getStartupProgress(); 1205 Step step = new Step(StepType.CACHE_ENTRIES); 1206 prog.beginStep(Phase.LOADING_FSIMAGE, step); 1207 int numDirectives = in.readInt(); 1208 prog.setTotal(Phase.LOADING_FSIMAGE, step, numDirectives); 1209 Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step); 1210 for (int i = 0; i < numDirectives; i++) { 1211 CacheDirectiveInfo info = FSImageSerialization.readCacheDirectiveInfo(in); 1212 // Get pool reference by looking it up in the map 1213 final String poolName = info.getPool(); 1214 CacheDirective directive = 1215 new CacheDirective(info.getId(), info.getPath().toUri().getPath(), 1216 info.getReplication(), info.getExpiration().getAbsoluteMillis()); 1217 addCacheDirective(poolName, directive); 1218 counter.increment(); 1219 } 1220 prog.endStep(Phase.LOADING_FSIMAGE, step); 1221 } 1222 } 1223 1224 public void waitForRescanIfNeeded() { 1225 crmLock.lock(); 1226 try { 1227 if (monitor != null) { 1228 monitor.waitForRescanIfNeeded(); 1229 } 1230 } finally { 1231 crmLock.unlock(); 1232 } 1233 } 1234 1235 private void setNeedsRescan() { 1236 crmLock.lock(); 1237 try { 1238 if (monitor != null) { 1239 monitor.setNeedsRescan(); 1240 } 1241 } finally { 1242 crmLock.unlock(); 1243 } 1244 } 1245 1246 @VisibleForTesting 1247 public Thread getCacheReplicationMonitor() { 1248 crmLock.lock(); 1249 try { 1250 return monitor; 1251 } finally { 1252 crmLock.unlock(); 1253 } 1254 } 1255}