001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hdfs.server.namenode;
019
020import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT;
021import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT_DEFAULT;
022import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES;
023import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES_DEFAULT;
024import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES;
025import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES_DEFAULT;
026import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS;
027import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS_DEFAULT;
028
029import java.io.DataInput;
030import java.io.DataOutputStream;
031import java.io.IOException;
032import java.util.ArrayList;
033import java.util.Collection;
034import java.util.Collections;
035import java.util.Date;
036import java.util.EnumSet;
037import java.util.Iterator;
038import java.util.LinkedList;
039import java.util.List;
040import java.util.Map.Entry;
041import java.util.SortedMap;
042import java.util.TreeMap;
043import java.util.concurrent.locks.ReentrantLock;
044
045import org.apache.commons.io.IOUtils;
046import org.apache.hadoop.classification.InterfaceAudience;
047import org.apache.hadoop.conf.Configuration;
048import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
049import org.apache.hadoop.fs.CacheFlag;
050import org.apache.hadoop.fs.InvalidRequestException;
051import org.apache.hadoop.fs.Path;
052import org.apache.hadoop.fs.permission.FsAction;
053import org.apache.hadoop.fs.permission.FsPermission;
054import org.apache.hadoop.hdfs.DFSUtil;
055import org.apache.hadoop.hdfs.protocol.CacheDirective;
056import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
057import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
058import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo.Expiration;
059import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats;
060import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
061import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
062import org.apache.hadoop.hdfs.protocol.DatanodeID;
063import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
064import org.apache.hadoop.hdfs.protocol.LocatedBlock;
065import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
066import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoProto;
067import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolInfoProto;
068import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
069import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
070import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor;
071import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
072import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList;
073import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type;
074import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
075import org.apache.hadoop.hdfs.server.namenode.FsImageProto.CacheManagerSection;
076import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
077import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
078import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
079import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress;
080import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Counter;
081import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
082import org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType;
083import org.apache.hadoop.hdfs.util.ReadOnlyList;
084import org.apache.hadoop.security.AccessControlException;
085import org.apache.hadoop.util.GSet;
086import org.apache.hadoop.util.LightWeightGSet;
087import org.apache.hadoop.util.Time;
088import org.slf4j.Logger;
089import org.slf4j.LoggerFactory;
090
091import com.google.common.annotations.VisibleForTesting;
092import com.google.common.collect.Lists;
093
094/**
095 * The Cache Manager handles caching on DataNodes.
096 *
097 * This class is instantiated by the FSNamesystem.
098 * It maintains the mapping of cached blocks to datanodes via processing
099 * datanode cache reports. Based on these reports and addition and removal of
100 * caching directives, we will schedule caching and uncaching work.
101 */
102@InterfaceAudience.LimitedPrivate({"HDFS"})
103public final class CacheManager {
104  public static final Logger LOG = LoggerFactory.getLogger(CacheManager.class);
105
106  private static final float MIN_CACHED_BLOCKS_PERCENT = 0.001f;
107
108  // TODO: add pending / underCached / schedule cached blocks stats.
109
110  /**
111   * The FSNamesystem that contains this CacheManager.
112   */
113  private final FSNamesystem namesystem;
114
115  /**
116   * The BlockManager associated with the FSN that owns this CacheManager.
117   */
118  private final BlockManager blockManager;
119
120  /**
121   * Cache directives, sorted by ID.
122   *
123   * listCacheDirectives relies on the ordering of elements in this map
124   * to track what has already been listed by the client.
125   */
126  private final TreeMap<Long, CacheDirective> directivesById =
127      new TreeMap<Long, CacheDirective>();
128
129  /**
130   * The directive ID to use for a new directive.  IDs always increase, and are
131   * never reused.
132   */
133  private long nextDirectiveId;
134
135  /**
136   * Cache directives, sorted by path
137   */
138  private final TreeMap<String, List<CacheDirective>> directivesByPath =
139      new TreeMap<String, List<CacheDirective>>();
140
141  /**
142   * Cache pools, sorted by name.
143   */
144  private final TreeMap<String, CachePool> cachePools =
145      new TreeMap<String, CachePool>();
146
147  /**
148   * Maximum number of cache pools to list in one operation.
149   */
150  private final int maxListCachePoolsResponses;
151
152  /**
153   * Maximum number of cache pool directives to list in one operation.
154   */
155  private final int maxListCacheDirectivesNumResponses;
156
157  /**
158   * Interval between scans in milliseconds.
159   */
160  private final long scanIntervalMs;
161
162  /**
163   * All cached blocks.
164   */
165  private final GSet<CachedBlock, CachedBlock> cachedBlocks;
166
167  /**
168   * Lock which protects the CacheReplicationMonitor.
169   */
170  private final ReentrantLock crmLock = new ReentrantLock();
171
172  private final SerializerCompat serializerCompat = new SerializerCompat();
173
174  /**
175   * The CacheReplicationMonitor.
176   */
177  private CacheReplicationMonitor monitor;
178
179  public static final class PersistState {
180    public final CacheManagerSection section;
181    public final List<CachePoolInfoProto> pools;
182    public final List<CacheDirectiveInfoProto> directives;
183
184    public PersistState(CacheManagerSection section,
185        List<CachePoolInfoProto> pools, List<CacheDirectiveInfoProto> directives) {
186      this.section = section;
187      this.pools = pools;
188      this.directives = directives;
189    }
190  }
191
192  CacheManager(FSNamesystem namesystem, Configuration conf,
193      BlockManager blockManager) {
194    this.namesystem = namesystem;
195    this.blockManager = blockManager;
196    this.nextDirectiveId = 1;
197    this.maxListCachePoolsResponses = conf.getInt(
198        DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES,
199        DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES_DEFAULT);
200    this.maxListCacheDirectivesNumResponses = conf.getInt(
201        DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES,
202        DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES_DEFAULT);
203    scanIntervalMs = conf.getLong(
204        DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS,
205        DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS_DEFAULT);
206    float cachedBlocksPercent = conf.getFloat(
207          DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT,
208          DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT_DEFAULT);
209    if (cachedBlocksPercent < MIN_CACHED_BLOCKS_PERCENT) {
210      LOG.info("Using minimum value {} for {}", MIN_CACHED_BLOCKS_PERCENT,
211        DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT);
212      cachedBlocksPercent = MIN_CACHED_BLOCKS_PERCENT;
213    }
214    this.cachedBlocks = new LightWeightGSet<CachedBlock, CachedBlock>(
215          LightWeightGSet.computeCapacity(cachedBlocksPercent,
216              "cachedBlocks"));
217
218  }
219
220  /**
221   * Resets all tracked directives and pools. Called during 2NN checkpointing to
222   * reset FSNamesystem state. See {@link FSNamesystem#clear()}.
223   */
224  void clear() {
225    directivesById.clear();
226    directivesByPath.clear();
227    cachePools.clear();
228    nextDirectiveId = 1;
229  }
230
231  public void startMonitorThread() {
232    crmLock.lock();
233    try {
234      if (this.monitor == null) {
235        this.monitor = new CacheReplicationMonitor(namesystem, this,
236            scanIntervalMs, crmLock);
237        this.monitor.start();
238      }
239    } finally {
240      crmLock.unlock();
241    }
242  }
243
244  public void stopMonitorThread() {
245    crmLock.lock();
246    try {
247      if (this.monitor != null) {
248        CacheReplicationMonitor prevMonitor = this.monitor;
249        this.monitor = null;
250        IOUtils.closeQuietly(prevMonitor);
251      }
252    } finally {
253      crmLock.unlock();
254    }
255  }
256
257  public void clearDirectiveStats() {
258    assert namesystem.hasWriteLock();
259    for (CacheDirective directive : directivesById.values()) {
260      directive.resetStatistics();
261    }
262  }
263
264  /**
265   * @return Unmodifiable view of the collection of CachePools.
266   */
267  public Collection<CachePool> getCachePools() {
268    assert namesystem.hasReadLock();
269    return Collections.unmodifiableCollection(cachePools.values());
270  }
271
272  /**
273   * @return Unmodifiable view of the collection of CacheDirectives.
274   */
275  public Collection<CacheDirective> getCacheDirectives() {
276    assert namesystem.hasReadLock();
277    return Collections.unmodifiableCollection(directivesById.values());
278  }
279  
280  @VisibleForTesting
281  public GSet<CachedBlock, CachedBlock> getCachedBlocks() {
282    assert namesystem.hasReadLock();
283    return cachedBlocks;
284  }
285
286  private long getNextDirectiveId() throws IOException {
287    assert namesystem.hasWriteLock();
288    if (nextDirectiveId >= Long.MAX_VALUE - 1) {
289      throw new IOException("No more available IDs.");
290    }
291    return nextDirectiveId++;
292  }
293
294  // Helper getter / validation methods
295
296  private static void checkWritePermission(FSPermissionChecker pc,
297      CachePool pool) throws AccessControlException {
298    if ((pc != null)) {
299      pc.checkPermission(pool, FsAction.WRITE);
300    }
301  }
302
303  private static String validatePoolName(CacheDirectiveInfo directive)
304      throws InvalidRequestException {
305    String pool = directive.getPool();
306    if (pool == null) {
307      throw new InvalidRequestException("No pool specified.");
308    }
309    if (pool.isEmpty()) {
310      throw new InvalidRequestException("Invalid empty pool name.");
311    }
312    return pool;
313  }
314
315  private static String validatePath(CacheDirectiveInfo directive)
316      throws InvalidRequestException {
317    if (directive.getPath() == null) {
318      throw new InvalidRequestException("No path specified.");
319    }
320    String path = directive.getPath().toUri().getPath();
321    if (!DFSUtil.isValidName(path)) {
322      throw new InvalidRequestException("Invalid path '" + path + "'.");
323    }
324    return path;
325  }
326
327  private static short validateReplication(CacheDirectiveInfo directive,
328      short defaultValue) throws InvalidRequestException {
329    short repl = (directive.getReplication() != null)
330        ? directive.getReplication() : defaultValue;
331    if (repl <= 0) {
332      throw new InvalidRequestException("Invalid replication factor " + repl
333          + " <= 0");
334    }
335    return repl;
336  }
337
338  /**
339   * Calculates the absolute expiry time of the directive from the
340   * {@link CacheDirectiveInfo.Expiration}. This converts a relative Expiration
341   * into an absolute time based on the local clock.
342   * 
343   * @param info to validate.
344   * @param maxRelativeExpiryTime of the info's pool.
345   * @return the expiration time, or the pool's max absolute expiration if the
346   *         info's expiration was not set.
347   * @throws InvalidRequestException if the info's Expiration is invalid.
348   */
349  private static long validateExpiryTime(CacheDirectiveInfo info,
350      long maxRelativeExpiryTime) throws InvalidRequestException {
351    LOG.trace("Validating directive {} pool maxRelativeExpiryTime {}", info,
352        maxRelativeExpiryTime);
353    final long now = new Date().getTime();
354    final long maxAbsoluteExpiryTime = now + maxRelativeExpiryTime;
355    if (info == null || info.getExpiration() == null) {
356      return maxAbsoluteExpiryTime;
357    }
358    Expiration expiry = info.getExpiration();
359    if (expiry.getMillis() < 0l) {
360      throw new InvalidRequestException("Cannot set a negative expiration: "
361          + expiry.getMillis());
362    }
363    long relExpiryTime, absExpiryTime;
364    if (expiry.isRelative()) {
365      relExpiryTime = expiry.getMillis();
366      absExpiryTime = now + relExpiryTime;
367    } else {
368      absExpiryTime = expiry.getMillis();
369      relExpiryTime = absExpiryTime - now;
370    }
371    // Need to cap the expiry so we don't overflow a long when doing math
372    if (relExpiryTime > Expiration.MAX_RELATIVE_EXPIRY_MS) {
373      throw new InvalidRequestException("Expiration "
374          + expiry.toString() + " is too far in the future!");
375    }
376    // Fail if the requested expiry is greater than the max
377    if (relExpiryTime > maxRelativeExpiryTime) {
378      throw new InvalidRequestException("Expiration " + expiry.toString()
379          + " exceeds the max relative expiration time of "
380          + maxRelativeExpiryTime + " ms.");
381    }
382    return absExpiryTime;
383  }
384
385  /**
386   * Throws an exception if the CachePool does not have enough capacity to
387   * cache the given path at the replication factor.
388   *
389   * @param pool CachePool where the path is being cached
390   * @param path Path that is being cached
391   * @param replication Replication factor of the path
392   * @throws InvalidRequestException if the pool does not have enough capacity
393   */
394  private void checkLimit(CachePool pool, String path,
395      short replication) throws InvalidRequestException {
396    CacheDirectiveStats stats = computeNeeded(path, replication);
397    if (pool.getLimit() == CachePoolInfo.LIMIT_UNLIMITED) {
398      return;
399    }
400    if (pool.getBytesNeeded() + stats.getBytesNeeded() > pool.getLimit()) {
401      throw new InvalidRequestException("Caching path " + path + " of size "
402          + stats.getBytesNeeded() / replication + " bytes at replication "
403          + replication + " would exceed pool " + pool.getPoolName()
404          + "'s remaining capacity of "
405          + (pool.getLimit() - pool.getBytesNeeded()) + " bytes.");
406    }
407  }
408
409  /**
410   * Computes the needed number of bytes and files for a path.
411   * @return CacheDirectiveStats describing the needed stats for this path
412   */
413  private CacheDirectiveStats computeNeeded(String path, short replication) {
414    FSDirectory fsDir = namesystem.getFSDirectory();
415    INode node;
416    long requestedBytes = 0;
417    long requestedFiles = 0;
418    CacheDirectiveStats.Builder builder = new CacheDirectiveStats.Builder();
419    try {
420      node = fsDir.getINode(path, DirOp.READ);
421    } catch (IOException e) {
422      // We don't cache through invalid paths
423      return builder.build();
424    }
425    if (node == null) {
426      return builder.build();
427    }
428    if (node.isFile()) {
429      requestedFiles = 1;
430      INodeFile file = node.asFile();
431      requestedBytes = file.computeFileSize();
432    } else if (node.isDirectory()) {
433      INodeDirectory dir = node.asDirectory();
434      ReadOnlyList<INode> children = dir
435          .getChildrenList(Snapshot.CURRENT_STATE_ID);
436      requestedFiles = children.size();
437      for (INode child : children) {
438        if (child.isFile()) {
439          requestedBytes += child.asFile().computeFileSize();
440        }
441      }
442    }
443    return new CacheDirectiveStats.Builder()
444        .setBytesNeeded(requestedBytes * replication)
445        .setFilesCached(requestedFiles)
446        .build();
447  }
448
449  /**
450   * Get a CacheDirective by ID, validating the ID and that the directive
451   * exists.
452   */
453  private CacheDirective getById(long id) throws InvalidRequestException {
454    // Check for invalid IDs.
455    if (id <= 0) {
456      throw new InvalidRequestException("Invalid negative ID.");
457    }
458    // Find the directive.
459    CacheDirective directive = directivesById.get(id);
460    if (directive == null) {
461      throw new InvalidRequestException("No directive with ID " + id
462          + " found.");
463    }
464    return directive;
465  }
466
467  /**
468   * Get a CachePool by name, validating that it exists.
469   */
470  private CachePool getCachePool(String poolName)
471      throws InvalidRequestException {
472    CachePool pool = cachePools.get(poolName);
473    if (pool == null) {
474      throw new InvalidRequestException("Unknown pool " + poolName);
475    }
476    return pool;
477  }
478
479  // RPC handlers
480
481  private void addInternal(CacheDirective directive, CachePool pool) {
482    boolean addedDirective = pool.getDirectiveList().add(directive);
483    assert addedDirective;
484    directivesById.put(directive.getId(), directive);
485    String path = directive.getPath();
486    List<CacheDirective> directives = directivesByPath.get(path);
487    if (directives == null) {
488      directives = new ArrayList<CacheDirective>(1);
489      directivesByPath.put(path, directives);
490    }
491    directives.add(directive);
492    // Fix up pool stats
493    CacheDirectiveStats stats =
494        computeNeeded(directive.getPath(), directive.getReplication());
495    directive.addBytesNeeded(stats.getBytesNeeded());
496    directive.addFilesNeeded(directive.getFilesNeeded());
497
498    setNeedsRescan();
499  }
500
501  /**
502   * Adds a directive, skipping most error checking. This should only be called
503   * internally in special scenarios like edit log replay.
504   */
505  CacheDirectiveInfo addDirectiveFromEditLog(CacheDirectiveInfo directive)
506      throws InvalidRequestException {
507    long id = directive.getId();
508    CacheDirective entry = new CacheDirective(directive);
509    CachePool pool = cachePools.get(directive.getPool());
510    addInternal(entry, pool);
511    if (nextDirectiveId <= id) {
512      nextDirectiveId = id + 1;
513    }
514    return entry.toInfo();
515  }
516
517  public CacheDirectiveInfo addDirective(
518      CacheDirectiveInfo info, FSPermissionChecker pc, EnumSet<CacheFlag> flags)
519      throws IOException {
520    assert namesystem.hasWriteLock();
521    CacheDirective directive;
522    try {
523      CachePool pool = getCachePool(validatePoolName(info));
524      checkWritePermission(pc, pool);
525      String path = validatePath(info);
526      short replication = validateReplication(info, (short)1);
527      long expiryTime = validateExpiryTime(info, pool.getMaxRelativeExpiryMs());
528      // Do quota validation if required
529      if (!flags.contains(CacheFlag.FORCE)) {
530        checkLimit(pool, path, replication);
531      }
532      // All validation passed
533      // Add a new entry with the next available ID.
534      long id = getNextDirectiveId();
535      directive = new CacheDirective(id, path, replication, expiryTime);
536      addInternal(directive, pool);
537    } catch (IOException e) {
538      LOG.warn("addDirective of " + info + " failed: ", e);
539      throw e;
540    }
541    LOG.info("addDirective of {} successful.", info);
542    return directive.toInfo();
543  }
544
545  /**
546   * Factory method that makes a new CacheDirectiveInfo by applying fields in a
547   * CacheDirectiveInfo to an existing CacheDirective.
548   * 
549   * @param info with some or all fields set.
550   * @param defaults directive providing default values for unset fields in
551   *          info.
552   * 
553   * @return new CacheDirectiveInfo of the info applied to the defaults.
554   */
555  private static CacheDirectiveInfo createFromInfoAndDefaults(
556      CacheDirectiveInfo info, CacheDirective defaults) {
557    // Initialize the builder with the default values
558    CacheDirectiveInfo.Builder builder =
559        new CacheDirectiveInfo.Builder(defaults.toInfo());
560    // Replace default with new value if present
561    if (info.getPath() != null) {
562      builder.setPath(info.getPath());
563    }
564    if (info.getReplication() != null) {
565      builder.setReplication(info.getReplication());
566    }
567    if (info.getPool() != null) {
568      builder.setPool(info.getPool());
569    }
570    if (info.getExpiration() != null) {
571      builder.setExpiration(info.getExpiration());
572    }
573    return builder.build();
574  }
575
576  /**
577   * Modifies a directive, skipping most error checking. This is for careful
578   * internal use only. modifyDirective can be non-deterministic since its error
579   * checking depends on current system time, which poses a problem for edit log
580   * replay.
581   */
582  void modifyDirectiveFromEditLog(CacheDirectiveInfo info)
583      throws InvalidRequestException {
584    // Check for invalid IDs.
585    Long id = info.getId();
586    if (id == null) {
587      throw new InvalidRequestException("Must supply an ID.");
588    }
589    CacheDirective prevEntry = getById(id);
590    CacheDirectiveInfo newInfo = createFromInfoAndDefaults(info, prevEntry);
591    removeInternal(prevEntry);
592    addInternal(new CacheDirective(newInfo), getCachePool(newInfo.getPool()));
593  }
594
595  public void modifyDirective(CacheDirectiveInfo info,
596      FSPermissionChecker pc, EnumSet<CacheFlag> flags) throws IOException {
597    assert namesystem.hasWriteLock();
598    String idString =
599        (info.getId() == null) ?
600            "(null)" : info.getId().toString();
601    try {
602      // Check for invalid IDs.
603      Long id = info.getId();
604      if (id == null) {
605        throw new InvalidRequestException("Must supply an ID.");
606      }
607      CacheDirective prevEntry = getById(id);
608      checkWritePermission(pc, prevEntry.getPool());
609
610      // Fill in defaults
611      CacheDirectiveInfo infoWithDefaults =
612          createFromInfoAndDefaults(info, prevEntry);
613      CacheDirectiveInfo.Builder builder =
614          new CacheDirectiveInfo.Builder(infoWithDefaults);
615
616      // Do validation
617      validatePath(infoWithDefaults);
618      validateReplication(infoWithDefaults, (short)-1);
619      // Need to test the pool being set here to avoid rejecting a modify for a
620      // directive that's already been forced into a pool
621      CachePool srcPool = prevEntry.getPool();
622      CachePool destPool = getCachePool(validatePoolName(infoWithDefaults));
623      if (!srcPool.getPoolName().equals(destPool.getPoolName())) {
624        checkWritePermission(pc, destPool);
625        if (!flags.contains(CacheFlag.FORCE)) {
626          checkLimit(destPool, infoWithDefaults.getPath().toUri().getPath(),
627              infoWithDefaults.getReplication());
628        }
629      }
630      // Verify the expiration against the destination pool
631      validateExpiryTime(infoWithDefaults, destPool.getMaxRelativeExpiryMs());
632
633      // Indicate changes to the CRM
634      setNeedsRescan();
635
636      // Validation passed
637      removeInternal(prevEntry);
638      addInternal(new CacheDirective(builder.build()), destPool);
639    } catch (IOException e) {
640      LOG.warn("modifyDirective of " + idString + " failed: ", e);
641      throw e;
642    }
643    LOG.info("modifyDirective of {} successfully applied {}.", idString, info);
644  }
645
646  private void removeInternal(CacheDirective directive)
647      throws InvalidRequestException {
648    assert namesystem.hasWriteLock();
649    // Remove the corresponding entry in directivesByPath.
650    String path = directive.getPath();
651    List<CacheDirective> directives = directivesByPath.get(path);
652    if (directives == null || !directives.remove(directive)) {
653      throw new InvalidRequestException("Failed to locate entry " +
654          directive.getId() + " by path " + directive.getPath());
655    }
656    if (directives.size() == 0) {
657      directivesByPath.remove(path);
658    }
659    // Fix up the stats from removing the pool
660    final CachePool pool = directive.getPool();
661    directive.addBytesNeeded(-directive.getBytesNeeded());
662    directive.addFilesNeeded(-directive.getFilesNeeded());
663
664    directivesById.remove(directive.getId());
665    pool.getDirectiveList().remove(directive);
666    assert directive.getPool() == null;
667
668    setNeedsRescan();
669  }
670
671  public void removeDirective(long id, FSPermissionChecker pc)
672      throws IOException {
673    assert namesystem.hasWriteLock();
674    try {
675      CacheDirective directive = getById(id);
676      checkWritePermission(pc, directive.getPool());
677      removeInternal(directive);
678    } catch (IOException e) {
679      LOG.warn("removeDirective of " + id + " failed: ", e);
680      throw e;
681    }
682    LOG.info("removeDirective of " + id + " successful.");
683  }
684
685  public BatchedListEntries<CacheDirectiveEntry> 
686        listCacheDirectives(long prevId,
687            CacheDirectiveInfo filter,
688            FSPermissionChecker pc) throws IOException {
689    assert namesystem.hasReadLock();
690    final int NUM_PRE_ALLOCATED_ENTRIES = 16;
691    String filterPath = null;
692    if (filter.getPath() != null) {
693      filterPath = validatePath(filter);
694    }
695    if (filter.getReplication() != null) {
696      throw new InvalidRequestException(
697          "Filtering by replication is unsupported.");
698    }
699
700    // Querying for a single ID
701    final Long id = filter.getId();
702    if (id != null) {
703      if (!directivesById.containsKey(id)) {
704        throw new InvalidRequestException("Did not find requested id " + id);
705      }
706      // Since we use a tailMap on directivesById, setting prev to id-1 gets
707      // us the directive with the id (if present)
708      prevId = id - 1;
709    }
710
711    ArrayList<CacheDirectiveEntry> replies =
712        new ArrayList<CacheDirectiveEntry>(NUM_PRE_ALLOCATED_ENTRIES);
713    int numReplies = 0;
714    SortedMap<Long, CacheDirective> tailMap =
715      directivesById.tailMap(prevId + 1);
716    for (Entry<Long, CacheDirective> cur : tailMap.entrySet()) {
717      if (numReplies >= maxListCacheDirectivesNumResponses) {
718        return new BatchedListEntries<CacheDirectiveEntry>(replies, true);
719      }
720      CacheDirective curDirective = cur.getValue();
721      CacheDirectiveInfo info = cur.getValue().toInfo();
722
723      // If the requested ID is present, it should be the first item.
724      // Hitting this case means the ID is not present, or we're on the second
725      // item and should break out.
726      if (id != null &&
727          !(info.getId().equals(id))) {
728        break;
729      }
730      if (filter.getPool() != null && 
731          !info.getPool().equals(filter.getPool())) {
732        continue;
733      }
734      if (filterPath != null &&
735          !info.getPath().toUri().getPath().equals(filterPath)) {
736        continue;
737      }
738      boolean hasPermission = true;
739      if (pc != null) {
740        try {
741          pc.checkPermission(curDirective.getPool(), FsAction.READ);
742        } catch (AccessControlException e) {
743          hasPermission = false;
744        }
745      }
746      if (hasPermission) {
747        replies.add(new CacheDirectiveEntry(info, cur.getValue().toStats()));
748        numReplies++;
749      }
750    }
751    return new BatchedListEntries<CacheDirectiveEntry>(replies, false);
752  }
753
754  /**
755   * Create a cache pool.
756   * 
757   * Only the superuser should be able to call this function.
758   *
759   * @param info    The info for the cache pool to create.
760   * @return        Information about the cache pool we created.
761   */
762  public CachePoolInfo addCachePool(CachePoolInfo info)
763      throws IOException {
764    assert namesystem.hasWriteLock();
765    CachePool pool;
766    try {
767      CachePoolInfo.validate(info);
768      String poolName = info.getPoolName();
769      pool = cachePools.get(poolName);
770      if (pool != null) {
771        throw new InvalidRequestException("Cache pool " + poolName
772            + " already exists.");
773      }
774      pool = CachePool.createFromInfoAndDefaults(info);
775      cachePools.put(pool.getPoolName(), pool);
776    } catch (IOException e) {
777      LOG.info("addCachePool of " + info + " failed: ", e);
778      throw e;
779    }
780    LOG.info("addCachePool of {} successful.", info);
781    return pool.getInfo(true);
782  }
783
784  /**
785   * Modify a cache pool.
786   * 
787   * Only the superuser should be able to call this function.
788   *
789   * @param info
790   *          The info for the cache pool to modify.
791   */
792  public void modifyCachePool(CachePoolInfo info)
793      throws IOException {
794    assert namesystem.hasWriteLock();
795    StringBuilder bld = new StringBuilder();
796    try {
797      CachePoolInfo.validate(info);
798      String poolName = info.getPoolName();
799      CachePool pool = cachePools.get(poolName);
800      if (pool == null) {
801        throw new InvalidRequestException("Cache pool " + poolName
802            + " does not exist.");
803      }
804      String prefix = "";
805      if (info.getOwnerName() != null) {
806        pool.setOwnerName(info.getOwnerName());
807        bld.append(prefix).
808          append("set owner to ").append(info.getOwnerName());
809        prefix = "; ";
810      }
811      if (info.getGroupName() != null) {
812        pool.setGroupName(info.getGroupName());
813        bld.append(prefix).
814          append("set group to ").append(info.getGroupName());
815        prefix = "; ";
816      }
817      if (info.getMode() != null) {
818        pool.setMode(info.getMode());
819        bld.append(prefix).append("set mode to " + info.getMode());
820        prefix = "; ";
821      }
822      if (info.getLimit() != null) {
823        pool.setLimit(info.getLimit());
824        bld.append(prefix).append("set limit to " + info.getLimit());
825        prefix = "; ";
826        // New limit changes stats, need to set needs refresh
827        setNeedsRescan();
828      }
829      if (info.getMaxRelativeExpiryMs() != null) {
830        final Long maxRelativeExpiry = info.getMaxRelativeExpiryMs();
831        pool.setMaxRelativeExpiryMs(maxRelativeExpiry);
832        bld.append(prefix).append("set maxRelativeExpiry to "
833            + maxRelativeExpiry);
834        prefix = "; ";
835      }
836      if (prefix.isEmpty()) {
837        bld.append("no changes.");
838      }
839    } catch (IOException e) {
840      LOG.info("modifyCachePool of " + info + " failed: ", e);
841      throw e;
842    }
843    LOG.info("modifyCachePool of {} successful; {}", info.getPoolName(), 
844        bld.toString());
845  }
846
847  /**
848   * Remove a cache pool.
849   * 
850   * Only the superuser should be able to call this function.
851   *
852   * @param poolName
853   *          The name for the cache pool to remove.
854   */
855  public void removeCachePool(String poolName)
856      throws IOException {
857    assert namesystem.hasWriteLock();
858    try {
859      CachePoolInfo.validateName(poolName);
860      CachePool pool = cachePools.remove(poolName);
861      if (pool == null) {
862        throw new InvalidRequestException(
863            "Cannot remove non-existent cache pool " + poolName);
864      }
865      // Remove all directives in this pool.
866      Iterator<CacheDirective> iter = pool.getDirectiveList().iterator();
867      while (iter.hasNext()) {
868        CacheDirective directive = iter.next();
869        directivesByPath.remove(directive.getPath());
870        directivesById.remove(directive.getId());
871        iter.remove();
872      }
873      setNeedsRescan();
874    } catch (IOException e) {
875      LOG.info("removeCachePool of " + poolName + " failed: ", e);
876      throw e;
877    }
878    LOG.info("removeCachePool of " + poolName + " successful.");
879  }
880
881  public BatchedListEntries<CachePoolEntry>
882      listCachePools(FSPermissionChecker pc, String prevKey) {
883    assert namesystem.hasReadLock();
884    final int NUM_PRE_ALLOCATED_ENTRIES = 16;
885    ArrayList<CachePoolEntry> results = 
886        new ArrayList<CachePoolEntry>(NUM_PRE_ALLOCATED_ENTRIES);
887    SortedMap<String, CachePool> tailMap = cachePools.tailMap(prevKey, false);
888    int numListed = 0;
889    for (Entry<String, CachePool> cur : tailMap.entrySet()) {
890      if (numListed++ >= maxListCachePoolsResponses) {
891        return new BatchedListEntries<CachePoolEntry>(results, true);
892      }
893      results.add(cur.getValue().getEntry(pc));
894    }
895    return new BatchedListEntries<CachePoolEntry>(results, false);
896  }
897
898  public void setCachedLocations(LocatedBlocks locations) {
899    // don't attempt lookups if there are no cached blocks
900    if (cachedBlocks.size() > 0) {
901      for (LocatedBlock lb : locations.getLocatedBlocks()) {
902        setCachedLocations(lb);
903      }
904    }
905  }
906
907  private void setCachedLocations(LocatedBlock block) {
908    CachedBlock cachedBlock =
909        new CachedBlock(block.getBlock().getBlockId(),
910            (short)0, false);
911    cachedBlock = cachedBlocks.get(cachedBlock);
912    if (cachedBlock == null) {
913      return;
914    }
915    List<DatanodeDescriptor> cachedDNs = cachedBlock.getDatanodes(Type.CACHED);
916    for (DatanodeDescriptor datanode : cachedDNs) {
917      // Filter out cached blocks that do not have a backing replica.
918      //
919      // This should not happen since it means the CacheManager thinks
920      // something is cached that does not exist, but it's a safety
921      // measure.
922      boolean found = false;
923      for (DatanodeInfo loc : block.getLocations()) {
924        if (loc.equals(datanode)) {
925          block.addCachedLoc(loc);
926          found = true;
927          break;
928        }
929      }
930      if (!found) {
931        LOG.warn("Datanode {} is not a valid cache location for block {} "
932            + "because that node does not have a backing replica!",
933            datanode, block.getBlock().getBlockName());
934      }
935    }
936  }
937
938  public final void processCacheReport(final DatanodeID datanodeID,
939      final List<Long> blockIds) throws IOException {
940    namesystem.writeLock();
941    final long startTime = Time.monotonicNow();
942    final long endTime;
943    try {
944      final DatanodeDescriptor datanode = 
945          blockManager.getDatanodeManager().getDatanode(datanodeID);
946      if (datanode == null || !datanode.isRegistered()) {
947        throw new IOException(
948            "processCacheReport from dead or unregistered datanode: " +
949            datanode);
950      }
951      processCacheReportImpl(datanode, blockIds);
952    } finally {
953      endTime = Time.monotonicNow();
954      namesystem.writeUnlock("processCacheReport");
955    }
956
957    // Log the block report processing stats from Namenode perspective
958    final NameNodeMetrics metrics = NameNode.getNameNodeMetrics();
959    if (metrics != null) {
960      metrics.addCacheBlockReport((int) (endTime - startTime));
961    }
962    LOG.debug("Processed cache report from {}, blocks: {}, " +
963        "processing time: {} msecs", datanodeID, blockIds.size(), 
964        (endTime - startTime));
965  }
966
967  private void processCacheReportImpl(final DatanodeDescriptor datanode,
968      final List<Long> blockIds) {
969    CachedBlocksList cached = datanode.getCached();
970    cached.clear();
971    CachedBlocksList cachedList = datanode.getCached();
972    CachedBlocksList pendingCachedList = datanode.getPendingCached();
973    for (Iterator<Long> iter = blockIds.iterator(); iter.hasNext(); ) {
974      long blockId = iter.next();
975      LOG.trace("Cache report from datanode {} has block {}", datanode,
976          blockId);
977      CachedBlock cachedBlock =
978          new CachedBlock(blockId, (short)0, false);
979      CachedBlock prevCachedBlock = cachedBlocks.get(cachedBlock);
980      // Add the block ID from the cache report to the cachedBlocks map
981      // if it's not already there.
982      if (prevCachedBlock != null) {
983        cachedBlock = prevCachedBlock;
984      } else {
985        cachedBlocks.put(cachedBlock);
986        LOG.trace("Added block {}  to cachedBlocks", cachedBlock);
987      }
988      // Add the block to the datanode's implicit cached block list
989      // if it's not already there.  Similarly, remove it from the pending
990      // cached block list if it exists there.
991      if (!cachedBlock.isPresent(cachedList)) {
992        cachedList.add(cachedBlock);
993        LOG.trace("Added block {} to CACHED list.", cachedBlock);
994      }
995      if (cachedBlock.isPresent(pendingCachedList)) {
996        pendingCachedList.remove(cachedBlock);
997        LOG.trace("Removed block {} from PENDING_CACHED list.", cachedBlock);
998      }
999    }
1000  }
1001
1002  /**
1003   * Saves the current state of the CacheManager to the DataOutput. Used
1004   * to persist CacheManager state in the FSImage.
1005   * @param out DataOutput to persist state
1006   * @param sdPath path of the storage directory
1007   * @throws IOException
1008   */
1009  public void saveStateCompat(DataOutputStream out, String sdPath)
1010      throws IOException {
1011    serializerCompat.save(out, sdPath);
1012  }
1013
1014  public PersistState saveState() throws IOException {
1015    ArrayList<CachePoolInfoProto> pools = Lists
1016        .newArrayListWithCapacity(cachePools.size());
1017    ArrayList<CacheDirectiveInfoProto> directives = Lists
1018        .newArrayListWithCapacity(directivesById.size());
1019
1020    for (CachePool pool : cachePools.values()) {
1021      CachePoolInfo p = pool.getInfo(true);
1022      CachePoolInfoProto.Builder b = CachePoolInfoProto.newBuilder()
1023          .setPoolName(p.getPoolName());
1024
1025      if (p.getOwnerName() != null)
1026        b.setOwnerName(p.getOwnerName());
1027
1028      if (p.getGroupName() != null)
1029        b.setGroupName(p.getGroupName());
1030
1031      if (p.getMode() != null)
1032        b.setMode(p.getMode().toShort());
1033
1034      if (p.getLimit() != null)
1035        b.setLimit(p.getLimit());
1036
1037      pools.add(b.build());
1038    }
1039
1040    for (CacheDirective directive : directivesById.values()) {
1041      CacheDirectiveInfo info = directive.toInfo();
1042      CacheDirectiveInfoProto.Builder b = CacheDirectiveInfoProto.newBuilder()
1043          .setId(info.getId());
1044
1045      if (info.getPath() != null) {
1046        b.setPath(info.getPath().toUri().getPath());
1047      }
1048
1049      if (info.getReplication() != null) {
1050        b.setReplication(info.getReplication());
1051      }
1052
1053      if (info.getPool() != null) {
1054        b.setPool(info.getPool());
1055      }
1056
1057      Expiration expiry = info.getExpiration();
1058      if (expiry != null) {
1059        assert (!expiry.isRelative());
1060        b.setExpiration(PBHelperClient.convert(expiry));
1061      }
1062
1063      directives.add(b.build());
1064    }
1065    CacheManagerSection s = CacheManagerSection.newBuilder()
1066        .setNextDirectiveId(nextDirectiveId).setNumPools(pools.size())
1067        .setNumDirectives(directives.size()).build();
1068
1069    return new PersistState(s, pools, directives);
1070  }
1071
1072  /**
1073   * Reloads CacheManager state from the passed DataInput. Used during namenode
1074   * startup to restore CacheManager state from an FSImage.
1075   * @param in DataInput from which to restore state
1076   * @throws IOException
1077   */
1078  public void loadStateCompat(DataInput in) throws IOException {
1079    serializerCompat.load(in);
1080  }
1081
1082  public void loadState(PersistState s) throws IOException {
1083    nextDirectiveId = s.section.getNextDirectiveId();
1084    for (CachePoolInfoProto p : s.pools) {
1085      CachePoolInfo info = new CachePoolInfo(p.getPoolName());
1086      if (p.hasOwnerName())
1087        info.setOwnerName(p.getOwnerName());
1088
1089      if (p.hasGroupName())
1090        info.setGroupName(p.getGroupName());
1091
1092      if (p.hasMode())
1093        info.setMode(new FsPermission((short) p.getMode()));
1094
1095      if (p.hasLimit())
1096        info.setLimit(p.getLimit());
1097
1098      addCachePool(info);
1099    }
1100
1101    for (CacheDirectiveInfoProto p : s.directives) {
1102      // Get pool reference by looking it up in the map
1103      final String poolName = p.getPool();
1104      CacheDirective directive = new CacheDirective(p.getId(), new Path(
1105          p.getPath()).toUri().getPath(), (short) p.getReplication(), p
1106          .getExpiration().getMillis());
1107      addCacheDirective(poolName, directive);
1108    }
1109  }
1110
1111  private void addCacheDirective(final String poolName,
1112      final CacheDirective directive) throws IOException {
1113    CachePool pool = cachePools.get(poolName);
1114    if (pool == null) {
1115      throw new IOException("Directive refers to pool " + poolName
1116          + ", which does not exist.");
1117    }
1118    boolean addedDirective = pool.getDirectiveList().add(directive);
1119    assert addedDirective;
1120    if (directivesById.put(directive.getId(), directive) != null) {
1121      throw new IOException("A directive with ID " + directive.getId()
1122          + " already exists");
1123    }
1124    List<CacheDirective> directives = directivesByPath.get(directive.getPath());
1125    if (directives == null) {
1126      directives = new LinkedList<CacheDirective>();
1127      directivesByPath.put(directive.getPath(), directives);
1128    }
1129    directives.add(directive);
1130  }
1131
1132  private final class SerializerCompat {
1133    private void save(DataOutputStream out, String sdPath) throws IOException {
1134      out.writeLong(nextDirectiveId);
1135      savePools(out, sdPath);
1136      saveDirectives(out, sdPath);
1137    }
1138
1139    private void load(DataInput in) throws IOException {
1140      nextDirectiveId = in.readLong();
1141      // pools need to be loaded first since directives point to their parent pool
1142      loadPools(in);
1143      loadDirectives(in);
1144    }
1145
1146    /**
1147     * Save cache pools to fsimage
1148     */
1149    private void savePools(DataOutputStream out,
1150        String sdPath) throws IOException {
1151      StartupProgress prog = NameNode.getStartupProgress();
1152      Step step = new Step(StepType.CACHE_POOLS, sdPath);
1153      prog.beginStep(Phase.SAVING_CHECKPOINT, step);
1154      prog.setTotal(Phase.SAVING_CHECKPOINT, step, cachePools.size());
1155      Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step);
1156      out.writeInt(cachePools.size());
1157      for (CachePool pool: cachePools.values()) {
1158        FSImageSerialization.writeCachePoolInfo(out, pool.getInfo(true));
1159        counter.increment();
1160      }
1161      prog.endStep(Phase.SAVING_CHECKPOINT, step);
1162    }
1163
1164    /*
1165     * Save cache entries to fsimage
1166     */
1167    private void saveDirectives(DataOutputStream out, String sdPath)
1168        throws IOException {
1169      StartupProgress prog = NameNode.getStartupProgress();
1170      Step step = new Step(StepType.CACHE_ENTRIES, sdPath);
1171      prog.beginStep(Phase.SAVING_CHECKPOINT, step);
1172      prog.setTotal(Phase.SAVING_CHECKPOINT, step, directivesById.size());
1173      Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step);
1174      out.writeInt(directivesById.size());
1175      for (CacheDirective directive : directivesById.values()) {
1176        FSImageSerialization.writeCacheDirectiveInfo(out, directive.toInfo());
1177        counter.increment();
1178      }
1179      prog.endStep(Phase.SAVING_CHECKPOINT, step);
1180    }
1181
1182    /**
1183     * Load cache pools from fsimage
1184     */
1185    private void loadPools(DataInput in)
1186        throws IOException {
1187      StartupProgress prog = NameNode.getStartupProgress();
1188      Step step = new Step(StepType.CACHE_POOLS);
1189      prog.beginStep(Phase.LOADING_FSIMAGE, step);
1190      int numberOfPools = in.readInt();
1191      prog.setTotal(Phase.LOADING_FSIMAGE, step, numberOfPools);
1192      Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step);
1193      for (int i = 0; i < numberOfPools; i++) {
1194        addCachePool(FSImageSerialization.readCachePoolInfo(in));
1195        counter.increment();
1196      }
1197      prog.endStep(Phase.LOADING_FSIMAGE, step);
1198    }
1199
1200    /**
1201     * Load cache directives from the fsimage
1202     */
1203    private void loadDirectives(DataInput in) throws IOException {
1204      StartupProgress prog = NameNode.getStartupProgress();
1205      Step step = new Step(StepType.CACHE_ENTRIES);
1206      prog.beginStep(Phase.LOADING_FSIMAGE, step);
1207      int numDirectives = in.readInt();
1208      prog.setTotal(Phase.LOADING_FSIMAGE, step, numDirectives);
1209      Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step);
1210      for (int i = 0; i < numDirectives; i++) {
1211        CacheDirectiveInfo info = FSImageSerialization.readCacheDirectiveInfo(in);
1212        // Get pool reference by looking it up in the map
1213        final String poolName = info.getPool();
1214        CacheDirective directive =
1215            new CacheDirective(info.getId(), info.getPath().toUri().getPath(),
1216                info.getReplication(), info.getExpiration().getAbsoluteMillis());
1217        addCacheDirective(poolName, directive);
1218        counter.increment();
1219      }
1220      prog.endStep(Phase.LOADING_FSIMAGE, step);
1221    }
1222  }
1223
1224  public void waitForRescanIfNeeded() {
1225    crmLock.lock();
1226    try {
1227      if (monitor != null) {
1228        monitor.waitForRescanIfNeeded();
1229      }
1230    } finally {
1231      crmLock.unlock();
1232    }
1233  }
1234
1235  private void setNeedsRescan() {
1236    crmLock.lock();
1237    try {
1238      if (monitor != null) {
1239        monitor.setNeedsRescan();
1240      }
1241    } finally {
1242      crmLock.unlock();
1243    }
1244  }
1245
1246  @VisibleForTesting
1247  public Thread getCacheReplicationMonitor() {
1248    crmLock.lock();
1249    try {
1250      return monitor;
1251    } finally {
1252      crmLock.unlock();
1253    }
1254  }
1255}