001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hdfs.server.namenode;
019
020import java.io.PrintWriter;
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.Comparator;
024import java.util.List;
025
026import org.apache.hadoop.fs.permission.FsPermission;
027import org.apache.hadoop.fs.permission.PermissionStatus;
028import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
029import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature;
030import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
031
032import com.google.common.base.Preconditions;
033
034/**
035 * An anonymous reference to an inode.
036 *
037 * This class and its subclasses are used to support multiple access paths.
038 * A file/directory may have multiple access paths when it is stored in some
039 * snapshots and it is renamed/moved to other locations.
040 * 
041 * For example,
042 * (1) Suppose we have /abc/foo, say the inode of foo is inode(id=1000,name=foo)
043 * (2) create snapshot s0 for /abc
044 * (3) mv /abc/foo /xyz/bar, i.e. inode(id=1000,name=...) is renamed from "foo"
045 *     to "bar" and its parent becomes /xyz.
046 * 
047 * Then, /xyz/bar and /abc/.snapshot/s0/foo are two different access paths to
048 * the same inode, inode(id=1000,name=bar).
049 *
050 * With references, we have the following
051 * - /abc has a child ref(id=1001,name=foo).
052 * - /xyz has a child ref(id=1002) 
053 * - Both ref(id=1001,name=foo) and ref(id=1002) point to another reference,
054 *   ref(id=1003,count=2).
055 * - Finally, ref(id=1003,count=2) points to inode(id=1000,name=bar).
056 * 
057 * Note 1: For a reference without name, e.g. ref(id=1002), it uses the name
058 *         of the referred inode.
059 * Note 2: getParent() always returns the parent in the current state, e.g.
060 *         inode(id=1000,name=bar).getParent() returns /xyz but not /abc.
061 */
062public abstract class INodeReference extends INode {
063  /**
064   * Try to remove the given reference and then return the reference count.
065   * If the given inode is not a reference, return -1;
066   */
067  public static int tryRemoveReference(INode inode) {
068    if (!inode.isReference()) {
069      return -1;
070    }
071    return removeReference(inode.asReference());
072  }
073
074  /**
075   * Remove the given reference and then return the reference count.
076   * If the referred inode is not a WithCount, return -1;
077   */
078  private static int removeReference(INodeReference ref) {
079    final INode referred = ref.getReferredINode();
080    if (!(referred instanceof WithCount)) {
081      return -1;
082    }
083    
084    WithCount wc = (WithCount) referred;
085    wc.removeReference(ref);
086    return wc.getReferenceCount();
087  }
088
089  /**
090   * When destroying a reference node (WithName or DstReference), we call this
091   * method to identify the snapshot which is the latest snapshot before the
092   * reference node's creation. 
093   */
094  static int getPriorSnapshot(INodeReference ref) {
095    WithCount wc = (WithCount) ref.getReferredINode();
096    WithName wn = null;
097    if (ref instanceof DstReference) {
098      wn = wc.getLastWithName();
099    } else if (ref instanceof WithName) {
100      wn = wc.getPriorWithName((WithName) ref);
101    }
102    if (wn != null) {
103      INode referred = wc.getReferredINode();
104      if (referred.isFile() && referred.asFile().isWithSnapshot()) {
105        return referred.asFile().getDiffs().getPrior(wn.lastSnapshotId);
106      } else if (referred.isDirectory()) {
107        DirectoryWithSnapshotFeature sf = referred.asDirectory()
108            .getDirectoryWithSnapshotFeature();
109        if (sf != null) {
110          return sf.getDiffs().getPrior(wn.lastSnapshotId);
111        }
112      }
113    }
114    return Snapshot.NO_SNAPSHOT_ID;
115  }
116  
117  private INode referred;
118  
119  public INodeReference(INode parent, INode referred) {
120    super(parent);
121    this.referred = referred;
122  }
123
124  public final INode getReferredINode() {
125    return referred;
126  }
127
128  public final void setReferredINode(INode referred) {
129    this.referred = referred;
130  }
131  
132  @Override
133  public final boolean isReference() {
134    return true;
135  }
136  
137  @Override
138  public final INodeReference asReference() {
139    return this;
140  }
141
142  @Override
143  public final boolean isFile() {
144    return referred.isFile();
145  }
146  
147  @Override
148  public final INodeFile asFile() {
149    return referred.asFile();
150  }
151  
152  @Override
153  public final boolean isDirectory() {
154    return referred.isDirectory();
155  }
156  
157  @Override
158  public final INodeDirectory asDirectory() {
159    return referred.asDirectory();
160  }
161  
162  @Override
163  public final boolean isSymlink() {
164    return referred.isSymlink();
165  }
166  
167  @Override
168  public final INodeSymlink asSymlink() {
169    return referred.asSymlink();
170  }
171
172  @Override
173  public byte[] getLocalNameBytes() {
174    return referred.getLocalNameBytes();
175  }
176
177  @Override
178  public void setLocalName(byte[] name) {
179    referred.setLocalName(name);
180  }
181
182  @Override
183  public final long getId() {
184    return referred.getId();
185  }
186  
187  @Override
188  public final PermissionStatus getPermissionStatus(int snapshotId) {
189    return referred.getPermissionStatus(snapshotId);
190  }
191  
192  @Override
193  public final String getUserName(int snapshotId) {
194    return referred.getUserName(snapshotId);
195  }
196  
197  @Override
198  final void setUser(String user) {
199    referred.setUser(user);
200  }
201  
202  @Override
203  public final String getGroupName(int snapshotId) {
204    return referred.getGroupName(snapshotId);
205  }
206  
207  @Override
208  final void setGroup(String group) {
209    referred.setGroup(group);
210  }
211  
212  @Override
213  public final FsPermission getFsPermission(int snapshotId) {
214    return referred.getFsPermission(snapshotId);
215  }
216
217  @Override
218  final AclFeature getAclFeature(int snapshotId) {
219    return referred.getAclFeature(snapshotId);
220  }
221
222  @Override
223  final void addAclFeature(AclFeature aclFeature) {
224    referred.addAclFeature(aclFeature);
225  }
226
227  @Override
228  final void removeAclFeature() {
229    referred.removeAclFeature();
230  }
231  
232  @Override
233  final XAttrFeature getXAttrFeature(int snapshotId) {
234    return referred.getXAttrFeature(snapshotId);
235  }
236  
237  @Override
238  final void addXAttrFeature(XAttrFeature xAttrFeature) {
239    referred.addXAttrFeature(xAttrFeature);
240  }
241  
242  @Override
243  final void removeXAttrFeature() {
244    referred.removeXAttrFeature();
245  }
246
247  @Override
248  public final short getFsPermissionShort() {
249    return referred.getFsPermissionShort();
250  }
251  
252  @Override
253  void setPermission(FsPermission permission) {
254    referred.setPermission(permission);
255  }
256
257  @Override
258  public long getPermissionLong() {
259    return referred.getPermissionLong();
260  }
261
262  @Override
263  public final long getModificationTime(int snapshotId) {
264    return referred.getModificationTime(snapshotId);
265  }
266  
267  @Override
268  public final INode updateModificationTime(long mtime, int latestSnapshotId) {
269    return referred.updateModificationTime(mtime, latestSnapshotId);
270  }
271  
272  @Override
273  public final void setModificationTime(long modificationTime) {
274    referred.setModificationTime(modificationTime);
275  }
276  
277  @Override
278  public final long getAccessTime(int snapshotId) {
279    return referred.getAccessTime(snapshotId);
280  }
281  
282  @Override
283  public final void setAccessTime(long accessTime) {
284    referred.setAccessTime(accessTime);
285  }
286
287  @Override
288  public final byte getStoragePolicyID() {
289    return referred.getStoragePolicyID();
290  }
291
292  @Override
293  public final byte getLocalStoragePolicyID() {
294    return referred.getLocalStoragePolicyID();
295  }
296
297  @Override
298  final void recordModification(int latestSnapshotId) {
299    referred.recordModification(latestSnapshotId);
300  }
301
302  @Override // used by WithCount
303  public void cleanSubtree(
304      ReclaimContext reclaimContext, int snapshot, int prior) {
305    referred.cleanSubtree(reclaimContext, snapshot, prior);
306  }
307
308  @Override // used by WithCount
309  public void destroyAndCollectBlocks(ReclaimContext reclaimContext) {
310    if (removeReference(this) <= 0) {
311      referred.destroyAndCollectBlocks(reclaimContext);
312    }
313  }
314
315  @Override
316  public ContentSummaryComputationContext computeContentSummary(int snapshotId,
317      ContentSummaryComputationContext summary) {
318    summary.nodeIncluded(this);
319    return referred.computeContentSummary(snapshotId, summary);
320  }
321
322  @Override
323  public QuotaCounts computeQuotaUsage(BlockStoragePolicySuite bsps,
324      byte blockStoragePolicyId, boolean useCache, int lastSnapshotId) {
325    return referred.computeQuotaUsage(bsps, blockStoragePolicyId, useCache,
326        lastSnapshotId);
327  }
328
329  @Override
330  public final INodeAttributes getSnapshotINode(int snapshotId) {
331    return referred.getSnapshotINode(snapshotId);
332  }
333
334  @Override
335  public QuotaCounts getQuotaCounts() {
336    return referred.getQuotaCounts();
337  }
338
339  @Override
340  public final void clear() {
341    super.clear();
342    referred = null;
343  }
344
345  @Override
346  public void dumpTreeRecursively(PrintWriter out, StringBuilder prefix,
347      final int snapshot) {
348    super.dumpTreeRecursively(out, prefix, snapshot);
349    if (this instanceof DstReference) {
350      out.print(", dstSnapshotId=" + ((DstReference) this).dstSnapshotId);
351    }
352    if (this instanceof WithCount) {
353      out.print(", count=" + ((WithCount)this).getReferenceCount());
354    }
355    out.println();
356    
357    final StringBuilder b = new StringBuilder();
358    for(int i = 0; i < prefix.length(); i++) {
359      b.append(' ');
360    }
361    b.append("->");
362    getReferredINode().dumpTreeRecursively(out, b, snapshot);
363  }
364  
365  public int getDstSnapshotId() {
366    return Snapshot.CURRENT_STATE_ID;
367  }
368  
369  /** An anonymous reference with reference count. */
370  public static class WithCount extends INodeReference {
371
372    private final List<WithName> withNameList = new ArrayList<>();
373
374    /**
375     * Compare snapshot with IDs, where null indicates the current status thus
376     * is greater than any non-null snapshot.
377     */
378    public static final Comparator<WithName> WITHNAME_COMPARATOR
379        = new Comparator<WithName>() {
380      @Override
381      public int compare(WithName left, WithName right) {
382        return left.lastSnapshotId - right.lastSnapshotId;
383      }
384    };
385    
386    public WithCount(INodeReference parent, INode referred) {
387      super(parent, referred);
388      Preconditions.checkArgument(!referred.isReference());
389      referred.setParentReference(this);
390    }
391    
392    public int getReferenceCount() {
393      int count = withNameList.size();
394      if (getParentReference() != null) {
395        count++;
396      }
397      return count;
398    }
399
400    /** Increment and then return the reference count. */
401    public void addReference(INodeReference ref) {
402      if (ref instanceof WithName) {
403        WithName refWithName = (WithName) ref;
404        int i = Collections.binarySearch(withNameList, refWithName,
405            WITHNAME_COMPARATOR);
406        Preconditions.checkState(i < 0);
407        withNameList.add(-i - 1, refWithName);
408      } else if (ref instanceof DstReference) {
409        setParentReference(ref);
410      }
411    }
412
413    /** Decrement and then return the reference count. */
414    public void removeReference(INodeReference ref) {
415      if (ref instanceof WithName) {
416        int i = Collections.binarySearch(withNameList, (WithName) ref,
417            WITHNAME_COMPARATOR);
418        if (i >= 0) {
419          withNameList.remove(i);
420        }
421      } else if (ref == getParentReference()) {
422        setParent(null);
423      }
424    }
425
426    /** Return the last WithName reference if there is any, null otherwise. */
427    public WithName getLastWithName() {
428      return withNameList.size() > 0 ? 
429          withNameList.get(withNameList.size() - 1) : null;
430    }
431    
432    WithName getPriorWithName(WithName post) {
433      int i = Collections.binarySearch(withNameList, post, WITHNAME_COMPARATOR);
434      if (i > 0) {
435        return withNameList.get(i - 1);
436      } else if (i == 0 || i == -1) {
437        return null;
438      } else {
439        return withNameList.get(-i - 2);
440      }
441    }
442
443    /**
444     * @return the WithName/DstReference node contained in the given snapshot.
445     */
446    public INodeReference getParentRef(int snapshotId) {
447      int start = 0;
448      int end = withNameList.size() - 1;
449      while (start < end) {
450        int mid = start + (end - start) / 2;
451        int sid = withNameList.get(mid).lastSnapshotId; 
452        if (sid == snapshotId) {
453          return withNameList.get(mid);
454        } else if (sid < snapshotId) {
455          start = mid + 1;
456        } else {
457          end = mid;
458        }
459      }
460      if (start < withNameList.size() &&
461          withNameList.get(start).lastSnapshotId >= snapshotId) {
462        return withNameList.get(start);
463      } else {
464        return this.getParentReference();
465      }
466    }
467  }
468  
469  /** A reference with a fixed name. */
470  public static class WithName extends INodeReference {
471
472    private final byte[] name;
473
474    /**
475     * The id of the last snapshot in the src tree when this WithName node was 
476     * generated. When calculating the quota usage of the referred node, only 
477     * the files/dirs existing when this snapshot was taken will be counted for 
478     * this WithName node and propagated along its ancestor path.
479     */
480    private final int lastSnapshotId;
481    
482    public WithName(INodeDirectory parent, WithCount referred, byte[] name,
483        int lastSnapshotId) {
484      super(parent, referred);
485      this.name = name;
486      this.lastSnapshotId = lastSnapshotId;
487      referred.addReference(this);
488    }
489
490    @Override
491    public final byte[] getLocalNameBytes() {
492      return name;
493    }
494
495    @Override
496    public final void setLocalName(byte[] name) {
497      throw new UnsupportedOperationException("Cannot set name: " + getClass()
498          + " is immutable.");
499    }
500    
501    public int getLastSnapshotId() {
502      return lastSnapshotId;
503    }
504    
505    @Override
506    public final ContentSummaryComputationContext computeContentSummary(
507        int snapshotId, ContentSummaryComputationContext summary) {
508      summary.nodeIncluded(this);
509      final int s = snapshotId < lastSnapshotId ? snapshotId : lastSnapshotId;
510      // only count storagespace for WithName
511      final QuotaCounts q = computeQuotaUsage(
512          summary.getBlockStoragePolicySuite(), getStoragePolicyID(), false, s);
513      summary.getCounts().addContent(Content.DISKSPACE, q.getStorageSpace());
514      summary.getCounts().addTypeSpaces(q.getTypeSpaces());
515      return summary;
516    }
517
518    @Override
519    public final QuotaCounts computeQuotaUsage(BlockStoragePolicySuite bsps,
520        byte blockStoragePolicyId, boolean useCache, int lastSnapshotId) {
521      // if this.lastSnapshotId < lastSnapshotId, the rename of the referred
522      // node happened before the rename of its ancestor. This should be
523      // impossible since for WithName node we only count its children at the
524      // time of the rename.
525      Preconditions.checkState(lastSnapshotId == Snapshot.CURRENT_STATE_ID
526          || this.lastSnapshotId >= lastSnapshotId);
527      final INode referred = this.getReferredINode().asReference()
528          .getReferredINode();
529      // We will continue the quota usage computation using the same snapshot id
530      // as time line (if the given snapshot id is valid). Also, we cannot use 
531      // cache for the referred node since its cached quota may have already 
532      // been updated by changes in the current tree.
533      int id = lastSnapshotId != Snapshot.CURRENT_STATE_ID ? 
534          lastSnapshotId : this.lastSnapshotId;
535      return referred.computeQuotaUsage(bsps, blockStoragePolicyId, false, id);
536    }
537    
538    @Override
539    public void cleanSubtree(ReclaimContext reclaimContext, final int snapshot,
540        int prior) {
541      // since WithName node resides in deleted list acting as a snapshot copy,
542      // the parameter snapshot must be non-null
543      Preconditions.checkArgument(snapshot != Snapshot.CURRENT_STATE_ID);
544      // if prior is NO_SNAPSHOT_ID, we need to check snapshot belonging to the
545      // previous WithName instance
546      if (prior == Snapshot.NO_SNAPSHOT_ID) {
547        prior = getPriorSnapshot(this);
548      }
549      
550      if (prior != Snapshot.NO_SNAPSHOT_ID
551          && Snapshot.ID_INTEGER_COMPARATOR.compare(snapshot, prior) <= 0) {
552        return;
553      }
554
555      // record the old quota delta
556      QuotaCounts old = reclaimContext.quotaDelta().getCountsCopy();
557      getReferredINode().cleanSubtree(reclaimContext, snapshot, prior);
558      INodeReference ref = getReferredINode().getParentReference();
559      if (ref != null) {
560        QuotaCounts current = reclaimContext.quotaDelta().getCountsCopy();
561        current.subtract(old);
562        // we need to update the quota usage along the parent path from ref
563        reclaimContext.quotaDelta().addUpdatePath(ref, current);
564      }
565      
566      if (snapshot < lastSnapshotId) {
567        // for a WithName node, when we compute its quota usage, we only count
568        // in all the nodes existing at the time of the corresponding rename op.
569        // Thus if we are deleting a snapshot before/at the snapshot associated 
570        // with lastSnapshotId, we do not need to update the quota upwards.
571        reclaimContext.quotaDelta().setCounts(old);
572      }
573    }
574    
575    @Override
576    public void destroyAndCollectBlocks(ReclaimContext reclaimContext) {
577      int snapshot = getSelfSnapshot();
578      reclaimContext.quotaDelta().add(computeQuotaUsage(reclaimContext.bsps));
579      if (removeReference(this) <= 0) {
580        getReferredINode().destroyAndCollectBlocks(reclaimContext.getCopy());
581      } else {
582        int prior = getPriorSnapshot(this);
583        INode referred = getReferredINode().asReference().getReferredINode();
584
585        if (snapshot != Snapshot.NO_SNAPSHOT_ID) {
586          if (prior != Snapshot.NO_SNAPSHOT_ID && snapshot <= prior) {
587            // the snapshot to be deleted has been deleted while traversing 
588            // the src tree of the previous rename operation. This usually 
589            // happens when rename's src and dst are under the same 
590            // snapshottable directory. E.g., the following operation sequence:
591            // 1. create snapshot s1 on /test
592            // 2. rename /test/foo/bar to /test/foo2/bar
593            // 3. create snapshot s2 on /test
594            // 4. rename foo2 again
595            // 5. delete snapshot s2
596            return;
597          }
598          ReclaimContext newCtx = reclaimContext.getCopy();
599          referred.cleanSubtree(newCtx, snapshot, prior);
600          INodeReference ref = getReferredINode().getParentReference();
601          if (ref != null) {
602            // we need to update the quota usage along the parent path from ref
603            reclaimContext.quotaDelta().addUpdatePath(ref,
604                newCtx.quotaDelta().getCountsCopy());
605          }
606        }
607      }
608    }
609    
610    private int getSelfSnapshot() {
611      INode referred = getReferredINode().asReference().getReferredINode();
612      int snapshot = Snapshot.NO_SNAPSHOT_ID;
613      if (referred.isFile() && referred.asFile().isWithSnapshot()) {
614        snapshot = referred.asFile().getDiffs().getPrior(lastSnapshotId);
615      } else if (referred.isDirectory()) {
616        DirectoryWithSnapshotFeature sf = referred.asDirectory()
617            .getDirectoryWithSnapshotFeature();
618        if (sf != null) {
619          snapshot = sf.getDiffs().getPrior(lastSnapshotId);
620        }
621      }
622      return snapshot;
623    }
624  }
625  
626  public static class DstReference extends INodeReference {
627    /**
628     * Record the latest snapshot of the dst subtree before the rename. For
629     * later operations on the moved/renamed files/directories, if the latest
630     * snapshot is after this dstSnapshot, changes will be recorded to the
631     * latest snapshot. Otherwise changes will be recorded to the snapshot
632     * belonging to the src of the rename.
633     * 
634     * {@link Snapshot#NO_SNAPSHOT_ID} means no dstSnapshot (e.g., src of the
635     * first-time rename).
636     */
637    private final int dstSnapshotId;
638    
639    @Override
640    public final int getDstSnapshotId() {
641      return dstSnapshotId;
642    }
643    
644    public DstReference(INodeDirectory parent, WithCount referred,
645        final int dstSnapshotId) {
646      super(parent, referred);
647      this.dstSnapshotId = dstSnapshotId;
648      referred.addReference(this);
649    }
650    
651    @Override
652    public void cleanSubtree(ReclaimContext reclaimContext, int snapshot,
653        int prior) {
654      if (snapshot == Snapshot.CURRENT_STATE_ID
655          && prior == Snapshot.NO_SNAPSHOT_ID) {
656        destroyAndCollectBlocks(reclaimContext);
657      } else {
658        // if prior is NO_SNAPSHOT_ID, we need to check snapshot belonging to 
659        // the previous WithName instance
660        if (prior == Snapshot.NO_SNAPSHOT_ID) {
661          prior = getPriorSnapshot(this);
662        }
663        // if prior is not NO_SNAPSHOT_ID, and prior is not before the
664        // to-be-deleted snapshot, we can quit here and leave the snapshot
665        // deletion work to the src tree of rename
666        if (snapshot != Snapshot.CURRENT_STATE_ID
667            && prior != Snapshot.NO_SNAPSHOT_ID
668            && Snapshot.ID_INTEGER_COMPARATOR.compare(snapshot, prior) <= 0) {
669          return;
670        }
671        getReferredINode().cleanSubtree(reclaimContext, snapshot, prior);
672      }
673    }
674    
675    /**
676     * {@inheritDoc}
677     * <br/>
678     * To destroy a DstReference node, we first remove its link with the 
679     * referred node. If the reference number of the referred node is <= 0, we 
680     * destroy the subtree of the referred node. Otherwise, we clean the 
681     * referred node's subtree and delete everything created after the last 
682     * rename operation, i.e., everything outside of the scope of the prior 
683     * WithName nodes.
684     * @param reclaimContext
685     */
686    @Override
687    public void destroyAndCollectBlocks(ReclaimContext reclaimContext) {
688      // since we count everything of the subtree for the quota usage of a
689      // dst reference node, here we should just simply do a quota computation.
690      // then to avoid double counting, we pass a different QuotaDelta to other
691      // calls
692      reclaimContext.quotaDelta().add(computeQuotaUsage(reclaimContext.bsps));
693      ReclaimContext newCtx = reclaimContext.getCopy();
694
695      if (removeReference(this) <= 0) {
696        getReferredINode().destroyAndCollectBlocks(newCtx);
697      } else {
698        // we will clean everything, including files, directories, and 
699        // snapshots, that were created after this prior snapshot
700        int prior = getPriorSnapshot(this);
701        // prior must be non-null, otherwise we do not have any previous 
702        // WithName nodes, and the reference number will be 0.
703        Preconditions.checkState(prior != Snapshot.NO_SNAPSHOT_ID);
704        // identify the snapshot created after prior
705        int snapshot = getSelfSnapshot(prior);
706        
707        INode referred = getReferredINode().asReference().getReferredINode();
708        if (referred.isFile()) {
709          // if referred is a file, it must be a file with snapshot since we did
710          // recordModification before the rename
711          INodeFile file = referred.asFile();
712          Preconditions.checkState(file.isWithSnapshot());
713          // make sure we mark the file as deleted
714          file.getFileWithSnapshotFeature().deleteCurrentFile();
715          // when calling cleanSubtree of the referred node, since we
716          // compute quota usage updates before calling this destroy
717          // function, we use true for countDiffChange
718          referred.cleanSubtree(newCtx, snapshot, prior);
719        } else if (referred.isDirectory()) {
720          // similarly, if referred is a directory, it must be an
721          // INodeDirectory with snapshot
722          INodeDirectory dir = referred.asDirectory();
723          Preconditions.checkState(dir.isWithSnapshot());
724          DirectoryWithSnapshotFeature.destroyDstSubtree(newCtx, dir,
725              snapshot, prior);
726        }
727      }
728    }
729
730    private int getSelfSnapshot(final int prior) {
731      WithCount wc = (WithCount) getReferredINode().asReference();
732      INode referred = wc.getReferredINode();
733      int lastSnapshot = Snapshot.CURRENT_STATE_ID;
734      if (referred.isFile() && referred.asFile().isWithSnapshot()) {
735        lastSnapshot = referred.asFile().getDiffs().getLastSnapshotId();
736      } else if (referred.isDirectory()) {
737        DirectoryWithSnapshotFeature sf = referred.asDirectory()
738            .getDirectoryWithSnapshotFeature();
739        if (sf != null) {
740          lastSnapshot = sf.getLastSnapshotId();
741        }
742      }
743      if (lastSnapshot != Snapshot.CURRENT_STATE_ID && lastSnapshot != prior) {
744        return lastSnapshot;
745      } else {
746        return Snapshot.CURRENT_STATE_ID;
747      }
748    }
749  }
750}