001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hdfs.server.namenode;
019
020import static org.apache.hadoop.util.ExitUtil.terminate;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.Collection;
025import java.util.Collections;
026import java.util.Comparator;
027import java.util.LinkedList;
028import java.util.List;
029import java.util.PriorityQueue;
030import java.util.SortedSet;
031import java.util.concurrent.CopyOnWriteArrayList;
032
033import org.apache.commons.logging.Log;
034import org.apache.commons.logging.LogFactory;
035import org.apache.hadoop.classification.InterfaceAudience;
036import org.apache.hadoop.hdfs.server.common.Storage;
037import org.apache.hadoop.hdfs.server.common.StorageInfo;
038import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
039import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
040import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
041
042import static org.apache.hadoop.util.ExitUtil.terminate;
043
044import com.google.common.base.Preconditions;
045import com.google.common.collect.ComparisonChain;
046import com.google.common.collect.ImmutableList;
047import com.google.common.collect.ImmutableListMultimap;
048import com.google.common.collect.Lists;
049import com.google.common.collect.Multimaps;
050import com.google.common.collect.Sets;
051
052/**
053 * Manages a collection of Journals. None of the methods are synchronized, it is
054 * assumed that FSEditLog methods, that use this class, use proper
055 * synchronization.
056 */
057public class JournalSet implements JournalManager {
058
059  static final Log LOG = LogFactory.getLog(FSEditLog.class);
060
061  private static final Comparator<EditLogInputStream>
062    LOCAL_LOG_PREFERENCE_COMPARATOR = new Comparator<EditLogInputStream>() {
063    @Override
064    public int compare(EditLogInputStream elis1, EditLogInputStream elis2) {
065      // we want local logs to be ordered earlier in the collection, and true
066      // is considered larger than false, so we want to invert the booleans here
067      return ComparisonChain.start().compare(!elis1.isLocalLog(),
068          !elis2.isLocalLog()).result();
069    }
070  };
071  
072  static final public Comparator<EditLogInputStream>
073    EDIT_LOG_INPUT_STREAM_COMPARATOR = new Comparator<EditLogInputStream>() {
074      @Override
075      public int compare(EditLogInputStream a, EditLogInputStream b) {
076        return ComparisonChain.start().
077          compare(a.getFirstTxId(), b.getFirstTxId()).
078          compare(b.getLastTxId(), a.getLastTxId()).
079          result();
080      }
081    };
082  
083  /**
084   * Container for a JournalManager paired with its currently
085   * active stream.
086   * 
087   * If a Journal gets disabled due to an error writing to its
088   * stream, then the stream will be aborted and set to null.
089   */
090  static class JournalAndStream implements CheckableNameNodeResource {
091    private final JournalManager journal;
092    private boolean disabled = false;
093    private EditLogOutputStream stream;
094    private final boolean required;
095    private final boolean shared;
096    
097    public JournalAndStream(JournalManager manager, boolean required,
098        boolean shared) {
099      this.journal = manager;
100      this.required = required;
101      this.shared = shared;
102    }
103
104    public void startLogSegment(long txId, int layoutVersion) throws IOException {
105      Preconditions.checkState(stream == null);
106      disabled = false;
107      stream = journal.startLogSegment(txId, layoutVersion);
108    }
109
110    /**
111     * Closes the stream, also sets it to null.
112     */
113    public void closeStream() throws IOException {
114      if (stream == null) return;
115      stream.close();
116      stream = null;
117    }
118
119    /**
120     * Close the Journal and Stream
121     */
122    public void close() throws IOException {
123      closeStream();
124
125      journal.close();
126    }
127    
128    /**
129     * Aborts the stream, also sets it to null.
130     */
131    public void abort() {
132      if (stream == null) return;
133      try {
134        stream.abort();
135      } catch (IOException ioe) {
136        LOG.error("Unable to abort stream " + stream, ioe);
137      }
138      stream = null;
139    }
140
141    boolean isActive() {
142      return stream != null;
143    }
144    
145    /**
146     * Should be used outside JournalSet only for testing.
147     */
148    EditLogOutputStream getCurrentStream() {
149      return stream;
150    }
151    
152    @Override
153    public String toString() {
154      return "JournalAndStream(mgr=" + journal +
155        ", " + "stream=" + stream + ")";
156    }
157
158    void setCurrentStreamForTests(EditLogOutputStream stream) {
159      this.stream = stream;
160    }
161    
162    JournalManager getManager() {
163      return journal;
164    }
165
166    boolean isDisabled() {
167      return disabled;
168    }
169
170    private void setDisabled(boolean disabled) {
171      this.disabled = disabled;
172    }
173    
174    @Override
175    public boolean isResourceAvailable() {
176      return !isDisabled();
177    }
178    
179    @Override
180    public boolean isRequired() {
181      return required;
182    }
183    
184    public boolean isShared() {
185      return shared;
186    }
187  }
188 
189  // COW implementation is necessary since some users (eg the web ui) call
190  // getAllJournalStreams() and then iterate. Since this is rarely
191  // mutated, there is no performance concern.
192  private final List<JournalAndStream> journals =
193      new CopyOnWriteArrayList<JournalSet.JournalAndStream>();
194  final int minimumRedundantJournals;
195
196  private boolean closed;
197  
198  JournalSet(int minimumRedundantResources) {
199    this.minimumRedundantJournals = minimumRedundantResources;
200  }
201  
202  @Override
203  public void format(NamespaceInfo nsInfo) throws IOException {
204    // The operation is done by FSEditLog itself
205    throw new UnsupportedOperationException();
206  }
207
208  @Override
209  public boolean hasSomeData() throws IOException {
210    // This is called individually on the underlying journals,
211    // not on the JournalSet.
212    throw new UnsupportedOperationException();
213  }
214
215  
216  @Override
217  public EditLogOutputStream startLogSegment(final long txId,
218      final int layoutVersion) throws IOException {
219    mapJournalsAndReportErrors(new JournalClosure() {
220      @Override
221      public void apply(JournalAndStream jas) throws IOException {
222        jas.startLogSegment(txId, layoutVersion);
223      }
224    }, "starting log segment " + txId);
225    return new JournalSetOutputStream();
226  }
227  
228  @Override
229  public void finalizeLogSegment(final long firstTxId, final long lastTxId)
230      throws IOException {
231    mapJournalsAndReportErrors(new JournalClosure() {
232      @Override
233      public void apply(JournalAndStream jas) throws IOException {
234        if (jas.isActive()) {
235          jas.closeStream();
236          jas.getManager().finalizeLogSegment(firstTxId, lastTxId);
237        }
238      }
239    }, "finalize log segment " + firstTxId + ", " + lastTxId);
240  }
241   
242  @Override
243  public void close() throws IOException {
244    mapJournalsAndReportErrors(new JournalClosure() {
245      @Override
246      public void apply(JournalAndStream jas) throws IOException {
247        jas.close();
248      }
249    }, "close journal");
250    closed = true;
251  }
252
253  public boolean isOpen() {
254    return !closed;
255  }
256
257  /**
258   * In this function, we get a bunch of streams from all of our JournalManager
259   * objects.  Then we add these to the collection one by one.
260   * 
261   * @param streams          The collection to add the streams to.  It may or 
262   *                         may not be sorted-- this is up to the caller.
263   * @param fromTxId         The transaction ID to start looking for streams at
264   * @param inProgressOk     Should we consider unfinalized streams?
265   */
266  @Override
267  public void selectInputStreams(Collection<EditLogInputStream> streams,
268      long fromTxId, boolean inProgressOk) throws IOException {
269    final PriorityQueue<EditLogInputStream> allStreams = 
270        new PriorityQueue<EditLogInputStream>(64,
271            EDIT_LOG_INPUT_STREAM_COMPARATOR);
272    for (JournalAndStream jas : journals) {
273      if (jas.isDisabled()) {
274        LOG.info("Skipping jas " + jas + " since it's disabled");
275        continue;
276      }
277      try {
278        jas.getManager().selectInputStreams(allStreams, fromTxId, inProgressOk);
279      } catch (IOException ioe) {
280        LOG.warn("Unable to determine input streams from " + jas.getManager() +
281            ". Skipping.", ioe);
282      }
283    }
284    chainAndMakeRedundantStreams(streams, allStreams, fromTxId);
285  }
286  
287  public static void chainAndMakeRedundantStreams(
288      Collection<EditLogInputStream> outStreams,
289      PriorityQueue<EditLogInputStream> allStreams, long fromTxId) {
290    // We want to group together all the streams that start on the same start
291    // transaction ID.  To do this, we maintain an accumulator (acc) of all
292    // the streams we've seen at a given start transaction ID.  When we see a
293    // higher start transaction ID, we select a stream from the accumulator and
294    // clear it.  Then we begin accumulating streams with the new, higher start
295    // transaction ID.
296    LinkedList<EditLogInputStream> acc =
297        new LinkedList<EditLogInputStream>();
298    EditLogInputStream elis;
299    while ((elis = allStreams.poll()) != null) {
300      if (acc.isEmpty()) {
301        acc.add(elis);
302      } else {
303        EditLogInputStream accFirst = acc.get(0);
304        long accFirstTxId = accFirst.getFirstTxId();
305        if (accFirstTxId == elis.getFirstTxId()) {
306          // if we have a finalized log segment available at this txid,
307          // we should throw out all in-progress segments at this txid
308          if (elis.isInProgress()) {
309            if (accFirst.isInProgress()) {
310              acc.add(elis);
311            }
312          } else {
313            if (accFirst.isInProgress()) {
314              acc.clear();
315            }
316            acc.add(elis);
317          }
318        } else if (accFirstTxId < elis.getFirstTxId()) {
319          // try to read from the local logs first since the throughput should
320          // be higher
321          Collections.sort(acc, LOCAL_LOG_PREFERENCE_COMPARATOR);
322          outStreams.add(new RedundantEditLogInputStream(acc, fromTxId));
323          acc.clear();
324          acc.add(elis);
325        } else if (accFirstTxId > elis.getFirstTxId()) {
326          throw new RuntimeException("sorted set invariants violated!  " +
327              "Got stream with first txid " + elis.getFirstTxId() +
328              ", but the last firstTxId was " + accFirstTxId);
329        }
330      }
331    }
332    if (!acc.isEmpty()) {
333      Collections.sort(acc, LOCAL_LOG_PREFERENCE_COMPARATOR);
334      outStreams.add(new RedundantEditLogInputStream(acc, fromTxId));
335      acc.clear();
336    }
337  }
338
339  /**
340   * Returns true if there are no journals, all redundant journals are disabled,
341   * or any required journals are disabled.
342   * 
343   * @return True if there no journals, all redundant journals are disabled,
344   * or any required journals are disabled.
345   */
346  public boolean isEmpty() {
347    return !NameNodeResourcePolicy.areResourcesAvailable(journals,
348        minimumRedundantJournals);
349  }
350  
351  /**
352   * Called when some journals experience an error in some operation.
353   */
354  private void disableAndReportErrorOnJournals(List<JournalAndStream> badJournals) {
355    if (badJournals == null || badJournals.isEmpty()) {
356      return; // nothing to do
357    }
358 
359    for (JournalAndStream j : badJournals) {
360      LOG.error("Disabling journal " + j);
361      j.abort();
362      j.setDisabled(true);
363    }
364  }
365
366  /**
367   * Implementations of this interface encapsulate operations that can be
368   * iteratively applied on all the journals. For example see
369   * {@link JournalSet#mapJournalsAndReportErrors}.
370   */
371  private interface JournalClosure {
372    /**
373     * The operation on JournalAndStream.
374     * @param jas Object on which operations are performed.
375     * @throws IOException
376     */
377    public void apply(JournalAndStream jas) throws IOException;
378  }
379  
380  /**
381   * Apply the given operation across all of the journal managers, disabling
382   * any for which the closure throws an IOException.
383   * @param closure {@link JournalClosure} object encapsulating the operation.
384   * @param status message used for logging errors (e.g. "opening journal")
385   * @throws IOException If the operation fails on all the journals.
386   */
387  private void mapJournalsAndReportErrors(
388      JournalClosure closure, String status) throws IOException{
389
390    List<JournalAndStream> badJAS = Lists.newLinkedList();
391    for (JournalAndStream jas : journals) {
392      try {
393        closure.apply(jas);
394      } catch (Throwable t) {
395        if (jas.isRequired()) {
396          final String msg = "Error: " + status + " failed for required journal ("
397            + jas + ")";
398          LOG.fatal(msg, t);
399          // If we fail on *any* of the required journals, then we must not
400          // continue on any of the other journals. Abort them to ensure that
401          // retry behavior doesn't allow them to keep going in any way.
402          abortAllJournals();
403          // the current policy is to shutdown the NN on errors to shared edits
404          // dir. There are many code paths to shared edits failures - syncs,
405          // roll of edits etc. All of them go through this common function 
406          // where the isRequired() check is made. Applying exit policy here 
407          // to catch all code paths.
408          terminate(1, msg);
409        } else {
410          LOG.error("Error: " + status + " failed for (journal " + jas + ")", t);
411          badJAS.add(jas);          
412        }
413      }
414    }
415    disableAndReportErrorOnJournals(badJAS);
416    if (!NameNodeResourcePolicy.areResourcesAvailable(journals,
417        minimumRedundantJournals)) {
418      String message = status + " failed for too many journals";
419      LOG.error("Error: " + message);
420      throw new IOException(message);
421    }
422  }
423  
424  /**
425   * Abort all of the underlying streams.
426   */
427  private void abortAllJournals() {
428    for (JournalAndStream jas : journals) {
429      if (jas.isActive()) {
430        jas.abort();
431      }
432    }
433  }
434
435  /**
436   * An implementation of EditLogOutputStream that applies a requested method on
437   * all the journals that are currently active.
438   */
439  private class JournalSetOutputStream extends EditLogOutputStream {
440
441    JournalSetOutputStream() throws IOException {
442      super();
443    }
444
445    @Override
446    public void write(final FSEditLogOp op)
447        throws IOException {
448      mapJournalsAndReportErrors(new JournalClosure() {
449        @Override
450        public void apply(JournalAndStream jas) throws IOException {
451          if (jas.isActive()) {
452            jas.getCurrentStream().write(op);
453          }
454        }
455      }, "write op");
456    }
457
458    @Override
459    public void writeRaw(final byte[] data, final int offset, final int length)
460        throws IOException {
461      mapJournalsAndReportErrors(new JournalClosure() {
462        @Override
463        public void apply(JournalAndStream jas) throws IOException {
464          if (jas.isActive()) {
465            jas.getCurrentStream().writeRaw(data, offset, length);
466          }
467        }
468      }, "write bytes");
469    }
470
471    @Override
472    public void create(final int layoutVersion) throws IOException {
473      mapJournalsAndReportErrors(new JournalClosure() {
474        @Override
475        public void apply(JournalAndStream jas) throws IOException {
476          if (jas.isActive()) {
477            jas.getCurrentStream().create(layoutVersion);
478          }
479        }
480      }, "create");
481    }
482
483    @Override
484    public void close() throws IOException {
485      mapJournalsAndReportErrors(new JournalClosure() {
486        @Override
487        public void apply(JournalAndStream jas) throws IOException {
488          jas.closeStream();
489        }
490      }, "close");
491    }
492
493    @Override
494    public void abort() throws IOException {
495      mapJournalsAndReportErrors(new JournalClosure() {
496        @Override
497        public void apply(JournalAndStream jas) throws IOException {
498          jas.abort();
499        }
500      }, "abort");
501    }
502
503    @Override
504    public void setReadyToFlush() throws IOException {
505      mapJournalsAndReportErrors(new JournalClosure() {
506        @Override
507        public void apply(JournalAndStream jas) throws IOException {
508          if (jas.isActive()) {
509            jas.getCurrentStream().setReadyToFlush();
510          }
511        }
512      }, "setReadyToFlush");
513    }
514
515    @Override
516    protected void flushAndSync(final boolean durable) throws IOException {
517      mapJournalsAndReportErrors(new JournalClosure() {
518        @Override
519        public void apply(JournalAndStream jas) throws IOException {
520          if (jas.isActive()) {
521            jas.getCurrentStream().flushAndSync(durable);
522          }
523        }
524      }, "flushAndSync");
525    }
526    
527    @Override
528    public void flush() throws IOException {
529      mapJournalsAndReportErrors(new JournalClosure() {
530        @Override
531        public void apply(JournalAndStream jas) throws IOException {
532          if (jas.isActive()) {
533            jas.getCurrentStream().flush();
534          }
535        }
536      }, "flush");
537    }
538    
539    @Override
540    public boolean shouldForceSync() {
541      for (JournalAndStream js : journals) {
542        if (js.isActive() && js.getCurrentStream().shouldForceSync()) {
543          return true;
544        }
545      }
546      return false;
547    }
548    
549    @Override
550    protected long getNumSync() {
551      for (JournalAndStream jas : journals) {
552        if (jas.isActive()) {
553          return jas.getCurrentStream().getNumSync();
554        }
555      }
556      return 0;
557    }
558  }
559
560  @Override
561  public void setOutputBufferCapacity(final int size) {
562    try {
563      mapJournalsAndReportErrors(new JournalClosure() {
564        @Override
565        public void apply(JournalAndStream jas) throws IOException {
566            jas.getManager().setOutputBufferCapacity(size);
567        }
568      }, "setOutputBufferCapacity");
569    } catch (IOException e) {
570      LOG.error("Error in setting outputbuffer capacity");
571    }
572  }
573  
574  List<JournalAndStream> getAllJournalStreams() {
575    return journals;
576  }
577
578  List<JournalManager> getJournalManagers() {
579    List<JournalManager> jList = new ArrayList<JournalManager>();
580    for (JournalAndStream j : journals) {
581      jList.add(j.getManager());
582    }
583    return jList;
584  }
585  
586  void add(JournalManager j, boolean required) {
587    add(j, required, false);
588  }
589  
590  void add(JournalManager j, boolean required, boolean shared) {
591    JournalAndStream jas = new JournalAndStream(j, required, shared);
592    journals.add(jas);
593  }
594  
595  void remove(JournalManager j) {
596    JournalAndStream jasToRemove = null;
597    for (JournalAndStream jas: journals) {
598      if (jas.getManager().equals(j)) {
599        jasToRemove = jas;
600        break;
601      }
602    }
603    if (jasToRemove != null) {
604      jasToRemove.abort();
605      journals.remove(jasToRemove);
606    }
607  }
608
609  @Override
610  public void purgeLogsOlderThan(final long minTxIdToKeep) throws IOException {
611    mapJournalsAndReportErrors(new JournalClosure() {
612      @Override
613      public void apply(JournalAndStream jas) throws IOException {
614        jas.getManager().purgeLogsOlderThan(minTxIdToKeep);
615      }
616    }, "purgeLogsOlderThan " + minTxIdToKeep);
617  }
618
619  @Override
620  public void recoverUnfinalizedSegments() throws IOException {
621    mapJournalsAndReportErrors(new JournalClosure() {
622      @Override
623      public void apply(JournalAndStream jas) throws IOException {
624        jas.getManager().recoverUnfinalizedSegments();
625      }
626    }, "recoverUnfinalizedSegments");
627  }
628  
629  /**
630   * Return a manifest of what finalized edit logs are available. All available
631   * edit logs are returned starting from the transaction id passed. If
632   * 'fromTxId' falls in the middle of a log, that log is returned as well.
633   * 
634   * @param fromTxId Starting transaction id to read the logs.
635   * @return RemoteEditLogManifest object.
636   */
637  public synchronized RemoteEditLogManifest getEditLogManifest(long fromTxId) {
638    // Collect RemoteEditLogs available from each FileJournalManager
639    List<RemoteEditLog> allLogs = Lists.newArrayList();
640    for (JournalAndStream j : journals) {
641      if (j.getManager() instanceof FileJournalManager) {
642        FileJournalManager fjm = (FileJournalManager)j.getManager();
643        try {
644          allLogs.addAll(fjm.getRemoteEditLogs(fromTxId, false));
645        } catch (Throwable t) {
646          LOG.warn("Cannot list edit logs in " + fjm, t);
647        }
648      }
649    }
650    
651    // Group logs by their starting txid
652    ImmutableListMultimap<Long, RemoteEditLog> logsByStartTxId =
653      Multimaps.index(allLogs, RemoteEditLog.GET_START_TXID);
654    long curStartTxId = fromTxId;
655
656    List<RemoteEditLog> logs = Lists.newArrayList();
657    while (true) {
658      ImmutableList<RemoteEditLog> logGroup = logsByStartTxId.get(curStartTxId);
659      if (logGroup.isEmpty()) {
660        // we have a gap in logs - for example because we recovered some old
661        // storage directory with ancient logs. Clear out any logs we've
662        // accumulated so far, and then skip to the next segment of logs
663        // after the gap.
664        SortedSet<Long> startTxIds = Sets.newTreeSet(logsByStartTxId.keySet());
665        startTxIds = startTxIds.tailSet(curStartTxId);
666        if (startTxIds.isEmpty()) {
667          break;
668        } else {
669          if (LOG.isDebugEnabled()) {
670            LOG.debug("Found gap in logs at " + curStartTxId + ": " +
671                "not returning previous logs in manifest.");
672          }
673          logs.clear();
674          curStartTxId = startTxIds.first();
675          continue;
676        }
677      }
678
679      // Find the one that extends the farthest forward
680      RemoteEditLog bestLog = Collections.max(logGroup);
681      logs.add(bestLog);
682      // And then start looking from after that point
683      curStartTxId = bestLog.getEndTxId() + 1;
684    }
685    RemoteEditLogManifest ret = new RemoteEditLogManifest(logs);
686    
687    if (LOG.isDebugEnabled()) {
688      LOG.debug("Generated manifest for logs since " + fromTxId + ":"
689          + ret);      
690    }
691    return ret;
692  }
693
694  /**
695   * Add sync times to the buffer.
696   */
697  String getSyncTimes() {
698    StringBuilder buf = new StringBuilder();
699    for (JournalAndStream jas : journals) {
700      if (jas.isActive()) {
701        buf.append(jas.getCurrentStream().getTotalSyncTime());
702        buf.append(" ");
703      }
704    }
705    return buf.toString();
706  }
707
708  @Override
709  public void discardSegments(long startTxId) throws IOException {
710    // This operation is handled by FSEditLog directly.
711    throw new UnsupportedOperationException();
712  }
713
714  @Override
715  public void doPreUpgrade() throws IOException {
716    // This operation is handled by FSEditLog directly.
717    throw new UnsupportedOperationException();
718  }
719
720  @Override
721  public void doUpgrade(Storage storage) throws IOException {
722    // This operation is handled by FSEditLog directly.
723    throw new UnsupportedOperationException();
724  }
725  
726  @Override
727  public void doFinalize() throws IOException {
728    // This operation is handled by FSEditLog directly.
729    throw new UnsupportedOperationException();
730  }
731
732  @Override
733  public boolean canRollBack(StorageInfo storage, StorageInfo prevStorage, int targetLayoutVersion) throws IOException {
734    // This operation is handled by FSEditLog directly.
735    throw new UnsupportedOperationException();
736  }
737
738  @Override
739  public void doRollback() throws IOException {
740    // This operation is handled by FSEditLog directly.
741    throw new UnsupportedOperationException();
742  }
743
744  @Override
745  public long getJournalCTime() throws IOException {
746    // This operation is handled by FSEditLog directly.
747    throw new UnsupportedOperationException();
748  }
749}