001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hdfs.qjournal.server;
019
020import java.io.Closeable;
021import java.io.File;
022import java.io.FileInputStream;
023import java.io.IOException;
024import java.io.InputStream;
025import java.io.OutputStreamWriter;
026import java.net.URL;
027import java.security.PrivilegedExceptionAction;
028import java.util.Iterator;
029import java.util.List;
030import java.util.concurrent.TimeUnit;
031
032import org.apache.commons.lang.math.LongRange;
033import org.apache.commons.logging.Log;
034import org.apache.commons.logging.LogFactory;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.FileUtil;
037import org.apache.hadoop.hdfs.qjournal.protocol.JournalNotFormattedException;
038import org.apache.hadoop.hdfs.qjournal.protocol.JournalOutOfSyncException;
039import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
040import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
041import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
042import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PersistedRecoveryPaxosData;
043import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
044import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
045import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
046import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
047import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
048import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
049import org.apache.hadoop.hdfs.server.common.StorageInfo;
050import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
051import org.apache.hadoop.hdfs.server.namenode.FileJournalManager;
052import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
053import org.apache.hadoop.hdfs.server.namenode.JournalManager;
054import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
055import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
056import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
057import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
058import org.apache.hadoop.hdfs.util.AtomicFileOutputStream;
059import org.apache.hadoop.hdfs.util.BestEffortLongFile;
060import org.apache.hadoop.hdfs.util.PersistentLongFile;
061import org.apache.hadoop.io.IOUtils;
062import org.apache.hadoop.ipc.Server;
063import org.apache.hadoop.security.SecurityUtil;
064import org.apache.hadoop.security.UserGroupInformation;
065import org.apache.hadoop.util.StopWatch;
066import org.apache.hadoop.util.Time;
067
068import com.google.common.annotations.VisibleForTesting;
069import com.google.common.base.Charsets;
070import com.google.common.base.Preconditions;
071import com.google.common.collect.ImmutableList;
072import com.google.protobuf.TextFormat;
073
074/**
075 * A JournalNode can manage journals for several clusters at once.
076 * Each such journal is entirely independent despite being hosted by
077 * the same JVM.
078 */
079public class Journal implements Closeable {
080  static final Log LOG = LogFactory.getLog(Journal.class);
081
082
083  // Current writing state
084  private EditLogOutputStream curSegment;
085  private long curSegmentTxId = HdfsServerConstants.INVALID_TXID;
086  private long nextTxId = HdfsServerConstants.INVALID_TXID;
087  private long highestWrittenTxId = 0;
088  
089  private final String journalId;
090  
091  private final JNStorage storage;
092
093  /**
094   * When a new writer comes along, it asks each node to promise
095   * to ignore requests from any previous writer, as identified
096   * by epoch number. In order to make such a promise, the epoch
097   * number of that writer is stored persistently on disk.
098   */
099  private PersistentLongFile lastPromisedEpoch;
100
101  /**
102   * Each IPC that comes from a given client contains a serial number
103   * which only increases from the client's perspective. Whenever
104   * we switch epochs, we reset this back to -1. Whenever an IPC
105   * comes from a client, we ensure that it is strictly higher
106   * than any previous IPC. This guards against any bugs in the IPC
107   * layer that would re-order IPCs or cause a stale retry from an old
108   * request to resurface and confuse things.
109   */
110  private long currentEpochIpcSerial = -1;
111  
112  /**
113   * The epoch number of the last writer to actually write a transaction.
114   * This is used to differentiate log segments after a crash at the very
115   * beginning of a segment. See the the 'testNewerVersionOfSegmentWins'
116   * test case.
117   */
118  private PersistentLongFile lastWriterEpoch;
119  
120  /**
121   * Lower-bound on the last committed transaction ID. This is not
122   * depended upon for correctness, but acts as a sanity check
123   * during the recovery procedures, and as a visibility mark
124   * for clients reading in-progress logs.
125   */
126  private BestEffortLongFile committedTxnId;
127  
128  public static final String LAST_PROMISED_FILENAME = "last-promised-epoch";
129  public static final String LAST_WRITER_EPOCH = "last-writer-epoch";
130  private static final String COMMITTED_TXID_FILENAME = "committed-txid";
131  
132  private final FileJournalManager fjm;
133
134  private final JournalMetrics metrics;
135
136  private long lastJournalTimestamp = 0;
137
138  /**
139   * Time threshold for sync calls, beyond which a warning should be logged to the console.
140   */
141  private static final int WARN_SYNC_MILLIS_THRESHOLD = 1000;
142
143  Journal(Configuration conf, File logDir, String journalId,
144      StartupOption startOpt, StorageErrorReporter errorReporter)
145      throws IOException {
146    storage = new JNStorage(conf, logDir, startOpt, errorReporter);
147    this.journalId = journalId;
148
149    refreshCachedData();
150    
151    this.fjm = storage.getJournalManager();
152    
153    this.metrics = JournalMetrics.create(this);
154    
155    EditLogFile latest = scanStorageForLatestEdits();
156    if (latest != null) {
157      updateHighestWrittenTxId(latest.getLastTxId());
158    }
159  }
160
161  /**
162   * Reload any data that may have been cached. This is necessary
163   * when we first load the Journal, but also after any formatting
164   * operation, since the cached data is no longer relevant.
165   */
166  private synchronized void refreshCachedData() {
167    IOUtils.closeStream(committedTxnId);
168    
169    File currentDir = storage.getSingularStorageDir().getCurrentDir();
170    this.lastPromisedEpoch = new PersistentLongFile(
171        new File(currentDir, LAST_PROMISED_FILENAME), 0);
172    this.lastWriterEpoch = new PersistentLongFile(
173        new File(currentDir, LAST_WRITER_EPOCH), 0);
174    this.committedTxnId = new BestEffortLongFile(
175        new File(currentDir, COMMITTED_TXID_FILENAME),
176        HdfsServerConstants.INVALID_TXID);
177  }
178  
179  /**
180   * Scan the local storage directory, and return the segment containing
181   * the highest transaction.
182   * @return the EditLogFile with the highest transactions, or null
183   * if no files exist.
184   */
185  private synchronized EditLogFile scanStorageForLatestEdits() throws IOException {
186    if (!fjm.getStorageDirectory().getCurrentDir().exists()) {
187      return null;
188    }
189    
190    LOG.info("Scanning storage " + fjm);
191    List<EditLogFile> files = fjm.getLogFiles(0);
192    
193    while (!files.isEmpty()) {
194      EditLogFile latestLog = files.remove(files.size() - 1);
195      latestLog.scanLog(Long.MAX_VALUE, false);
196      LOG.info("Latest log is " + latestLog);
197      if (latestLog.getLastTxId() == HdfsServerConstants.INVALID_TXID) {
198        // the log contains no transactions
199        LOG.warn("Latest log " + latestLog + " has no transactions. " +
200            "moving it aside and looking for previous log");
201        latestLog.moveAsideEmptyFile();
202      } else {
203        return latestLog;
204      }
205    }
206    
207    LOG.info("No files in " + fjm);
208    return null;
209  }
210
211  /**
212   * Format the local storage with the given namespace.
213   */
214  void format(NamespaceInfo nsInfo) throws IOException {
215    Preconditions.checkState(nsInfo.getNamespaceID() != 0,
216        "can't format with uninitialized namespace info: %s",
217        nsInfo);
218    LOG.info("Formatting " + this + " with namespace info: " +
219        nsInfo);
220    storage.format(nsInfo);
221    refreshCachedData();
222  }
223
224  /**
225   * Unlock and release resources.
226   */
227  @Override // Closeable
228  public void close() throws IOException {
229    storage.close();
230    IOUtils.closeStream(committedTxnId);
231    IOUtils.closeStream(curSegment);
232  }
233  
234  JNStorage getStorage() {
235    return storage;
236  }
237  
238  String getJournalId() {
239    return journalId;
240  }
241
242  /**
243   * @return the last epoch which this node has promised not to accept
244   * any lower epoch, or 0 if no promises have been made.
245   */
246  synchronized long getLastPromisedEpoch() throws IOException {
247    checkFormatted();
248    return lastPromisedEpoch.get();
249  }
250
251  synchronized public long getLastWriterEpoch() throws IOException {
252    checkFormatted();
253    return lastWriterEpoch.get();
254  }
255  
256  synchronized long getCommittedTxnIdForTests() throws IOException {
257    return committedTxnId.get();
258  }
259
260  synchronized long getLastJournalTimestamp() {
261    return lastJournalTimestamp;
262  }
263
264  synchronized long getCurrentLagTxns() throws IOException {
265    long committed = committedTxnId.get();
266    if (committed == 0) {
267      return 0;
268    }
269    
270    return Math.max(committed - highestWrittenTxId, 0L);
271  }
272  
273  synchronized long getHighestWrittenTxId() {
274    return highestWrittenTxId;
275  }
276
277  /**
278   * Update the highest Tx ID that has been written to the journal. Also update
279   * the {@link FileJournalManager#lastReadableTxId} of the underlying fjm.
280   * @param val The new value
281   */
282  private void updateHighestWrittenTxId(long val) {
283    highestWrittenTxId = val;
284    fjm.setLastReadableTxId(val);
285  }
286
287  @VisibleForTesting
288  JournalMetrics getMetricsForTests() {
289    return metrics;
290  }
291
292  /**
293   * Try to create a new epoch for this journal.
294   * @param nsInfo the namespace, which is verified for consistency or used to
295   * format, if the Journal has not yet been written to.
296   * @param epoch the epoch to start
297   * @return the status information necessary to begin recovery
298   * @throws IOException if the node has already made a promise to another
299   * writer with a higher epoch number, if the namespace is inconsistent,
300   * or if a disk error occurs.
301   */
302  synchronized NewEpochResponseProto newEpoch(
303      NamespaceInfo nsInfo, long epoch) throws IOException {
304
305    checkFormatted();
306    storage.checkConsistentNamespace(nsInfo);
307
308    // Check that the new epoch being proposed is in fact newer than
309    // any other that we've promised. 
310    if (epoch <= getLastPromisedEpoch()) {
311      throw new IOException("Proposed epoch " + epoch + " <= last promise " +
312          getLastPromisedEpoch());
313    }
314    
315    updateLastPromisedEpoch(epoch);
316    abortCurSegment();
317    
318    NewEpochResponseProto.Builder builder =
319        NewEpochResponseProto.newBuilder();
320
321    EditLogFile latestFile = scanStorageForLatestEdits();
322
323    if (latestFile != null) {
324      builder.setLastSegmentTxId(latestFile.getFirstTxId());
325    }
326    
327    return builder.build();
328  }
329
330  private void updateLastPromisedEpoch(long newEpoch) throws IOException {
331    LOG.info("Updating lastPromisedEpoch from " + lastPromisedEpoch.get() +
332        " to " + newEpoch + " for client " + Server.getRemoteIp());
333    lastPromisedEpoch.set(newEpoch);
334    
335    // Since we have a new writer, reset the IPC serial - it will start
336    // counting again from 0 for this writer.
337    currentEpochIpcSerial = -1;
338  }
339
340  private void abortCurSegment() throws IOException {
341    if (curSegment == null) {
342      return;
343    }
344    
345    curSegment.abort();
346    curSegment = null;
347    curSegmentTxId = HdfsServerConstants.INVALID_TXID;
348  }
349
350  /**
351   * Write a batch of edits to the journal.
352   * {@see QJournalProtocol#journal(RequestInfo, long, long, int, byte[])}
353   */
354  synchronized void journal(RequestInfo reqInfo,
355      long segmentTxId, long firstTxnId,
356      int numTxns, byte[] records) throws IOException {
357    checkFormatted();
358    checkWriteRequest(reqInfo);
359
360    checkSync(curSegment != null,
361        "Can't write, no segment open");
362    
363    if (curSegmentTxId != segmentTxId) {
364      // Sanity check: it is possible that the writer will fail IPCs
365      // on both the finalize() and then the start() of the next segment.
366      // This could cause us to continue writing to an old segment
367      // instead of rolling to a new one, which breaks one of the
368      // invariants in the design. If it happens, abort the segment
369      // and throw an exception.
370      JournalOutOfSyncException e = new JournalOutOfSyncException(
371          "Writer out of sync: it thinks it is writing segment " + segmentTxId
372          + " but current segment is " + curSegmentTxId);
373      abortCurSegment();
374      throw e;
375    }
376      
377    checkSync(nextTxId == firstTxnId,
378        "Can't write txid " + firstTxnId + " expecting nextTxId=" + nextTxId);
379    
380    long lastTxnId = firstTxnId + numTxns - 1;
381    if (LOG.isTraceEnabled()) {
382      LOG.trace("Writing txid " + firstTxnId + "-" + lastTxnId);
383    }
384
385    // If the edit has already been marked as committed, we know
386    // it has been fsynced on a quorum of other nodes, and we are
387    // "catching up" with the rest. Hence we do not need to fsync.
388    boolean isLagging = lastTxnId <= committedTxnId.get();
389    boolean shouldFsync = !isLagging;
390    
391    curSegment.writeRaw(records, 0, records.length);
392    curSegment.setReadyToFlush();
393    StopWatch sw = new StopWatch();
394    sw.start();
395    curSegment.flush(shouldFsync);
396    sw.stop();
397
398    long nanoSeconds = sw.now();
399    metrics.addSync(
400        TimeUnit.MICROSECONDS.convert(nanoSeconds, TimeUnit.NANOSECONDS));
401    long milliSeconds = TimeUnit.MILLISECONDS.convert(
402        nanoSeconds, TimeUnit.NANOSECONDS);
403
404    if (milliSeconds > WARN_SYNC_MILLIS_THRESHOLD) {
405      LOG.warn("Sync of transaction range " + firstTxnId + "-" + lastTxnId +
406               " took " + milliSeconds + "ms");
407    }
408
409    if (isLagging) {
410      // This batch of edits has already been committed on a quorum of other
411      // nodes. So, we are in "catch up" mode. This gets its own metric.
412      metrics.batchesWrittenWhileLagging.incr(1);
413    }
414    
415    metrics.batchesWritten.incr(1);
416    metrics.bytesWritten.incr(records.length);
417    metrics.txnsWritten.incr(numTxns);
418    
419    updateHighestWrittenTxId(lastTxnId);
420    nextTxId = lastTxnId + 1;
421    lastJournalTimestamp = Time.now();
422  }
423
424  public void heartbeat(RequestInfo reqInfo) throws IOException {
425    checkRequest(reqInfo);
426  }
427  
428  /**
429   * Ensure that the given request is coming from the correct writer and in-order.
430   * @param reqInfo the request info
431   * @throws IOException if the request is invalid.
432   */
433  private synchronized void checkRequest(RequestInfo reqInfo) throws IOException {
434    // Invariant 25 from ZAB paper
435    if (reqInfo.getEpoch() < lastPromisedEpoch.get()) {
436      throw new IOException("IPC's epoch " + reqInfo.getEpoch() +
437          " is less than the last promised epoch " +
438          lastPromisedEpoch.get());
439    } else if (reqInfo.getEpoch() > lastPromisedEpoch.get()) {
440      // A newer client has arrived. Fence any previous writers by updating
441      // the promise.
442      updateLastPromisedEpoch(reqInfo.getEpoch());
443    }
444    
445    // Ensure that the IPCs are arriving in-order as expected.
446    checkSync(reqInfo.getIpcSerialNumber() > currentEpochIpcSerial,
447        "IPC serial %s from client %s was not higher than prior highest " +
448        "IPC serial %s", reqInfo.getIpcSerialNumber(),
449        Server.getRemoteIp(),
450        currentEpochIpcSerial);
451    currentEpochIpcSerial = reqInfo.getIpcSerialNumber();
452
453    if (reqInfo.hasCommittedTxId()) {
454      Preconditions.checkArgument(
455          reqInfo.getCommittedTxId() >= committedTxnId.get(),
456          "Client trying to move committed txid backward from " +
457          committedTxnId.get() + " to " + reqInfo.getCommittedTxId());
458      
459      committedTxnId.set(reqInfo.getCommittedTxId());
460    }
461  }
462  
463  private synchronized void checkWriteRequest(RequestInfo reqInfo) throws IOException {
464    checkRequest(reqInfo);
465    
466    if (reqInfo.getEpoch() != lastWriterEpoch.get()) {
467      throw new IOException("IPC's epoch " + reqInfo.getEpoch() +
468          " is not the current writer epoch  " +
469          lastWriterEpoch.get());
470    }
471  }
472  
473  public synchronized boolean isFormatted() {
474    return storage.isFormatted();
475  }
476
477  private void checkFormatted() throws JournalNotFormattedException {
478    if (!isFormatted()) {
479      throw new JournalNotFormattedException("Journal " +
480          storage.getSingularStorageDir() + " not formatted");
481    }
482  }
483
484  /**
485   * @throws JournalOutOfSyncException if the given expression is not true.
486   * The message of the exception is formatted using the 'msg' and
487   * 'formatArgs' parameters.
488   */
489  private void checkSync(boolean expression, String msg,
490      Object... formatArgs) throws JournalOutOfSyncException {
491    if (!expression) {
492      throw new JournalOutOfSyncException(String.format(msg, formatArgs));
493    }
494  }
495
496  /**
497   * @throws AssertionError if the given expression is not true.
498   * The message of the exception is formatted using the 'msg' and
499   * 'formatArgs' parameters.
500   * 
501   * This should be used in preference to Java's built-in assert in
502   * non-performance-critical paths, where a failure of this invariant
503   * might cause the protocol to lose data. 
504   */
505  private void alwaysAssert(boolean expression, String msg,
506      Object... formatArgs) {
507    if (!expression) {
508      throw new AssertionError(String.format(msg, formatArgs));
509    }
510  }
511  
512  /**
513   * Start a new segment at the given txid. The previous segment
514   * must have already been finalized.
515   */
516  public synchronized void startLogSegment(RequestInfo reqInfo, long txid,
517      int layoutVersion) throws IOException {
518    assert fjm != null;
519    checkFormatted();
520    checkRequest(reqInfo);
521    
522    if (curSegment != null) {
523      LOG.warn("Client is requesting a new log segment " + txid + 
524          " though we are already writing " + curSegment + ". " +
525          "Aborting the current segment in order to begin the new one.");
526      // The writer may have lost a connection to us and is now
527      // re-connecting after the connection came back.
528      // We should abort our own old segment.
529      abortCurSegment();
530    }
531
532    // Paranoid sanity check: we should never overwrite a finalized log file.
533    // Additionally, if it's in-progress, it should have at most 1 transaction.
534    // This can happen if the writer crashes exactly at the start of a segment.
535    EditLogFile existing = fjm.getLogFile(txid);
536    if (existing != null) {
537      if (!existing.isInProgress()) {
538        throw new IllegalStateException("Already have a finalized segment " +
539            existing + " beginning at " + txid);
540      }
541      
542      // If it's in-progress, it should only contain one transaction,
543      // because the "startLogSegment" transaction is written alone at the
544      // start of each segment. 
545      existing.scanLog(Long.MAX_VALUE, false);
546      if (existing.getLastTxId() != existing.getFirstTxId()) {
547        throw new IllegalStateException("The log file " +
548            existing + " seems to contain valid transactions");
549      }
550    }
551    
552    long curLastWriterEpoch = lastWriterEpoch.get();
553    if (curLastWriterEpoch != reqInfo.getEpoch()) {
554      LOG.info("Updating lastWriterEpoch from " + curLastWriterEpoch +
555          " to " + reqInfo.getEpoch() + " for client " +
556          Server.getRemoteIp());
557      lastWriterEpoch.set(reqInfo.getEpoch());
558    }
559
560    // The fact that we are starting a segment at this txid indicates
561    // that any previous recovery for this same segment was aborted.
562    // Otherwise, no writer would have started writing. So, we can
563    // remove the record of the older segment here.
564    purgePaxosDecision(txid);
565    
566    curSegment = fjm.startLogSegment(txid, layoutVersion);
567    curSegmentTxId = txid;
568    nextTxId = txid;
569  }
570  
571  /**
572   * Finalize the log segment at the given transaction ID.
573   */
574  public synchronized void finalizeLogSegment(RequestInfo reqInfo, long startTxId,
575      long endTxId) throws IOException {
576    checkFormatted();
577    checkRequest(reqInfo);
578
579    boolean needsValidation = true;
580
581    // Finalizing the log that the writer was just writing.
582    if (startTxId == curSegmentTxId) {
583      if (curSegment != null) {
584        curSegment.close();
585        curSegment = null;
586        curSegmentTxId = HdfsServerConstants.INVALID_TXID;
587      }
588      
589      checkSync(nextTxId == endTxId + 1,
590          "Trying to finalize in-progress log segment %s to end at " +
591          "txid %s but only written up to txid %s",
592          startTxId, endTxId, nextTxId - 1);
593      // No need to validate the edit log if the client is finalizing
594      // the log segment that it was just writing to.
595      needsValidation = false;
596    }
597    
598    FileJournalManager.EditLogFile elf = fjm.getLogFile(startTxId);
599    if (elf == null) {
600      throw new JournalOutOfSyncException("No log file to finalize at " +
601          "transaction ID " + startTxId);
602    }
603
604    if (elf.isInProgress()) {
605      if (needsValidation) {
606        LOG.info("Validating log segment " + elf.getFile() + " about to be " +
607            "finalized");
608        elf.scanLog(Long.MAX_VALUE, false);
609  
610        checkSync(elf.getLastTxId() == endTxId,
611            "Trying to finalize in-progress log segment %s to end at " +
612            "txid %s but log %s on disk only contains up to txid %s",
613            startTxId, endTxId, elf.getFile(), elf.getLastTxId());
614      }
615      fjm.finalizeLogSegment(startTxId, endTxId);
616    } else {
617      Preconditions.checkArgument(endTxId == elf.getLastTxId(),
618          "Trying to re-finalize already finalized log " +
619              elf + " with different endTxId " + endTxId);
620    }
621
622    // Once logs are finalized, a different length will never be decided.
623    // During recovery, we treat a finalized segment the same as an accepted
624    // recovery. Thus, we no longer need to keep track of the previously-
625    // accepted decision. The existence of the finalized log segment is enough.
626    purgePaxosDecision(elf.getFirstTxId());
627  }
628  
629  /**
630   * @see JournalManager#purgeLogsOlderThan(long)
631   */
632  public synchronized void purgeLogsOlderThan(RequestInfo reqInfo,
633      long minTxIdToKeep) throws IOException {
634    checkFormatted();
635    checkRequest(reqInfo);
636    
637    storage.purgeDataOlderThan(minTxIdToKeep);
638  }
639  
640  /**
641   * Remove the previously-recorded 'accepted recovery' information
642   * for a given log segment, once it is no longer necessary. 
643   * @param segmentTxId the transaction ID to purge
644   * @throws IOException if the file could not be deleted
645   */
646  private void purgePaxosDecision(long segmentTxId) throws IOException {
647    File paxosFile = storage.getPaxosFile(segmentTxId);
648    if (paxosFile.exists()) {
649      if (!paxosFile.delete()) {
650        throw new IOException("Unable to delete paxos file " + paxosFile);
651      }
652    }
653  }
654
655  /**
656   * @see QJournalProtocol#getEditLogManifest(String, long, boolean)
657   */
658  public RemoteEditLogManifest getEditLogManifest(long sinceTxId,
659      boolean inProgressOk) throws IOException {
660    // No need to checkRequest() here - anyone may ask for the list
661    // of segments.
662    checkFormatted();
663    
664    List<RemoteEditLog> logs = fjm.getRemoteEditLogs(sinceTxId, inProgressOk);
665    
666    if (inProgressOk) {
667      RemoteEditLog log = null;
668      for (Iterator<RemoteEditLog> iter = logs.iterator(); iter.hasNext();) {
669        log = iter.next();
670        if (log.isInProgress()) {
671          iter.remove();
672          break;
673        }
674      }
675      if (log != null && log.isInProgress()) {
676        logs.add(new RemoteEditLog(log.getStartTxId(), getHighestWrittenTxId(),
677            true));
678      }
679    }
680    
681    return new RemoteEditLogManifest(logs);
682  }
683
684  /**
685   * @return the current state of the given segment, or null if the
686   * segment does not exist.
687   */
688  @VisibleForTesting
689  SegmentStateProto getSegmentInfo(long segmentTxId)
690      throws IOException {
691    EditLogFile elf = fjm.getLogFile(segmentTxId);
692    if (elf == null) {
693      return null;
694    }
695    if (elf.isInProgress()) {
696      elf.scanLog(Long.MAX_VALUE, false);
697    }
698    if (elf.getLastTxId() == HdfsServerConstants.INVALID_TXID) {
699      LOG.info("Edit log file " + elf + " appears to be empty. " +
700          "Moving it aside...");
701      elf.moveAsideEmptyFile();
702      return null;
703    }
704    SegmentStateProto ret = SegmentStateProto.newBuilder()
705        .setStartTxId(segmentTxId)
706        .setEndTxId(elf.getLastTxId())
707        .setIsInProgress(elf.isInProgress())
708        .build();
709    LOG.info("getSegmentInfo(" + segmentTxId + "): " + elf + " -> " +
710        TextFormat.shortDebugString(ret));
711    return ret;
712  }
713
714  /**
715   * @see QJournalProtocol#prepareRecovery(RequestInfo, long)
716   */
717  public synchronized PrepareRecoveryResponseProto prepareRecovery(
718      RequestInfo reqInfo, long segmentTxId) throws IOException {
719    checkFormatted();
720    checkRequest(reqInfo);
721    
722    abortCurSegment();
723    
724    PrepareRecoveryResponseProto.Builder builder =
725        PrepareRecoveryResponseProto.newBuilder();
726
727    PersistedRecoveryPaxosData previouslyAccepted = getPersistedPaxosData(segmentTxId);
728    completeHalfDoneAcceptRecovery(previouslyAccepted);
729
730    SegmentStateProto segInfo = getSegmentInfo(segmentTxId);
731    boolean hasFinalizedSegment = segInfo != null && !segInfo.getIsInProgress();
732
733    if (previouslyAccepted != null && !hasFinalizedSegment) {
734      SegmentStateProto acceptedState = previouslyAccepted.getSegmentState();
735      assert acceptedState.getEndTxId() == segInfo.getEndTxId() :
736            "prev accepted: " + TextFormat.shortDebugString(previouslyAccepted)+ "\n" +
737            "on disk:       " + TextFormat.shortDebugString(segInfo);
738            
739      builder.setAcceptedInEpoch(previouslyAccepted.getAcceptedInEpoch())
740        .setSegmentState(previouslyAccepted.getSegmentState());
741    } else {
742      if (segInfo != null) {
743        builder.setSegmentState(segInfo);
744      }
745    }
746    
747    builder.setLastWriterEpoch(lastWriterEpoch.get());
748    if (committedTxnId.get() != HdfsServerConstants.INVALID_TXID) {
749      builder.setLastCommittedTxId(committedTxnId.get());
750    }
751    
752    PrepareRecoveryResponseProto resp = builder.build();
753    LOG.info("Prepared recovery for segment " + segmentTxId + ": " +
754        TextFormat.shortDebugString(resp));
755    return resp;
756  }
757  
758  /**
759   * @see QJournalProtocol#acceptRecovery(RequestInfo, QJournalProtocolProtos.SegmentStateProto, URL)
760   */
761  public synchronized void acceptRecovery(RequestInfo reqInfo,
762      SegmentStateProto segment, URL fromUrl)
763      throws IOException {
764    checkFormatted();
765    checkRequest(reqInfo);
766    
767    abortCurSegment();
768
769    long segmentTxId = segment.getStartTxId();
770
771    // Basic sanity checks that the segment is well-formed and contains
772    // at least one transaction.
773    Preconditions.checkArgument(segment.getEndTxId() > 0 &&
774        segment.getEndTxId() >= segmentTxId,
775        "bad recovery state for segment %s: %s",
776        segmentTxId, TextFormat.shortDebugString(segment));
777    
778    PersistedRecoveryPaxosData oldData = getPersistedPaxosData(segmentTxId);
779    PersistedRecoveryPaxosData newData = PersistedRecoveryPaxosData.newBuilder()
780        .setAcceptedInEpoch(reqInfo.getEpoch())
781        .setSegmentState(segment)
782        .build();
783    
784    // If we previously acted on acceptRecovery() from a higher-numbered writer,
785    // this call is out of sync. We should never actually trigger this, since the
786    // checkRequest() call above should filter non-increasing epoch numbers.
787    if (oldData != null) {
788      alwaysAssert(oldData.getAcceptedInEpoch() <= reqInfo.getEpoch(),
789          "Bad paxos transition, out-of-order epochs.\nOld: %s\nNew: %s\n",
790          oldData, newData);
791    }
792    
793    File syncedFile = null;
794    
795    SegmentStateProto currentSegment = getSegmentInfo(segmentTxId);
796    if (currentSegment == null ||
797        currentSegment.getEndTxId() != segment.getEndTxId()) {
798      if (currentSegment == null) {
799        LOG.info("Synchronizing log " + TextFormat.shortDebugString(segment) +
800            ": no current segment in place");
801        
802        // Update the highest txid for lag metrics
803        updateHighestWrittenTxId(Math.max(segment.getEndTxId(),
804            highestWrittenTxId));
805      } else {
806        LOG.info("Synchronizing log " + TextFormat.shortDebugString(segment) +
807            ": old segment " + TextFormat.shortDebugString(currentSegment) +
808            " is not the right length");
809        
810        // Paranoid sanity check: if the new log is shorter than the log we
811        // currently have, we should not end up discarding any transactions
812        // which are already Committed.
813        if (txnRange(currentSegment).containsLong(committedTxnId.get()) &&
814            !txnRange(segment).containsLong(committedTxnId.get())) {
815          throw new AssertionError(
816              "Cannot replace segment " +
817              TextFormat.shortDebugString(currentSegment) +
818              " with new segment " +
819              TextFormat.shortDebugString(segment) + 
820              ": would discard already-committed txn " +
821              committedTxnId.get());
822        }
823        
824        // Another paranoid check: we should not be asked to synchronize a log
825        // on top of a finalized segment.
826        alwaysAssert(currentSegment.getIsInProgress(),
827            "Should never be asked to synchronize a different log on top of an " +
828            "already-finalized segment");
829        
830        // If we're shortening the log, update our highest txid
831        // used for lag metrics.
832        if (txnRange(currentSegment).containsLong(highestWrittenTxId)) {
833          updateHighestWrittenTxId(segment.getEndTxId());
834        }
835      }
836      syncedFile = syncLog(reqInfo, segment, fromUrl);
837      
838    } else {
839      LOG.info("Skipping download of log " +
840          TextFormat.shortDebugString(segment) +
841          ": already have up-to-date logs");
842    }
843    
844    // This is one of the few places in the protocol where we have a single
845    // RPC that results in two distinct actions:
846    //
847    // - 1) Downloads the new log segment data (above)
848    // - 2) Records the new Paxos data about the synchronized segment (below)
849    //
850    // These need to be treated as a transaction from the perspective
851    // of any external process. We do this by treating the persistPaxosData()
852    // success as the "commit" of an atomic transaction. If we fail before
853    // this point, the downloaded edit log will only exist at a temporary
854    // path, and thus not change any externally visible state. If we fail
855    // after this point, then any future prepareRecovery() call will see
856    // the Paxos data, and by calling completeHalfDoneAcceptRecovery() will
857    // roll forward the rename of the referenced log file.
858    //
859    // See also: HDFS-3955
860    //
861    // The fault points here are exercised by the randomized fault injection
862    // test case to ensure that this atomic "transaction" operates correctly.
863    JournalFaultInjector.get().beforePersistPaxosData();
864    persistPaxosData(segmentTxId, newData);
865    JournalFaultInjector.get().afterPersistPaxosData();
866
867    if (syncedFile != null) {
868      FileUtil.replaceFile(syncedFile,
869          storage.getInProgressEditLog(segmentTxId));
870    }
871
872    LOG.info("Accepted recovery for segment " + segmentTxId + ": " +
873        TextFormat.shortDebugString(newData));
874  }
875
876  private LongRange txnRange(SegmentStateProto seg) {
877    Preconditions.checkArgument(seg.hasEndTxId(),
878        "invalid segment: %s", seg);
879    return new LongRange(seg.getStartTxId(), seg.getEndTxId());
880  }
881
882  /**
883   * Synchronize a log segment from another JournalNode. The log is
884   * downloaded from the provided URL into a temporary location on disk,
885   * which is named based on the current request's epoch.
886   *
887   * @return the temporary location of the downloaded file
888   */
889  private File syncLog(RequestInfo reqInfo,
890      final SegmentStateProto segment, final URL url) throws IOException {
891    final File tmpFile = storage.getSyncLogTemporaryFile(
892        segment.getStartTxId(), reqInfo.getEpoch());
893    final List<File> localPaths = ImmutableList.of(tmpFile);
894
895    LOG.info("Synchronizing log " +
896        TextFormat.shortDebugString(segment) + " from " + url);
897    SecurityUtil.doAsLoginUser(
898        new PrivilegedExceptionAction<Void>() {
899          @Override
900          public Void run() throws IOException {
901            // We may have lost our ticket since last checkpoint, log in again, just in case
902            if (UserGroupInformation.isSecurityEnabled()) {
903              UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab();
904            }
905
906            boolean success = false;
907            try {
908              TransferFsImage.doGetUrl(url, localPaths, storage, true);
909              assert tmpFile.exists();
910              success = true;
911            } finally {
912              if (!success) {
913                if (!tmpFile.delete()) {
914                  LOG.warn("Failed to delete temporary file " + tmpFile);
915                }
916              }
917            }
918            return null;
919          }
920        });
921    return tmpFile;
922  }
923  
924
925  /**
926   * In the case the node crashes in between downloading a log segment
927   * and persisting the associated paxos recovery data, the log segment
928   * will be left in its temporary location on disk. Given the paxos data,
929   * we can check if this was indeed the case, and &quot;roll forward&quot;
930   * the atomic operation.
931   * 
932   * See the inline comments in
933   * {@link #acceptRecovery(RequestInfo, SegmentStateProto, URL)} for more
934   * details.
935   *
936   * @throws IOException if the temporary file is unable to be renamed into
937   * place
938   */
939  private void completeHalfDoneAcceptRecovery(
940      PersistedRecoveryPaxosData paxosData) throws IOException {
941    if (paxosData == null) {
942      return;
943    }
944
945    long segmentId = paxosData.getSegmentState().getStartTxId();
946    long epoch = paxosData.getAcceptedInEpoch();
947    
948    File tmp = storage.getSyncLogTemporaryFile(segmentId, epoch);
949    
950    if (tmp.exists()) {
951      File dst = storage.getInProgressEditLog(segmentId);
952      LOG.info("Rolling forward previously half-completed synchronization: " +
953          tmp + " -> " + dst);
954      FileUtil.replaceFile(tmp, dst);
955    }
956  }
957
958  /**
959   * Retrieve the persisted data for recovering the given segment from disk.
960   */
961  private PersistedRecoveryPaxosData getPersistedPaxosData(long segmentTxId)
962      throws IOException {
963    File f = storage.getPaxosFile(segmentTxId);
964    if (!f.exists()) {
965      // Default instance has no fields filled in (they're optional)
966      return null;
967    }
968    
969    InputStream in = new FileInputStream(f);
970    try {
971      PersistedRecoveryPaxosData ret = PersistedRecoveryPaxosData.parseDelimitedFrom(in);
972      Preconditions.checkState(ret != null &&
973          ret.getSegmentState().getStartTxId() == segmentTxId,
974          "Bad persisted data for segment %s: %s",
975          segmentTxId, ret);
976      return ret;
977    } finally {
978      IOUtils.closeStream(in);
979    }
980  }
981
982  /**
983   * Persist data for recovering the given segment from disk.
984   */
985  private void persistPaxosData(long segmentTxId,
986      PersistedRecoveryPaxosData newData) throws IOException {
987    File f = storage.getPaxosFile(segmentTxId);
988    boolean success = false;
989    AtomicFileOutputStream fos = new AtomicFileOutputStream(f);
990    try {
991      newData.writeDelimitedTo(fos);
992      fos.write('\n');
993      // Write human-readable data after the protobuf. This is only
994      // to assist in debugging -- it's not parsed at all.
995      OutputStreamWriter writer = new OutputStreamWriter(fos, Charsets.UTF_8);
996      
997      writer.write(String.valueOf(newData));
998      writer.write('\n');
999      writer.flush();
1000      
1001      fos.flush();
1002      success = true;
1003    } finally {
1004      if (success) {
1005        IOUtils.closeStream(fos);
1006      } else {
1007        fos.abort();
1008      }
1009    }
1010  }
1011
1012  synchronized void discardSegments(long startTxId) throws IOException {
1013    storage.getJournalManager().discardSegments(startTxId);
1014    // we delete all the segments after the startTxId. let's reset committedTxnId 
1015    committedTxnId.set(startTxId - 1);
1016  }
1017
1018  public synchronized void doPreUpgrade() throws IOException {
1019    // Do not hold file lock on committedTxnId, because the containing
1020    // directory will be renamed.  It will be reopened lazily on next access.
1021    IOUtils.cleanup(LOG, committedTxnId);
1022    storage.getJournalManager().doPreUpgrade();
1023  }
1024
1025  public synchronized void doUpgrade(StorageInfo sInfo) throws IOException {
1026    long oldCTime = storage.getCTime();
1027    storage.cTime = sInfo.cTime;
1028    int oldLV = storage.getLayoutVersion();
1029    storage.layoutVersion = sInfo.layoutVersion;
1030    LOG.info("Starting upgrade of edits directory: "
1031        + ".\n   old LV = " + oldLV
1032        + "; old CTime = " + oldCTime
1033        + ".\n   new LV = " + storage.getLayoutVersion()
1034        + "; new CTime = " + storage.getCTime());
1035    storage.getJournalManager().doUpgrade(storage);
1036    storage.createPaxosDir();
1037    
1038    // Copy over the contents of the epoch data files to the new dir.
1039    File currentDir = storage.getSingularStorageDir().getCurrentDir();
1040    File previousDir = storage.getSingularStorageDir().getPreviousDir();
1041    
1042    PersistentLongFile prevLastPromisedEpoch = new PersistentLongFile(
1043        new File(previousDir, LAST_PROMISED_FILENAME), 0);
1044    PersistentLongFile prevLastWriterEpoch = new PersistentLongFile(
1045        new File(previousDir, LAST_WRITER_EPOCH), 0);
1046    BestEffortLongFile prevCommittedTxnId = new BestEffortLongFile(
1047        new File(previousDir, COMMITTED_TXID_FILENAME),
1048        HdfsServerConstants.INVALID_TXID);
1049
1050    lastPromisedEpoch = new PersistentLongFile(
1051        new File(currentDir, LAST_PROMISED_FILENAME), 0);
1052    lastWriterEpoch = new PersistentLongFile(
1053        new File(currentDir, LAST_WRITER_EPOCH), 0);
1054    committedTxnId = new BestEffortLongFile(
1055        new File(currentDir, COMMITTED_TXID_FILENAME),
1056        HdfsServerConstants.INVALID_TXID);
1057
1058    try {
1059      lastPromisedEpoch.set(prevLastPromisedEpoch.get());
1060      lastWriterEpoch.set(prevLastWriterEpoch.get());
1061      committedTxnId.set(prevCommittedTxnId.get());
1062    } finally {
1063      IOUtils.cleanup(LOG, prevCommittedTxnId);
1064    }
1065  }
1066
1067  public synchronized void doFinalize() throws IOException {
1068    LOG.info("Finalizing upgrade for journal " 
1069          + storage.getRoot() + "."
1070          + (storage.getLayoutVersion()==0 ? "" :
1071            "\n   cur LV = " + storage.getLayoutVersion()
1072            + "; cur CTime = " + storage.getCTime()));
1073    storage.getJournalManager().doFinalize();
1074  }
1075
1076  public Boolean canRollBack(StorageInfo storage, StorageInfo prevStorage,
1077      int targetLayoutVersion) throws IOException {
1078    return this.storage.getJournalManager().canRollBack(storage, prevStorage,
1079        targetLayoutVersion);
1080  }
1081
1082  public synchronized void doRollback() throws IOException {
1083    // Do not hold file lock on committedTxnId, because the containing
1084    // directory will be renamed.  It will be reopened lazily on next access.
1085    IOUtils.cleanup(LOG, committedTxnId);
1086    storage.getJournalManager().doRollback();
1087  }
1088
1089  public Long getJournalCTime() throws IOException {
1090    return storage.getJournalManager().getJournalCTime();
1091  }
1092}