001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.v2.hs;
020
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.net.ConnectException;
024import java.util.ArrayList;
025import java.util.Collection;
026import java.util.Collections;
027import java.util.HashSet;
028import java.util.Iterator;
029import java.util.List;
030import java.util.NavigableSet;
031import java.util.Set;
032import java.util.SortedMap;
033import java.util.TreeMap;
034import java.util.concurrent.ConcurrentHashMap;
035import java.util.concurrent.ConcurrentMap;
036import java.util.concurrent.ConcurrentSkipListMap;
037import java.util.concurrent.LinkedBlockingQueue;
038import java.util.concurrent.ThreadFactory;
039import java.util.concurrent.ThreadPoolExecutor;
040import java.util.concurrent.TimeUnit;
041import java.util.concurrent.atomic.AtomicInteger;
042
043import org.apache.commons.logging.Log;
044import org.apache.commons.logging.LogFactory;
045import org.apache.hadoop.classification.InterfaceAudience;
046import org.apache.hadoop.classification.InterfaceStability;
047import org.apache.hadoop.conf.Configuration;
048import org.apache.hadoop.fs.FSDataInputStream;
049import org.apache.hadoop.fs.FileAlreadyExistsException;
050import org.apache.hadoop.fs.FileContext;
051import org.apache.hadoop.fs.FileStatus;
052import org.apache.hadoop.fs.Options;
053import org.apache.hadoop.fs.Path;
054import org.apache.hadoop.fs.PathFilter;
055import org.apache.hadoop.fs.RemoteIterator;
056import org.apache.hadoop.fs.UnsupportedFileSystemException;
057import org.apache.hadoop.fs.permission.FsPermission;
058import org.apache.hadoop.mapred.JobACLsManager;
059import org.apache.hadoop.mapreduce.jobhistory.JobSummary;
060import org.apache.hadoop.mapreduce.v2.api.records.JobId;
061import org.apache.hadoop.mapreduce.v2.app.job.Job;
062import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
063import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
064import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
065import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
066import org.apache.hadoop.security.AccessControlException;
067import org.apache.hadoop.service.AbstractService;
068import org.apache.hadoop.util.ShutdownThreadsHelper;
069import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
070
071import com.google.common.annotations.VisibleForTesting;
072import com.google.common.util.concurrent.ThreadFactoryBuilder;
073import org.apache.hadoop.yarn.util.Clock;
074import org.apache.hadoop.yarn.util.SystemClock;
075
076/**
077 * This class provides a way to interact with history files in a thread safe
078 * manor.
079 */
080@InterfaceAudience.Public
081@InterfaceStability.Unstable
082public class HistoryFileManager extends AbstractService {
083  private static final Log LOG = LogFactory.getLog(HistoryFileManager.class);
084  private static final Log SUMMARY_LOG = LogFactory.getLog(JobSummary.class);
085
086  private static enum HistoryInfoState {
087    IN_INTERMEDIATE, IN_DONE, DELETED, MOVE_FAILED
088  };
089  
090  private static String DONE_BEFORE_SERIAL_TAIL = JobHistoryUtils
091      .doneSubdirsBeforeSerialTail();
092
093  /**
094   * Maps between a serial number (generated based on jobId) and the timestamp
095   * component(s) to which it belongs. Facilitates jobId based searches. If a
096   * jobId is not found in this list - it will not be found.
097   */
098  private static class SerialNumberIndex {
099    private SortedMap<String, Set<String>> cache;
100    private int maxSize;
101
102    public SerialNumberIndex(int maxSize) {
103      this.cache = new TreeMap<String, Set<String>>();
104      this.maxSize = maxSize;
105    }
106
107    public synchronized void add(String serialPart, String timestampPart) {
108      if (!cache.containsKey(serialPart)) {
109        cache.put(serialPart, new HashSet<String>());
110        if (cache.size() > maxSize) {
111          String key = cache.firstKey();
112          LOG.error("Dropping " + key
113              + " from the SerialNumberIndex. We will no "
114              + "longer be able to see jobs that are in that serial index for "
115              + cache.get(key));
116          cache.remove(key);
117        }
118      }
119      Set<String> datePartSet = cache.get(serialPart);
120      datePartSet.add(timestampPart);
121    }
122
123    public synchronized void remove(String serialPart, String timeStampPart) {
124      if (cache.containsKey(serialPart)) {
125        Set<String> set = cache.get(serialPart);
126        set.remove(timeStampPart);
127        if (set.isEmpty()) {
128          cache.remove(serialPart);
129        }
130      }
131    }
132
133    public synchronized Set<String> get(String serialPart) {
134      Set<String> found = cache.get(serialPart);
135      if (found != null) {
136        return new HashSet<String>(found);
137      }
138      return null;
139    }
140  }
141
142  /**
143   * Wrapper around {@link ConcurrentSkipListMap} that maintains size along
144   * side for O(1) size() implementation for use in JobListCache.
145   *
146   * Note: The size is not updated atomically with changes additions/removals.
147   * This race can lead to size() returning an incorrect size at times.
148   */
149  static class JobIdHistoryFileInfoMap {
150    private ConcurrentSkipListMap<JobId, HistoryFileInfo> cache;
151    private AtomicInteger mapSize;
152
153    JobIdHistoryFileInfoMap() {
154      cache = new ConcurrentSkipListMap<JobId, HistoryFileInfo>();
155      mapSize = new AtomicInteger();
156    }
157
158    public HistoryFileInfo putIfAbsent(JobId key, HistoryFileInfo value) {
159      HistoryFileInfo ret = cache.putIfAbsent(key, value);
160      if (ret == null) {
161        mapSize.incrementAndGet();
162      }
163      return ret;
164    }
165
166    public HistoryFileInfo remove(JobId key) {
167      HistoryFileInfo ret = cache.remove(key);
168      if (ret != null) {
169        mapSize.decrementAndGet();
170      }
171      return ret;
172    }
173
174    /**
175     * Returns the recorded size of the internal map. Note that this could be out
176     * of sync with the actual size of the map
177     * @return "recorded" size
178     */
179    public int size() {
180      return mapSize.get();
181    }
182
183    public HistoryFileInfo get(JobId key) {
184      return cache.get(key);
185    }
186
187    public NavigableSet<JobId> navigableKeySet() {
188      return cache.navigableKeySet();
189    }
190
191    public Collection<HistoryFileInfo> values() {
192      return cache.values();
193    }
194  }
195
196  static class JobListCache {
197    private JobIdHistoryFileInfoMap cache;
198    private int maxSize;
199    private long maxAge;
200
201    public JobListCache(int maxSize, long maxAge) {
202      this.maxSize = maxSize;
203      this.maxAge = maxAge;
204      this.cache = new JobIdHistoryFileInfoMap();
205    }
206
207    public HistoryFileInfo addIfAbsent(HistoryFileInfo fileInfo) {
208      JobId jobId = fileInfo.getJobId();
209      if (LOG.isDebugEnabled()) {
210        LOG.debug("Adding " + jobId + " to job list cache with "
211            + fileInfo.getJobIndexInfo());
212      }
213      HistoryFileInfo old = cache.putIfAbsent(jobId, fileInfo);
214      if (cache.size() > maxSize) {
215        //There is a race here, where more then one thread could be trying to
216        // remove entries.  This could result in too many entries being removed
217        // from the cache.  This is considered OK as the size of the cache
218        // should be rather large, and we would rather have performance over
219        // keeping the cache size exactly at the maximum.
220        Iterator<JobId> keys = cache.navigableKeySet().iterator();
221        long cutoff = System.currentTimeMillis() - maxAge;
222
223        // MAPREDUCE-6436: In order to reduce the number of logs written
224        // in case of a lot of move pending histories.
225        JobId firstInIntermediateKey = null;
226        int inIntermediateCount = 0;
227        JobId firstMoveFailedKey = null;
228        int moveFailedCount = 0;
229
230        while (cache.size() > maxSize && keys.hasNext()) {
231          JobId key = keys.next();
232          HistoryFileInfo firstValue = cache.get(key);
233          if (firstValue != null) {
234            if (firstValue.isMovePending()) {
235              if (firstValue.didMoveFail() &&
236                  firstValue.jobIndexInfo.getFinishTime() <= cutoff) {
237                cache.remove(key);
238                // Now lets try to delete it
239                try {
240                  firstValue.delete();
241                } catch (IOException e) {
242                  LOG.error("Error while trying to delete history files" +
243                      " that could not be moved to done.", e);
244                }
245              } else {
246                if (firstValue.didMoveFail()) {
247                  if (moveFailedCount == 0) {
248                    firstMoveFailedKey = key;
249                  }
250                  moveFailedCount += 1;
251                } else {
252                  if (inIntermediateCount == 0) {
253                    firstInIntermediateKey = key;
254                  }
255                  inIntermediateCount += 1;
256                }
257              }
258            } else {
259              cache.remove(key);
260            }
261          }
262        }
263        // Log output only for first jobhisotry in pendings to restrict
264        // the total number of logs.
265        if (inIntermediateCount > 0) {
266          LOG.warn("Waiting to remove IN_INTERMEDIATE state histories " +
267                  "(e.g. " + firstInIntermediateKey + ") from JobListCache " +
268                  "because it is not in done yet. Total count is " +
269                  inIntermediateCount + ".");
270        }
271        if (moveFailedCount > 0) {
272          LOG.warn("Waiting to remove MOVE_FAILED state histories " +
273                  "(e.g. " + firstMoveFailedKey + ") from JobListCache " +
274                  "because it is not in done yet. Total count is " +
275                  moveFailedCount + ".");
276        }
277      }
278      return old;
279    }
280
281    public void delete(HistoryFileInfo fileInfo) {
282      if (LOG.isDebugEnabled()) {
283        LOG.debug("Removing from cache " + fileInfo);
284      }
285      cache.remove(fileInfo.getJobId());
286    }
287
288    public Collection<HistoryFileInfo> values() {
289      return new ArrayList<HistoryFileInfo>(cache.values());
290    }
291
292    public HistoryFileInfo get(JobId jobId) {
293      return cache.get(jobId);
294    }
295
296    public boolean isFull() {
297      return cache.size() >= maxSize;
298    }
299  }
300
301  /**
302   * This class represents a user dir in the intermediate done directory.  This
303   * is mostly for locking purposes. 
304   */
305  private class UserLogDir {
306    long modTime = 0;
307    private long scanTime = 0;
308
309    public synchronized void scanIfNeeded(FileStatus fs) {
310      long newModTime = fs.getModificationTime();
311      // MAPREDUCE-6680: In some Cloud FileSystem, like Azure FS or S3, file's
312      // modification time is truncated into seconds. In that case,
313      // modTime == newModTime doesn't means no file update in the directory,
314      // so we need to have additional check.
315      // Note: modTime (X second Y millisecond) could be casted to X second or
316      // X+1 second.
317      if (modTime != newModTime
318          || (scanTime/1000) == (modTime/1000)
319          || (scanTime/1000 + 1) == (modTime/1000)) {
320        // reset scanTime before scanning happens
321        scanTime = System.currentTimeMillis();
322        Path p = fs.getPath();
323        try {
324          scanIntermediateDirectory(p);
325          //If scanning fails, we will scan again.  We assume the failure is
326          // temporary.
327          modTime = newModTime;
328        } catch (IOException e) {
329          LOG.error("Error while trying to scan the directory " + p, e);
330        }
331      } else {
332        if (LOG.isDebugEnabled()) {
333          LOG.debug("Scan not needed of " + fs.getPath());
334        }
335        // reset scanTime
336        scanTime = System.currentTimeMillis();
337      }
338    }
339  }
340
341  public class HistoryFileInfo {
342    private Path historyFile;
343    private Path confFile;
344    private Path summaryFile;
345    private JobIndexInfo jobIndexInfo;
346    private volatile HistoryInfoState state;
347
348    @VisibleForTesting
349    protected HistoryFileInfo(Path historyFile, Path confFile,
350        Path summaryFile, JobIndexInfo jobIndexInfo, boolean isInDone) {
351      this.historyFile = historyFile;
352      this.confFile = confFile;
353      this.summaryFile = summaryFile;
354      this.jobIndexInfo = jobIndexInfo;
355      state = isInDone ? HistoryInfoState.IN_DONE
356          : HistoryInfoState.IN_INTERMEDIATE;
357    }
358
359    @VisibleForTesting
360    boolean isMovePending() {
361      return state == HistoryInfoState.IN_INTERMEDIATE
362          || state == HistoryInfoState.MOVE_FAILED;
363    }
364
365    @VisibleForTesting
366    boolean didMoveFail() {
367      return state == HistoryInfoState.MOVE_FAILED;
368    }
369
370    /**
371     * @return true if the files backed by this were deleted.
372     */
373    public boolean isDeleted() {
374      return state == HistoryInfoState.DELETED;
375    }
376
377    @Override
378    public String toString() {
379      return "HistoryFileInfo jobID " + getJobId()
380             + " historyFile = " + historyFile;
381    }
382
383    @VisibleForTesting
384    synchronized void moveToDone() throws IOException {
385      if (LOG.isDebugEnabled()) {
386        LOG.debug("moveToDone: " + historyFile);
387      }
388      if (!isMovePending()) {
389        // It was either deleted or is already in done. Either way do nothing
390        if (LOG.isDebugEnabled()) {
391          LOG.debug("Move no longer pending");
392        }
393        return;
394      }
395      try {
396        long completeTime = jobIndexInfo.getFinishTime();
397        if (completeTime == 0) {
398          completeTime = System.currentTimeMillis();
399        }
400        JobId jobId = jobIndexInfo.getJobId();
401
402        List<Path> paths = new ArrayList<Path>(2);
403        if (historyFile == null) {
404          LOG.info("No file for job-history with " + jobId + " found in cache!");
405        } else {
406          paths.add(historyFile);
407        }
408
409        if (confFile == null) {
410          LOG.info("No file for jobConf with " + jobId + " found in cache!");
411        } else {
412          paths.add(confFile);
413        }
414
415        if (summaryFile == null || !intermediateDoneDirFc.util().exists(
416            summaryFile)) {
417          LOG.info("No summary file for job: " + jobId);
418        } else {
419          String jobSummaryString = getJobSummary(intermediateDoneDirFc,
420              summaryFile);
421          SUMMARY_LOG.info(jobSummaryString);
422          LOG.info("Deleting JobSummary file: [" + summaryFile + "]");
423          intermediateDoneDirFc.delete(summaryFile, false);
424          summaryFile = null;
425        }
426
427        Path targetDir = canonicalHistoryLogPath(jobId, completeTime);
428        addDirectoryToSerialNumberIndex(targetDir);
429        makeDoneSubdir(targetDir);
430        if (historyFile != null) {
431          Path toPath = doneDirFc.makeQualified(new Path(targetDir, historyFile
432              .getName()));
433          if (!toPath.equals(historyFile)) {
434            moveToDoneNow(historyFile, toPath);
435            historyFile = toPath;
436          }
437        }
438        if (confFile != null) {
439          Path toPath = doneDirFc.makeQualified(new Path(targetDir, confFile
440              .getName()));
441          if (!toPath.equals(confFile)) {
442            moveToDoneNow(confFile, toPath);
443            confFile = toPath;
444          }
445        }
446        state = HistoryInfoState.IN_DONE;
447      } catch (Throwable t) {
448        LOG.error("Error while trying to move a job to done", t);
449        this.state = HistoryInfoState.MOVE_FAILED;
450      }
451    }
452
453    /**
454     * Parse a job from the JobHistoryFile, if the underlying file is not going
455     * to be deleted.
456     * 
457     * @return the Job or null if the underlying file was deleted.
458     * @throws IOException
459     *           if there is an error trying to read the file.
460     */
461    public synchronized Job loadJob() throws IOException {
462      return new CompletedJob(conf, jobIndexInfo.getJobId(), historyFile,
463          false, jobIndexInfo.getUser(), this, aclsMgr);
464    }
465
466    /**
467     * Return the history file.  This should only be used for testing.
468     * @return the history file.
469     */
470    synchronized Path getHistoryFile() {
471      return historyFile;
472    }
473    
474    protected synchronized void delete() throws IOException {
475      if (LOG.isDebugEnabled()) {
476        LOG.debug("deleting " + historyFile + " and " + confFile);
477      }
478      state = HistoryInfoState.DELETED;
479      doneDirFc.delete(doneDirFc.makeQualified(historyFile), false);
480      doneDirFc.delete(doneDirFc.makeQualified(confFile), false);
481    }
482
483    public JobIndexInfo getJobIndexInfo() {
484      return jobIndexInfo;
485    }
486
487    public JobId getJobId() {
488      return jobIndexInfo.getJobId();
489    }
490
491    public synchronized Path getConfFile() {
492      return confFile;
493    }
494    
495    public synchronized Configuration loadConfFile() throws IOException {
496      FileContext fc = FileContext.getFileContext(confFile.toUri(), conf);
497      Configuration jobConf = new Configuration(false);
498      jobConf.addResource(fc.open(confFile), confFile.toString());
499      return jobConf;
500    }
501  }
502
503  private SerialNumberIndex serialNumberIndex = null;
504  protected JobListCache jobListCache = null;
505
506  // Maintains a list of known done subdirectories.
507  private final Set<Path> existingDoneSubdirs = Collections
508      .synchronizedSet(new HashSet<Path>());
509
510  /**
511   * Maintains a mapping between intermediate user directories and the last
512   * known modification time.
513   */
514  private ConcurrentMap<String, UserLogDir> userDirModificationTimeMap = 
515    new ConcurrentHashMap<String, UserLogDir>();
516
517  private JobACLsManager aclsMgr;
518
519  @VisibleForTesting
520  Configuration conf;
521
522  private String serialNumberFormat;
523
524  private Path doneDirPrefixPath = null; // folder for completed jobs
525  private FileContext doneDirFc; // done Dir FileContext
526
527  private Path intermediateDoneDirPath = null; // Intermediate Done Dir Path
528  private FileContext intermediateDoneDirFc; // Intermediate Done Dir
529                                             // FileContext
530  @VisibleForTesting
531  protected ThreadPoolExecutor moveToDoneExecutor = null;
532  private long maxHistoryAge = 0;
533  
534  public HistoryFileManager() {
535    super(HistoryFileManager.class.getName());
536  }
537
538  @Override
539  protected void serviceInit(Configuration conf) throws Exception {
540    this.conf = conf;
541
542    int serialNumberLowDigits = 3;
543    serialNumberFormat = ("%0"
544        + (JobHistoryUtils.SERIAL_NUMBER_DIRECTORY_DIGITS + serialNumberLowDigits)
545        + "d");
546
547    long maxFSWaitTime = conf.getLong(
548        JHAdminConfig.MR_HISTORY_MAX_START_WAIT_TIME,
549        JHAdminConfig.DEFAULT_MR_HISTORY_MAX_START_WAIT_TIME);
550    createHistoryDirs(new SystemClock(), 10 * 1000, maxFSWaitTime);
551
552    this.aclsMgr = new JobACLsManager(conf);
553
554    maxHistoryAge = conf.getLong(JHAdminConfig.MR_HISTORY_MAX_AGE_MS,
555        JHAdminConfig.DEFAULT_MR_HISTORY_MAX_AGE);
556    
557    jobListCache = createJobListCache();
558
559    serialNumberIndex = new SerialNumberIndex(conf.getInt(
560        JHAdminConfig.MR_HISTORY_DATESTRING_CACHE_SIZE,
561        JHAdminConfig.DEFAULT_MR_HISTORY_DATESTRING_CACHE_SIZE));
562
563    int numMoveThreads = conf.getInt(
564        JHAdminConfig.MR_HISTORY_MOVE_THREAD_COUNT,
565        JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_THREAD_COUNT);
566    moveToDoneExecutor = createMoveToDoneThreadPool(numMoveThreads);
567    super.serviceInit(conf);
568  }
569
570  protected ThreadPoolExecutor createMoveToDoneThreadPool(int numMoveThreads) {
571    ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
572        "MoveIntermediateToDone Thread #%d").build();
573    return new ThreadPoolExecutor(numMoveThreads, numMoveThreads,
574        1, TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
575  }
576
577  @VisibleForTesting
578  void createHistoryDirs(Clock clock, long intervalCheckMillis,
579      long timeOutMillis) throws IOException {
580    long start = clock.getTime();
581    boolean done = false;
582    int counter = 0;
583    while (!done &&
584        ((timeOutMillis == -1) || (clock.getTime() - start < timeOutMillis))) {
585      done = tryCreatingHistoryDirs(counter++ % 3 == 0); // log every 3 attempts, 30sec
586      try {
587        Thread.sleep(intervalCheckMillis);
588      } catch (InterruptedException ex) {
589        throw new YarnRuntimeException(ex);
590      }
591    }
592    if (!done) {
593      throw new YarnRuntimeException("Timed out '" + timeOutMillis+
594              "ms' waiting for FileSystem to become available");
595    }
596  }
597
598  /**
599   * DistributedFileSystem returns a RemoteException with a message stating
600   * SafeModeException in it. So this is only way to check it is because of
601   * being in safe mode.
602   */
603  private boolean isBecauseSafeMode(Throwable ex) {
604    return ex.toString().contains("SafeModeException");
605  }
606
607  /**
608   * Returns TRUE if the history dirs were created, FALSE if they could not
609   * be created because the FileSystem is not reachable or in safe mode and
610   * throws and exception otherwise.
611   */
612  @VisibleForTesting
613  boolean tryCreatingHistoryDirs(boolean logWait) throws IOException {
614    boolean succeeded = true;
615    String doneDirPrefix = JobHistoryUtils.
616        getConfiguredHistoryServerDoneDirPrefix(conf);
617    try {
618      doneDirPrefixPath = FileContext.getFileContext(conf).makeQualified(
619          new Path(doneDirPrefix));
620      doneDirFc = FileContext.getFileContext(doneDirPrefixPath.toUri(), conf);
621      doneDirFc.setUMask(JobHistoryUtils.HISTORY_DONE_DIR_UMASK);
622      mkdir(doneDirFc, doneDirPrefixPath, new FsPermission(
623          JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION));
624    } catch (ConnectException ex) {
625      if (logWait) {
626        LOG.info("Waiting for FileSystem at " +
627            doneDirPrefixPath.toUri().getAuthority()  + "to be available");
628      }
629      succeeded = false;
630    } catch (IOException e) {
631      if (isBecauseSafeMode(e)) {
632        succeeded = false;
633        if (logWait) {
634          LOG.info("Waiting for FileSystem at " +
635              doneDirPrefixPath.toUri().getAuthority() +
636              "to be out of safe mode");
637        }
638      } else {
639        throw new YarnRuntimeException("Error creating done directory: ["
640            + doneDirPrefixPath + "]", e);
641      }
642    }
643    if (succeeded) {
644      String intermediateDoneDirPrefix = JobHistoryUtils.
645          getConfiguredHistoryIntermediateDoneDirPrefix(conf);
646      try {
647        intermediateDoneDirPath = FileContext.getFileContext(conf).makeQualified(
648            new Path(intermediateDoneDirPrefix));
649        intermediateDoneDirFc = FileContext.getFileContext(
650            intermediateDoneDirPath.toUri(), conf);
651        mkdir(intermediateDoneDirFc, intermediateDoneDirPath, new FsPermission(
652            JobHistoryUtils.HISTORY_INTERMEDIATE_DONE_DIR_PERMISSIONS.toShort()));
653      } catch (ConnectException ex) {
654        succeeded = false;
655        if (logWait) {
656          LOG.info("Waiting for FileSystem at " +
657              intermediateDoneDirPath.toUri().getAuthority() +
658              "to be available");
659        }
660      } catch (IOException e) {
661        if (isBecauseSafeMode(e)) {
662          succeeded = false;
663          if (logWait) {
664            LOG.info("Waiting for FileSystem at " +
665                intermediateDoneDirPath.toUri().getAuthority() +
666                "to be out of safe mode");
667          }
668        } else {
669          throw new YarnRuntimeException(
670              "Error creating intermediate done directory: ["
671              + intermediateDoneDirPath + "]", e);
672        }
673      }
674    }
675    return succeeded;
676  }
677
678  @Override
679  public void serviceStop() throws Exception {
680    ShutdownThreadsHelper.shutdownExecutorService(moveToDoneExecutor);
681    super.serviceStop();
682  }
683
684  protected JobListCache createJobListCache() {
685    return new JobListCache(conf.getInt(
686        JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE,
687        JHAdminConfig.DEFAULT_MR_HISTORY_JOBLIST_CACHE_SIZE), maxHistoryAge);
688  }
689
690  private void mkdir(FileContext fc, Path path, FsPermission fsp)
691      throws IOException {
692    if (!fc.util().exists(path)) {
693      try {
694        fc.mkdir(path, fsp, true);
695
696        FileStatus fsStatus = fc.getFileStatus(path);
697        LOG.info("Perms after creating " + fsStatus.getPermission().toShort()
698            + ", Expected: " + fsp.toShort());
699        if (fsStatus.getPermission().toShort() != fsp.toShort()) {
700          LOG.info("Explicitly setting permissions to : " + fsp.toShort()
701              + ", " + fsp);
702          fc.setPermission(path, fsp);
703        }
704      } catch (FileAlreadyExistsException e) {
705        LOG.info("Directory: [" + path + "] already exists.");
706      }
707    }
708  }
709
710  protected HistoryFileInfo createHistoryFileInfo(Path historyFile,
711      Path confFile, Path summaryFile, JobIndexInfo jobIndexInfo,
712      boolean isInDone) {
713    return new HistoryFileInfo(
714        historyFile, confFile, summaryFile, jobIndexInfo, isInDone);
715  }
716
717  /**
718   * Populates index data structures. Should only be called at initialization
719   * times.
720   */
721  @SuppressWarnings("unchecked")
722  void initExisting() throws IOException {
723    LOG.info("Initializing Existing Jobs...");
724    List<FileStatus> timestampedDirList = findTimestampedDirectories();
725    // Sort first just so insertion is in a consistent order
726    Collections.sort(timestampedDirList);
727    for (FileStatus fs : timestampedDirList) {
728      // TODO Could verify the correct format for these directories.
729      addDirectoryToSerialNumberIndex(fs.getPath());
730    }
731    for (int i= timestampedDirList.size() - 1;
732        i >= 0 && !jobListCache.isFull(); i--) {
733      FileStatus fs = timestampedDirList.get(i); 
734      addDirectoryToJobListCache(fs.getPath());
735    }
736  }
737
738  private void removeDirectoryFromSerialNumberIndex(Path serialDirPath) {
739    String serialPart = serialDirPath.getName();
740    String timeStampPart = JobHistoryUtils
741        .getTimestampPartFromPath(serialDirPath.toString());
742    if (timeStampPart == null) {
743      LOG.warn("Could not find timestamp portion from path: "
744          + serialDirPath.toString() + ". Continuing with next");
745      return;
746    }
747    if (serialPart == null) {
748      LOG.warn("Could not find serial portion from path: "
749          + serialDirPath.toString() + ". Continuing with next");
750      return;
751    }
752    serialNumberIndex.remove(serialPart, timeStampPart);
753  }
754
755  private void addDirectoryToSerialNumberIndex(Path serialDirPath) {
756    if (LOG.isDebugEnabled()) {
757      LOG.debug("Adding " + serialDirPath + " to serial index");
758    }
759    String serialPart = serialDirPath.getName();
760    String timestampPart = JobHistoryUtils
761        .getTimestampPartFromPath(serialDirPath.toString());
762    if (timestampPart == null) {
763      LOG.warn("Could not find timestamp portion from path: " + serialDirPath
764          + ". Continuing with next");
765      return;
766    }
767    if (serialPart == null) {
768      LOG.warn("Could not find serial portion from path: "
769          + serialDirPath.toString() + ". Continuing with next");
770    } else {
771      serialNumberIndex.add(serialPart, timestampPart);
772    }
773  }
774
775  private void addDirectoryToJobListCache(Path path) throws IOException {
776    if (LOG.isDebugEnabled()) {
777      LOG.debug("Adding " + path + " to job list cache.");
778    }
779    List<FileStatus> historyFileList = scanDirectoryForHistoryFiles(path,
780        doneDirFc);
781    for (FileStatus fs : historyFileList) {
782      if (LOG.isDebugEnabled()) {
783        LOG.debug("Adding in history for " + fs.getPath());
784      }
785      JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
786          .getName());
787      String confFileName = JobHistoryUtils
788          .getIntermediateConfFileName(jobIndexInfo.getJobId());
789      String summaryFileName = JobHistoryUtils
790          .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
791      HistoryFileInfo fileInfo = createHistoryFileInfo(fs.getPath(), new Path(fs
792          .getPath().getParent(), confFileName), new Path(fs.getPath()
793          .getParent(), summaryFileName), jobIndexInfo, true);
794      jobListCache.addIfAbsent(fileInfo);
795    }
796  }
797
798  @VisibleForTesting
799  protected static List<FileStatus> scanDirectory(Path path, FileContext fc,
800      PathFilter pathFilter) throws IOException {
801    path = fc.makeQualified(path);
802    List<FileStatus> jhStatusList = new ArrayList<FileStatus>();
803    try {
804      RemoteIterator<FileStatus> fileStatusIter = fc.listStatus(path);
805      while (fileStatusIter.hasNext()) {
806        FileStatus fileStatus = fileStatusIter.next();
807        Path filePath = fileStatus.getPath();
808        if (fileStatus.isFile() && pathFilter.accept(filePath)) {
809          jhStatusList.add(fileStatus);
810        }
811      }
812    } catch (FileNotFoundException fe) {
813      LOG.error("Error while scanning directory " + path, fe);
814    }
815    return jhStatusList;
816  }
817
818  protected List<FileStatus> scanDirectoryForHistoryFiles(Path path,
819      FileContext fc) throws IOException {
820    return scanDirectory(path, fc, JobHistoryUtils.getHistoryFileFilter());
821  }
822  
823  /**
824   * Finds all history directories with a timestamp component by scanning the
825   * filesystem. Used when the JobHistory server is started.
826   * 
827   * @return list of history directories
828   */
829  protected List<FileStatus> findTimestampedDirectories() throws IOException {
830    List<FileStatus> fsList = JobHistoryUtils.localGlobber(doneDirFc,
831        doneDirPrefixPath, DONE_BEFORE_SERIAL_TAIL);
832    return fsList;
833  }
834
835  /**
836   * Scans the intermediate directory to find user directories. Scans these for
837   * history files if the modification time for the directory has changed. Once
838   * it finds history files it starts the process of moving them to the done 
839   * directory.
840   * 
841   * @throws IOException
842   *           if there was a error while scanning
843   */
844  void scanIntermediateDirectory() throws IOException {
845    // TODO it would be great to limit how often this happens, except in the
846    // case where we are looking for a particular job.
847    List<FileStatus> userDirList = JobHistoryUtils.localGlobber(
848        intermediateDoneDirFc, intermediateDoneDirPath, "");
849    LOG.debug("Scanning intermediate dirs");
850    for (FileStatus userDir : userDirList) {
851      String name = userDir.getPath().getName();
852      UserLogDir dir = userDirModificationTimeMap.get(name);
853      if(dir == null) {
854        dir = new UserLogDir();
855        UserLogDir old = userDirModificationTimeMap.putIfAbsent(name, dir);
856        if(old != null) {
857          dir = old;
858        }
859      }
860      dir.scanIfNeeded(userDir);
861    }
862  }
863
864  /**
865   * Scans the specified path and populates the intermediate cache.
866   * 
867   * @param absPath
868   * @throws IOException
869   */
870  private void scanIntermediateDirectory(final Path absPath) throws IOException {
871    if (LOG.isDebugEnabled()) {
872      LOG.debug("Scanning intermediate dir " + absPath);
873    }
874    List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(absPath,
875        intermediateDoneDirFc);
876    if (LOG.isDebugEnabled()) {
877      LOG.debug("Found " + fileStatusList.size() + " files");
878    }
879    for (FileStatus fs : fileStatusList) {
880      if (LOG.isDebugEnabled()) {
881        LOG.debug("scanning file: "+ fs.getPath());
882      }
883      JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
884          .getName());
885      String confFileName = JobHistoryUtils
886          .getIntermediateConfFileName(jobIndexInfo.getJobId());
887      String summaryFileName = JobHistoryUtils
888          .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
889      HistoryFileInfo fileInfo = createHistoryFileInfo(fs.getPath(), new Path(fs
890          .getPath().getParent(), confFileName), new Path(fs.getPath()
891          .getParent(), summaryFileName), jobIndexInfo, false);
892
893      final HistoryFileInfo old = jobListCache.addIfAbsent(fileInfo);
894      if (old == null || old.didMoveFail()) {
895        final HistoryFileInfo found = (old == null) ? fileInfo : old;
896        long cutoff = System.currentTimeMillis() - maxHistoryAge;
897        if(found.getJobIndexInfo().getFinishTime() <= cutoff) {
898          try {
899            found.delete();
900          } catch (IOException e) {
901            LOG.warn("Error cleaning up a HistoryFile that is out of date.", e);
902          }
903        } else {
904          if (LOG.isDebugEnabled()) {
905            LOG.debug("Scheduling move to done of " +found);
906          }
907          moveToDoneExecutor.execute(new Runnable() {
908            @Override
909            public void run() {
910              try {
911                found.moveToDone();
912              } catch (IOException e) {
913                LOG.info("Failed to process fileInfo for job: " + 
914                    found.getJobId(), e);
915              }
916            }
917          });
918        }
919      } else if (!old.isMovePending()) {
920        //This is a duplicate so just delete it
921        if (LOG.isDebugEnabled()) {
922          LOG.debug("Duplicate: deleting");
923        }
924        fileInfo.delete();
925      }
926    }
927  }
928
929  /**
930   * Searches the job history file FileStatus list for the specified JobId.
931   * 
932   * @param fileStatusList
933   *          fileStatus list of Job History Files.
934   * @param jobId
935   *          The JobId to find.
936   * @return A FileInfo object for the jobId, null if not found.
937   * @throws IOException
938   */
939  private HistoryFileInfo getJobFileInfo(List<FileStatus> fileStatusList,
940      JobId jobId) throws IOException {
941    for (FileStatus fs : fileStatusList) {
942      JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
943          .getName());
944      if (jobIndexInfo.getJobId().equals(jobId)) {
945        String confFileName = JobHistoryUtils
946            .getIntermediateConfFileName(jobIndexInfo.getJobId());
947        String summaryFileName = JobHistoryUtils
948            .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
949        HistoryFileInfo fileInfo = createHistoryFileInfo(fs.getPath(), new Path(
950            fs.getPath().getParent(), confFileName), new Path(fs.getPath()
951            .getParent(), summaryFileName), jobIndexInfo, true);
952        return fileInfo;
953      }
954    }
955    return null;
956  }
957
958  /**
959   * Scans old directories known by the idToDateString map for the specified
960   * jobId. If the number of directories is higher than the supported size of
961   * the idToDateString cache, the jobId will not be found.
962   * 
963   * @param jobId
964   *          the jobId.
965   * @return
966   * @throws IOException
967   */
968  private HistoryFileInfo scanOldDirsForJob(JobId jobId) throws IOException {
969    String boxedSerialNumber = JobHistoryUtils.serialNumberDirectoryComponent(
970        jobId, serialNumberFormat);
971    Set<String> dateStringSet = serialNumberIndex.get(boxedSerialNumber);
972    if (dateStringSet == null) {
973      return null;
974    }
975    for (String timestampPart : dateStringSet) {
976      Path logDir = canonicalHistoryLogPath(jobId, timestampPart);
977      List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(logDir,
978          doneDirFc);
979      HistoryFileInfo fileInfo = getJobFileInfo(fileStatusList, jobId);
980      if (fileInfo != null) {
981        return fileInfo;
982      }
983    }
984    return null;
985  }
986
987  public Collection<HistoryFileInfo> getAllFileInfo() throws IOException {
988    scanIntermediateDirectory();
989    return jobListCache.values();
990  }
991
992  public HistoryFileInfo getFileInfo(JobId jobId) throws IOException {
993    // FileInfo available in cache.
994    HistoryFileInfo fileInfo = jobListCache.get(jobId);
995    if (fileInfo != null) {
996      return fileInfo;
997    }
998    // OK so scan the intermediate to be sure we did not lose it that way
999    scanIntermediateDirectory();
1000    fileInfo = jobListCache.get(jobId);
1001    if (fileInfo != null) {
1002      return fileInfo;
1003    }
1004
1005    // Intermediate directory does not contain job. Search through older ones.
1006    fileInfo = scanOldDirsForJob(jobId);
1007    if (fileInfo != null) {
1008      return fileInfo;
1009    }
1010    return null;
1011  }
1012
1013  private void moveToDoneNow(final Path src, final Path target)
1014      throws IOException {
1015    LOG.info("Moving " + src.toString() + " to " + target.toString());
1016    intermediateDoneDirFc.rename(src, target, Options.Rename.NONE);
1017  }
1018
1019  private String getJobSummary(FileContext fc, Path path) throws IOException {
1020    Path qPath = fc.makeQualified(path);
1021    FSDataInputStream in = null;
1022    String jobSummaryString = null;
1023    try {
1024      in = fc.open(qPath);
1025      jobSummaryString = in.readUTF();
1026    } finally {
1027      if (in != null) {
1028        in.close();
1029      }
1030    }
1031    return jobSummaryString;
1032  }
1033
1034  private void makeDoneSubdir(Path path) throws IOException {
1035    try {
1036      doneDirFc.getFileStatus(path);
1037      existingDoneSubdirs.add(path);
1038    } catch (FileNotFoundException fnfE) {
1039      try {
1040        FsPermission fsp = new FsPermission(
1041            JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION);
1042        doneDirFc.mkdir(path, fsp, true);
1043        FileStatus fsStatus = doneDirFc.getFileStatus(path);
1044        LOG.info("Perms after creating " + fsStatus.getPermission().toShort()
1045            + ", Expected: " + fsp.toShort());
1046        if (fsStatus.getPermission().toShort() != fsp.toShort()) {
1047          LOG.info("Explicitly setting permissions to : " + fsp.toShort()
1048              + ", " + fsp);
1049          doneDirFc.setPermission(path, fsp);
1050        }
1051        existingDoneSubdirs.add(path);
1052      } catch (FileAlreadyExistsException faeE) { // Nothing to do.
1053      }
1054    }
1055  }
1056
1057  private Path canonicalHistoryLogPath(JobId id, String timestampComponent) {
1058    return new Path(doneDirPrefixPath, JobHistoryUtils.historyLogSubdirectory(
1059        id, timestampComponent, serialNumberFormat));
1060  }
1061
1062  private Path canonicalHistoryLogPath(JobId id, long millisecondTime) {
1063    String timestampComponent = JobHistoryUtils
1064        .timestampDirectoryComponent(millisecondTime);
1065    return new Path(doneDirPrefixPath, JobHistoryUtils.historyLogSubdirectory(
1066        id, timestampComponent, serialNumberFormat));
1067  }
1068
1069  private long getEffectiveTimestamp(long finishTime, FileStatus fileStatus) {
1070    if (finishTime == 0) {
1071      return fileStatus.getModificationTime();
1072    }
1073    return finishTime;
1074  }
1075
1076  private void deleteJobFromDone(HistoryFileInfo fileInfo) throws IOException {
1077    jobListCache.delete(fileInfo);
1078    fileInfo.delete();
1079  }
1080
1081  List<FileStatus> getHistoryDirsForCleaning(long cutoff) throws IOException {
1082      return JobHistoryUtils.
1083        getHistoryDirsForCleaning(doneDirFc, doneDirPrefixPath, cutoff);
1084  }
1085
1086  /**
1087   * Clean up older history files.
1088   * 
1089   * @throws IOException
1090   *           on any error trying to remove the entries.
1091   */
1092  @SuppressWarnings("unchecked")
1093  void clean() throws IOException {
1094    long cutoff = System.currentTimeMillis() - maxHistoryAge;
1095    boolean halted = false;
1096    List<FileStatus> serialDirList = getHistoryDirsForCleaning(cutoff);
1097    // Sort in ascending order. Relies on YYYY/MM/DD/Serial
1098    Collections.sort(serialDirList);
1099    for (FileStatus serialDir : serialDirList) {
1100      List<FileStatus> historyFileList = scanDirectoryForHistoryFiles(
1101          serialDir.getPath(), doneDirFc);
1102      for (FileStatus historyFile : historyFileList) {
1103        JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(historyFile
1104            .getPath().getName());
1105        long effectiveTimestamp = getEffectiveTimestamp(
1106            jobIndexInfo.getFinishTime(), historyFile);
1107        if (effectiveTimestamp <= cutoff) {
1108          HistoryFileInfo fileInfo = this.jobListCache.get(jobIndexInfo
1109              .getJobId());
1110          if (fileInfo == null) {
1111            String confFileName = JobHistoryUtils
1112                .getIntermediateConfFileName(jobIndexInfo.getJobId());
1113
1114            fileInfo = createHistoryFileInfo(historyFile.getPath(), new Path(
1115                historyFile.getPath().getParent(), confFileName), null,
1116                jobIndexInfo, true);
1117          }
1118          deleteJobFromDone(fileInfo);
1119        } else {
1120          halted = true;
1121          break;
1122        }
1123      }
1124      if (!halted) {
1125        deleteDir(serialDir);
1126        removeDirectoryFromSerialNumberIndex(serialDir.getPath());
1127        existingDoneSubdirs.remove(serialDir.getPath());
1128      } else {
1129        break; // Don't scan any more directories.
1130      }
1131    }
1132  }
1133  
1134  protected boolean deleteDir(FileStatus serialDir)
1135      throws AccessControlException, FileNotFoundException,
1136      UnsupportedFileSystemException, IOException {
1137    return doneDirFc.delete(doneDirFc.makeQualified(serialDir.getPath()), true);
1138  }
1139
1140  @VisibleForTesting
1141  protected void setMaxHistoryAge(long newValue){
1142    maxHistoryAge=newValue;
1143  } 
1144}