001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hdfs.server.namenode; 019 020import static org.apache.hadoop.hdfs.server.namenode.FSImageFormat.renameReservedPathsOnUpgrade; 021import static org.apache.hadoop.util.Time.monotonicNow; 022 023import java.io.FilterInputStream; 024import java.io.IOException; 025import java.io.InputStream; 026import java.util.Arrays; 027import java.util.EnumMap; 028import java.util.EnumSet; 029import java.util.List; 030 031import org.apache.commons.logging.Log; 032import org.apache.commons.logging.LogFactory; 033import org.apache.hadoop.classification.InterfaceAudience; 034import org.apache.hadoop.classification.InterfaceStability; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.XAttrSetFlag; 037import org.apache.hadoop.hdfs.protocol.HdfsConstants; 038import org.apache.hadoop.hdfs.protocol.LocatedBlock; 039import org.apache.hadoop.hdfs.protocol.Block; 040import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; 041import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; 042import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus; 043import org.apache.hadoop.hdfs.protocol.LayoutVersion; 044import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; 045import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; 046import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; 047import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption; 048import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; 049import org.apache.hadoop.hdfs.server.common.Storage; 050import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp; 051import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddBlockOp; 052import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCacheDirectiveInfoOp; 053import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCachePoolOp; 054import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCloseOp; 055import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllocateBlockIdOp; 056import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllowSnapshotOp; 057import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AppendOp; 058import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.BlockListUpdatingOp; 059import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.CancelDelegationTokenOp; 060import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ClearNSQuotaOp; 061import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ConcatDeleteOp; 062import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.CreateSnapshotOp; 063import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.DeleteOp; 064import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.DeleteSnapshotOp; 065import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.DisallowSnapshotOp; 066import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.GetDelegationTokenOp; 067import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp; 068import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ModifyCacheDirectiveInfoOp; 069import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ModifyCachePoolOp; 070import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ReassignLeaseOp; 071import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveCacheDirectiveInfoOp; 072import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveCachePoolOp; 073import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveXAttrOp; 074import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOldOp; 075import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOp; 076import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameSnapshotOp; 077import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenewDelegationTokenOp; 078import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetAclOp; 079import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RollingUpgradeOp; 080import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV1Op; 081import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV2Op; 082import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetNSQuotaOp; 083import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetOwnerOp; 084import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetPermissionsOp; 085import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetQuotaOp; 086import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetReplicationOp; 087import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetStoragePolicyOp; 088import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetXAttrOp; 089import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SymlinkOp; 090import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp; 091import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TruncateOp; 092import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateBlocksOp; 093import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateMasterKeyOp; 094import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo; 095import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease; 096import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile; 097import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; 098import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase; 099import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress; 100import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Counter; 101import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step; 102import org.apache.hadoop.hdfs.util.Holder; 103import org.apache.hadoop.util.ChunkedArrayList; 104 105import com.google.common.base.Joiner; 106import com.google.common.base.Preconditions; 107 108@InterfaceAudience.Private 109@InterfaceStability.Evolving 110public class FSEditLogLoader { 111 static final Log LOG = LogFactory.getLog(FSEditLogLoader.class.getName()); 112 static final long REPLAY_TRANSACTION_LOG_INTERVAL = 1000; // 1sec 113 114 private final FSNamesystem fsNamesys; 115 private long lastAppliedTxId; 116 /** Total number of end transactions loaded. */ 117 private int totalEdits = 0; 118 119 public FSEditLogLoader(FSNamesystem fsNamesys, long lastAppliedTxId) { 120 this.fsNamesys = fsNamesys; 121 this.lastAppliedTxId = lastAppliedTxId; 122 } 123 124 long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId) 125 throws IOException { 126 return loadFSEdits(edits, expectedStartingTxId, null, null); 127 } 128 129 /** 130 * Load an edit log, and apply the changes to the in-memory structure 131 * This is where we apply edits that we've been writing to disk all 132 * along. 133 */ 134 long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId, 135 StartupOption startOpt, MetaRecoveryContext recovery) throws IOException { 136 StartupProgress prog = NameNode.getStartupProgress(); 137 Step step = createStartupProgressStep(edits); 138 prog.beginStep(Phase.LOADING_EDITS, step); 139 fsNamesys.writeLock(); 140 try { 141 long startTime = monotonicNow(); 142 FSImage.LOG.info("Start loading edits file " + edits.getName()); 143 long numEdits = loadEditRecords(edits, false, expectedStartingTxId, 144 startOpt, recovery); 145 FSImage.LOG.info("Edits file " + edits.getName() 146 + " of size " + edits.length() + " edits # " + numEdits 147 + " loaded in " + (monotonicNow()-startTime)/1000 + " seconds"); 148 return numEdits; 149 } finally { 150 edits.close(); 151 fsNamesys.writeUnlock("loadFSEdits"); 152 prog.endStep(Phase.LOADING_EDITS, step); 153 } 154 } 155 156 long loadEditRecords(EditLogInputStream in, boolean closeOnExit, 157 long expectedStartingTxId, StartupOption startOpt, 158 MetaRecoveryContext recovery) throws IOException { 159 FSDirectory fsDir = fsNamesys.dir; 160 161 EnumMap<FSEditLogOpCodes, Holder<Integer>> opCounts = 162 new EnumMap<FSEditLogOpCodes, Holder<Integer>>(FSEditLogOpCodes.class); 163 164 if (LOG.isTraceEnabled()) { 165 LOG.trace("Acquiring write lock to replay edit log"); 166 } 167 168 fsNamesys.writeLock(); 169 fsDir.writeLock(); 170 171 long recentOpcodeOffsets[] = new long[4]; 172 Arrays.fill(recentOpcodeOffsets, -1); 173 174 long expectedTxId = expectedStartingTxId; 175 long numEdits = 0; 176 long lastTxId = in.getLastTxId(); 177 long numTxns = (lastTxId - expectedStartingTxId) + 1; 178 StartupProgress prog = NameNode.getStartupProgress(); 179 Step step = createStartupProgressStep(in); 180 prog.setTotal(Phase.LOADING_EDITS, step, numTxns); 181 Counter counter = prog.getCounter(Phase.LOADING_EDITS, step); 182 long lastLogTime = monotonicNow(); 183 long lastInodeId = fsNamesys.dir.getLastInodeId(); 184 185 try { 186 while (true) { 187 try { 188 FSEditLogOp op; 189 try { 190 op = in.readOp(); 191 if (op == null) { 192 break; 193 } 194 } catch (Throwable e) { 195 // Handle a problem with our input 196 check203UpgradeFailure(in.getVersion(true), e); 197 String errorMessage = 198 formatEditLogReplayError(in, recentOpcodeOffsets, expectedTxId); 199 FSImage.LOG.error(errorMessage, e); 200 if (recovery == null) { 201 // We will only try to skip over problematic opcodes when in 202 // recovery mode. 203 throw new EditLogInputException(errorMessage, e, numEdits); 204 } 205 MetaRecoveryContext.editLogLoaderPrompt( 206 "We failed to read txId " + expectedTxId, 207 recovery, "skipping the bad section in the log"); 208 in.resync(); 209 continue; 210 } 211 recentOpcodeOffsets[(int)(numEdits % recentOpcodeOffsets.length)] = 212 in.getPosition(); 213 if (op.hasTransactionId()) { 214 if (op.getTransactionId() > expectedTxId) { 215 MetaRecoveryContext.editLogLoaderPrompt("There appears " + 216 "to be a gap in the edit log. We expected txid " + 217 expectedTxId + ", but got txid " + 218 op.getTransactionId() + ".", recovery, "ignoring missing " + 219 " transaction IDs"); 220 } else if (op.getTransactionId() < expectedTxId) { 221 MetaRecoveryContext.editLogLoaderPrompt("There appears " + 222 "to be an out-of-order edit in the edit log. We " + 223 "expected txid " + expectedTxId + ", but got txid " + 224 op.getTransactionId() + ".", recovery, 225 "skipping the out-of-order edit"); 226 continue; 227 } 228 } 229 try { 230 if (LOG.isTraceEnabled()) { 231 LOG.trace("op=" + op + ", startOpt=" + startOpt 232 + ", numEdits=" + numEdits + ", totalEdits=" + totalEdits); 233 } 234 long inodeId = applyEditLogOp(op, fsDir, startOpt, 235 in.getVersion(true), lastInodeId); 236 if (lastInodeId < inodeId) { 237 lastInodeId = inodeId; 238 } 239 } catch (RollingUpgradeOp.RollbackException e) { 240 throw e; 241 } catch (Throwable e) { 242 LOG.error("Encountered exception on operation " + op, e); 243 if (recovery == null) { 244 throw e instanceof IOException? (IOException)e: new IOException(e); 245 } 246 247 MetaRecoveryContext.editLogLoaderPrompt("Failed to " + 248 "apply edit log operation " + op + ": error " + 249 e.getMessage(), recovery, "applying edits"); 250 } 251 // Now that the operation has been successfully decoded and 252 // applied, update our bookkeeping. 253 incrOpCount(op.opCode, opCounts, step, counter); 254 if (op.hasTransactionId()) { 255 lastAppliedTxId = op.getTransactionId(); 256 expectedTxId = lastAppliedTxId + 1; 257 } else { 258 expectedTxId = lastAppliedTxId = expectedStartingTxId; 259 } 260 // log progress 261 if (op.hasTransactionId()) { 262 long now = monotonicNow(); 263 if (now - lastLogTime > REPLAY_TRANSACTION_LOG_INTERVAL) { 264 long deltaTxId = lastAppliedTxId - expectedStartingTxId + 1; 265 int percent = Math.round((float) deltaTxId / numTxns * 100); 266 LOG.info("replaying edit log: " + deltaTxId + "/" + numTxns 267 + " transactions completed. (" + percent + "%)"); 268 lastLogTime = now; 269 } 270 } 271 numEdits++; 272 totalEdits++; 273 } catch (RollingUpgradeOp.RollbackException e) { 274 LOG.info("Stopped at OP_START_ROLLING_UPGRADE for rollback."); 275 break; 276 } catch (MetaRecoveryContext.RequestStopException e) { 277 MetaRecoveryContext.LOG.warn("Stopped reading edit log at " + 278 in.getPosition() + "/" + in.length()); 279 break; 280 } 281 } 282 } finally { 283 fsNamesys.dir.resetLastInodeId(lastInodeId); 284 if(closeOnExit) { 285 in.close(); 286 } 287 fsDir.writeUnlock(); 288 fsNamesys.writeUnlock("loadEditRecords"); 289 290 if (LOG.isTraceEnabled()) { 291 LOG.trace("replaying edit log finished"); 292 } 293 294 if (FSImage.LOG.isDebugEnabled()) { 295 dumpOpCounts(opCounts); 296 } 297 } 298 return numEdits; 299 } 300 301 // allocate and update last allocated inode id 302 private long getAndUpdateLastInodeId(long inodeIdFromOp, int logVersion, 303 long lastInodeId) throws IOException { 304 long inodeId = inodeIdFromOp; 305 306 if (inodeId == HdfsConstants.GRANDFATHER_INODE_ID) { 307 if (NameNodeLayoutVersion.supports( 308 LayoutVersion.Feature.ADD_INODE_ID, logVersion)) { 309 throw new IOException("The layout version " + logVersion 310 + " supports inodeId but gave bogus inodeId"); 311 } 312 inodeId = fsNamesys.dir.allocateNewInodeId(); 313 } else { 314 // need to reset lastInodeId. fsnamesys gets lastInodeId firstly from 315 // fsimage but editlog captures more recent inodeId allocations 316 if (inodeId > lastInodeId) { 317 fsNamesys.dir.resetLastInodeId(inodeId); 318 } 319 } 320 return inodeId; 321 } 322 323 @SuppressWarnings("deprecation") 324 private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir, 325 StartupOption startOpt, int logVersion, long lastInodeId) throws IOException { 326 long inodeId = HdfsConstants.GRANDFATHER_INODE_ID; 327 if (LOG.isTraceEnabled()) { 328 LOG.trace("replaying edit log: " + op); 329 } 330 final boolean toAddRetryCache = fsNamesys.hasRetryCache() && op.hasRpcIds(); 331 332 switch (op.opCode) { 333 case OP_ADD: { 334 AddCloseOp addCloseOp = (AddCloseOp)op; 335 final String path = 336 renameReservedPathsOnUpgrade(addCloseOp.path, logVersion); 337 if (FSNamesystem.LOG.isDebugEnabled()) { 338 FSNamesystem.LOG.debug(op.opCode + ": " + path + 339 " numblocks : " + addCloseOp.blocks.length + 340 " clientHolder " + addCloseOp.clientName + 341 " clientMachine " + addCloseOp.clientMachine); 342 } 343 // There are 3 cases here: 344 // 1. OP_ADD to create a new file 345 // 2. OP_ADD to update file blocks 346 // 3. OP_ADD to open file for append (old append) 347 348 // See if the file already exists (persistBlocks call) 349 INodesInPath iip = fsDir.getINodesInPath(path, DirOp.WRITE); 350 INodeFile oldFile = INodeFile.valueOf(iip.getLastINode(), path, true); 351 if (oldFile != null && addCloseOp.overwrite) { 352 // This is OP_ADD with overwrite 353 FSDirDeleteOp.deleteForEditLog(fsDir, iip, addCloseOp.mtime); 354 iip = INodesInPath.replace(iip, iip.length() - 1, null); 355 oldFile = null; 356 } 357 INodeFile newFile = oldFile; 358 if (oldFile == null) { // this is OP_ADD on a new file (case 1) 359 // versions > 0 support per file replication 360 // get name and replication 361 final short replication = fsNamesys.getBlockManager() 362 .adjustReplication(addCloseOp.replication); 363 assert addCloseOp.blocks.length == 0; 364 365 // add to the file tree 366 inodeId = getAndUpdateLastInodeId(addCloseOp.inodeId, logVersion, lastInodeId); 367 newFile = FSDirWriteFileOp.addFileForEditLog(fsDir, inodeId, 368 iip.getExistingINodes(), iip.getLastLocalName(), 369 addCloseOp.permissions, addCloseOp.aclEntries, 370 addCloseOp.xAttrs, replication, addCloseOp.mtime, 371 addCloseOp.atime, addCloseOp.blockSize, true, 372 addCloseOp.clientName, addCloseOp.clientMachine, 373 addCloseOp.storagePolicyId); 374 assert newFile != null; 375 iip = INodesInPath.replace(iip, iip.length() - 1, newFile); 376 fsNamesys.leaseManager.addLease(addCloseOp.clientName, newFile.getId()); 377 378 // add the op into retry cache if necessary 379 if (toAddRetryCache) { 380 HdfsFileStatus stat = 381 FSDirStatAndListingOp.createFileStatusForEditLog(fsDir, iip); 382 fsNamesys.addCacheEntryWithPayload(addCloseOp.rpcClientId, 383 addCloseOp.rpcCallId, stat); 384 } 385 } else { // This is OP_ADD on an existing file (old append) 386 if (!oldFile.isUnderConstruction()) { 387 // This is case 3: a call to append() on an already-closed file. 388 if (FSNamesystem.LOG.isDebugEnabled()) { 389 FSNamesystem.LOG.debug("Reopening an already-closed file " + 390 "for append"); 391 } 392 LocatedBlock lb = FSDirAppendOp.prepareFileForAppend(fsNamesys, iip, 393 addCloseOp.clientName, addCloseOp.clientMachine, false, false, 394 false); 395 // add the op into retry cache if necessary 396 if (toAddRetryCache) { 397 HdfsFileStatus stat = 398 FSDirStatAndListingOp.createFileStatusForEditLog(fsDir, iip); 399 fsNamesys.addCacheEntryWithPayload(addCloseOp.rpcClientId, 400 addCloseOp.rpcCallId, new LastBlockWithStatus(lb, stat)); 401 } 402 } 403 } 404 // Fall-through for case 2. 405 // Regardless of whether it's a new file or an updated file, 406 // update the block list. 407 408 // Update the salient file attributes. 409 newFile.setAccessTime(addCloseOp.atime, Snapshot.CURRENT_STATE_ID); 410 newFile.setModificationTime(addCloseOp.mtime, Snapshot.CURRENT_STATE_ID); 411 updateBlocks(fsDir, addCloseOp, iip, newFile); 412 break; 413 } 414 case OP_CLOSE: { 415 AddCloseOp addCloseOp = (AddCloseOp)op; 416 final String path = 417 renameReservedPathsOnUpgrade(addCloseOp.path, logVersion); 418 if (FSNamesystem.LOG.isDebugEnabled()) { 419 FSNamesystem.LOG.debug(op.opCode + ": " + path + 420 " numblocks : " + addCloseOp.blocks.length + 421 " clientHolder " + addCloseOp.clientName + 422 " clientMachine " + addCloseOp.clientMachine); 423 } 424 425 final INodesInPath iip = fsDir.getINodesInPath(path, DirOp.READ); 426 final INodeFile file = INodeFile.valueOf(iip.getLastINode(), path); 427 428 // Update the salient file attributes. 429 file.setAccessTime(addCloseOp.atime, Snapshot.CURRENT_STATE_ID); 430 file.setModificationTime(addCloseOp.mtime, Snapshot.CURRENT_STATE_ID); 431 updateBlocks(fsDir, addCloseOp, iip, file); 432 433 // Now close the file 434 if (!file.isUnderConstruction() && 435 logVersion <= LayoutVersion.BUGFIX_HDFS_2991_VERSION) { 436 // There was a bug (HDFS-2991) in hadoop < 0.23.1 where OP_CLOSE 437 // could show up twice in a row. But after that version, this 438 // should be fixed, so we should treat it as an error. 439 throw new IOException( 440 "File is not under construction: " + path); 441 } 442 // One might expect that you could use removeLease(holder, path) here, 443 // but OP_CLOSE doesn't serialize the holder. So, remove the inode. 444 if (file.isUnderConstruction()) { 445 fsNamesys.getLeaseManager().removeLease(file.getId()); 446 file.toCompleteFile(file.getModificationTime(), 0, 447 fsNamesys.getBlockManager().getMinReplication()); 448 } 449 break; 450 } 451 case OP_APPEND: { 452 AppendOp appendOp = (AppendOp) op; 453 final String path = renameReservedPathsOnUpgrade(appendOp.path, 454 logVersion); 455 if (FSNamesystem.LOG.isDebugEnabled()) { 456 FSNamesystem.LOG.debug(op.opCode + ": " + path + 457 " clientName " + appendOp.clientName + 458 " clientMachine " + appendOp.clientMachine + 459 " newBlock " + appendOp.newBlock); 460 } 461 INodesInPath iip = fsDir.getINodesInPath(path, DirOp.WRITE); 462 INodeFile file = INodeFile.valueOf(iip.getLastINode(), path); 463 if (!file.isUnderConstruction()) { 464 LocatedBlock lb = FSDirAppendOp.prepareFileForAppend(fsNamesys, iip, 465 appendOp.clientName, appendOp.clientMachine, appendOp.newBlock, 466 false, false); 467 // add the op into retry cache if necessary 468 if (toAddRetryCache) { 469 HdfsFileStatus stat = 470 FSDirStatAndListingOp.createFileStatusForEditLog(fsDir, iip); 471 fsNamesys.addCacheEntryWithPayload(appendOp.rpcClientId, 472 appendOp.rpcCallId, new LastBlockWithStatus(lb, stat)); 473 } 474 } 475 break; 476 } 477 case OP_UPDATE_BLOCKS: { 478 UpdateBlocksOp updateOp = (UpdateBlocksOp)op; 479 final String path = 480 renameReservedPathsOnUpgrade(updateOp.path, logVersion); 481 if (FSNamesystem.LOG.isDebugEnabled()) { 482 FSNamesystem.LOG.debug(op.opCode + ": " + path + 483 " numblocks : " + updateOp.blocks.length); 484 } 485 INodesInPath iip = fsDir.getINodesInPath(path, DirOp.READ); 486 INodeFile oldFile = INodeFile.valueOf(iip.getLastINode(), path); 487 // Update in-memory data structures 488 updateBlocks(fsDir, updateOp, iip, oldFile); 489 490 if (toAddRetryCache) { 491 fsNamesys.addCacheEntry(updateOp.rpcClientId, updateOp.rpcCallId); 492 } 493 break; 494 } 495 case OP_ADD_BLOCK: { 496 AddBlockOp addBlockOp = (AddBlockOp) op; 497 String path = renameReservedPathsOnUpgrade(addBlockOp.getPath(), logVersion); 498 if (FSNamesystem.LOG.isDebugEnabled()) { 499 FSNamesystem.LOG.debug(op.opCode + ": " + path + 500 " new block id : " + addBlockOp.getLastBlock().getBlockId()); 501 } 502 INodeFile oldFile = INodeFile.valueOf(fsDir.getINode(path), path); 503 // add the new block to the INodeFile 504 addNewBlock(addBlockOp, oldFile); 505 break; 506 } 507 case OP_SET_REPLICATION: { 508 SetReplicationOp setReplicationOp = (SetReplicationOp)op; 509 String src = renameReservedPathsOnUpgrade( 510 setReplicationOp.path, logVersion); 511 INodesInPath iip = fsDir.getINodesInPath(src, DirOp.WRITE); 512 short replication = fsNamesys.getBlockManager().adjustReplication( 513 setReplicationOp.replication); 514 FSDirAttrOp.unprotectedSetReplication(fsDir, iip, replication); 515 break; 516 } 517 case OP_CONCAT_DELETE: { 518 ConcatDeleteOp concatDeleteOp = (ConcatDeleteOp)op; 519 String trg = renameReservedPathsOnUpgrade(concatDeleteOp.trg, logVersion); 520 String[] srcs = new String[concatDeleteOp.srcs.length]; 521 for (int i=0; i<srcs.length; i++) { 522 srcs[i] = 523 renameReservedPathsOnUpgrade(concatDeleteOp.srcs[i], logVersion); 524 } 525 INodesInPath targetIIP = fsDir.getINodesInPath(trg, DirOp.WRITE); 526 INodeFile[] srcFiles = new INodeFile[srcs.length]; 527 for (int i = 0; i < srcs.length; i++) { 528 INodesInPath srcIIP = fsDir.getINodesInPath(srcs[i], DirOp.WRITE); 529 srcFiles[i] = srcIIP.getLastINode().asFile(); 530 } 531 FSDirConcatOp.unprotectedConcat(fsDir, targetIIP, srcFiles, 532 concatDeleteOp.timestamp); 533 534 if (toAddRetryCache) { 535 fsNamesys.addCacheEntry(concatDeleteOp.rpcClientId, 536 concatDeleteOp.rpcCallId); 537 } 538 break; 539 } 540 case OP_RENAME_OLD: { 541 RenameOldOp renameOp = (RenameOldOp)op; 542 final String src = renameReservedPathsOnUpgrade(renameOp.src, logVersion); 543 final String dst = renameReservedPathsOnUpgrade(renameOp.dst, logVersion); 544 FSDirRenameOp.renameForEditLog(fsDir, src, dst, renameOp.timestamp); 545 546 if (toAddRetryCache) { 547 fsNamesys.addCacheEntry(renameOp.rpcClientId, renameOp.rpcCallId); 548 } 549 break; 550 } 551 case OP_DELETE: { 552 DeleteOp deleteOp = (DeleteOp)op; 553 final String src = renameReservedPathsOnUpgrade( 554 deleteOp.path, logVersion); 555 final INodesInPath iip = fsDir.getINodesInPath(src, DirOp.WRITE_LINK); 556 FSDirDeleteOp.deleteForEditLog(fsDir, iip, deleteOp.timestamp); 557 558 if (toAddRetryCache) { 559 fsNamesys.addCacheEntry(deleteOp.rpcClientId, deleteOp.rpcCallId); 560 } 561 break; 562 } 563 case OP_MKDIR: { 564 MkdirOp mkdirOp = (MkdirOp)op; 565 inodeId = getAndUpdateLastInodeId(mkdirOp.inodeId, logVersion, 566 lastInodeId); 567 FSDirMkdirOp.mkdirForEditLog(fsDir, inodeId, 568 renameReservedPathsOnUpgrade(mkdirOp.path, logVersion), 569 mkdirOp.permissions, mkdirOp.aclEntries, mkdirOp.timestamp); 570 break; 571 } 572 case OP_SET_GENSTAMP_V1: { 573 SetGenstampV1Op setGenstampV1Op = (SetGenstampV1Op)op; 574 fsNamesys.getBlockIdManager().setGenerationStampV1( 575 setGenstampV1Op.genStampV1); 576 break; 577 } 578 case OP_SET_PERMISSIONS: { 579 SetPermissionsOp setPermissionsOp = (SetPermissionsOp)op; 580 final String src = 581 renameReservedPathsOnUpgrade(setPermissionsOp.src, logVersion); 582 final INodesInPath iip = fsDir.getINodesInPath(src, DirOp.WRITE); 583 FSDirAttrOp.unprotectedSetPermission(fsDir, iip, 584 setPermissionsOp.permissions); 585 break; 586 } 587 case OP_SET_OWNER: { 588 SetOwnerOp setOwnerOp = (SetOwnerOp)op; 589 final String src = renameReservedPathsOnUpgrade( 590 setOwnerOp.src, logVersion); 591 final INodesInPath iip = fsDir.getINodesInPath(src, DirOp.WRITE); 592 FSDirAttrOp.unprotectedSetOwner(fsDir, iip, 593 setOwnerOp.username, setOwnerOp.groupname); 594 break; 595 } 596 case OP_SET_NS_QUOTA: { 597 SetNSQuotaOp setNSQuotaOp = (SetNSQuotaOp)op; 598 final String src = renameReservedPathsOnUpgrade( 599 setNSQuotaOp.src, logVersion); 600 final INodesInPath iip = fsDir.getINodesInPath(src, DirOp.WRITE); 601 FSDirAttrOp.unprotectedSetQuota(fsDir, iip, 602 setNSQuotaOp.nsQuota, HdfsConstants.QUOTA_DONT_SET, null); 603 break; 604 } 605 case OP_CLEAR_NS_QUOTA: { 606 ClearNSQuotaOp clearNSQuotaOp = (ClearNSQuotaOp)op; 607 final String src = renameReservedPathsOnUpgrade( 608 clearNSQuotaOp.src, logVersion); 609 final INodesInPath iip = fsDir.getINodesInPath(src, DirOp.WRITE); 610 FSDirAttrOp.unprotectedSetQuota(fsDir, iip, 611 HdfsConstants.QUOTA_RESET, HdfsConstants.QUOTA_DONT_SET, null); 612 break; 613 } 614 case OP_SET_QUOTA: { 615 SetQuotaOp setQuotaOp = (SetQuotaOp) op; 616 final String src = renameReservedPathsOnUpgrade( 617 setQuotaOp.src, logVersion); 618 final INodesInPath iip = fsDir.getINodesInPath(src, DirOp.WRITE); 619 FSDirAttrOp.unprotectedSetQuota(fsDir, iip, 620 setQuotaOp.nsQuota, setQuotaOp.dsQuota, null); 621 break; 622 } 623 case OP_SET_QUOTA_BY_STORAGETYPE: { 624 FSEditLogOp.SetQuotaByStorageTypeOp setQuotaByStorageTypeOp = 625 (FSEditLogOp.SetQuotaByStorageTypeOp) op; 626 final String src = renameReservedPathsOnUpgrade( 627 setQuotaByStorageTypeOp.src, logVersion); 628 final INodesInPath iip = fsDir.getINodesInPath(src, DirOp.WRITE); 629 FSDirAttrOp.unprotectedSetQuota(fsDir, iip, 630 HdfsConstants.QUOTA_DONT_SET, setQuotaByStorageTypeOp.dsQuota, 631 setQuotaByStorageTypeOp.type); 632 break; 633 } 634 case OP_TIMES: { 635 TimesOp timesOp = (TimesOp)op; 636 final String src = renameReservedPathsOnUpgrade( 637 timesOp.path, logVersion); 638 final INodesInPath iip = fsDir.getINodesInPath(src, DirOp.WRITE); 639 FSDirAttrOp.unprotectedSetTimes(fsDir, iip, 640 timesOp.mtime, timesOp.atime, true); 641 break; 642 } 643 case OP_SYMLINK: { 644 if (!FileSystem.areSymlinksEnabled()) { 645 throw new IOException("Symlinks not supported - please remove symlink before upgrading to this version of HDFS"); 646 } 647 SymlinkOp symlinkOp = (SymlinkOp)op; 648 inodeId = getAndUpdateLastInodeId(symlinkOp.inodeId, logVersion, 649 lastInodeId); 650 final String path = renameReservedPathsOnUpgrade(symlinkOp.path, 651 logVersion); 652 final INodesInPath iip = fsDir.getINodesInPath(path, DirOp.WRITE_LINK); 653 FSDirSymlinkOp.unprotectedAddSymlink(fsDir, iip.getExistingINodes(), 654 iip.getLastLocalName(), inodeId, symlinkOp.value, symlinkOp.mtime, 655 symlinkOp.atime, symlinkOp.permissionStatus); 656 657 if (toAddRetryCache) { 658 fsNamesys.addCacheEntry(symlinkOp.rpcClientId, symlinkOp.rpcCallId); 659 } 660 break; 661 } 662 case OP_RENAME: { 663 RenameOp renameOp = (RenameOp)op; 664 FSDirRenameOp.renameForEditLog(fsDir, 665 renameReservedPathsOnUpgrade(renameOp.src, logVersion), 666 renameReservedPathsOnUpgrade(renameOp.dst, logVersion), 667 renameOp.timestamp, renameOp.options); 668 669 if (toAddRetryCache) { 670 fsNamesys.addCacheEntry(renameOp.rpcClientId, renameOp.rpcCallId); 671 } 672 break; 673 } 674 case OP_GET_DELEGATION_TOKEN: { 675 GetDelegationTokenOp getDelegationTokenOp 676 = (GetDelegationTokenOp)op; 677 678 fsNamesys.getDelegationTokenSecretManager() 679 .addPersistedDelegationToken(getDelegationTokenOp.token, 680 getDelegationTokenOp.expiryTime); 681 break; 682 } 683 case OP_RENEW_DELEGATION_TOKEN: { 684 RenewDelegationTokenOp renewDelegationTokenOp 685 = (RenewDelegationTokenOp)op; 686 fsNamesys.getDelegationTokenSecretManager() 687 .updatePersistedTokenRenewal(renewDelegationTokenOp.token, 688 renewDelegationTokenOp.expiryTime); 689 break; 690 } 691 case OP_CANCEL_DELEGATION_TOKEN: { 692 CancelDelegationTokenOp cancelDelegationTokenOp 693 = (CancelDelegationTokenOp)op; 694 fsNamesys.getDelegationTokenSecretManager() 695 .updatePersistedTokenCancellation( 696 cancelDelegationTokenOp.token); 697 break; 698 } 699 case OP_UPDATE_MASTER_KEY: { 700 UpdateMasterKeyOp updateMasterKeyOp = (UpdateMasterKeyOp)op; 701 fsNamesys.getDelegationTokenSecretManager() 702 .updatePersistedMasterKey(updateMasterKeyOp.key); 703 break; 704 } 705 case OP_REASSIGN_LEASE: { 706 ReassignLeaseOp reassignLeaseOp = (ReassignLeaseOp)op; 707 708 Lease lease = fsNamesys.leaseManager.getLease( 709 reassignLeaseOp.leaseHolder); 710 final String path = 711 renameReservedPathsOnUpgrade(reassignLeaseOp.path, logVersion); 712 INodeFile pendingFile = fsDir.getINode(path, DirOp.READ).asFile(); 713 Preconditions.checkState(pendingFile.isUnderConstruction()); 714 fsNamesys.reassignLeaseInternal(lease, reassignLeaseOp.newHolder, 715 pendingFile); 716 break; 717 } 718 case OP_START_LOG_SEGMENT: 719 case OP_END_LOG_SEGMENT: { 720 // no data in here currently. 721 break; 722 } 723 case OP_CREATE_SNAPSHOT: { 724 CreateSnapshotOp createSnapshotOp = (CreateSnapshotOp) op; 725 final String snapshotRoot = 726 renameReservedPathsOnUpgrade(createSnapshotOp.snapshotRoot, 727 logVersion); 728 INodesInPath iip = fsDir.getINodesInPath(snapshotRoot, DirOp.WRITE); 729 String path = fsNamesys.getSnapshotManager().createSnapshot(iip, 730 snapshotRoot, createSnapshotOp.snapshotName); 731 if (toAddRetryCache) { 732 fsNamesys.addCacheEntryWithPayload(createSnapshotOp.rpcClientId, 733 createSnapshotOp.rpcCallId, path); 734 } 735 break; 736 } 737 case OP_DELETE_SNAPSHOT: { 738 DeleteSnapshotOp deleteSnapshotOp = (DeleteSnapshotOp) op; 739 BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo(); 740 List<INode> removedINodes = new ChunkedArrayList<INode>(); 741 final String snapshotRoot = 742 renameReservedPathsOnUpgrade(deleteSnapshotOp.snapshotRoot, 743 logVersion); 744 INodesInPath iip = fsDir.getINodesInPath(snapshotRoot, DirOp.WRITE); 745 fsNamesys.getSnapshotManager().deleteSnapshot(iip, 746 deleteSnapshotOp.snapshotName, 747 new INode.ReclaimContext(fsNamesys.dir.getBlockStoragePolicySuite(), 748 collectedBlocks, removedINodes, null)); 749 fsNamesys.removeBlocksAndUpdateSafemodeTotal(collectedBlocks); 750 collectedBlocks.clear(); 751 fsNamesys.dir.removeFromInodeMap(removedINodes); 752 removedINodes.clear(); 753 754 if (toAddRetryCache) { 755 fsNamesys.addCacheEntry(deleteSnapshotOp.rpcClientId, 756 deleteSnapshotOp.rpcCallId); 757 } 758 break; 759 } 760 case OP_RENAME_SNAPSHOT: { 761 RenameSnapshotOp renameSnapshotOp = (RenameSnapshotOp) op; 762 final String snapshotRoot = 763 renameReservedPathsOnUpgrade(renameSnapshotOp.snapshotRoot, 764 logVersion); 765 INodesInPath iip = fsDir.getINodesInPath(snapshotRoot, DirOp.WRITE); 766 fsNamesys.getSnapshotManager().renameSnapshot(iip, 767 snapshotRoot, renameSnapshotOp.snapshotOldName, 768 renameSnapshotOp.snapshotNewName); 769 770 if (toAddRetryCache) { 771 fsNamesys.addCacheEntry(renameSnapshotOp.rpcClientId, 772 renameSnapshotOp.rpcCallId); 773 } 774 break; 775 } 776 case OP_ALLOW_SNAPSHOT: { 777 AllowSnapshotOp allowSnapshotOp = (AllowSnapshotOp) op; 778 final String snapshotRoot = 779 renameReservedPathsOnUpgrade(allowSnapshotOp.snapshotRoot, logVersion); 780 fsNamesys.getSnapshotManager().setSnapshottable( 781 snapshotRoot, false); 782 break; 783 } 784 case OP_DISALLOW_SNAPSHOT: { 785 DisallowSnapshotOp disallowSnapshotOp = (DisallowSnapshotOp) op; 786 final String snapshotRoot = 787 renameReservedPathsOnUpgrade(disallowSnapshotOp.snapshotRoot, 788 logVersion); 789 fsNamesys.getSnapshotManager().resetSnapshottable( 790 snapshotRoot); 791 break; 792 } 793 case OP_SET_GENSTAMP_V2: { 794 SetGenstampV2Op setGenstampV2Op = (SetGenstampV2Op) op; 795 fsNamesys.getBlockIdManager().setGenerationStampV2( 796 setGenstampV2Op.genStampV2); 797 break; 798 } 799 case OP_ALLOCATE_BLOCK_ID: { 800 AllocateBlockIdOp allocateBlockIdOp = (AllocateBlockIdOp) op; 801 fsNamesys.getBlockIdManager().setLastAllocatedBlockId( 802 allocateBlockIdOp.blockId); 803 break; 804 } 805 case OP_ROLLING_UPGRADE_START: { 806 if (startOpt == StartupOption.ROLLINGUPGRADE) { 807 final RollingUpgradeStartupOption rollingUpgradeOpt 808 = startOpt.getRollingUpgradeStartupOption(); 809 if (rollingUpgradeOpt == RollingUpgradeStartupOption.ROLLBACK) { 810 throw new RollingUpgradeOp.RollbackException(); 811 } else if (rollingUpgradeOpt == RollingUpgradeStartupOption.DOWNGRADE) { 812 //ignore upgrade marker 813 break; 814 } 815 } 816 // start rolling upgrade 817 final long startTime = ((RollingUpgradeOp) op).getTime(); 818 fsNamesys.startRollingUpgradeInternal(startTime); 819 fsNamesys.triggerRollbackCheckpoint(); 820 break; 821 } 822 case OP_ROLLING_UPGRADE_FINALIZE: { 823 final long finalizeTime = ((RollingUpgradeOp) op).getTime(); 824 if (fsNamesys.isRollingUpgrade()) { 825 // Only do it when NN is actually doing rolling upgrade. 826 // We can get FINALIZE without corresponding START, if NN is restarted 827 // before this op is consumed and a new checkpoint is created. 828 fsNamesys.finalizeRollingUpgradeInternal(finalizeTime); 829 } 830 fsNamesys.getFSImage().updateStorageVersion(); 831 fsNamesys.getFSImage().renameCheckpoint(NameNodeFile.IMAGE_ROLLBACK, 832 NameNodeFile.IMAGE); 833 break; 834 } 835 case OP_ADD_CACHE_DIRECTIVE: { 836 AddCacheDirectiveInfoOp addOp = (AddCacheDirectiveInfoOp) op; 837 CacheDirectiveInfo result = fsNamesys. 838 getCacheManager().addDirectiveFromEditLog(addOp.directive); 839 if (toAddRetryCache) { 840 Long id = result.getId(); 841 fsNamesys.addCacheEntryWithPayload(op.rpcClientId, op.rpcCallId, id); 842 } 843 break; 844 } 845 case OP_MODIFY_CACHE_DIRECTIVE: { 846 ModifyCacheDirectiveInfoOp modifyOp = 847 (ModifyCacheDirectiveInfoOp) op; 848 fsNamesys.getCacheManager().modifyDirectiveFromEditLog( 849 modifyOp.directive); 850 if (toAddRetryCache) { 851 fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId); 852 } 853 break; 854 } 855 case OP_REMOVE_CACHE_DIRECTIVE: { 856 RemoveCacheDirectiveInfoOp removeOp = 857 (RemoveCacheDirectiveInfoOp) op; 858 fsNamesys.getCacheManager().removeDirective(removeOp.id, null); 859 if (toAddRetryCache) { 860 fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId); 861 } 862 break; 863 } 864 case OP_ADD_CACHE_POOL: { 865 AddCachePoolOp addOp = (AddCachePoolOp) op; 866 fsNamesys.getCacheManager().addCachePool(addOp.info); 867 if (toAddRetryCache) { 868 fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId); 869 } 870 break; 871 } 872 case OP_MODIFY_CACHE_POOL: { 873 ModifyCachePoolOp modifyOp = (ModifyCachePoolOp) op; 874 fsNamesys.getCacheManager().modifyCachePool(modifyOp.info); 875 if (toAddRetryCache) { 876 fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId); 877 } 878 break; 879 } 880 case OP_REMOVE_CACHE_POOL: { 881 RemoveCachePoolOp removeOp = (RemoveCachePoolOp) op; 882 fsNamesys.getCacheManager().removeCachePool(removeOp.poolName); 883 if (toAddRetryCache) { 884 fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId); 885 } 886 break; 887 } 888 case OP_SET_ACL: { 889 SetAclOp setAclOp = (SetAclOp) op; 890 INodesInPath iip = fsDir.getINodesInPath(setAclOp.src, DirOp.WRITE); 891 FSDirAclOp.unprotectedSetAcl(fsDir, iip, setAclOp.aclEntries, true); 892 break; 893 } 894 case OP_SET_XATTR: { 895 SetXAttrOp setXAttrOp = (SetXAttrOp) op; 896 INodesInPath iip = fsDir.getINodesInPath(setXAttrOp.src, DirOp.WRITE); 897 FSDirXAttrOp.unprotectedSetXAttrs(fsDir, iip, 898 setXAttrOp.xAttrs, 899 EnumSet.of(XAttrSetFlag.CREATE, 900 XAttrSetFlag.REPLACE)); 901 if (toAddRetryCache) { 902 fsNamesys.addCacheEntry(setXAttrOp.rpcClientId, setXAttrOp.rpcCallId); 903 } 904 break; 905 } 906 case OP_REMOVE_XATTR: { 907 RemoveXAttrOp removeXAttrOp = (RemoveXAttrOp) op; 908 FSDirXAttrOp.unprotectedRemoveXAttrs(fsDir, removeXAttrOp.src, 909 removeXAttrOp.xAttrs); 910 if (toAddRetryCache) { 911 fsNamesys.addCacheEntry(removeXAttrOp.rpcClientId, 912 removeXAttrOp.rpcCallId); 913 } 914 break; 915 } 916 case OP_TRUNCATE: { 917 TruncateOp truncateOp = (TruncateOp) op; 918 INodesInPath iip = fsDir.getINodesInPath(truncateOp.src, DirOp.WRITE); 919 FSDirTruncateOp.unprotectedTruncate(fsNamesys, iip, 920 truncateOp.clientName, truncateOp.clientMachine, 921 truncateOp.newLength, truncateOp.timestamp, truncateOp.truncateBlock); 922 break; 923 } 924 case OP_SET_STORAGE_POLICY: { 925 SetStoragePolicyOp setStoragePolicyOp = (SetStoragePolicyOp) op; 926 final String path = renameReservedPathsOnUpgrade(setStoragePolicyOp.path, 927 logVersion); 928 final INodesInPath iip = fsDir.getINodesInPath(path, DirOp.WRITE); 929 FSDirAttrOp.unprotectedSetStoragePolicy( 930 fsDir, fsNamesys.getBlockManager(), iip, 931 setStoragePolicyOp.policyId); 932 break; 933 } 934 default: 935 throw new IOException("Invalid operation read " + op.opCode); 936 } 937 return inodeId; 938 } 939 940 private static String formatEditLogReplayError(EditLogInputStream in, 941 long recentOpcodeOffsets[], long txid) { 942 StringBuilder sb = new StringBuilder(); 943 sb.append("Error replaying edit log at offset " + in.getPosition()); 944 sb.append(". Expected transaction ID was ").append(txid); 945 if (recentOpcodeOffsets[0] != -1) { 946 Arrays.sort(recentOpcodeOffsets); 947 sb.append("\nRecent opcode offsets:"); 948 for (long offset : recentOpcodeOffsets) { 949 if (offset != -1) { 950 sb.append(' ').append(offset); 951 } 952 } 953 } 954 return sb.toString(); 955 } 956 957 /** 958 * Add a new block into the given INodeFile 959 */ 960 private void addNewBlock(AddBlockOp op, INodeFile file) 961 throws IOException { 962 BlockInfo[] oldBlocks = file.getBlocks(); 963 Block pBlock = op.getPenultimateBlock(); 964 Block newBlock= op.getLastBlock(); 965 966 if (pBlock != null) { // the penultimate block is not null 967 Preconditions.checkState(oldBlocks != null && oldBlocks.length > 0); 968 // compare pBlock with the last block of oldBlocks 969 BlockInfo oldLastBlock = oldBlocks[oldBlocks.length - 1]; 970 if (oldLastBlock.getBlockId() != pBlock.getBlockId() 971 || oldLastBlock.getGenerationStamp() != pBlock.getGenerationStamp()) { 972 throw new IOException( 973 "Mismatched block IDs or generation stamps for the old last block of file " 974 + op.getPath() + ", the old last block is " + oldLastBlock 975 + ", and the block read from editlog is " + pBlock); 976 } 977 978 oldLastBlock.setNumBytes(pBlock.getNumBytes()); 979 if (!oldLastBlock.isComplete()) { 980 fsNamesys.getBlockManager().forceCompleteBlock(oldLastBlock); 981 fsNamesys.getBlockManager().processQueuedMessagesForBlock(pBlock); 982 } 983 } else { // the penultimate block is null 984 Preconditions.checkState(oldBlocks == null || oldBlocks.length == 0); 985 } 986 // add the new block 987 BlockInfo newBI = new BlockInfoContiguous(newBlock, 988 file.getPreferredBlockReplication()); 989 newBI.convertToBlockUnderConstruction( 990 HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, null); 991 fsNamesys.getBlockManager().addBlockCollection(newBI, file); 992 file.addBlock(newBI); 993 fsNamesys.getBlockManager().processQueuedMessagesForBlock(newBlock); 994 } 995 996 /** 997 * Update in-memory data structures with new block information. 998 * @throws IOException 999 */ 1000 private void updateBlocks(FSDirectory fsDir, BlockListUpdatingOp op, 1001 INodesInPath iip, INodeFile file) throws IOException { 1002 // Update its block list 1003 BlockInfo[] oldBlocks = file.getBlocks(); 1004 Block[] newBlocks = op.getBlocks(); 1005 String path = op.getPath(); 1006 1007 // Are we only updating the last block's gen stamp. 1008 boolean isGenStampUpdate = oldBlocks.length == newBlocks.length; 1009 1010 // First, update blocks in common 1011 for (int i = 0; i < oldBlocks.length && i < newBlocks.length; i++) { 1012 BlockInfo oldBlock = oldBlocks[i]; 1013 Block newBlock = newBlocks[i]; 1014 1015 boolean isLastBlock = i == newBlocks.length - 1; 1016 if (oldBlock.getBlockId() != newBlock.getBlockId() || 1017 (oldBlock.getGenerationStamp() != newBlock.getGenerationStamp() && 1018 !(isGenStampUpdate && isLastBlock))) { 1019 throw new IOException("Mismatched block IDs or generation stamps, " + 1020 "attempting to replace block " + oldBlock + " with " + newBlock + 1021 " as block # " + i + "/" + newBlocks.length + " of " + 1022 path); 1023 } 1024 1025 oldBlock.setNumBytes(newBlock.getNumBytes()); 1026 boolean changeMade = 1027 oldBlock.getGenerationStamp() != newBlock.getGenerationStamp(); 1028 oldBlock.setGenerationStamp(newBlock.getGenerationStamp()); 1029 1030 if (!oldBlock.isComplete() && 1031 (!isLastBlock || op.shouldCompleteLastBlock())) { 1032 changeMade = true; 1033 fsNamesys.getBlockManager().forceCompleteBlock(oldBlock); 1034 } 1035 if (changeMade) { 1036 // The state or gen-stamp of the block has changed. So, we may be 1037 // able to process some messages from datanodes that we previously 1038 // were unable to process. 1039 fsNamesys.getBlockManager().processQueuedMessagesForBlock(newBlock); 1040 } 1041 } 1042 1043 if (newBlocks.length < oldBlocks.length) { 1044 // We're removing a block from the file, e.g. abandonBlock(...) 1045 if (!file.isUnderConstruction()) { 1046 throw new IOException("Trying to remove a block from file " + 1047 path + " which is not under construction."); 1048 } 1049 if (newBlocks.length != oldBlocks.length - 1) { 1050 throw new IOException("Trying to remove more than one block from file " 1051 + path); 1052 } 1053 Block oldBlock = oldBlocks[oldBlocks.length - 1]; 1054 boolean removed = FSDirWriteFileOp.unprotectedRemoveBlock( 1055 fsDir, path, iip, file, oldBlock); 1056 if (!removed && !(op instanceof UpdateBlocksOp)) { 1057 throw new IOException("Trying to delete non-existant block " + oldBlock); 1058 } 1059 } else if (newBlocks.length > oldBlocks.length) { 1060 // We're adding blocks 1061 for (int i = oldBlocks.length; i < newBlocks.length; i++) { 1062 Block newBlock = newBlocks[i]; 1063 BlockInfo newBI; 1064 if (!op.shouldCompleteLastBlock()) { 1065 // TODO: shouldn't this only be true for the last block? 1066 // what about an old-version fsync() where fsync isn't called 1067 // until several blocks in? 1068 newBI = new BlockInfoContiguous(newBlock, 1069 file.getPreferredBlockReplication()); 1070 newBI.convertToBlockUnderConstruction( 1071 HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, null); 1072 } else { 1073 // OP_CLOSE should add finalized blocks. This code path 1074 // is only executed when loading edits written by prior 1075 // versions of Hadoop. Current versions always log 1076 // OP_ADD operations as each block is allocated. 1077 newBI = new BlockInfoContiguous(newBlock, 1078 file.getFileReplication()); 1079 } 1080 fsNamesys.getBlockManager().addBlockCollection(newBI, file); 1081 file.addBlock(newBI); 1082 fsNamesys.getBlockManager().processQueuedMessagesForBlock(newBlock); 1083 } 1084 } 1085 } 1086 1087 private static void dumpOpCounts( 1088 EnumMap<FSEditLogOpCodes, Holder<Integer>> opCounts) { 1089 StringBuilder sb = new StringBuilder(); 1090 sb.append("Summary of operations loaded from edit log:\n "); 1091 Joiner.on("\n ").withKeyValueSeparator("=").appendTo(sb, opCounts); 1092 FSImage.LOG.debug(sb.toString()); 1093 } 1094 1095 private void incrOpCount(FSEditLogOpCodes opCode, 1096 EnumMap<FSEditLogOpCodes, Holder<Integer>> opCounts, Step step, 1097 Counter counter) { 1098 Holder<Integer> holder = opCounts.get(opCode); 1099 if (holder == null) { 1100 holder = new Holder<Integer>(1); 1101 opCounts.put(opCode, holder); 1102 } else { 1103 holder.held++; 1104 } 1105 counter.increment(); 1106 } 1107 1108 /** 1109 * Throw appropriate exception during upgrade from 203, when editlog loading 1110 * could fail due to opcode conflicts. 1111 */ 1112 private void check203UpgradeFailure(int logVersion, Throwable e) 1113 throws IOException { 1114 // 0.20.203 version version has conflicting opcodes with the later releases. 1115 // The editlog must be emptied by restarting the namenode, before proceeding 1116 // with the upgrade. 1117 if (Storage.is203LayoutVersion(logVersion) 1118 && logVersion != HdfsServerConstants.NAMENODE_LAYOUT_VERSION) { 1119 String msg = "During upgrade failed to load the editlog version " 1120 + logVersion + " from release 0.20.203. Please go back to the old " 1121 + " release and restart the namenode. This empties the editlog " 1122 + " and saves the namespace. Resume the upgrade after this step."; 1123 throw new IOException(msg, e); 1124 } 1125 } 1126 1127 /** 1128 * Find the last valid transaction ID in the stream. 1129 * If there are invalid or corrupt transactions in the middle of the stream, 1130 * scanEditLog will skip over them. 1131 * This reads through the stream but does not close it. 1132 * 1133 * @param maxTxIdToScan Maximum Tx ID to try to scan. 1134 * The scan returns after reading this or a higher ID. 1135 * The file portion beyond this ID is potentially being 1136 * updated. 1137 */ 1138 static EditLogValidation scanEditLog(EditLogInputStream in, 1139 long maxTxIdToScan) { 1140 long lastPos; 1141 long lastTxId = HdfsServerConstants.INVALID_TXID; 1142 long numValid = 0; 1143 while (true) { 1144 long txid; 1145 lastPos = in.getPosition(); 1146 try { 1147 if ((txid = in.scanNextOp()) == HdfsServerConstants.INVALID_TXID) { 1148 break; 1149 } 1150 } catch (Throwable t) { 1151 FSImage.LOG.warn("Caught exception after scanning through " 1152 + numValid + " ops from " + in 1153 + " while determining its valid length. Position was " 1154 + lastPos, t); 1155 in.resync(); 1156 FSImage.LOG.warn("After resync, position is " + in.getPosition()); 1157 continue; 1158 } 1159 if (lastTxId == HdfsServerConstants.INVALID_TXID || txid > lastTxId) { 1160 lastTxId = txid; 1161 } 1162 if (lastTxId >= maxTxIdToScan) { 1163 break; 1164 } 1165 numValid++; 1166 } 1167 return new EditLogValidation(lastPos, lastTxId, false); 1168 } 1169 1170 static class EditLogValidation { 1171 private final long validLength; 1172 private final long endTxId; 1173 private final boolean hasCorruptHeader; 1174 1175 EditLogValidation(long validLength, long endTxId, 1176 boolean hasCorruptHeader) { 1177 this.validLength = validLength; 1178 this.endTxId = endTxId; 1179 this.hasCorruptHeader = hasCorruptHeader; 1180 } 1181 1182 long getValidLength() { return validLength; } 1183 1184 long getEndTxId() { return endTxId; } 1185 1186 boolean hasCorruptHeader() { return hasCorruptHeader; } 1187 } 1188 1189 /** 1190 * Stream wrapper that keeps track of the current stream position. 1191 * 1192 * This stream also allows us to set a limit on how many bytes we can read 1193 * without getting an exception. 1194 */ 1195 public static class PositionTrackingInputStream extends FilterInputStream 1196 implements StreamLimiter { 1197 private long curPos = 0; 1198 private long markPos = -1; 1199 private long limitPos = Long.MAX_VALUE; 1200 1201 public PositionTrackingInputStream(InputStream is) { 1202 super(is); 1203 } 1204 1205 private void checkLimit(long amt) throws IOException { 1206 long extra = (curPos + amt) - limitPos; 1207 if (extra > 0) { 1208 throw new IOException("Tried to read " + amt + " byte(s) past " + 1209 "the limit at offset " + limitPos); 1210 } 1211 } 1212 1213 @Override 1214 public int read() throws IOException { 1215 checkLimit(1); 1216 int ret = super.read(); 1217 if (ret != -1) curPos++; 1218 return ret; 1219 } 1220 1221 @Override 1222 public int read(byte[] data) throws IOException { 1223 checkLimit(data.length); 1224 int ret = super.read(data); 1225 if (ret > 0) curPos += ret; 1226 return ret; 1227 } 1228 1229 @Override 1230 public int read(byte[] data, int offset, int length) throws IOException { 1231 checkLimit(length); 1232 int ret = super.read(data, offset, length); 1233 if (ret > 0) curPos += ret; 1234 return ret; 1235 } 1236 1237 @Override 1238 public void setLimit(long limit) { 1239 limitPos = curPos + limit; 1240 } 1241 1242 @Override 1243 public void clearLimit() { 1244 limitPos = Long.MAX_VALUE; 1245 } 1246 1247 @Override 1248 public void mark(int limit) { 1249 super.mark(limit); 1250 markPos = curPos; 1251 } 1252 1253 @Override 1254 public void reset() throws IOException { 1255 if (markPos == -1) { 1256 throw new IOException("Not marked!"); 1257 } 1258 super.reset(); 1259 curPos = markPos; 1260 markPos = -1; 1261 } 1262 1263 public long getPos() { 1264 return curPos; 1265 } 1266 1267 @Override 1268 public long skip(long amt) throws IOException { 1269 long extra = (curPos + amt) - limitPos; 1270 if (extra > 0) { 1271 throw new IOException("Tried to skip " + extra + " bytes past " + 1272 "the limit at offset " + limitPos); 1273 } 1274 long ret = super.skip(amt); 1275 curPos += ret; 1276 return ret; 1277 } 1278 } 1279 1280 public long getLastAppliedTxId() { 1281 return lastAppliedTxId; 1282 } 1283 1284 /** 1285 * Creates a Step used for updating startup progress, populated with 1286 * information from the given edits. The step always includes the log's name. 1287 * If the log has a known length, then the length is included in the step too. 1288 * 1289 * @param edits EditLogInputStream to use for populating step 1290 * @return Step populated with information from edits 1291 * @throws IOException thrown if there is an I/O error 1292 */ 1293 private static Step createStartupProgressStep(EditLogInputStream edits) 1294 throws IOException { 1295 long length = edits.length(); 1296 String name = edits.getCurrentStreamName(); 1297 return length != -1 ? new Step(name, length) : new Step(name); 1298 } 1299}