001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hdfs.qjournal.server; 019 020import java.io.Closeable; 021import java.io.File; 022import java.io.FileInputStream; 023import java.io.IOException; 024import java.io.InputStream; 025import java.io.OutputStreamWriter; 026import java.net.URL; 027import java.security.PrivilegedExceptionAction; 028import java.util.Iterator; 029import java.util.List; 030import java.util.concurrent.TimeUnit; 031 032import org.apache.commons.lang.math.LongRange; 033import org.apache.commons.logging.Log; 034import org.apache.commons.logging.LogFactory; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.fs.FileUtil; 037import org.apache.hadoop.hdfs.qjournal.protocol.JournalNotFormattedException; 038import org.apache.hadoop.hdfs.qjournal.protocol.JournalOutOfSyncException; 039import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol; 040import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos; 041import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto; 042import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PersistedRecoveryPaxosData; 043import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto; 044import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto; 045import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo; 046import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; 047import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; 048import org.apache.hadoop.hdfs.server.common.StorageErrorReporter; 049import org.apache.hadoop.hdfs.server.common.StorageInfo; 050import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream; 051import org.apache.hadoop.hdfs.server.namenode.FileJournalManager; 052import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile; 053import org.apache.hadoop.hdfs.server.namenode.JournalManager; 054import org.apache.hadoop.hdfs.server.namenode.TransferFsImage; 055import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; 056import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog; 057import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; 058import org.apache.hadoop.hdfs.util.AtomicFileOutputStream; 059import org.apache.hadoop.hdfs.util.BestEffortLongFile; 060import org.apache.hadoop.hdfs.util.PersistentLongFile; 061import org.apache.hadoop.io.IOUtils; 062import org.apache.hadoop.ipc.Server; 063import org.apache.hadoop.security.SecurityUtil; 064import org.apache.hadoop.security.UserGroupInformation; 065import org.apache.hadoop.util.StopWatch; 066import org.apache.hadoop.util.Time; 067 068import com.google.common.annotations.VisibleForTesting; 069import com.google.common.base.Charsets; 070import com.google.common.base.Preconditions; 071import com.google.common.collect.ImmutableList; 072import com.google.protobuf.TextFormat; 073 074/** 075 * A JournalNode can manage journals for several clusters at once. 076 * Each such journal is entirely independent despite being hosted by 077 * the same JVM. 078 */ 079public class Journal implements Closeable { 080 static final Log LOG = LogFactory.getLog(Journal.class); 081 082 083 // Current writing state 084 private EditLogOutputStream curSegment; 085 private long curSegmentTxId = HdfsServerConstants.INVALID_TXID; 086 private long nextTxId = HdfsServerConstants.INVALID_TXID; 087 private long highestWrittenTxId = 0; 088 089 private final String journalId; 090 091 private final JNStorage storage; 092 093 /** 094 * When a new writer comes along, it asks each node to promise 095 * to ignore requests from any previous writer, as identified 096 * by epoch number. In order to make such a promise, the epoch 097 * number of that writer is stored persistently on disk. 098 */ 099 private PersistentLongFile lastPromisedEpoch; 100 101 /** 102 * Each IPC that comes from a given client contains a serial number 103 * which only increases from the client's perspective. Whenever 104 * we switch epochs, we reset this back to -1. Whenever an IPC 105 * comes from a client, we ensure that it is strictly higher 106 * than any previous IPC. This guards against any bugs in the IPC 107 * layer that would re-order IPCs or cause a stale retry from an old 108 * request to resurface and confuse things. 109 */ 110 private long currentEpochIpcSerial = -1; 111 112 /** 113 * The epoch number of the last writer to actually write a transaction. 114 * This is used to differentiate log segments after a crash at the very 115 * beginning of a segment. See the the 'testNewerVersionOfSegmentWins' 116 * test case. 117 */ 118 private PersistentLongFile lastWriterEpoch; 119 120 /** 121 * Lower-bound on the last committed transaction ID. This is not 122 * depended upon for correctness, but acts as a sanity check 123 * during the recovery procedures, and as a visibility mark 124 * for clients reading in-progress logs. 125 */ 126 private BestEffortLongFile committedTxnId; 127 128 public static final String LAST_PROMISED_FILENAME = "last-promised-epoch"; 129 public static final String LAST_WRITER_EPOCH = "last-writer-epoch"; 130 private static final String COMMITTED_TXID_FILENAME = "committed-txid"; 131 132 private final FileJournalManager fjm; 133 134 private final JournalMetrics metrics; 135 136 private long lastJournalTimestamp = 0; 137 138 /** 139 * Time threshold for sync calls, beyond which a warning should be logged to the console. 140 */ 141 private static final int WARN_SYNC_MILLIS_THRESHOLD = 1000; 142 143 Journal(Configuration conf, File logDir, String journalId, 144 StartupOption startOpt, StorageErrorReporter errorReporter) 145 throws IOException { 146 storage = new JNStorage(conf, logDir, startOpt, errorReporter); 147 this.journalId = journalId; 148 149 refreshCachedData(); 150 151 this.fjm = storage.getJournalManager(); 152 153 this.metrics = JournalMetrics.create(this); 154 155 EditLogFile latest = scanStorageForLatestEdits(); 156 if (latest != null) { 157 updateHighestWrittenTxId(latest.getLastTxId()); 158 } 159 } 160 161 /** 162 * Reload any data that may have been cached. This is necessary 163 * when we first load the Journal, but also after any formatting 164 * operation, since the cached data is no longer relevant. 165 */ 166 private synchronized void refreshCachedData() { 167 IOUtils.closeStream(committedTxnId); 168 169 File currentDir = storage.getSingularStorageDir().getCurrentDir(); 170 this.lastPromisedEpoch = new PersistentLongFile( 171 new File(currentDir, LAST_PROMISED_FILENAME), 0); 172 this.lastWriterEpoch = new PersistentLongFile( 173 new File(currentDir, LAST_WRITER_EPOCH), 0); 174 this.committedTxnId = new BestEffortLongFile( 175 new File(currentDir, COMMITTED_TXID_FILENAME), 176 HdfsServerConstants.INVALID_TXID); 177 } 178 179 /** 180 * Scan the local storage directory, and return the segment containing 181 * the highest transaction. 182 * @return the EditLogFile with the highest transactions, or null 183 * if no files exist. 184 */ 185 private synchronized EditLogFile scanStorageForLatestEdits() throws IOException { 186 if (!fjm.getStorageDirectory().getCurrentDir().exists()) { 187 return null; 188 } 189 190 LOG.info("Scanning storage " + fjm); 191 List<EditLogFile> files = fjm.getLogFiles(0); 192 193 while (!files.isEmpty()) { 194 EditLogFile latestLog = files.remove(files.size() - 1); 195 latestLog.scanLog(Long.MAX_VALUE, false); 196 LOG.info("Latest log is " + latestLog); 197 if (latestLog.getLastTxId() == HdfsServerConstants.INVALID_TXID) { 198 // the log contains no transactions 199 LOG.warn("Latest log " + latestLog + " has no transactions. " + 200 "moving it aside and looking for previous log"); 201 latestLog.moveAsideEmptyFile(); 202 } else { 203 return latestLog; 204 } 205 } 206 207 LOG.info("No files in " + fjm); 208 return null; 209 } 210 211 /** 212 * Format the local storage with the given namespace. 213 */ 214 void format(NamespaceInfo nsInfo) throws IOException { 215 Preconditions.checkState(nsInfo.getNamespaceID() != 0, 216 "can't format with uninitialized namespace info: %s", 217 nsInfo); 218 LOG.info("Formatting " + this + " with namespace info: " + 219 nsInfo); 220 storage.format(nsInfo); 221 refreshCachedData(); 222 } 223 224 /** 225 * Unlock and release resources. 226 */ 227 @Override // Closeable 228 public void close() throws IOException { 229 storage.close(); 230 IOUtils.closeStream(committedTxnId); 231 IOUtils.closeStream(curSegment); 232 } 233 234 JNStorage getStorage() { 235 return storage; 236 } 237 238 String getJournalId() { 239 return journalId; 240 } 241 242 /** 243 * @return the last epoch which this node has promised not to accept 244 * any lower epoch, or 0 if no promises have been made. 245 */ 246 synchronized long getLastPromisedEpoch() throws IOException { 247 checkFormatted(); 248 return lastPromisedEpoch.get(); 249 } 250 251 synchronized public long getLastWriterEpoch() throws IOException { 252 checkFormatted(); 253 return lastWriterEpoch.get(); 254 } 255 256 synchronized long getCommittedTxnIdForTests() throws IOException { 257 return committedTxnId.get(); 258 } 259 260 synchronized long getLastJournalTimestamp() { 261 return lastJournalTimestamp; 262 } 263 264 synchronized long getCurrentLagTxns() throws IOException { 265 long committed = committedTxnId.get(); 266 if (committed == 0) { 267 return 0; 268 } 269 270 return Math.max(committed - highestWrittenTxId, 0L); 271 } 272 273 synchronized long getHighestWrittenTxId() { 274 return highestWrittenTxId; 275 } 276 277 /** 278 * Update the highest Tx ID that has been written to the journal. Also update 279 * the {@link FileJournalManager#lastReadableTxId} of the underlying fjm. 280 * @param val The new value 281 */ 282 private void updateHighestWrittenTxId(long val) { 283 highestWrittenTxId = val; 284 fjm.setLastReadableTxId(val); 285 } 286 287 @VisibleForTesting 288 JournalMetrics getMetricsForTests() { 289 return metrics; 290 } 291 292 /** 293 * Try to create a new epoch for this journal. 294 * @param nsInfo the namespace, which is verified for consistency or used to 295 * format, if the Journal has not yet been written to. 296 * @param epoch the epoch to start 297 * @return the status information necessary to begin recovery 298 * @throws IOException if the node has already made a promise to another 299 * writer with a higher epoch number, if the namespace is inconsistent, 300 * or if a disk error occurs. 301 */ 302 synchronized NewEpochResponseProto newEpoch( 303 NamespaceInfo nsInfo, long epoch) throws IOException { 304 305 checkFormatted(); 306 storage.checkConsistentNamespace(nsInfo); 307 308 // Check that the new epoch being proposed is in fact newer than 309 // any other that we've promised. 310 if (epoch <= getLastPromisedEpoch()) { 311 throw new IOException("Proposed epoch " + epoch + " <= last promise " + 312 getLastPromisedEpoch()); 313 } 314 315 updateLastPromisedEpoch(epoch); 316 abortCurSegment(); 317 318 NewEpochResponseProto.Builder builder = 319 NewEpochResponseProto.newBuilder(); 320 321 EditLogFile latestFile = scanStorageForLatestEdits(); 322 323 if (latestFile != null) { 324 builder.setLastSegmentTxId(latestFile.getFirstTxId()); 325 } 326 327 return builder.build(); 328 } 329 330 private void updateLastPromisedEpoch(long newEpoch) throws IOException { 331 LOG.info("Updating lastPromisedEpoch from " + lastPromisedEpoch.get() + 332 " to " + newEpoch + " for client " + Server.getRemoteIp()); 333 lastPromisedEpoch.set(newEpoch); 334 335 // Since we have a new writer, reset the IPC serial - it will start 336 // counting again from 0 for this writer. 337 currentEpochIpcSerial = -1; 338 } 339 340 private void abortCurSegment() throws IOException { 341 if (curSegment == null) { 342 return; 343 } 344 345 curSegment.abort(); 346 curSegment = null; 347 curSegmentTxId = HdfsServerConstants.INVALID_TXID; 348 } 349 350 /** 351 * Write a batch of edits to the journal. 352 * {@see QJournalProtocol#journal(RequestInfo, long, long, int, byte[])} 353 */ 354 synchronized void journal(RequestInfo reqInfo, 355 long segmentTxId, long firstTxnId, 356 int numTxns, byte[] records) throws IOException { 357 checkFormatted(); 358 checkWriteRequest(reqInfo); 359 360 checkSync(curSegment != null, 361 "Can't write, no segment open"); 362 363 if (curSegmentTxId != segmentTxId) { 364 // Sanity check: it is possible that the writer will fail IPCs 365 // on both the finalize() and then the start() of the next segment. 366 // This could cause us to continue writing to an old segment 367 // instead of rolling to a new one, which breaks one of the 368 // invariants in the design. If it happens, abort the segment 369 // and throw an exception. 370 JournalOutOfSyncException e = new JournalOutOfSyncException( 371 "Writer out of sync: it thinks it is writing segment " + segmentTxId 372 + " but current segment is " + curSegmentTxId); 373 abortCurSegment(); 374 throw e; 375 } 376 377 checkSync(nextTxId == firstTxnId, 378 "Can't write txid " + firstTxnId + " expecting nextTxId=" + nextTxId); 379 380 long lastTxnId = firstTxnId + numTxns - 1; 381 if (LOG.isTraceEnabled()) { 382 LOG.trace("Writing txid " + firstTxnId + "-" + lastTxnId); 383 } 384 385 // If the edit has already been marked as committed, we know 386 // it has been fsynced on a quorum of other nodes, and we are 387 // "catching up" with the rest. Hence we do not need to fsync. 388 boolean isLagging = lastTxnId <= committedTxnId.get(); 389 boolean shouldFsync = !isLagging; 390 391 curSegment.writeRaw(records, 0, records.length); 392 curSegment.setReadyToFlush(); 393 StopWatch sw = new StopWatch(); 394 sw.start(); 395 curSegment.flush(shouldFsync); 396 sw.stop(); 397 398 long nanoSeconds = sw.now(); 399 metrics.addSync( 400 TimeUnit.MICROSECONDS.convert(nanoSeconds, TimeUnit.NANOSECONDS)); 401 long milliSeconds = TimeUnit.MILLISECONDS.convert( 402 nanoSeconds, TimeUnit.NANOSECONDS); 403 404 if (milliSeconds > WARN_SYNC_MILLIS_THRESHOLD) { 405 LOG.warn("Sync of transaction range " + firstTxnId + "-" + lastTxnId + 406 " took " + milliSeconds + "ms"); 407 } 408 409 if (isLagging) { 410 // This batch of edits has already been committed on a quorum of other 411 // nodes. So, we are in "catch up" mode. This gets its own metric. 412 metrics.batchesWrittenWhileLagging.incr(1); 413 } 414 415 metrics.batchesWritten.incr(1); 416 metrics.bytesWritten.incr(records.length); 417 metrics.txnsWritten.incr(numTxns); 418 419 updateHighestWrittenTxId(lastTxnId); 420 nextTxId = lastTxnId + 1; 421 lastJournalTimestamp = Time.now(); 422 } 423 424 public void heartbeat(RequestInfo reqInfo) throws IOException { 425 checkRequest(reqInfo); 426 } 427 428 /** 429 * Ensure that the given request is coming from the correct writer and in-order. 430 * @param reqInfo the request info 431 * @throws IOException if the request is invalid. 432 */ 433 private synchronized void checkRequest(RequestInfo reqInfo) throws IOException { 434 // Invariant 25 from ZAB paper 435 if (reqInfo.getEpoch() < lastPromisedEpoch.get()) { 436 throw new IOException("IPC's epoch " + reqInfo.getEpoch() + 437 " is less than the last promised epoch " + 438 lastPromisedEpoch.get()); 439 } else if (reqInfo.getEpoch() > lastPromisedEpoch.get()) { 440 // A newer client has arrived. Fence any previous writers by updating 441 // the promise. 442 updateLastPromisedEpoch(reqInfo.getEpoch()); 443 } 444 445 // Ensure that the IPCs are arriving in-order as expected. 446 checkSync(reqInfo.getIpcSerialNumber() > currentEpochIpcSerial, 447 "IPC serial %s from client %s was not higher than prior highest " + 448 "IPC serial %s", reqInfo.getIpcSerialNumber(), 449 Server.getRemoteIp(), 450 currentEpochIpcSerial); 451 currentEpochIpcSerial = reqInfo.getIpcSerialNumber(); 452 453 if (reqInfo.hasCommittedTxId()) { 454 Preconditions.checkArgument( 455 reqInfo.getCommittedTxId() >= committedTxnId.get(), 456 "Client trying to move committed txid backward from " + 457 committedTxnId.get() + " to " + reqInfo.getCommittedTxId()); 458 459 committedTxnId.set(reqInfo.getCommittedTxId()); 460 } 461 } 462 463 private synchronized void checkWriteRequest(RequestInfo reqInfo) throws IOException { 464 checkRequest(reqInfo); 465 466 if (reqInfo.getEpoch() != lastWriterEpoch.get()) { 467 throw new IOException("IPC's epoch " + reqInfo.getEpoch() + 468 " is not the current writer epoch " + 469 lastWriterEpoch.get()); 470 } 471 } 472 473 public synchronized boolean isFormatted() { 474 return storage.isFormatted(); 475 } 476 477 private void checkFormatted() throws JournalNotFormattedException { 478 if (!isFormatted()) { 479 throw new JournalNotFormattedException("Journal " + 480 storage.getSingularStorageDir() + " not formatted"); 481 } 482 } 483 484 /** 485 * @throws JournalOutOfSyncException if the given expression is not true. 486 * The message of the exception is formatted using the 'msg' and 487 * 'formatArgs' parameters. 488 */ 489 private void checkSync(boolean expression, String msg, 490 Object... formatArgs) throws JournalOutOfSyncException { 491 if (!expression) { 492 throw new JournalOutOfSyncException(String.format(msg, formatArgs)); 493 } 494 } 495 496 /** 497 * @throws AssertionError if the given expression is not true. 498 * The message of the exception is formatted using the 'msg' and 499 * 'formatArgs' parameters. 500 * 501 * This should be used in preference to Java's built-in assert in 502 * non-performance-critical paths, where a failure of this invariant 503 * might cause the protocol to lose data. 504 */ 505 private void alwaysAssert(boolean expression, String msg, 506 Object... formatArgs) { 507 if (!expression) { 508 throw new AssertionError(String.format(msg, formatArgs)); 509 } 510 } 511 512 /** 513 * Start a new segment at the given txid. The previous segment 514 * must have already been finalized. 515 */ 516 public synchronized void startLogSegment(RequestInfo reqInfo, long txid, 517 int layoutVersion) throws IOException { 518 assert fjm != null; 519 checkFormatted(); 520 checkRequest(reqInfo); 521 522 if (curSegment != null) { 523 LOG.warn("Client is requesting a new log segment " + txid + 524 " though we are already writing " + curSegment + ". " + 525 "Aborting the current segment in order to begin the new one."); 526 // The writer may have lost a connection to us and is now 527 // re-connecting after the connection came back. 528 // We should abort our own old segment. 529 abortCurSegment(); 530 } 531 532 // Paranoid sanity check: we should never overwrite a finalized log file. 533 // Additionally, if it's in-progress, it should have at most 1 transaction. 534 // This can happen if the writer crashes exactly at the start of a segment. 535 EditLogFile existing = fjm.getLogFile(txid); 536 if (existing != null) { 537 if (!existing.isInProgress()) { 538 throw new IllegalStateException("Already have a finalized segment " + 539 existing + " beginning at " + txid); 540 } 541 542 // If it's in-progress, it should only contain one transaction, 543 // because the "startLogSegment" transaction is written alone at the 544 // start of each segment. 545 existing.scanLog(Long.MAX_VALUE, false); 546 if (existing.getLastTxId() != existing.getFirstTxId()) { 547 throw new IllegalStateException("The log file " + 548 existing + " seems to contain valid transactions"); 549 } 550 } 551 552 long curLastWriterEpoch = lastWriterEpoch.get(); 553 if (curLastWriterEpoch != reqInfo.getEpoch()) { 554 LOG.info("Updating lastWriterEpoch from " + curLastWriterEpoch + 555 " to " + reqInfo.getEpoch() + " for client " + 556 Server.getRemoteIp()); 557 lastWriterEpoch.set(reqInfo.getEpoch()); 558 } 559 560 // The fact that we are starting a segment at this txid indicates 561 // that any previous recovery for this same segment was aborted. 562 // Otherwise, no writer would have started writing. So, we can 563 // remove the record of the older segment here. 564 purgePaxosDecision(txid); 565 566 curSegment = fjm.startLogSegment(txid, layoutVersion); 567 curSegmentTxId = txid; 568 nextTxId = txid; 569 } 570 571 /** 572 * Finalize the log segment at the given transaction ID. 573 */ 574 public synchronized void finalizeLogSegment(RequestInfo reqInfo, long startTxId, 575 long endTxId) throws IOException { 576 checkFormatted(); 577 checkRequest(reqInfo); 578 579 boolean needsValidation = true; 580 581 // Finalizing the log that the writer was just writing. 582 if (startTxId == curSegmentTxId) { 583 if (curSegment != null) { 584 curSegment.close(); 585 curSegment = null; 586 curSegmentTxId = HdfsServerConstants.INVALID_TXID; 587 } 588 589 checkSync(nextTxId == endTxId + 1, 590 "Trying to finalize in-progress log segment %s to end at " + 591 "txid %s but only written up to txid %s", 592 startTxId, endTxId, nextTxId - 1); 593 // No need to validate the edit log if the client is finalizing 594 // the log segment that it was just writing to. 595 needsValidation = false; 596 } 597 598 FileJournalManager.EditLogFile elf = fjm.getLogFile(startTxId); 599 if (elf == null) { 600 throw new JournalOutOfSyncException("No log file to finalize at " + 601 "transaction ID " + startTxId); 602 } 603 604 if (elf.isInProgress()) { 605 if (needsValidation) { 606 LOG.info("Validating log segment " + elf.getFile() + " about to be " + 607 "finalized"); 608 elf.scanLog(Long.MAX_VALUE, false); 609 610 checkSync(elf.getLastTxId() == endTxId, 611 "Trying to finalize in-progress log segment %s to end at " + 612 "txid %s but log %s on disk only contains up to txid %s", 613 startTxId, endTxId, elf.getFile(), elf.getLastTxId()); 614 } 615 fjm.finalizeLogSegment(startTxId, endTxId); 616 } else { 617 Preconditions.checkArgument(endTxId == elf.getLastTxId(), 618 "Trying to re-finalize already finalized log " + 619 elf + " with different endTxId " + endTxId); 620 } 621 622 // Once logs are finalized, a different length will never be decided. 623 // During recovery, we treat a finalized segment the same as an accepted 624 // recovery. Thus, we no longer need to keep track of the previously- 625 // accepted decision. The existence of the finalized log segment is enough. 626 purgePaxosDecision(elf.getFirstTxId()); 627 } 628 629 /** 630 * @see JournalManager#purgeLogsOlderThan(long) 631 */ 632 public synchronized void purgeLogsOlderThan(RequestInfo reqInfo, 633 long minTxIdToKeep) throws IOException { 634 checkFormatted(); 635 checkRequest(reqInfo); 636 637 storage.purgeDataOlderThan(minTxIdToKeep); 638 } 639 640 /** 641 * Remove the previously-recorded 'accepted recovery' information 642 * for a given log segment, once it is no longer necessary. 643 * @param segmentTxId the transaction ID to purge 644 * @throws IOException if the file could not be deleted 645 */ 646 private void purgePaxosDecision(long segmentTxId) throws IOException { 647 File paxosFile = storage.getPaxosFile(segmentTxId); 648 if (paxosFile.exists()) { 649 if (!paxosFile.delete()) { 650 throw new IOException("Unable to delete paxos file " + paxosFile); 651 } 652 } 653 } 654 655 /** 656 * @see QJournalProtocol#getEditLogManifest(String, long, boolean) 657 */ 658 public RemoteEditLogManifest getEditLogManifest(long sinceTxId, 659 boolean inProgressOk) throws IOException { 660 // No need to checkRequest() here - anyone may ask for the list 661 // of segments. 662 checkFormatted(); 663 664 List<RemoteEditLog> logs = fjm.getRemoteEditLogs(sinceTxId, inProgressOk); 665 666 if (inProgressOk) { 667 RemoteEditLog log = null; 668 for (Iterator<RemoteEditLog> iter = logs.iterator(); iter.hasNext();) { 669 log = iter.next(); 670 if (log.isInProgress()) { 671 iter.remove(); 672 break; 673 } 674 } 675 if (log != null && log.isInProgress()) { 676 logs.add(new RemoteEditLog(log.getStartTxId(), getHighestWrittenTxId(), 677 true)); 678 } 679 } 680 681 return new RemoteEditLogManifest(logs); 682 } 683 684 /** 685 * @return the current state of the given segment, or null if the 686 * segment does not exist. 687 */ 688 @VisibleForTesting 689 SegmentStateProto getSegmentInfo(long segmentTxId) 690 throws IOException { 691 EditLogFile elf = fjm.getLogFile(segmentTxId); 692 if (elf == null) { 693 return null; 694 } 695 if (elf.isInProgress()) { 696 elf.scanLog(Long.MAX_VALUE, false); 697 } 698 if (elf.getLastTxId() == HdfsServerConstants.INVALID_TXID) { 699 LOG.info("Edit log file " + elf + " appears to be empty. " + 700 "Moving it aside..."); 701 elf.moveAsideEmptyFile(); 702 return null; 703 } 704 SegmentStateProto ret = SegmentStateProto.newBuilder() 705 .setStartTxId(segmentTxId) 706 .setEndTxId(elf.getLastTxId()) 707 .setIsInProgress(elf.isInProgress()) 708 .build(); 709 LOG.info("getSegmentInfo(" + segmentTxId + "): " + elf + " -> " + 710 TextFormat.shortDebugString(ret)); 711 return ret; 712 } 713 714 /** 715 * @see QJournalProtocol#prepareRecovery(RequestInfo, long) 716 */ 717 public synchronized PrepareRecoveryResponseProto prepareRecovery( 718 RequestInfo reqInfo, long segmentTxId) throws IOException { 719 checkFormatted(); 720 checkRequest(reqInfo); 721 722 abortCurSegment(); 723 724 PrepareRecoveryResponseProto.Builder builder = 725 PrepareRecoveryResponseProto.newBuilder(); 726 727 PersistedRecoveryPaxosData previouslyAccepted = getPersistedPaxosData(segmentTxId); 728 completeHalfDoneAcceptRecovery(previouslyAccepted); 729 730 SegmentStateProto segInfo = getSegmentInfo(segmentTxId); 731 boolean hasFinalizedSegment = segInfo != null && !segInfo.getIsInProgress(); 732 733 if (previouslyAccepted != null && !hasFinalizedSegment) { 734 SegmentStateProto acceptedState = previouslyAccepted.getSegmentState(); 735 assert acceptedState.getEndTxId() == segInfo.getEndTxId() : 736 "prev accepted: " + TextFormat.shortDebugString(previouslyAccepted)+ "\n" + 737 "on disk: " + TextFormat.shortDebugString(segInfo); 738 739 builder.setAcceptedInEpoch(previouslyAccepted.getAcceptedInEpoch()) 740 .setSegmentState(previouslyAccepted.getSegmentState()); 741 } else { 742 if (segInfo != null) { 743 builder.setSegmentState(segInfo); 744 } 745 } 746 747 builder.setLastWriterEpoch(lastWriterEpoch.get()); 748 if (committedTxnId.get() != HdfsServerConstants.INVALID_TXID) { 749 builder.setLastCommittedTxId(committedTxnId.get()); 750 } 751 752 PrepareRecoveryResponseProto resp = builder.build(); 753 LOG.info("Prepared recovery for segment " + segmentTxId + ": " + 754 TextFormat.shortDebugString(resp)); 755 return resp; 756 } 757 758 /** 759 * @see QJournalProtocol#acceptRecovery(RequestInfo, QJournalProtocolProtos.SegmentStateProto, URL) 760 */ 761 public synchronized void acceptRecovery(RequestInfo reqInfo, 762 SegmentStateProto segment, URL fromUrl) 763 throws IOException { 764 checkFormatted(); 765 checkRequest(reqInfo); 766 767 abortCurSegment(); 768 769 long segmentTxId = segment.getStartTxId(); 770 771 // Basic sanity checks that the segment is well-formed and contains 772 // at least one transaction. 773 Preconditions.checkArgument(segment.getEndTxId() > 0 && 774 segment.getEndTxId() >= segmentTxId, 775 "bad recovery state for segment %s: %s", 776 segmentTxId, TextFormat.shortDebugString(segment)); 777 778 PersistedRecoveryPaxosData oldData = getPersistedPaxosData(segmentTxId); 779 PersistedRecoveryPaxosData newData = PersistedRecoveryPaxosData.newBuilder() 780 .setAcceptedInEpoch(reqInfo.getEpoch()) 781 .setSegmentState(segment) 782 .build(); 783 784 // If we previously acted on acceptRecovery() from a higher-numbered writer, 785 // this call is out of sync. We should never actually trigger this, since the 786 // checkRequest() call above should filter non-increasing epoch numbers. 787 if (oldData != null) { 788 alwaysAssert(oldData.getAcceptedInEpoch() <= reqInfo.getEpoch(), 789 "Bad paxos transition, out-of-order epochs.\nOld: %s\nNew: %s\n", 790 oldData, newData); 791 } 792 793 File syncedFile = null; 794 795 SegmentStateProto currentSegment = getSegmentInfo(segmentTxId); 796 if (currentSegment == null || 797 currentSegment.getEndTxId() != segment.getEndTxId()) { 798 if (currentSegment == null) { 799 LOG.info("Synchronizing log " + TextFormat.shortDebugString(segment) + 800 ": no current segment in place"); 801 802 // Update the highest txid for lag metrics 803 updateHighestWrittenTxId(Math.max(segment.getEndTxId(), 804 highestWrittenTxId)); 805 } else { 806 LOG.info("Synchronizing log " + TextFormat.shortDebugString(segment) + 807 ": old segment " + TextFormat.shortDebugString(currentSegment) + 808 " is not the right length"); 809 810 // Paranoid sanity check: if the new log is shorter than the log we 811 // currently have, we should not end up discarding any transactions 812 // which are already Committed. 813 if (txnRange(currentSegment).containsLong(committedTxnId.get()) && 814 !txnRange(segment).containsLong(committedTxnId.get())) { 815 throw new AssertionError( 816 "Cannot replace segment " + 817 TextFormat.shortDebugString(currentSegment) + 818 " with new segment " + 819 TextFormat.shortDebugString(segment) + 820 ": would discard already-committed txn " + 821 committedTxnId.get()); 822 } 823 824 // Another paranoid check: we should not be asked to synchronize a log 825 // on top of a finalized segment. 826 alwaysAssert(currentSegment.getIsInProgress(), 827 "Should never be asked to synchronize a different log on top of an " + 828 "already-finalized segment"); 829 830 // If we're shortening the log, update our highest txid 831 // used for lag metrics. 832 if (txnRange(currentSegment).containsLong(highestWrittenTxId)) { 833 updateHighestWrittenTxId(segment.getEndTxId()); 834 } 835 } 836 syncedFile = syncLog(reqInfo, segment, fromUrl); 837 838 } else { 839 LOG.info("Skipping download of log " + 840 TextFormat.shortDebugString(segment) + 841 ": already have up-to-date logs"); 842 } 843 844 // This is one of the few places in the protocol where we have a single 845 // RPC that results in two distinct actions: 846 // 847 // - 1) Downloads the new log segment data (above) 848 // - 2) Records the new Paxos data about the synchronized segment (below) 849 // 850 // These need to be treated as a transaction from the perspective 851 // of any external process. We do this by treating the persistPaxosData() 852 // success as the "commit" of an atomic transaction. If we fail before 853 // this point, the downloaded edit log will only exist at a temporary 854 // path, and thus not change any externally visible state. If we fail 855 // after this point, then any future prepareRecovery() call will see 856 // the Paxos data, and by calling completeHalfDoneAcceptRecovery() will 857 // roll forward the rename of the referenced log file. 858 // 859 // See also: HDFS-3955 860 // 861 // The fault points here are exercised by the randomized fault injection 862 // test case to ensure that this atomic "transaction" operates correctly. 863 JournalFaultInjector.get().beforePersistPaxosData(); 864 persistPaxosData(segmentTxId, newData); 865 JournalFaultInjector.get().afterPersistPaxosData(); 866 867 if (syncedFile != null) { 868 FileUtil.replaceFile(syncedFile, 869 storage.getInProgressEditLog(segmentTxId)); 870 } 871 872 LOG.info("Accepted recovery for segment " + segmentTxId + ": " + 873 TextFormat.shortDebugString(newData)); 874 } 875 876 private LongRange txnRange(SegmentStateProto seg) { 877 Preconditions.checkArgument(seg.hasEndTxId(), 878 "invalid segment: %s", seg); 879 return new LongRange(seg.getStartTxId(), seg.getEndTxId()); 880 } 881 882 /** 883 * Synchronize a log segment from another JournalNode. The log is 884 * downloaded from the provided URL into a temporary location on disk, 885 * which is named based on the current request's epoch. 886 * 887 * @return the temporary location of the downloaded file 888 */ 889 private File syncLog(RequestInfo reqInfo, 890 final SegmentStateProto segment, final URL url) throws IOException { 891 final File tmpFile = storage.getSyncLogTemporaryFile( 892 segment.getStartTxId(), reqInfo.getEpoch()); 893 final List<File> localPaths = ImmutableList.of(tmpFile); 894 895 LOG.info("Synchronizing log " + 896 TextFormat.shortDebugString(segment) + " from " + url); 897 SecurityUtil.doAsLoginUser( 898 new PrivilegedExceptionAction<Void>() { 899 @Override 900 public Void run() throws IOException { 901 // We may have lost our ticket since last checkpoint, log in again, just in case 902 if (UserGroupInformation.isSecurityEnabled()) { 903 UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab(); 904 } 905 906 boolean success = false; 907 try { 908 TransferFsImage.doGetUrl(url, localPaths, storage, true); 909 assert tmpFile.exists(); 910 success = true; 911 } finally { 912 if (!success) { 913 if (!tmpFile.delete()) { 914 LOG.warn("Failed to delete temporary file " + tmpFile); 915 } 916 } 917 } 918 return null; 919 } 920 }); 921 return tmpFile; 922 } 923 924 925 /** 926 * In the case the node crashes in between downloading a log segment 927 * and persisting the associated paxos recovery data, the log segment 928 * will be left in its temporary location on disk. Given the paxos data, 929 * we can check if this was indeed the case, and "roll forward" 930 * the atomic operation. 931 * 932 * See the inline comments in 933 * {@link #acceptRecovery(RequestInfo, SegmentStateProto, URL)} for more 934 * details. 935 * 936 * @throws IOException if the temporary file is unable to be renamed into 937 * place 938 */ 939 private void completeHalfDoneAcceptRecovery( 940 PersistedRecoveryPaxosData paxosData) throws IOException { 941 if (paxosData == null) { 942 return; 943 } 944 945 long segmentId = paxosData.getSegmentState().getStartTxId(); 946 long epoch = paxosData.getAcceptedInEpoch(); 947 948 File tmp = storage.getSyncLogTemporaryFile(segmentId, epoch); 949 950 if (tmp.exists()) { 951 File dst = storage.getInProgressEditLog(segmentId); 952 LOG.info("Rolling forward previously half-completed synchronization: " + 953 tmp + " -> " + dst); 954 FileUtil.replaceFile(tmp, dst); 955 } 956 } 957 958 /** 959 * Retrieve the persisted data for recovering the given segment from disk. 960 */ 961 private PersistedRecoveryPaxosData getPersistedPaxosData(long segmentTxId) 962 throws IOException { 963 File f = storage.getPaxosFile(segmentTxId); 964 if (!f.exists()) { 965 // Default instance has no fields filled in (they're optional) 966 return null; 967 } 968 969 InputStream in = new FileInputStream(f); 970 try { 971 PersistedRecoveryPaxosData ret = PersistedRecoveryPaxosData.parseDelimitedFrom(in); 972 Preconditions.checkState(ret != null && 973 ret.getSegmentState().getStartTxId() == segmentTxId, 974 "Bad persisted data for segment %s: %s", 975 segmentTxId, ret); 976 return ret; 977 } finally { 978 IOUtils.closeStream(in); 979 } 980 } 981 982 /** 983 * Persist data for recovering the given segment from disk. 984 */ 985 private void persistPaxosData(long segmentTxId, 986 PersistedRecoveryPaxosData newData) throws IOException { 987 File f = storage.getPaxosFile(segmentTxId); 988 boolean success = false; 989 AtomicFileOutputStream fos = new AtomicFileOutputStream(f); 990 try { 991 newData.writeDelimitedTo(fos); 992 fos.write('\n'); 993 // Write human-readable data after the protobuf. This is only 994 // to assist in debugging -- it's not parsed at all. 995 OutputStreamWriter writer = new OutputStreamWriter(fos, Charsets.UTF_8); 996 997 writer.write(String.valueOf(newData)); 998 writer.write('\n'); 999 writer.flush(); 1000 1001 fos.flush(); 1002 success = true; 1003 } finally { 1004 if (success) { 1005 IOUtils.closeStream(fos); 1006 } else { 1007 fos.abort(); 1008 } 1009 } 1010 } 1011 1012 synchronized void discardSegments(long startTxId) throws IOException { 1013 storage.getJournalManager().discardSegments(startTxId); 1014 // we delete all the segments after the startTxId. let's reset committedTxnId 1015 committedTxnId.set(startTxId - 1); 1016 } 1017 1018 public synchronized void doPreUpgrade() throws IOException { 1019 // Do not hold file lock on committedTxnId, because the containing 1020 // directory will be renamed. It will be reopened lazily on next access. 1021 IOUtils.cleanup(LOG, committedTxnId); 1022 storage.getJournalManager().doPreUpgrade(); 1023 } 1024 1025 public synchronized void doUpgrade(StorageInfo sInfo) throws IOException { 1026 long oldCTime = storage.getCTime(); 1027 storage.cTime = sInfo.cTime; 1028 int oldLV = storage.getLayoutVersion(); 1029 storage.layoutVersion = sInfo.layoutVersion; 1030 LOG.info("Starting upgrade of edits directory: " 1031 + ".\n old LV = " + oldLV 1032 + "; old CTime = " + oldCTime 1033 + ".\n new LV = " + storage.getLayoutVersion() 1034 + "; new CTime = " + storage.getCTime()); 1035 storage.getJournalManager().doUpgrade(storage); 1036 storage.createPaxosDir(); 1037 1038 // Copy over the contents of the epoch data files to the new dir. 1039 File currentDir = storage.getSingularStorageDir().getCurrentDir(); 1040 File previousDir = storage.getSingularStorageDir().getPreviousDir(); 1041 1042 PersistentLongFile prevLastPromisedEpoch = new PersistentLongFile( 1043 new File(previousDir, LAST_PROMISED_FILENAME), 0); 1044 PersistentLongFile prevLastWriterEpoch = new PersistentLongFile( 1045 new File(previousDir, LAST_WRITER_EPOCH), 0); 1046 BestEffortLongFile prevCommittedTxnId = new BestEffortLongFile( 1047 new File(previousDir, COMMITTED_TXID_FILENAME), 1048 HdfsServerConstants.INVALID_TXID); 1049 1050 lastPromisedEpoch = new PersistentLongFile( 1051 new File(currentDir, LAST_PROMISED_FILENAME), 0); 1052 lastWriterEpoch = new PersistentLongFile( 1053 new File(currentDir, LAST_WRITER_EPOCH), 0); 1054 committedTxnId = new BestEffortLongFile( 1055 new File(currentDir, COMMITTED_TXID_FILENAME), 1056 HdfsServerConstants.INVALID_TXID); 1057 1058 try { 1059 lastPromisedEpoch.set(prevLastPromisedEpoch.get()); 1060 lastWriterEpoch.set(prevLastWriterEpoch.get()); 1061 committedTxnId.set(prevCommittedTxnId.get()); 1062 } finally { 1063 IOUtils.cleanup(LOG, prevCommittedTxnId); 1064 } 1065 } 1066 1067 public synchronized void doFinalize() throws IOException { 1068 LOG.info("Finalizing upgrade for journal " 1069 + storage.getRoot() + "." 1070 + (storage.getLayoutVersion()==0 ? "" : 1071 "\n cur LV = " + storage.getLayoutVersion() 1072 + "; cur CTime = " + storage.getCTime())); 1073 storage.getJournalManager().doFinalize(); 1074 } 1075 1076 public Boolean canRollBack(StorageInfo storage, StorageInfo prevStorage, 1077 int targetLayoutVersion) throws IOException { 1078 return this.storage.getJournalManager().canRollBack(storage, prevStorage, 1079 targetLayoutVersion); 1080 } 1081 1082 public synchronized void doRollback() throws IOException { 1083 // Do not hold file lock on committedTxnId, because the containing 1084 // directory will be renamed. It will be reopened lazily on next access. 1085 IOUtils.cleanup(LOG, committedTxnId); 1086 storage.getJournalManager().doRollback(); 1087 } 1088 1089 public Long getJournalCTime() throws IOException { 1090 return storage.getJournalManager().getJournalCTime(); 1091 } 1092}