001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hdfs.server.namenode;
019
020import static org.apache.hadoop.hdfs.server.namenode.FSImageFormat.renameReservedPathsOnUpgrade;
021import static org.apache.hadoop.util.Time.monotonicNow;
022
023import java.io.FilterInputStream;
024import java.io.IOException;
025import java.io.InputStream;
026import java.util.Arrays;
027import java.util.EnumMap;
028import java.util.EnumSet;
029import java.util.List;
030
031import org.apache.commons.logging.Log;
032import org.apache.commons.logging.LogFactory;
033import org.apache.hadoop.classification.InterfaceAudience;
034import org.apache.hadoop.classification.InterfaceStability;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.XAttrSetFlag;
037import org.apache.hadoop.hdfs.protocol.HdfsConstants;
038import org.apache.hadoop.hdfs.protocol.LocatedBlock;
039import org.apache.hadoop.hdfs.protocol.Block;
040import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
041import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
042import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
043import org.apache.hadoop.hdfs.protocol.LayoutVersion;
044import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
045import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
046import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
047import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption;
048import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
049import org.apache.hadoop.hdfs.server.common.Storage;
050import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
051import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddBlockOp;
052import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCacheDirectiveInfoOp;
053import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCachePoolOp;
054import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCloseOp;
055import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllocateBlockIdOp;
056import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllowSnapshotOp;
057import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AppendOp;
058import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.BlockListUpdatingOp;
059import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.CancelDelegationTokenOp;
060import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ClearNSQuotaOp;
061import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ConcatDeleteOp;
062import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.CreateSnapshotOp;
063import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.DeleteOp;
064import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.DeleteSnapshotOp;
065import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.DisallowSnapshotOp;
066import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.GetDelegationTokenOp;
067import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp;
068import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ModifyCacheDirectiveInfoOp;
069import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ModifyCachePoolOp;
070import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ReassignLeaseOp;
071import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveCacheDirectiveInfoOp;
072import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveCachePoolOp;
073import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveXAttrOp;
074import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOldOp;
075import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOp;
076import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameSnapshotOp;
077import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenewDelegationTokenOp;
078import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetAclOp;
079import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RollingUpgradeOp;
080import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV1Op;
081import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV2Op;
082import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetNSQuotaOp;
083import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetOwnerOp;
084import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetPermissionsOp;
085import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetQuotaOp;
086import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetReplicationOp;
087import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetStoragePolicyOp;
088import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetXAttrOp;
089import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SymlinkOp;
090import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp;
091import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TruncateOp;
092import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateBlocksOp;
093import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateMasterKeyOp;
094import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
095import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
096import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
097import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
098import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
099import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress;
100import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Counter;
101import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
102import org.apache.hadoop.hdfs.util.Holder;
103import org.apache.hadoop.util.ChunkedArrayList;
104
105import com.google.common.base.Joiner;
106import com.google.common.base.Preconditions;
107
108@InterfaceAudience.Private
109@InterfaceStability.Evolving
110public class FSEditLogLoader {
111  static final Log LOG = LogFactory.getLog(FSEditLogLoader.class.getName());
112  static final long REPLAY_TRANSACTION_LOG_INTERVAL = 1000; // 1sec
113
114  private final FSNamesystem fsNamesys;
115  private long lastAppliedTxId;
116  /** Total number of end transactions loaded. */
117  private int totalEdits = 0;
118  
119  public FSEditLogLoader(FSNamesystem fsNamesys, long lastAppliedTxId) {
120    this.fsNamesys = fsNamesys;
121    this.lastAppliedTxId = lastAppliedTxId;
122  }
123  
124  long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId)
125      throws IOException {
126    return loadFSEdits(edits, expectedStartingTxId, null, null);
127  }
128
129  /**
130   * Load an edit log, and apply the changes to the in-memory structure
131   * This is where we apply edits that we've been writing to disk all
132   * along.
133   */
134  long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId,
135      StartupOption startOpt, MetaRecoveryContext recovery) throws IOException {
136    StartupProgress prog = NameNode.getStartupProgress();
137    Step step = createStartupProgressStep(edits);
138    prog.beginStep(Phase.LOADING_EDITS, step);
139    fsNamesys.writeLock();
140    try {
141      long startTime = monotonicNow();
142      FSImage.LOG.info("Start loading edits file " + edits.getName());
143      long numEdits = loadEditRecords(edits, false, expectedStartingTxId,
144          startOpt, recovery);
145      FSImage.LOG.info("Edits file " + edits.getName() 
146          + " of size " + edits.length() + " edits # " + numEdits 
147          + " loaded in " + (monotonicNow()-startTime)/1000 + " seconds");
148      return numEdits;
149    } finally {
150      edits.close();
151      fsNamesys.writeUnlock("loadFSEdits");
152      prog.endStep(Phase.LOADING_EDITS, step);
153    }
154  }
155
156  long loadEditRecords(EditLogInputStream in, boolean closeOnExit,
157      long expectedStartingTxId, StartupOption startOpt,
158      MetaRecoveryContext recovery) throws IOException {
159    FSDirectory fsDir = fsNamesys.dir;
160
161    EnumMap<FSEditLogOpCodes, Holder<Integer>> opCounts =
162      new EnumMap<FSEditLogOpCodes, Holder<Integer>>(FSEditLogOpCodes.class);
163
164    if (LOG.isTraceEnabled()) {
165      LOG.trace("Acquiring write lock to replay edit log");
166    }
167
168    fsNamesys.writeLock();
169    fsDir.writeLock();
170
171    long recentOpcodeOffsets[] = new long[4];
172    Arrays.fill(recentOpcodeOffsets, -1);
173    
174    long expectedTxId = expectedStartingTxId;
175    long numEdits = 0;
176    long lastTxId = in.getLastTxId();
177    long numTxns = (lastTxId - expectedStartingTxId) + 1;
178    StartupProgress prog = NameNode.getStartupProgress();
179    Step step = createStartupProgressStep(in);
180    prog.setTotal(Phase.LOADING_EDITS, step, numTxns);
181    Counter counter = prog.getCounter(Phase.LOADING_EDITS, step);
182    long lastLogTime = monotonicNow();
183    long lastInodeId = fsNamesys.dir.getLastInodeId();
184    
185    try {
186      while (true) {
187        try {
188          FSEditLogOp op;
189          try {
190            op = in.readOp();
191            if (op == null) {
192              break;
193            }
194          } catch (Throwable e) {
195            // Handle a problem with our input
196            check203UpgradeFailure(in.getVersion(true), e);
197            String errorMessage =
198              formatEditLogReplayError(in, recentOpcodeOffsets, expectedTxId);
199            FSImage.LOG.error(errorMessage, e);
200            if (recovery == null) {
201               // We will only try to skip over problematic opcodes when in
202               // recovery mode.
203              throw new EditLogInputException(errorMessage, e, numEdits);
204            }
205            MetaRecoveryContext.editLogLoaderPrompt(
206                "We failed to read txId " + expectedTxId,
207                recovery, "skipping the bad section in the log");
208            in.resync();
209            continue;
210          }
211          recentOpcodeOffsets[(int)(numEdits % recentOpcodeOffsets.length)] =
212            in.getPosition();
213          if (op.hasTransactionId()) {
214            if (op.getTransactionId() > expectedTxId) { 
215              MetaRecoveryContext.editLogLoaderPrompt("There appears " +
216                  "to be a gap in the edit log.  We expected txid " +
217                  expectedTxId + ", but got txid " +
218                  op.getTransactionId() + ".", recovery, "ignoring missing " +
219                  " transaction IDs");
220            } else if (op.getTransactionId() < expectedTxId) { 
221              MetaRecoveryContext.editLogLoaderPrompt("There appears " +
222                  "to be an out-of-order edit in the edit log.  We " +
223                  "expected txid " + expectedTxId + ", but got txid " +
224                  op.getTransactionId() + ".", recovery,
225                  "skipping the out-of-order edit");
226              continue;
227            }
228          }
229          try {
230            if (LOG.isTraceEnabled()) {
231              LOG.trace("op=" + op + ", startOpt=" + startOpt
232                  + ", numEdits=" + numEdits + ", totalEdits=" + totalEdits);
233            }
234            long inodeId = applyEditLogOp(op, fsDir, startOpt,
235                in.getVersion(true), lastInodeId);
236            if (lastInodeId < inodeId) {
237              lastInodeId = inodeId;
238            }
239          } catch (RollingUpgradeOp.RollbackException e) {
240            throw e;
241          } catch (Throwable e) {
242            LOG.error("Encountered exception on operation " + op, e);
243            if (recovery == null) {
244              throw e instanceof IOException? (IOException)e: new IOException(e);
245            }
246
247            MetaRecoveryContext.editLogLoaderPrompt("Failed to " +
248             "apply edit log operation " + op + ": error " +
249             e.getMessage(), recovery, "applying edits");
250          }
251          // Now that the operation has been successfully decoded and
252          // applied, update our bookkeeping.
253          incrOpCount(op.opCode, opCounts, step, counter);
254          if (op.hasTransactionId()) {
255            lastAppliedTxId = op.getTransactionId();
256            expectedTxId = lastAppliedTxId + 1;
257          } else {
258            expectedTxId = lastAppliedTxId = expectedStartingTxId;
259          }
260          // log progress
261          if (op.hasTransactionId()) {
262            long now = monotonicNow();
263            if (now - lastLogTime > REPLAY_TRANSACTION_LOG_INTERVAL) {
264              long deltaTxId = lastAppliedTxId - expectedStartingTxId + 1;
265              int percent = Math.round((float) deltaTxId / numTxns * 100);
266              LOG.info("replaying edit log: " + deltaTxId + "/" + numTxns
267                  + " transactions completed. (" + percent + "%)");
268              lastLogTime = now;
269            }
270          }
271          numEdits++;
272          totalEdits++;
273        } catch (RollingUpgradeOp.RollbackException e) {
274          LOG.info("Stopped at OP_START_ROLLING_UPGRADE for rollback.");
275          break;
276        } catch (MetaRecoveryContext.RequestStopException e) {
277          MetaRecoveryContext.LOG.warn("Stopped reading edit log at " +
278              in.getPosition() + "/"  + in.length());
279          break;
280        }
281      }
282    } finally {
283      fsNamesys.dir.resetLastInodeId(lastInodeId);
284      if(closeOnExit) {
285        in.close();
286      }
287      fsDir.writeUnlock();
288      fsNamesys.writeUnlock("loadEditRecords");
289
290      if (LOG.isTraceEnabled()) {
291        LOG.trace("replaying edit log finished");
292      }
293
294      if (FSImage.LOG.isDebugEnabled()) {
295        dumpOpCounts(opCounts);
296      }
297    }
298    return numEdits;
299  }
300  
301  // allocate and update last allocated inode id
302  private long getAndUpdateLastInodeId(long inodeIdFromOp, int logVersion,
303      long lastInodeId) throws IOException {
304    long inodeId = inodeIdFromOp;
305
306    if (inodeId == HdfsConstants.GRANDFATHER_INODE_ID) {
307      if (NameNodeLayoutVersion.supports(
308          LayoutVersion.Feature.ADD_INODE_ID, logVersion)) {
309        throw new IOException("The layout version " + logVersion
310            + " supports inodeId but gave bogus inodeId");
311      }
312      inodeId = fsNamesys.dir.allocateNewInodeId();
313    } else {
314      // need to reset lastInodeId. fsnamesys gets lastInodeId firstly from
315      // fsimage but editlog captures more recent inodeId allocations
316      if (inodeId > lastInodeId) {
317        fsNamesys.dir.resetLastInodeId(inodeId);
318      }
319    }
320    return inodeId;
321  }
322
323  @SuppressWarnings("deprecation")
324  private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
325      StartupOption startOpt, int logVersion, long lastInodeId) throws IOException {
326    long inodeId = HdfsConstants.GRANDFATHER_INODE_ID;
327    if (LOG.isTraceEnabled()) {
328      LOG.trace("replaying edit log: " + op);
329    }
330    final boolean toAddRetryCache = fsNamesys.hasRetryCache() && op.hasRpcIds();
331
332    switch (op.opCode) {
333    case OP_ADD: {
334      AddCloseOp addCloseOp = (AddCloseOp)op;
335      final String path =
336          renameReservedPathsOnUpgrade(addCloseOp.path, logVersion);
337      if (FSNamesystem.LOG.isDebugEnabled()) {
338        FSNamesystem.LOG.debug(op.opCode + ": " + path +
339            " numblocks : " + addCloseOp.blocks.length +
340            " clientHolder " + addCloseOp.clientName +
341            " clientMachine " + addCloseOp.clientMachine);
342      }
343      // There are 3 cases here:
344      // 1. OP_ADD to create a new file
345      // 2. OP_ADD to update file blocks
346      // 3. OP_ADD to open file for append (old append)
347
348      // See if the file already exists (persistBlocks call)
349      INodesInPath iip = fsDir.getINodesInPath(path, DirOp.WRITE);
350      INodeFile oldFile = INodeFile.valueOf(iip.getLastINode(), path, true);
351      if (oldFile != null && addCloseOp.overwrite) {
352        // This is OP_ADD with overwrite
353        FSDirDeleteOp.deleteForEditLog(fsDir, iip, addCloseOp.mtime);
354        iip = INodesInPath.replace(iip, iip.length() - 1, null);
355        oldFile = null;
356      }
357      INodeFile newFile = oldFile;
358      if (oldFile == null) { // this is OP_ADD on a new file (case 1)
359        // versions > 0 support per file replication
360        // get name and replication
361        final short replication = fsNamesys.getBlockManager()
362            .adjustReplication(addCloseOp.replication);
363        assert addCloseOp.blocks.length == 0;
364
365        // add to the file tree
366        inodeId = getAndUpdateLastInodeId(addCloseOp.inodeId, logVersion, lastInodeId);
367        newFile = FSDirWriteFileOp.addFileForEditLog(fsDir, inodeId,
368            iip.getExistingINodes(), iip.getLastLocalName(),
369            addCloseOp.permissions, addCloseOp.aclEntries,
370            addCloseOp.xAttrs, replication, addCloseOp.mtime,
371            addCloseOp.atime, addCloseOp.blockSize, true,
372            addCloseOp.clientName, addCloseOp.clientMachine,
373            addCloseOp.storagePolicyId);
374        assert newFile != null;
375        iip = INodesInPath.replace(iip, iip.length() - 1, newFile);
376        fsNamesys.leaseManager.addLease(addCloseOp.clientName, newFile.getId());
377
378        // add the op into retry cache if necessary
379        if (toAddRetryCache) {
380          HdfsFileStatus stat =
381              FSDirStatAndListingOp.createFileStatusForEditLog(fsDir, iip);
382          fsNamesys.addCacheEntryWithPayload(addCloseOp.rpcClientId,
383              addCloseOp.rpcCallId, stat);
384        }
385      } else { // This is OP_ADD on an existing file (old append)
386        if (!oldFile.isUnderConstruction()) {
387          // This is case 3: a call to append() on an already-closed file.
388          if (FSNamesystem.LOG.isDebugEnabled()) {
389            FSNamesystem.LOG.debug("Reopening an already-closed file " +
390                "for append");
391          }
392          LocatedBlock lb = FSDirAppendOp.prepareFileForAppend(fsNamesys, iip,
393              addCloseOp.clientName, addCloseOp.clientMachine, false, false,
394              false);
395          // add the op into retry cache if necessary
396          if (toAddRetryCache) {
397            HdfsFileStatus stat =
398                FSDirStatAndListingOp.createFileStatusForEditLog(fsDir, iip);
399            fsNamesys.addCacheEntryWithPayload(addCloseOp.rpcClientId,
400                addCloseOp.rpcCallId, new LastBlockWithStatus(lb, stat));
401          }
402        }
403      }
404      // Fall-through for case 2.
405      // Regardless of whether it's a new file or an updated file,
406      // update the block list.
407      
408      // Update the salient file attributes.
409      newFile.setAccessTime(addCloseOp.atime, Snapshot.CURRENT_STATE_ID);
410      newFile.setModificationTime(addCloseOp.mtime, Snapshot.CURRENT_STATE_ID);
411      updateBlocks(fsDir, addCloseOp, iip, newFile);
412      break;
413    }
414    case OP_CLOSE: {
415      AddCloseOp addCloseOp = (AddCloseOp)op;
416      final String path =
417          renameReservedPathsOnUpgrade(addCloseOp.path, logVersion);
418      if (FSNamesystem.LOG.isDebugEnabled()) {
419        FSNamesystem.LOG.debug(op.opCode + ": " + path +
420            " numblocks : " + addCloseOp.blocks.length +
421            " clientHolder " + addCloseOp.clientName +
422            " clientMachine " + addCloseOp.clientMachine);
423      }
424
425      final INodesInPath iip = fsDir.getINodesInPath(path, DirOp.READ);
426      final INodeFile file = INodeFile.valueOf(iip.getLastINode(), path);
427
428      // Update the salient file attributes.
429      file.setAccessTime(addCloseOp.atime, Snapshot.CURRENT_STATE_ID);
430      file.setModificationTime(addCloseOp.mtime, Snapshot.CURRENT_STATE_ID);
431      updateBlocks(fsDir, addCloseOp, iip, file);
432
433      // Now close the file
434      if (!file.isUnderConstruction() &&
435          logVersion <= LayoutVersion.BUGFIX_HDFS_2991_VERSION) {
436        // There was a bug (HDFS-2991) in hadoop < 0.23.1 where OP_CLOSE
437        // could show up twice in a row. But after that version, this
438        // should be fixed, so we should treat it as an error.
439        throw new IOException(
440            "File is not under construction: " + path);
441      }
442      // One might expect that you could use removeLease(holder, path) here,
443      // but OP_CLOSE doesn't serialize the holder. So, remove the inode.
444      if (file.isUnderConstruction()) {
445        fsNamesys.getLeaseManager().removeLease(file.getId());
446        file.toCompleteFile(file.getModificationTime(), 0,
447            fsNamesys.getBlockManager().getMinReplication());
448      }
449      break;
450    }
451    case OP_APPEND: {
452      AppendOp appendOp = (AppendOp) op;
453      final String path = renameReservedPathsOnUpgrade(appendOp.path,
454          logVersion);
455      if (FSNamesystem.LOG.isDebugEnabled()) {
456        FSNamesystem.LOG.debug(op.opCode + ": " + path +
457            " clientName " + appendOp.clientName +
458            " clientMachine " + appendOp.clientMachine +
459            " newBlock " + appendOp.newBlock);
460      }
461      INodesInPath iip = fsDir.getINodesInPath(path, DirOp.WRITE);
462      INodeFile file = INodeFile.valueOf(iip.getLastINode(), path);
463      if (!file.isUnderConstruction()) {
464        LocatedBlock lb = FSDirAppendOp.prepareFileForAppend(fsNamesys, iip,
465            appendOp.clientName, appendOp.clientMachine, appendOp.newBlock,
466            false, false);
467        // add the op into retry cache if necessary
468        if (toAddRetryCache) {
469          HdfsFileStatus stat =
470              FSDirStatAndListingOp.createFileStatusForEditLog(fsDir, iip);
471          fsNamesys.addCacheEntryWithPayload(appendOp.rpcClientId,
472              appendOp.rpcCallId, new LastBlockWithStatus(lb, stat));
473        }
474      }
475      break;
476    }
477    case OP_UPDATE_BLOCKS: {
478      UpdateBlocksOp updateOp = (UpdateBlocksOp)op;
479      final String path =
480          renameReservedPathsOnUpgrade(updateOp.path, logVersion);
481      if (FSNamesystem.LOG.isDebugEnabled()) {
482        FSNamesystem.LOG.debug(op.opCode + ": " + path +
483            " numblocks : " + updateOp.blocks.length);
484      }
485      INodesInPath iip = fsDir.getINodesInPath(path, DirOp.READ);
486      INodeFile oldFile = INodeFile.valueOf(iip.getLastINode(), path);
487      // Update in-memory data structures
488      updateBlocks(fsDir, updateOp, iip, oldFile);
489      
490      if (toAddRetryCache) {
491        fsNamesys.addCacheEntry(updateOp.rpcClientId, updateOp.rpcCallId);
492      }
493      break;
494    }
495    case OP_ADD_BLOCK: {
496      AddBlockOp addBlockOp = (AddBlockOp) op;
497      String path = renameReservedPathsOnUpgrade(addBlockOp.getPath(), logVersion);
498      if (FSNamesystem.LOG.isDebugEnabled()) {
499        FSNamesystem.LOG.debug(op.opCode + ": " + path +
500            " new block id : " + addBlockOp.getLastBlock().getBlockId());
501      }
502      INodeFile oldFile = INodeFile.valueOf(fsDir.getINode(path), path);
503      // add the new block to the INodeFile
504      addNewBlock(addBlockOp, oldFile);
505      break;
506    }
507    case OP_SET_REPLICATION: {
508      SetReplicationOp setReplicationOp = (SetReplicationOp)op;
509      String src = renameReservedPathsOnUpgrade(
510          setReplicationOp.path, logVersion);
511      INodesInPath iip = fsDir.getINodesInPath(src, DirOp.WRITE);
512      short replication = fsNamesys.getBlockManager().adjustReplication(
513          setReplicationOp.replication);
514      FSDirAttrOp.unprotectedSetReplication(fsDir, iip, replication);
515      break;
516    }
517    case OP_CONCAT_DELETE: {
518      ConcatDeleteOp concatDeleteOp = (ConcatDeleteOp)op;
519      String trg = renameReservedPathsOnUpgrade(concatDeleteOp.trg, logVersion);
520      String[] srcs = new String[concatDeleteOp.srcs.length];
521      for (int i=0; i<srcs.length; i++) {
522        srcs[i] =
523            renameReservedPathsOnUpgrade(concatDeleteOp.srcs[i], logVersion);
524      }
525      INodesInPath targetIIP = fsDir.getINodesInPath(trg, DirOp.WRITE);
526      INodeFile[] srcFiles = new INodeFile[srcs.length];
527      for (int i = 0; i < srcs.length; i++) {
528        INodesInPath srcIIP = fsDir.getINodesInPath(srcs[i], DirOp.WRITE);
529        srcFiles[i] = srcIIP.getLastINode().asFile();
530      }
531      FSDirConcatOp.unprotectedConcat(fsDir, targetIIP, srcFiles,
532          concatDeleteOp.timestamp);
533      
534      if (toAddRetryCache) {
535        fsNamesys.addCacheEntry(concatDeleteOp.rpcClientId,
536            concatDeleteOp.rpcCallId);
537      }
538      break;
539    }
540    case OP_RENAME_OLD: {
541      RenameOldOp renameOp = (RenameOldOp)op;
542      final String src = renameReservedPathsOnUpgrade(renameOp.src, logVersion);
543      final String dst = renameReservedPathsOnUpgrade(renameOp.dst, logVersion);
544      FSDirRenameOp.renameForEditLog(fsDir, src, dst, renameOp.timestamp);
545      
546      if (toAddRetryCache) {
547        fsNamesys.addCacheEntry(renameOp.rpcClientId, renameOp.rpcCallId);
548      }
549      break;
550    }
551    case OP_DELETE: {
552      DeleteOp deleteOp = (DeleteOp)op;
553      final String src = renameReservedPathsOnUpgrade(
554          deleteOp.path, logVersion);
555      final INodesInPath iip = fsDir.getINodesInPath(src, DirOp.WRITE_LINK);
556      FSDirDeleteOp.deleteForEditLog(fsDir, iip, deleteOp.timestamp);
557
558      if (toAddRetryCache) {
559        fsNamesys.addCacheEntry(deleteOp.rpcClientId, deleteOp.rpcCallId);
560      }
561      break;
562    }
563    case OP_MKDIR: {
564      MkdirOp mkdirOp = (MkdirOp)op;
565      inodeId = getAndUpdateLastInodeId(mkdirOp.inodeId, logVersion,
566          lastInodeId);
567      FSDirMkdirOp.mkdirForEditLog(fsDir, inodeId,
568          renameReservedPathsOnUpgrade(mkdirOp.path, logVersion),
569          mkdirOp.permissions, mkdirOp.aclEntries, mkdirOp.timestamp);
570      break;
571    }
572    case OP_SET_GENSTAMP_V1: {
573      SetGenstampV1Op setGenstampV1Op = (SetGenstampV1Op)op;
574      fsNamesys.getBlockIdManager().setGenerationStampV1(
575          setGenstampV1Op.genStampV1);
576      break;
577    }
578    case OP_SET_PERMISSIONS: {
579      SetPermissionsOp setPermissionsOp = (SetPermissionsOp)op;
580      final String src =
581          renameReservedPathsOnUpgrade(setPermissionsOp.src, logVersion);
582      final INodesInPath iip = fsDir.getINodesInPath(src, DirOp.WRITE);
583      FSDirAttrOp.unprotectedSetPermission(fsDir, iip,
584          setPermissionsOp.permissions);
585      break;
586    }
587    case OP_SET_OWNER: {
588      SetOwnerOp setOwnerOp = (SetOwnerOp)op;
589      final String src = renameReservedPathsOnUpgrade(
590          setOwnerOp.src, logVersion);
591      final INodesInPath iip = fsDir.getINodesInPath(src, DirOp.WRITE);
592      FSDirAttrOp.unprotectedSetOwner(fsDir, iip,
593          setOwnerOp.username, setOwnerOp.groupname);
594      break;
595    }
596    case OP_SET_NS_QUOTA: {
597      SetNSQuotaOp setNSQuotaOp = (SetNSQuotaOp)op;
598      final String src = renameReservedPathsOnUpgrade(
599          setNSQuotaOp.src, logVersion);
600      final INodesInPath iip = fsDir.getINodesInPath(src, DirOp.WRITE);
601      FSDirAttrOp.unprotectedSetQuota(fsDir, iip,
602          setNSQuotaOp.nsQuota, HdfsConstants.QUOTA_DONT_SET, null);
603      break;
604    }
605    case OP_CLEAR_NS_QUOTA: {
606      ClearNSQuotaOp clearNSQuotaOp = (ClearNSQuotaOp)op;
607      final String src = renameReservedPathsOnUpgrade(
608          clearNSQuotaOp.src, logVersion);
609      final INodesInPath iip = fsDir.getINodesInPath(src, DirOp.WRITE);
610      FSDirAttrOp.unprotectedSetQuota(fsDir, iip,
611          HdfsConstants.QUOTA_RESET, HdfsConstants.QUOTA_DONT_SET, null);
612      break;
613    }
614    case OP_SET_QUOTA: {
615      SetQuotaOp setQuotaOp = (SetQuotaOp) op;
616      final String src = renameReservedPathsOnUpgrade(
617          setQuotaOp.src, logVersion);
618      final INodesInPath iip = fsDir.getINodesInPath(src, DirOp.WRITE);
619      FSDirAttrOp.unprotectedSetQuota(fsDir, iip,
620          setQuotaOp.nsQuota, setQuotaOp.dsQuota, null);
621      break;
622    }
623    case OP_SET_QUOTA_BY_STORAGETYPE: {
624      FSEditLogOp.SetQuotaByStorageTypeOp setQuotaByStorageTypeOp =
625          (FSEditLogOp.SetQuotaByStorageTypeOp) op;
626      final String src = renameReservedPathsOnUpgrade(
627          setQuotaByStorageTypeOp.src, logVersion);
628      final INodesInPath iip = fsDir.getINodesInPath(src, DirOp.WRITE);
629      FSDirAttrOp.unprotectedSetQuota(fsDir, iip,
630          HdfsConstants.QUOTA_DONT_SET, setQuotaByStorageTypeOp.dsQuota,
631          setQuotaByStorageTypeOp.type);
632      break;
633    }
634    case OP_TIMES: {
635      TimesOp timesOp = (TimesOp)op;
636      final String src = renameReservedPathsOnUpgrade(
637          timesOp.path, logVersion);
638      final INodesInPath iip = fsDir.getINodesInPath(src, DirOp.WRITE);
639      FSDirAttrOp.unprotectedSetTimes(fsDir, iip,
640          timesOp.mtime, timesOp.atime, true);
641      break;
642    }
643    case OP_SYMLINK: {
644      if (!FileSystem.areSymlinksEnabled()) {
645        throw new IOException("Symlinks not supported - please remove symlink before upgrading to this version of HDFS");
646      }
647      SymlinkOp symlinkOp = (SymlinkOp)op;
648      inodeId = getAndUpdateLastInodeId(symlinkOp.inodeId, logVersion,
649          lastInodeId);
650      final String path = renameReservedPathsOnUpgrade(symlinkOp.path,
651          logVersion);
652      final INodesInPath iip = fsDir.getINodesInPath(path, DirOp.WRITE_LINK);
653      FSDirSymlinkOp.unprotectedAddSymlink(fsDir, iip.getExistingINodes(),
654          iip.getLastLocalName(), inodeId, symlinkOp.value, symlinkOp.mtime,
655          symlinkOp.atime, symlinkOp.permissionStatus);
656      
657      if (toAddRetryCache) {
658        fsNamesys.addCacheEntry(symlinkOp.rpcClientId, symlinkOp.rpcCallId);
659      }
660      break;
661    }
662    case OP_RENAME: {
663      RenameOp renameOp = (RenameOp)op;
664      FSDirRenameOp.renameForEditLog(fsDir,
665          renameReservedPathsOnUpgrade(renameOp.src, logVersion),
666          renameReservedPathsOnUpgrade(renameOp.dst, logVersion),
667          renameOp.timestamp, renameOp.options);
668      
669      if (toAddRetryCache) {
670        fsNamesys.addCacheEntry(renameOp.rpcClientId, renameOp.rpcCallId);
671      }
672      break;
673    }
674    case OP_GET_DELEGATION_TOKEN: {
675      GetDelegationTokenOp getDelegationTokenOp
676        = (GetDelegationTokenOp)op;
677
678      fsNamesys.getDelegationTokenSecretManager()
679        .addPersistedDelegationToken(getDelegationTokenOp.token,
680                                     getDelegationTokenOp.expiryTime);
681      break;
682    }
683    case OP_RENEW_DELEGATION_TOKEN: {
684      RenewDelegationTokenOp renewDelegationTokenOp
685        = (RenewDelegationTokenOp)op;
686      fsNamesys.getDelegationTokenSecretManager()
687        .updatePersistedTokenRenewal(renewDelegationTokenOp.token,
688                                     renewDelegationTokenOp.expiryTime);
689      break;
690    }
691    case OP_CANCEL_DELEGATION_TOKEN: {
692      CancelDelegationTokenOp cancelDelegationTokenOp
693        = (CancelDelegationTokenOp)op;
694      fsNamesys.getDelegationTokenSecretManager()
695          .updatePersistedTokenCancellation(
696              cancelDelegationTokenOp.token);
697      break;
698    }
699    case OP_UPDATE_MASTER_KEY: {
700      UpdateMasterKeyOp updateMasterKeyOp = (UpdateMasterKeyOp)op;
701      fsNamesys.getDelegationTokenSecretManager()
702        .updatePersistedMasterKey(updateMasterKeyOp.key);
703      break;
704    }
705    case OP_REASSIGN_LEASE: {
706      ReassignLeaseOp reassignLeaseOp = (ReassignLeaseOp)op;
707
708      Lease lease = fsNamesys.leaseManager.getLease(
709          reassignLeaseOp.leaseHolder);
710      final String path =
711          renameReservedPathsOnUpgrade(reassignLeaseOp.path, logVersion);
712      INodeFile pendingFile = fsDir.getINode(path, DirOp.READ).asFile();
713      Preconditions.checkState(pendingFile.isUnderConstruction());
714      fsNamesys.reassignLeaseInternal(lease, reassignLeaseOp.newHolder,
715              pendingFile);
716      break;
717    }
718    case OP_START_LOG_SEGMENT:
719    case OP_END_LOG_SEGMENT: {
720      // no data in here currently.
721      break;
722    }
723    case OP_CREATE_SNAPSHOT: {
724      CreateSnapshotOp createSnapshotOp = (CreateSnapshotOp) op;
725      final String snapshotRoot =
726          renameReservedPathsOnUpgrade(createSnapshotOp.snapshotRoot,
727              logVersion);
728      INodesInPath iip = fsDir.getINodesInPath(snapshotRoot, DirOp.WRITE);
729      String path = fsNamesys.getSnapshotManager().createSnapshot(iip,
730          snapshotRoot, createSnapshotOp.snapshotName);
731      if (toAddRetryCache) {
732        fsNamesys.addCacheEntryWithPayload(createSnapshotOp.rpcClientId,
733            createSnapshotOp.rpcCallId, path);
734      }
735      break;
736    }
737    case OP_DELETE_SNAPSHOT: {
738      DeleteSnapshotOp deleteSnapshotOp = (DeleteSnapshotOp) op;
739      BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
740      List<INode> removedINodes = new ChunkedArrayList<INode>();
741      final String snapshotRoot =
742          renameReservedPathsOnUpgrade(deleteSnapshotOp.snapshotRoot,
743              logVersion);
744      INodesInPath iip = fsDir.getINodesInPath(snapshotRoot, DirOp.WRITE);
745      fsNamesys.getSnapshotManager().deleteSnapshot(iip,
746          deleteSnapshotOp.snapshotName,
747          new INode.ReclaimContext(fsNamesys.dir.getBlockStoragePolicySuite(),
748              collectedBlocks, removedINodes, null));
749      fsNamesys.removeBlocksAndUpdateSafemodeTotal(collectedBlocks);
750      collectedBlocks.clear();
751      fsNamesys.dir.removeFromInodeMap(removedINodes);
752      removedINodes.clear();
753
754      if (toAddRetryCache) {
755        fsNamesys.addCacheEntry(deleteSnapshotOp.rpcClientId,
756            deleteSnapshotOp.rpcCallId);
757      }
758      break;
759    }
760    case OP_RENAME_SNAPSHOT: {
761      RenameSnapshotOp renameSnapshotOp = (RenameSnapshotOp) op;
762      final String snapshotRoot =
763          renameReservedPathsOnUpgrade(renameSnapshotOp.snapshotRoot,
764              logVersion);
765      INodesInPath iip = fsDir.getINodesInPath(snapshotRoot, DirOp.WRITE);
766      fsNamesys.getSnapshotManager().renameSnapshot(iip,
767          snapshotRoot, renameSnapshotOp.snapshotOldName,
768          renameSnapshotOp.snapshotNewName);
769      
770      if (toAddRetryCache) {
771        fsNamesys.addCacheEntry(renameSnapshotOp.rpcClientId,
772            renameSnapshotOp.rpcCallId);
773      }
774      break;
775    }
776    case OP_ALLOW_SNAPSHOT: {
777      AllowSnapshotOp allowSnapshotOp = (AllowSnapshotOp) op;
778      final String snapshotRoot =
779          renameReservedPathsOnUpgrade(allowSnapshotOp.snapshotRoot, logVersion);
780      fsNamesys.getSnapshotManager().setSnapshottable(
781          snapshotRoot, false);
782      break;
783    }
784    case OP_DISALLOW_SNAPSHOT: {
785      DisallowSnapshotOp disallowSnapshotOp = (DisallowSnapshotOp) op;
786      final String snapshotRoot =
787          renameReservedPathsOnUpgrade(disallowSnapshotOp.snapshotRoot,
788              logVersion);
789      fsNamesys.getSnapshotManager().resetSnapshottable(
790          snapshotRoot);
791      break;
792    }
793    case OP_SET_GENSTAMP_V2: {
794      SetGenstampV2Op setGenstampV2Op = (SetGenstampV2Op) op;
795      fsNamesys.getBlockIdManager().setGenerationStampV2(
796          setGenstampV2Op.genStampV2);
797      break;
798    }
799    case OP_ALLOCATE_BLOCK_ID: {
800      AllocateBlockIdOp allocateBlockIdOp = (AllocateBlockIdOp) op;
801      fsNamesys.getBlockIdManager().setLastAllocatedBlockId(
802          allocateBlockIdOp.blockId);
803      break;
804    }
805    case OP_ROLLING_UPGRADE_START: {
806      if (startOpt == StartupOption.ROLLINGUPGRADE) {
807        final RollingUpgradeStartupOption rollingUpgradeOpt
808            = startOpt.getRollingUpgradeStartupOption(); 
809        if (rollingUpgradeOpt == RollingUpgradeStartupOption.ROLLBACK) {
810          throw new RollingUpgradeOp.RollbackException();
811        } else if (rollingUpgradeOpt == RollingUpgradeStartupOption.DOWNGRADE) {
812          //ignore upgrade marker
813          break;
814        }
815      }
816      // start rolling upgrade
817      final long startTime = ((RollingUpgradeOp) op).getTime();
818      fsNamesys.startRollingUpgradeInternal(startTime);
819      fsNamesys.triggerRollbackCheckpoint();
820      break;
821    }
822    case OP_ROLLING_UPGRADE_FINALIZE: {
823      final long finalizeTime = ((RollingUpgradeOp) op).getTime();
824      if (fsNamesys.isRollingUpgrade()) {
825        // Only do it when NN is actually doing rolling upgrade.
826        // We can get FINALIZE without corresponding START, if NN is restarted
827        // before this op is consumed and a new checkpoint is created.
828        fsNamesys.finalizeRollingUpgradeInternal(finalizeTime);
829      }
830      fsNamesys.getFSImage().updateStorageVersion();
831      fsNamesys.getFSImage().renameCheckpoint(NameNodeFile.IMAGE_ROLLBACK,
832          NameNodeFile.IMAGE);
833      break;
834    }
835    case OP_ADD_CACHE_DIRECTIVE: {
836      AddCacheDirectiveInfoOp addOp = (AddCacheDirectiveInfoOp) op;
837      CacheDirectiveInfo result = fsNamesys.
838          getCacheManager().addDirectiveFromEditLog(addOp.directive);
839      if (toAddRetryCache) {
840        Long id = result.getId();
841        fsNamesys.addCacheEntryWithPayload(op.rpcClientId, op.rpcCallId, id);
842      }
843      break;
844    }
845    case OP_MODIFY_CACHE_DIRECTIVE: {
846      ModifyCacheDirectiveInfoOp modifyOp =
847          (ModifyCacheDirectiveInfoOp) op;
848      fsNamesys.getCacheManager().modifyDirectiveFromEditLog(
849          modifyOp.directive);
850      if (toAddRetryCache) {
851        fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId);
852      }
853      break;
854    }
855    case OP_REMOVE_CACHE_DIRECTIVE: {
856      RemoveCacheDirectiveInfoOp removeOp =
857          (RemoveCacheDirectiveInfoOp) op;
858      fsNamesys.getCacheManager().removeDirective(removeOp.id, null);
859      if (toAddRetryCache) {
860        fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId);
861      }
862      break;
863    }
864    case OP_ADD_CACHE_POOL: {
865      AddCachePoolOp addOp = (AddCachePoolOp) op;
866      fsNamesys.getCacheManager().addCachePool(addOp.info);
867      if (toAddRetryCache) {
868        fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId);
869      }
870      break;
871    }
872    case OP_MODIFY_CACHE_POOL: {
873      ModifyCachePoolOp modifyOp = (ModifyCachePoolOp) op;
874      fsNamesys.getCacheManager().modifyCachePool(modifyOp.info);
875      if (toAddRetryCache) {
876        fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId);
877      }
878      break;
879    }
880    case OP_REMOVE_CACHE_POOL: {
881      RemoveCachePoolOp removeOp = (RemoveCachePoolOp) op;
882      fsNamesys.getCacheManager().removeCachePool(removeOp.poolName);
883      if (toAddRetryCache) {
884        fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId);
885      }
886      break;
887    }
888    case OP_SET_ACL: {
889      SetAclOp setAclOp = (SetAclOp) op;
890      INodesInPath iip = fsDir.getINodesInPath(setAclOp.src, DirOp.WRITE);
891      FSDirAclOp.unprotectedSetAcl(fsDir, iip, setAclOp.aclEntries, true);
892      break;
893    }
894    case OP_SET_XATTR: {
895      SetXAttrOp setXAttrOp = (SetXAttrOp) op;
896      INodesInPath iip = fsDir.getINodesInPath(setXAttrOp.src, DirOp.WRITE);
897      FSDirXAttrOp.unprotectedSetXAttrs(fsDir, iip,
898                                        setXAttrOp.xAttrs,
899                                        EnumSet.of(XAttrSetFlag.CREATE,
900                                                   XAttrSetFlag.REPLACE));
901      if (toAddRetryCache) {
902        fsNamesys.addCacheEntry(setXAttrOp.rpcClientId, setXAttrOp.rpcCallId);
903      }
904      break;
905    }
906    case OP_REMOVE_XATTR: {
907      RemoveXAttrOp removeXAttrOp = (RemoveXAttrOp) op;
908      FSDirXAttrOp.unprotectedRemoveXAttrs(fsDir, removeXAttrOp.src,
909                                           removeXAttrOp.xAttrs);
910      if (toAddRetryCache) {
911        fsNamesys.addCacheEntry(removeXAttrOp.rpcClientId,
912            removeXAttrOp.rpcCallId);
913      }
914      break;
915    }
916    case OP_TRUNCATE: {
917      TruncateOp truncateOp = (TruncateOp) op;
918      INodesInPath iip = fsDir.getINodesInPath(truncateOp.src, DirOp.WRITE);
919      FSDirTruncateOp.unprotectedTruncate(fsNamesys, iip,
920          truncateOp.clientName, truncateOp.clientMachine,
921          truncateOp.newLength, truncateOp.timestamp, truncateOp.truncateBlock);
922      break;
923    }
924    case OP_SET_STORAGE_POLICY: {
925      SetStoragePolicyOp setStoragePolicyOp = (SetStoragePolicyOp) op;
926      final String path = renameReservedPathsOnUpgrade(setStoragePolicyOp.path,
927          logVersion);
928      final INodesInPath iip = fsDir.getINodesInPath(path, DirOp.WRITE);
929      FSDirAttrOp.unprotectedSetStoragePolicy(
930          fsDir, fsNamesys.getBlockManager(), iip,
931          setStoragePolicyOp.policyId);
932      break;
933    }
934    default:
935      throw new IOException("Invalid operation read " + op.opCode);
936    }
937    return inodeId;
938  }
939  
940  private static String formatEditLogReplayError(EditLogInputStream in,
941      long recentOpcodeOffsets[], long txid) {
942    StringBuilder sb = new StringBuilder();
943    sb.append("Error replaying edit log at offset " + in.getPosition());
944    sb.append(".  Expected transaction ID was ").append(txid);
945    if (recentOpcodeOffsets[0] != -1) {
946      Arrays.sort(recentOpcodeOffsets);
947      sb.append("\nRecent opcode offsets:");
948      for (long offset : recentOpcodeOffsets) {
949        if (offset != -1) {
950          sb.append(' ').append(offset);
951        }
952      }
953    }
954    return sb.toString();
955  }
956
957  /**
958   * Add a new block into the given INodeFile
959   */
960  private void addNewBlock(AddBlockOp op, INodeFile file)
961      throws IOException {
962    BlockInfo[] oldBlocks = file.getBlocks();
963    Block pBlock = op.getPenultimateBlock();
964    Block newBlock= op.getLastBlock();
965    
966    if (pBlock != null) { // the penultimate block is not null
967      Preconditions.checkState(oldBlocks != null && oldBlocks.length > 0);
968      // compare pBlock with the last block of oldBlocks
969      BlockInfo oldLastBlock = oldBlocks[oldBlocks.length - 1];
970      if (oldLastBlock.getBlockId() != pBlock.getBlockId()
971          || oldLastBlock.getGenerationStamp() != pBlock.getGenerationStamp()) {
972        throw new IOException(
973            "Mismatched block IDs or generation stamps for the old last block of file "
974                + op.getPath() + ", the old last block is " + oldLastBlock
975                + ", and the block read from editlog is " + pBlock);
976      }
977      
978      oldLastBlock.setNumBytes(pBlock.getNumBytes());
979      if (!oldLastBlock.isComplete()) {
980        fsNamesys.getBlockManager().forceCompleteBlock(oldLastBlock);
981        fsNamesys.getBlockManager().processQueuedMessagesForBlock(pBlock);
982      }
983    } else { // the penultimate block is null
984      Preconditions.checkState(oldBlocks == null || oldBlocks.length == 0);
985    }
986    // add the new block
987    BlockInfo newBI = new BlockInfoContiguous(newBlock,
988        file.getPreferredBlockReplication());
989    newBI.convertToBlockUnderConstruction(
990        HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, null);
991    fsNamesys.getBlockManager().addBlockCollection(newBI, file);
992    file.addBlock(newBI);
993    fsNamesys.getBlockManager().processQueuedMessagesForBlock(newBlock);
994  }
995  
996  /**
997   * Update in-memory data structures with new block information.
998   * @throws IOException
999   */
1000  private void updateBlocks(FSDirectory fsDir, BlockListUpdatingOp op,
1001      INodesInPath iip, INodeFile file) throws IOException {
1002    // Update its block list
1003    BlockInfo[] oldBlocks = file.getBlocks();
1004    Block[] newBlocks = op.getBlocks();
1005    String path = op.getPath();
1006    
1007    // Are we only updating the last block's gen stamp.
1008    boolean isGenStampUpdate = oldBlocks.length == newBlocks.length;
1009    
1010    // First, update blocks in common
1011    for (int i = 0; i < oldBlocks.length && i < newBlocks.length; i++) {
1012      BlockInfo oldBlock = oldBlocks[i];
1013      Block newBlock = newBlocks[i];
1014      
1015      boolean isLastBlock = i == newBlocks.length - 1;
1016      if (oldBlock.getBlockId() != newBlock.getBlockId() ||
1017          (oldBlock.getGenerationStamp() != newBlock.getGenerationStamp() && 
1018              !(isGenStampUpdate && isLastBlock))) {
1019        throw new IOException("Mismatched block IDs or generation stamps, " +
1020            "attempting to replace block " + oldBlock + " with " + newBlock +
1021            " as block # " + i + "/" + newBlocks.length + " of " +
1022            path);
1023      }
1024      
1025      oldBlock.setNumBytes(newBlock.getNumBytes());
1026      boolean changeMade =
1027        oldBlock.getGenerationStamp() != newBlock.getGenerationStamp();
1028      oldBlock.setGenerationStamp(newBlock.getGenerationStamp());
1029      
1030      if (!oldBlock.isComplete() &&
1031          (!isLastBlock || op.shouldCompleteLastBlock())) {
1032        changeMade = true;
1033        fsNamesys.getBlockManager().forceCompleteBlock(oldBlock);
1034      }
1035      if (changeMade) {
1036        // The state or gen-stamp of the block has changed. So, we may be
1037        // able to process some messages from datanodes that we previously
1038        // were unable to process.
1039        fsNamesys.getBlockManager().processQueuedMessagesForBlock(newBlock);
1040      }
1041    }
1042    
1043    if (newBlocks.length < oldBlocks.length) {
1044      // We're removing a block from the file, e.g. abandonBlock(...)
1045      if (!file.isUnderConstruction()) {
1046        throw new IOException("Trying to remove a block from file " +
1047            path + " which is not under construction.");
1048      }
1049      if (newBlocks.length != oldBlocks.length - 1) {
1050        throw new IOException("Trying to remove more than one block from file "
1051            + path);
1052      }
1053      Block oldBlock = oldBlocks[oldBlocks.length - 1];
1054      boolean removed = FSDirWriteFileOp.unprotectedRemoveBlock(
1055          fsDir, path, iip, file, oldBlock);
1056      if (!removed && !(op instanceof UpdateBlocksOp)) {
1057        throw new IOException("Trying to delete non-existant block " + oldBlock);
1058      }
1059    } else if (newBlocks.length > oldBlocks.length) {
1060      // We're adding blocks
1061      for (int i = oldBlocks.length; i < newBlocks.length; i++) {
1062        Block newBlock = newBlocks[i];
1063        BlockInfo newBI;
1064        if (!op.shouldCompleteLastBlock()) {
1065          // TODO: shouldn't this only be true for the last block?
1066          // what about an old-version fsync() where fsync isn't called
1067          // until several blocks in?
1068          newBI = new BlockInfoContiguous(newBlock,
1069              file.getPreferredBlockReplication());
1070          newBI.convertToBlockUnderConstruction(
1071              HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, null);
1072        } else {
1073          // OP_CLOSE should add finalized blocks. This code path
1074          // is only executed when loading edits written by prior
1075          // versions of Hadoop. Current versions always log
1076          // OP_ADD operations as each block is allocated.
1077          newBI = new BlockInfoContiguous(newBlock,
1078              file.getFileReplication());
1079        }
1080        fsNamesys.getBlockManager().addBlockCollection(newBI, file);
1081        file.addBlock(newBI);
1082        fsNamesys.getBlockManager().processQueuedMessagesForBlock(newBlock);
1083      }
1084    }
1085  }
1086
1087  private static void dumpOpCounts(
1088      EnumMap<FSEditLogOpCodes, Holder<Integer>> opCounts) {
1089    StringBuilder sb = new StringBuilder();
1090    sb.append("Summary of operations loaded from edit log:\n  ");
1091    Joiner.on("\n  ").withKeyValueSeparator("=").appendTo(sb, opCounts);
1092    FSImage.LOG.debug(sb.toString());
1093  }
1094
1095  private void incrOpCount(FSEditLogOpCodes opCode,
1096      EnumMap<FSEditLogOpCodes, Holder<Integer>> opCounts, Step step,
1097      Counter counter) {
1098    Holder<Integer> holder = opCounts.get(opCode);
1099    if (holder == null) {
1100      holder = new Holder<Integer>(1);
1101      opCounts.put(opCode, holder);
1102    } else {
1103      holder.held++;
1104    }
1105    counter.increment();
1106  }
1107
1108  /**
1109   * Throw appropriate exception during upgrade from 203, when editlog loading
1110   * could fail due to opcode conflicts.
1111   */
1112  private void check203UpgradeFailure(int logVersion, Throwable e)
1113      throws IOException {
1114    // 0.20.203 version version has conflicting opcodes with the later releases.
1115    // The editlog must be emptied by restarting the namenode, before proceeding
1116    // with the upgrade.
1117    if (Storage.is203LayoutVersion(logVersion)
1118        && logVersion != HdfsServerConstants.NAMENODE_LAYOUT_VERSION) {
1119      String msg = "During upgrade failed to load the editlog version "
1120          + logVersion + " from release 0.20.203. Please go back to the old "
1121          + " release and restart the namenode. This empties the editlog "
1122          + " and saves the namespace. Resume the upgrade after this step.";
1123      throw new IOException(msg, e);
1124    }
1125  }
1126  
1127  /**
1128   * Find the last valid transaction ID in the stream.
1129   * If there are invalid or corrupt transactions in the middle of the stream,
1130   * scanEditLog will skip over them.
1131   * This reads through the stream but does not close it.
1132   *
1133   * @param maxTxIdToScan Maximum Tx ID to try to scan.
1134   *                      The scan returns after reading this or a higher ID.
1135   *                      The file portion beyond this ID is potentially being
1136   *                      updated.
1137   */
1138  static EditLogValidation scanEditLog(EditLogInputStream in,
1139      long maxTxIdToScan) {
1140    long lastPos;
1141    long lastTxId = HdfsServerConstants.INVALID_TXID;
1142    long numValid = 0;
1143    while (true) {
1144      long txid;
1145      lastPos = in.getPosition();
1146      try {
1147        if ((txid = in.scanNextOp()) == HdfsServerConstants.INVALID_TXID) {
1148          break;
1149        }
1150      } catch (Throwable t) {
1151        FSImage.LOG.warn("Caught exception after scanning through "
1152            + numValid + " ops from " + in
1153            + " while determining its valid length. Position was "
1154            + lastPos, t);
1155        in.resync();
1156        FSImage.LOG.warn("After resync, position is " + in.getPosition());
1157        continue;
1158      }
1159      if (lastTxId == HdfsServerConstants.INVALID_TXID || txid > lastTxId) {
1160        lastTxId = txid;
1161      }
1162      if (lastTxId >= maxTxIdToScan) {
1163        break;
1164      }
1165      numValid++;
1166    }
1167    return new EditLogValidation(lastPos, lastTxId, false);
1168  }
1169
1170  static class EditLogValidation {
1171    private final long validLength;
1172    private final long endTxId;
1173    private final boolean hasCorruptHeader;
1174
1175    EditLogValidation(long validLength, long endTxId,
1176        boolean hasCorruptHeader) {
1177      this.validLength = validLength;
1178      this.endTxId = endTxId;
1179      this.hasCorruptHeader = hasCorruptHeader;
1180    }
1181
1182    long getValidLength() { return validLength; }
1183
1184    long getEndTxId() { return endTxId; }
1185
1186    boolean hasCorruptHeader() { return hasCorruptHeader; }
1187  }
1188
1189  /**
1190   * Stream wrapper that keeps track of the current stream position.
1191   * 
1192   * This stream also allows us to set a limit on how many bytes we can read
1193   * without getting an exception.
1194   */
1195  public static class PositionTrackingInputStream extends FilterInputStream
1196      implements StreamLimiter {
1197    private long curPos = 0;
1198    private long markPos = -1;
1199    private long limitPos = Long.MAX_VALUE;
1200
1201    public PositionTrackingInputStream(InputStream is) {
1202      super(is);
1203    }
1204
1205    private void checkLimit(long amt) throws IOException {
1206      long extra = (curPos + amt) - limitPos;
1207      if (extra > 0) {
1208        throw new IOException("Tried to read " + amt + " byte(s) past " +
1209            "the limit at offset " + limitPos);
1210      }
1211    }
1212    
1213    @Override
1214    public int read() throws IOException {
1215      checkLimit(1);
1216      int ret = super.read();
1217      if (ret != -1) curPos++;
1218      return ret;
1219    }
1220
1221    @Override
1222    public int read(byte[] data) throws IOException {
1223      checkLimit(data.length);
1224      int ret = super.read(data);
1225      if (ret > 0) curPos += ret;
1226      return ret;
1227    }
1228
1229    @Override
1230    public int read(byte[] data, int offset, int length) throws IOException {
1231      checkLimit(length);
1232      int ret = super.read(data, offset, length);
1233      if (ret > 0) curPos += ret;
1234      return ret;
1235    }
1236
1237    @Override
1238    public void setLimit(long limit) {
1239      limitPos = curPos + limit;
1240    }
1241
1242    @Override
1243    public void clearLimit() {
1244      limitPos = Long.MAX_VALUE;
1245    }
1246
1247    @Override
1248    public void mark(int limit) {
1249      super.mark(limit);
1250      markPos = curPos;
1251    }
1252
1253    @Override
1254    public void reset() throws IOException {
1255      if (markPos == -1) {
1256        throw new IOException("Not marked!");
1257      }
1258      super.reset();
1259      curPos = markPos;
1260      markPos = -1;
1261    }
1262
1263    public long getPos() {
1264      return curPos;
1265    }
1266    
1267    @Override
1268    public long skip(long amt) throws IOException {
1269      long extra = (curPos + amt) - limitPos;
1270      if (extra > 0) {
1271        throw new IOException("Tried to skip " + extra + " bytes past " +
1272            "the limit at offset " + limitPos);
1273      }
1274      long ret = super.skip(amt);
1275      curPos += ret;
1276      return ret;
1277    }
1278  }
1279
1280  public long getLastAppliedTxId() {
1281    return lastAppliedTxId;
1282  }
1283
1284  /**
1285   * Creates a Step used for updating startup progress, populated with
1286   * information from the given edits.  The step always includes the log's name.
1287   * If the log has a known length, then the length is included in the step too.
1288   * 
1289   * @param edits EditLogInputStream to use for populating step
1290   * @return Step populated with information from edits
1291   * @throws IOException thrown if there is an I/O error
1292   */
1293  private static Step createStartupProgressStep(EditLogInputStream edits)
1294      throws IOException {
1295    long length = edits.length();
1296    String name = edits.getCurrentStreamName();
1297    return length != -1 ? new Step(name, length) : new Step(name);
1298  }
1299}