001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.v2.hs;
020
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.net.ConnectException;
024import java.util.ArrayList;
025import java.util.Collection;
026import java.util.Collections;
027import java.util.HashSet;
028import java.util.Iterator;
029import java.util.List;
030import java.util.NavigableSet;
031import java.util.Set;
032import java.util.SortedMap;
033import java.util.TreeMap;
034import java.util.concurrent.ConcurrentHashMap;
035import java.util.concurrent.ConcurrentMap;
036import java.util.concurrent.ConcurrentSkipListMap;
037import java.util.concurrent.LinkedBlockingQueue;
038import java.util.concurrent.ThreadFactory;
039import java.util.concurrent.ThreadPoolExecutor;
040import java.util.concurrent.TimeUnit;
041import java.util.concurrent.atomic.AtomicInteger;
042
043import org.apache.commons.logging.Log;
044import org.apache.commons.logging.LogFactory;
045import org.apache.hadoop.classification.InterfaceAudience;
046import org.apache.hadoop.classification.InterfaceStability;
047import org.apache.hadoop.conf.Configuration;
048import org.apache.hadoop.fs.FSDataInputStream;
049import org.apache.hadoop.fs.FileAlreadyExistsException;
050import org.apache.hadoop.fs.FileContext;
051import org.apache.hadoop.fs.FileStatus;
052import org.apache.hadoop.fs.Options;
053import org.apache.hadoop.fs.Path;
054import org.apache.hadoop.fs.PathFilter;
055import org.apache.hadoop.fs.RemoteIterator;
056import org.apache.hadoop.fs.UnsupportedFileSystemException;
057import org.apache.hadoop.fs.permission.FsPermission;
058import org.apache.hadoop.mapred.JobACLsManager;
059import org.apache.hadoop.mapreduce.jobhistory.JobSummary;
060import org.apache.hadoop.mapreduce.v2.api.records.JobId;
061import org.apache.hadoop.mapreduce.v2.app.job.Job;
062import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
063import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
064import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
065import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
066import org.apache.hadoop.security.AccessControlException;
067import org.apache.hadoop.service.AbstractService;
068import org.apache.hadoop.util.ShutdownThreadsHelper;
069import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
070
071import com.google.common.annotations.VisibleForTesting;
072import com.google.common.util.concurrent.ThreadFactoryBuilder;
073import org.apache.hadoop.yarn.util.Clock;
074import org.apache.hadoop.yarn.util.SystemClock;
075
076/**
077 * This class provides a way to interact with history files in a thread safe
078 * manor.
079 */
080@InterfaceAudience.Public
081@InterfaceStability.Unstable
082public class HistoryFileManager extends AbstractService {
083  private static final Log LOG = LogFactory.getLog(HistoryFileManager.class);
084  private static final Log SUMMARY_LOG = LogFactory.getLog(JobSummary.class);
085
086  private static enum HistoryInfoState {
087    IN_INTERMEDIATE, IN_DONE, DELETED, MOVE_FAILED
088  };
089  
090  private static String DONE_BEFORE_SERIAL_TAIL = JobHistoryUtils
091      .doneSubdirsBeforeSerialTail();
092
093  /**
094   * Maps between a serial number (generated based on jobId) and the timestamp
095   * component(s) to which it belongs. Facilitates jobId based searches. If a
096   * jobId is not found in this list - it will not be found.
097   */
098  private static class SerialNumberIndex {
099    private SortedMap<String, Set<String>> cache;
100    private int maxSize;
101
102    public SerialNumberIndex(int maxSize) {
103      this.cache = new TreeMap<String, Set<String>>();
104      this.maxSize = maxSize;
105    }
106
107    public synchronized void add(String serialPart, String timestampPart) {
108      if (!cache.containsKey(serialPart)) {
109        cache.put(serialPart, new HashSet<String>());
110        if (cache.size() > maxSize) {
111          String key = cache.firstKey();
112          LOG.error("Dropping " + key
113              + " from the SerialNumberIndex. We will no "
114              + "longer be able to see jobs that are in that serial index for "
115              + cache.get(key));
116          cache.remove(key);
117        }
118      }
119      Set<String> datePartSet = cache.get(serialPart);
120      datePartSet.add(timestampPart);
121    }
122
123    public synchronized void remove(String serialPart, String timeStampPart) {
124      if (cache.containsKey(serialPart)) {
125        Set<String> set = cache.get(serialPart);
126        set.remove(timeStampPart);
127        if (set.isEmpty()) {
128          cache.remove(serialPart);
129        }
130      }
131    }
132
133    public synchronized Set<String> get(String serialPart) {
134      Set<String> found = cache.get(serialPart);
135      if (found != null) {
136        return new HashSet<String>(found);
137      }
138      return null;
139    }
140  }
141
142  /**
143   * Wrapper around {@link ConcurrentSkipListMap} that maintains size along
144   * side for O(1) size() implementation for use in JobListCache.
145   *
146   * Note: The size is not updated atomically with changes additions/removals.
147   * This race can lead to size() returning an incorrect size at times.
148   */
149  static class JobIdHistoryFileInfoMap {
150    private ConcurrentSkipListMap<JobId, HistoryFileInfo> cache;
151    private AtomicInteger mapSize;
152
153    JobIdHistoryFileInfoMap() {
154      cache = new ConcurrentSkipListMap<JobId, HistoryFileInfo>();
155      mapSize = new AtomicInteger();
156    }
157
158    public HistoryFileInfo putIfAbsent(JobId key, HistoryFileInfo value) {
159      HistoryFileInfo ret = cache.putIfAbsent(key, value);
160      if (ret == null) {
161        mapSize.incrementAndGet();
162      }
163      return ret;
164    }
165
166    public HistoryFileInfo remove(JobId key) {
167      HistoryFileInfo ret = cache.remove(key);
168      if (ret != null) {
169        mapSize.decrementAndGet();
170      }
171      return ret;
172    }
173
174    /**
175     * Returns the recorded size of the internal map. Note that this could be out
176     * of sync with the actual size of the map
177     * @return "recorded" size
178     */
179    public int size() {
180      return mapSize.get();
181    }
182
183    public HistoryFileInfo get(JobId key) {
184      return cache.get(key);
185    }
186
187    public NavigableSet<JobId> navigableKeySet() {
188      return cache.navigableKeySet();
189    }
190
191    public Collection<HistoryFileInfo> values() {
192      return cache.values();
193    }
194  }
195
196  static class JobListCache {
197    private JobIdHistoryFileInfoMap cache;
198    private int maxSize;
199    private long maxAge;
200
201    public JobListCache(int maxSize, long maxAge) {
202      this.maxSize = maxSize;
203      this.maxAge = maxAge;
204      this.cache = new JobIdHistoryFileInfoMap();
205    }
206
207    public HistoryFileInfo addIfAbsent(HistoryFileInfo fileInfo) {
208      JobId jobId = fileInfo.getJobId();
209      if (LOG.isDebugEnabled()) {
210        LOG.debug("Adding " + jobId + " to job list cache with "
211            + fileInfo.getJobIndexInfo());
212      }
213      HistoryFileInfo old = cache.putIfAbsent(jobId, fileInfo);
214      if (cache.size() > maxSize) {
215        //There is a race here, where more then one thread could be trying to
216        // remove entries.  This could result in too many entries being removed
217        // from the cache.  This is considered OK as the size of the cache
218        // should be rather large, and we would rather have performance over
219        // keeping the cache size exactly at the maximum.
220        Iterator<JobId> keys = cache.navigableKeySet().iterator();
221        long cutoff = System.currentTimeMillis() - maxAge;
222
223        // MAPREDUCE-6436: In order to reduce the number of logs written
224        // in case of a lot of move pending histories.
225        JobId firstInIntermediateKey = null;
226        int inIntermediateCount = 0;
227        JobId firstMoveFailedKey = null;
228        int moveFailedCount = 0;
229
230        while(cache.size() > maxSize && keys.hasNext()) {
231          JobId key = keys.next();
232          HistoryFileInfo firstValue = cache.get(key);
233          if(firstValue != null) {
234            synchronized(firstValue) {
235              if (firstValue.isMovePending()) {
236                if(firstValue.didMoveFail() &&
237                    firstValue.jobIndexInfo.getFinishTime() <= cutoff) {
238                  cache.remove(key);
239                  //Now lets try to delete it
240                  try {
241                    firstValue.delete();
242                  } catch (IOException e) {
243                    LOG.error("Error while trying to delete history files" +
244                                " that could not be moved to done.", e);
245                  }
246                } else {
247                  if (firstValue.didMoveFail()) {
248                    if (moveFailedCount == 0) {
249                      firstMoveFailedKey = key;
250                    }
251                    moveFailedCount += 1;
252                  } else {
253                    if (inIntermediateCount == 0) {
254                      firstInIntermediateKey = key;
255                    }
256                    inIntermediateCount += 1;
257                  }
258                }
259              } else {
260                cache.remove(key);
261              }
262            }
263          }
264        }
265        // Log output only for first jobhisotry in pendings to restrict
266        // the total number of logs.
267        if (inIntermediateCount > 0) {
268          LOG.warn("Waiting to remove IN_INTERMEDIATE state histories " +
269                  "(e.g. " + firstInIntermediateKey + ") from JobListCache " +
270                  "because it is not in done yet. Total count is " +
271                  inIntermediateCount + ".");
272        }
273        if (moveFailedCount > 0) {
274          LOG.warn("Waiting to remove MOVE_FAILED state histories " +
275                  "(e.g. " + firstMoveFailedKey + ") from JobListCache " +
276                  "because it is not in done yet. Total count is " +
277                  moveFailedCount + ".");
278        }
279      }
280      return old;
281    }
282
283    public void delete(HistoryFileInfo fileInfo) {
284      if (LOG.isDebugEnabled()) {
285        LOG.debug("Removing from cache " + fileInfo);
286      }
287      cache.remove(fileInfo.getJobId());
288    }
289
290    public Collection<HistoryFileInfo> values() {
291      return new ArrayList<HistoryFileInfo>(cache.values());
292    }
293
294    public HistoryFileInfo get(JobId jobId) {
295      return cache.get(jobId);
296    }
297
298    public boolean isFull() {
299      return cache.size() >= maxSize;
300    }
301  }
302
303  /**
304   * This class represents a user dir in the intermediate done directory.  This
305   * is mostly for locking purposes. 
306   */
307  private class UserLogDir {
308    long modTime = 0;
309    private long scanTime = 0;
310
311    public synchronized void scanIfNeeded(FileStatus fs) {
312      long newModTime = fs.getModificationTime();
313      // MAPREDUCE-6680: In some Cloud FileSystem, like Azure FS or S3, file's
314      // modification time is truncated into seconds. In that case,
315      // modTime == newModTime doesn't means no file update in the directory,
316      // so we need to have additional check.
317      // Note: modTime (X second Y millisecond) could be casted to X second or
318      // X+1 second.
319      if (modTime != newModTime
320          || (scanTime/1000) == (modTime/1000)
321          || (scanTime/1000 + 1) == (modTime/1000)) {
322        // reset scanTime before scanning happens
323        scanTime = System.currentTimeMillis();
324        Path p = fs.getPath();
325        try {
326          scanIntermediateDirectory(p);
327          //If scanning fails, we will scan again.  We assume the failure is
328          // temporary.
329          modTime = newModTime;
330        } catch (IOException e) {
331          LOG.error("Error while trying to scan the directory " + p, e);
332        }
333      } else {
334        if (LOG.isDebugEnabled()) {
335          LOG.debug("Scan not needed of " + fs.getPath());
336        }
337        // reset scanTime
338        scanTime = System.currentTimeMillis();
339      }
340    }
341  }
342
343  public class HistoryFileInfo {
344    private Path historyFile;
345    private Path confFile;
346    private Path summaryFile;
347    private JobIndexInfo jobIndexInfo;
348    private HistoryInfoState state;
349
350    @VisibleForTesting
351    protected HistoryFileInfo(Path historyFile, Path confFile,
352        Path summaryFile, JobIndexInfo jobIndexInfo, boolean isInDone) {
353      this.historyFile = historyFile;
354      this.confFile = confFile;
355      this.summaryFile = summaryFile;
356      this.jobIndexInfo = jobIndexInfo;
357      state = isInDone ? HistoryInfoState.IN_DONE
358          : HistoryInfoState.IN_INTERMEDIATE;
359    }
360
361    @VisibleForTesting
362    synchronized boolean isMovePending() {
363      return state == HistoryInfoState.IN_INTERMEDIATE
364          || state == HistoryInfoState.MOVE_FAILED;
365    }
366
367    @VisibleForTesting
368    synchronized boolean didMoveFail() {
369      return state == HistoryInfoState.MOVE_FAILED;
370    }
371    
372    /**
373     * @return true if the files backed by this were deleted.
374     */
375    public synchronized boolean isDeleted() {
376      return state == HistoryInfoState.DELETED;
377    }
378
379    @Override
380    public String toString() {
381      return "HistoryFileInfo jobID " + getJobId()
382             + " historyFile = " + historyFile;
383    }
384
385    @VisibleForTesting
386    synchronized void moveToDone() throws IOException {
387      if (LOG.isDebugEnabled()) {
388        LOG.debug("moveToDone: " + historyFile);
389      }
390      if (!isMovePending()) {
391        // It was either deleted or is already in done. Either way do nothing
392        if (LOG.isDebugEnabled()) {
393          LOG.debug("Move no longer pending");
394        }
395        return;
396      }
397      try {
398        long completeTime = jobIndexInfo.getFinishTime();
399        if (completeTime == 0) {
400          completeTime = System.currentTimeMillis();
401        }
402        JobId jobId = jobIndexInfo.getJobId();
403
404        List<Path> paths = new ArrayList<Path>(2);
405        if (historyFile == null) {
406          LOG.info("No file for job-history with " + jobId + " found in cache!");
407        } else {
408          paths.add(historyFile);
409        }
410
411        if (confFile == null) {
412          LOG.info("No file for jobConf with " + jobId + " found in cache!");
413        } else {
414          paths.add(confFile);
415        }
416
417        if (summaryFile == null || !intermediateDoneDirFc.util().exists(
418            summaryFile)) {
419          LOG.info("No summary file for job: " + jobId);
420        } else {
421          String jobSummaryString = getJobSummary(intermediateDoneDirFc,
422              summaryFile);
423          SUMMARY_LOG.info(jobSummaryString);
424          LOG.info("Deleting JobSummary file: [" + summaryFile + "]");
425          intermediateDoneDirFc.delete(summaryFile, false);
426          summaryFile = null;
427        }
428
429        Path targetDir = canonicalHistoryLogPath(jobId, completeTime);
430        addDirectoryToSerialNumberIndex(targetDir);
431        makeDoneSubdir(targetDir);
432        if (historyFile != null) {
433          Path toPath = doneDirFc.makeQualified(new Path(targetDir, historyFile
434              .getName()));
435          if (!toPath.equals(historyFile)) {
436            moveToDoneNow(historyFile, toPath);
437            historyFile = toPath;
438          }
439        }
440        if (confFile != null) {
441          Path toPath = doneDirFc.makeQualified(new Path(targetDir, confFile
442              .getName()));
443          if (!toPath.equals(confFile)) {
444            moveToDoneNow(confFile, toPath);
445            confFile = toPath;
446          }
447        }
448        state = HistoryInfoState.IN_DONE;
449      } catch (Throwable t) {
450        LOG.error("Error while trying to move a job to done", t);
451        this.state = HistoryInfoState.MOVE_FAILED;
452      }
453    }
454
455    /**
456     * Parse a job from the JobHistoryFile, if the underlying file is not going
457     * to be deleted.
458     * 
459     * @return the Job or null if the underlying file was deleted.
460     * @throws IOException
461     *           if there is an error trying to read the file.
462     */
463    public synchronized Job loadJob() throws IOException {
464      return new CompletedJob(conf, jobIndexInfo.getJobId(), historyFile,
465          false, jobIndexInfo.getUser(), this, aclsMgr);
466    }
467
468    /**
469     * Return the history file.  This should only be used for testing.
470     * @return the history file.
471     */
472    synchronized Path getHistoryFile() {
473      return historyFile;
474    }
475    
476    protected synchronized void delete() throws IOException {
477      if (LOG.isDebugEnabled()) {
478        LOG.debug("deleting " + historyFile + " and " + confFile);
479      }
480      state = HistoryInfoState.DELETED;
481      doneDirFc.delete(doneDirFc.makeQualified(historyFile), false);
482      doneDirFc.delete(doneDirFc.makeQualified(confFile), false);
483    }
484
485    public JobIndexInfo getJobIndexInfo() {
486      return jobIndexInfo;
487    }
488
489    public JobId getJobId() {
490      return jobIndexInfo.getJobId();
491    }
492
493    public synchronized Path getConfFile() {
494      return confFile;
495    }
496    
497    public synchronized Configuration loadConfFile() throws IOException {
498      FileContext fc = FileContext.getFileContext(confFile.toUri(), conf);
499      Configuration jobConf = new Configuration(false);
500      jobConf.addResource(fc.open(confFile), confFile.toString(), true);
501      return jobConf;
502    }
503  }
504
505  private SerialNumberIndex serialNumberIndex = null;
506  protected JobListCache jobListCache = null;
507
508  // Maintains a list of known done subdirectories.
509  private final Set<Path> existingDoneSubdirs = Collections
510      .synchronizedSet(new HashSet<Path>());
511
512  /**
513   * Maintains a mapping between intermediate user directories and the last
514   * known modification time.
515   */
516  private ConcurrentMap<String, UserLogDir> userDirModificationTimeMap = 
517    new ConcurrentHashMap<String, UserLogDir>();
518
519  private JobACLsManager aclsMgr;
520
521  @VisibleForTesting
522  Configuration conf;
523
524  private String serialNumberFormat;
525
526  private Path doneDirPrefixPath = null; // folder for completed jobs
527  private FileContext doneDirFc; // done Dir FileContext
528
529  private Path intermediateDoneDirPath = null; // Intermediate Done Dir Path
530  private FileContext intermediateDoneDirFc; // Intermediate Done Dir
531                                             // FileContext
532  @VisibleForTesting
533  protected ThreadPoolExecutor moveToDoneExecutor = null;
534  private long maxHistoryAge = 0;
535  
536  public HistoryFileManager() {
537    super(HistoryFileManager.class.getName());
538  }
539
540  @Override
541  protected void serviceInit(Configuration conf) throws Exception {
542    this.conf = conf;
543
544    int serialNumberLowDigits = 3;
545    serialNumberFormat = ("%0"
546        + (JobHistoryUtils.SERIAL_NUMBER_DIRECTORY_DIGITS + serialNumberLowDigits)
547        + "d");
548
549    long maxFSWaitTime = conf.getLong(
550        JHAdminConfig.MR_HISTORY_MAX_START_WAIT_TIME,
551        JHAdminConfig.DEFAULT_MR_HISTORY_MAX_START_WAIT_TIME);
552    createHistoryDirs(new SystemClock(), 10 * 1000, maxFSWaitTime);
553
554    this.aclsMgr = new JobACLsManager(conf);
555
556    maxHistoryAge = conf.getLong(JHAdminConfig.MR_HISTORY_MAX_AGE_MS,
557        JHAdminConfig.DEFAULT_MR_HISTORY_MAX_AGE);
558    
559    jobListCache = createJobListCache();
560
561    serialNumberIndex = new SerialNumberIndex(conf.getInt(
562        JHAdminConfig.MR_HISTORY_DATESTRING_CACHE_SIZE,
563        JHAdminConfig.DEFAULT_MR_HISTORY_DATESTRING_CACHE_SIZE));
564
565    int numMoveThreads = conf.getInt(
566        JHAdminConfig.MR_HISTORY_MOVE_THREAD_COUNT,
567        JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_THREAD_COUNT);
568    ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
569        "MoveIntermediateToDone Thread #%d").build();
570    moveToDoneExecutor = new ThreadPoolExecutor(numMoveThreads, numMoveThreads,
571        1, TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
572
573    super.serviceInit(conf);
574  }
575
576  @VisibleForTesting
577  void createHistoryDirs(Clock clock, long intervalCheckMillis,
578      long timeOutMillis) throws IOException {
579    long start = clock.getTime();
580    boolean done = false;
581    int counter = 0;
582    while (!done &&
583        ((timeOutMillis == -1) || (clock.getTime() - start < timeOutMillis))) {
584      done = tryCreatingHistoryDirs(counter++ % 3 == 0); // log every 3 attempts, 30sec
585      try {
586        Thread.sleep(intervalCheckMillis);
587      } catch (InterruptedException ex) {
588        throw new YarnRuntimeException(ex);
589      }
590    }
591    if (!done) {
592      throw new YarnRuntimeException("Timed out '" + timeOutMillis+
593              "ms' waiting for FileSystem to become available");
594    }
595  }
596
597  /**
598   * DistributedFileSystem returns a RemoteException with a message stating
599   * SafeModeException in it. So this is only way to check it is because of
600   * being in safe mode.
601   */
602  private boolean isBecauseSafeMode(Throwable ex) {
603    return ex.toString().contains("SafeModeException");
604  }
605
606  /**
607   * Returns TRUE if the history dirs were created, FALSE if they could not
608   * be created because the FileSystem is not reachable or in safe mode and
609   * throws and exception otherwise.
610   */
611  @VisibleForTesting
612  boolean tryCreatingHistoryDirs(boolean logWait) throws IOException {
613    boolean succeeded = true;
614    String doneDirPrefix = JobHistoryUtils.
615        getConfiguredHistoryServerDoneDirPrefix(conf);
616    try {
617      doneDirPrefixPath = FileContext.getFileContext(conf).makeQualified(
618          new Path(doneDirPrefix));
619      doneDirFc = FileContext.getFileContext(doneDirPrefixPath.toUri(), conf);
620      doneDirFc.setUMask(JobHistoryUtils.HISTORY_DONE_DIR_UMASK);
621      mkdir(doneDirFc, doneDirPrefixPath, new FsPermission(
622          JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION));
623    } catch (ConnectException ex) {
624      if (logWait) {
625        LOG.info("Waiting for FileSystem at " +
626            doneDirPrefixPath.toUri().getAuthority()  + "to be available");
627      }
628      succeeded = false;
629    } catch (IOException e) {
630      if (isBecauseSafeMode(e)) {
631        succeeded = false;
632        if (logWait) {
633          LOG.info("Waiting for FileSystem at " +
634              doneDirPrefixPath.toUri().getAuthority() +
635              "to be out of safe mode");
636        }
637      } else {
638        throw new YarnRuntimeException("Error creating done directory: ["
639            + doneDirPrefixPath + "]", e);
640      }
641    }
642    if (succeeded) {
643      String intermediateDoneDirPrefix = JobHistoryUtils.
644          getConfiguredHistoryIntermediateDoneDirPrefix(conf);
645      try {
646        intermediateDoneDirPath = FileContext.getFileContext(conf).makeQualified(
647            new Path(intermediateDoneDirPrefix));
648        intermediateDoneDirFc = FileContext.getFileContext(
649            intermediateDoneDirPath.toUri(), conf);
650        mkdir(intermediateDoneDirFc, intermediateDoneDirPath, new FsPermission(
651            JobHistoryUtils.HISTORY_INTERMEDIATE_DONE_DIR_PERMISSIONS.toShort()));
652      } catch (ConnectException ex) {
653        succeeded = false;
654        if (logWait) {
655          LOG.info("Waiting for FileSystem at " +
656              intermediateDoneDirPath.toUri().getAuthority() +
657              "to be available");
658        }
659      } catch (IOException e) {
660        if (isBecauseSafeMode(e)) {
661          succeeded = false;
662          if (logWait) {
663            LOG.info("Waiting for FileSystem at " +
664                intermediateDoneDirPath.toUri().getAuthority() +
665                "to be out of safe mode");
666          }
667        } else {
668          throw new YarnRuntimeException(
669              "Error creating intermediate done directory: ["
670              + intermediateDoneDirPath + "]", e);
671        }
672      }
673    }
674    return succeeded;
675  }
676
677  @Override
678  public void serviceStop() throws Exception {
679    ShutdownThreadsHelper.shutdownExecutorService(moveToDoneExecutor);
680    super.serviceStop();
681  }
682
683  protected JobListCache createJobListCache() {
684    return new JobListCache(conf.getInt(
685        JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE,
686        JHAdminConfig.DEFAULT_MR_HISTORY_JOBLIST_CACHE_SIZE), maxHistoryAge);
687  }
688
689  private void mkdir(FileContext fc, Path path, FsPermission fsp)
690      throws IOException {
691    if (!fc.util().exists(path)) {
692      try {
693        fc.mkdir(path, fsp, true);
694
695        FileStatus fsStatus = fc.getFileStatus(path);
696        LOG.info("Perms after creating " + fsStatus.getPermission().toShort()
697            + ", Expected: " + fsp.toShort());
698        if (fsStatus.getPermission().toShort() != fsp.toShort()) {
699          LOG.info("Explicitly setting permissions to : " + fsp.toShort()
700              + ", " + fsp);
701          fc.setPermission(path, fsp);
702        }
703      } catch (FileAlreadyExistsException e) {
704        LOG.info("Directory: [" + path + "] already exists.");
705      }
706    }
707  }
708
709  /**
710   * Populates index data structures. Should only be called at initialization
711   * times.
712   */
713  @SuppressWarnings("unchecked")
714  void initExisting() throws IOException {
715    LOG.info("Initializing Existing Jobs...");
716    List<FileStatus> timestampedDirList = findTimestampedDirectories();
717    // Sort first just so insertion is in a consistent order
718    Collections.sort(timestampedDirList);
719    for (FileStatus fs : timestampedDirList) {
720      // TODO Could verify the correct format for these directories.
721      addDirectoryToSerialNumberIndex(fs.getPath());
722    }
723    for (int i= timestampedDirList.size() - 1;
724        i >= 0 && !jobListCache.isFull(); i--) {
725      FileStatus fs = timestampedDirList.get(i); 
726      addDirectoryToJobListCache(fs.getPath());
727    }
728  }
729
730  private void removeDirectoryFromSerialNumberIndex(Path serialDirPath) {
731    String serialPart = serialDirPath.getName();
732    String timeStampPart = JobHistoryUtils
733        .getTimestampPartFromPath(serialDirPath.toString());
734    if (timeStampPart == null) {
735      LOG.warn("Could not find timestamp portion from path: "
736          + serialDirPath.toString() + ". Continuing with next");
737      return;
738    }
739    if (serialPart == null) {
740      LOG.warn("Could not find serial portion from path: "
741          + serialDirPath.toString() + ". Continuing with next");
742      return;
743    }
744    serialNumberIndex.remove(serialPart, timeStampPart);
745  }
746
747  private void addDirectoryToSerialNumberIndex(Path serialDirPath) {
748    if (LOG.isDebugEnabled()) {
749      LOG.debug("Adding " + serialDirPath + " to serial index");
750    }
751    String serialPart = serialDirPath.getName();
752    String timestampPart = JobHistoryUtils
753        .getTimestampPartFromPath(serialDirPath.toString());
754    if (timestampPart == null) {
755      LOG.warn("Could not find timestamp portion from path: " + serialDirPath
756          + ". Continuing with next");
757      return;
758    }
759    if (serialPart == null) {
760      LOG.warn("Could not find serial portion from path: "
761          + serialDirPath.toString() + ". Continuing with next");
762    } else {
763      serialNumberIndex.add(serialPart, timestampPart);
764    }
765  }
766
767  private void addDirectoryToJobListCache(Path path) throws IOException {
768    if (LOG.isDebugEnabled()) {
769      LOG.debug("Adding " + path + " to job list cache.");
770    }
771    List<FileStatus> historyFileList = scanDirectoryForHistoryFiles(path,
772        doneDirFc);
773    for (FileStatus fs : historyFileList) {
774      if (LOG.isDebugEnabled()) {
775        LOG.debug("Adding in history for " + fs.getPath());
776      }
777      JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
778          .getName());
779      String confFileName = JobHistoryUtils
780          .getIntermediateConfFileName(jobIndexInfo.getJobId());
781      String summaryFileName = JobHistoryUtils
782          .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
783      HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(fs
784          .getPath().getParent(), confFileName), new Path(fs.getPath()
785          .getParent(), summaryFileName), jobIndexInfo, true);
786      jobListCache.addIfAbsent(fileInfo);
787    }
788  }
789
790  @VisibleForTesting
791  protected static List<FileStatus> scanDirectory(Path path, FileContext fc,
792      PathFilter pathFilter) throws IOException {
793    path = fc.makeQualified(path);
794    List<FileStatus> jhStatusList = new ArrayList<FileStatus>();
795    try {
796      RemoteIterator<FileStatus> fileStatusIter = fc.listStatus(path);
797      while (fileStatusIter.hasNext()) {
798        FileStatus fileStatus = fileStatusIter.next();
799        Path filePath = fileStatus.getPath();
800        if (fileStatus.isFile() && pathFilter.accept(filePath)) {
801          jhStatusList.add(fileStatus);
802        }
803      }
804    } catch (FileNotFoundException fe) {
805      LOG.error("Error while scanning directory " + path, fe);
806    }
807    return jhStatusList;
808  }
809
810  protected List<FileStatus> scanDirectoryForHistoryFiles(Path path,
811      FileContext fc) throws IOException {
812    return scanDirectory(path, fc, JobHistoryUtils.getHistoryFileFilter());
813  }
814  
815  /**
816   * Finds all history directories with a timestamp component by scanning the
817   * filesystem. Used when the JobHistory server is started.
818   * 
819   * @return list of history directories
820   */
821  protected List<FileStatus> findTimestampedDirectories() throws IOException {
822    List<FileStatus> fsList = JobHistoryUtils.localGlobber(doneDirFc,
823        doneDirPrefixPath, DONE_BEFORE_SERIAL_TAIL);
824    return fsList;
825  }
826
827  /**
828   * Scans the intermediate directory to find user directories. Scans these for
829   * history files if the modification time for the directory has changed. Once
830   * it finds history files it starts the process of moving them to the done 
831   * directory.
832   * 
833   * @throws IOException
834   *           if there was a error while scanning
835   */
836  void scanIntermediateDirectory() throws IOException {
837    // TODO it would be great to limit how often this happens, except in the
838    // case where we are looking for a particular job.
839    List<FileStatus> userDirList = JobHistoryUtils.localGlobber(
840        intermediateDoneDirFc, intermediateDoneDirPath, "");
841    LOG.debug("Scanning intermediate dirs");
842    for (FileStatus userDir : userDirList) {
843      String name = userDir.getPath().getName();
844      UserLogDir dir = userDirModificationTimeMap.get(name);
845      if(dir == null) {
846        dir = new UserLogDir();
847        UserLogDir old = userDirModificationTimeMap.putIfAbsent(name, dir);
848        if(old != null) {
849          dir = old;
850        }
851      }
852      dir.scanIfNeeded(userDir);
853    }
854  }
855
856  /**
857   * Scans the specified path and populates the intermediate cache.
858   * 
859   * @param absPath
860   * @throws IOException
861   */
862  private void scanIntermediateDirectory(final Path absPath) throws IOException {
863    if (LOG.isDebugEnabled()) {
864      LOG.debug("Scanning intermediate dir " + absPath);
865    }
866    List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(absPath,
867        intermediateDoneDirFc);
868    if (LOG.isDebugEnabled()) {
869      LOG.debug("Found " + fileStatusList.size() + " files");
870    }
871    for (FileStatus fs : fileStatusList) {
872      if (LOG.isDebugEnabled()) {
873        LOG.debug("scanning file: "+ fs.getPath());
874      }
875      JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
876          .getName());
877      String confFileName = JobHistoryUtils
878          .getIntermediateConfFileName(jobIndexInfo.getJobId());
879      String summaryFileName = JobHistoryUtils
880          .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
881      HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(fs
882          .getPath().getParent(), confFileName), new Path(fs.getPath()
883          .getParent(), summaryFileName), jobIndexInfo, false);
884
885      final HistoryFileInfo old = jobListCache.addIfAbsent(fileInfo);
886      if (old == null || old.didMoveFail()) {
887        final HistoryFileInfo found = (old == null) ? fileInfo : old;
888        long cutoff = System.currentTimeMillis() - maxHistoryAge;
889        if(found.getJobIndexInfo().getFinishTime() <= cutoff) {
890          try {
891            found.delete();
892          } catch (IOException e) {
893            LOG.warn("Error cleaning up a HistoryFile that is out of date.", e);
894          }
895        } else {
896          if (LOG.isDebugEnabled()) {
897            LOG.debug("Scheduling move to done of " +found);
898          }
899          moveToDoneExecutor.execute(new Runnable() {
900            @Override
901            public void run() {
902              try {
903                found.moveToDone();
904              } catch (IOException e) {
905                LOG.info("Failed to process fileInfo for job: " + 
906                    found.getJobId(), e);
907              }
908            }
909          });
910        }
911      } else if (!old.isMovePending()) {
912        //This is a duplicate so just delete it
913        if (LOG.isDebugEnabled()) {
914          LOG.debug("Duplicate: deleting");
915        }
916        fileInfo.delete();
917      }
918    }
919  }
920
921  /**
922   * Searches the job history file FileStatus list for the specified JobId.
923   * 
924   * @param fileStatusList
925   *          fileStatus list of Job History Files.
926   * @param jobId
927   *          The JobId to find.
928   * @return A FileInfo object for the jobId, null if not found.
929   * @throws IOException
930   */
931  private HistoryFileInfo getJobFileInfo(List<FileStatus> fileStatusList,
932      JobId jobId) throws IOException {
933    for (FileStatus fs : fileStatusList) {
934      JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
935          .getName());
936      if (jobIndexInfo.getJobId().equals(jobId)) {
937        String confFileName = JobHistoryUtils
938            .getIntermediateConfFileName(jobIndexInfo.getJobId());
939        String summaryFileName = JobHistoryUtils
940            .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
941        HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(
942            fs.getPath().getParent(), confFileName), new Path(fs.getPath()
943            .getParent(), summaryFileName), jobIndexInfo, true);
944        return fileInfo;
945      }
946    }
947    return null;
948  }
949
950  /**
951   * Scans old directories known by the idToDateString map for the specified
952   * jobId. If the number of directories is higher than the supported size of
953   * the idToDateString cache, the jobId will not be found.
954   * 
955   * @param jobId
956   *          the jobId.
957   * @return
958   * @throws IOException
959   */
960  private HistoryFileInfo scanOldDirsForJob(JobId jobId) throws IOException {
961    String boxedSerialNumber = JobHistoryUtils.serialNumberDirectoryComponent(
962        jobId, serialNumberFormat);
963    Set<String> dateStringSet = serialNumberIndex.get(boxedSerialNumber);
964    if (dateStringSet == null) {
965      return null;
966    }
967    for (String timestampPart : dateStringSet) {
968      Path logDir = canonicalHistoryLogPath(jobId, timestampPart);
969      List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(logDir,
970          doneDirFc);
971      HistoryFileInfo fileInfo = getJobFileInfo(fileStatusList, jobId);
972      if (fileInfo != null) {
973        return fileInfo;
974      }
975    }
976    return null;
977  }
978
979  public Collection<HistoryFileInfo> getAllFileInfo() throws IOException {
980    scanIntermediateDirectory();
981    return jobListCache.values();
982  }
983
984  public HistoryFileInfo getFileInfo(JobId jobId) throws IOException {
985    // FileInfo available in cache.
986    HistoryFileInfo fileInfo = jobListCache.get(jobId);
987    if (fileInfo != null) {
988      return fileInfo;
989    }
990    // OK so scan the intermediate to be sure we did not lose it that way
991    scanIntermediateDirectory();
992    fileInfo = jobListCache.get(jobId);
993    if (fileInfo != null) {
994      return fileInfo;
995    }
996
997    // Intermediate directory does not contain job. Search through older ones.
998    fileInfo = scanOldDirsForJob(jobId);
999    if (fileInfo != null) {
1000      return fileInfo;
1001    }
1002    return null;
1003  }
1004
1005  private void moveToDoneNow(final Path src, final Path target)
1006      throws IOException {
1007    LOG.info("Moving " + src.toString() + " to " + target.toString());
1008    intermediateDoneDirFc.rename(src, target, Options.Rename.NONE);
1009  }
1010
1011  private String getJobSummary(FileContext fc, Path path) throws IOException {
1012    Path qPath = fc.makeQualified(path);
1013    FSDataInputStream in = null;
1014    String jobSummaryString = null;
1015    try {
1016      in = fc.open(qPath);
1017      jobSummaryString = in.readUTF();
1018    } finally {
1019      if (in != null) {
1020        in.close();
1021      }
1022    }
1023    return jobSummaryString;
1024  }
1025
1026  private void makeDoneSubdir(Path path) throws IOException {
1027    try {
1028      doneDirFc.getFileStatus(path);
1029      existingDoneSubdirs.add(path);
1030    } catch (FileNotFoundException fnfE) {
1031      try {
1032        FsPermission fsp = new FsPermission(
1033            JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION);
1034        doneDirFc.mkdir(path, fsp, true);
1035        FileStatus fsStatus = doneDirFc.getFileStatus(path);
1036        LOG.info("Perms after creating " + fsStatus.getPermission().toShort()
1037            + ", Expected: " + fsp.toShort());
1038        if (fsStatus.getPermission().toShort() != fsp.toShort()) {
1039          LOG.info("Explicitly setting permissions to : " + fsp.toShort()
1040              + ", " + fsp);
1041          doneDirFc.setPermission(path, fsp);
1042        }
1043        existingDoneSubdirs.add(path);
1044      } catch (FileAlreadyExistsException faeE) { // Nothing to do.
1045      }
1046    }
1047  }
1048
1049  private Path canonicalHistoryLogPath(JobId id, String timestampComponent) {
1050    return new Path(doneDirPrefixPath, JobHistoryUtils.historyLogSubdirectory(
1051        id, timestampComponent, serialNumberFormat));
1052  }
1053
1054  private Path canonicalHistoryLogPath(JobId id, long millisecondTime) {
1055    String timestampComponent = JobHistoryUtils
1056        .timestampDirectoryComponent(millisecondTime);
1057    return new Path(doneDirPrefixPath, JobHistoryUtils.historyLogSubdirectory(
1058        id, timestampComponent, serialNumberFormat));
1059  }
1060
1061  private long getEffectiveTimestamp(long finishTime, FileStatus fileStatus) {
1062    if (finishTime == 0) {
1063      return fileStatus.getModificationTime();
1064    }
1065    return finishTime;
1066  }
1067
1068  private void deleteJobFromDone(HistoryFileInfo fileInfo) throws IOException {
1069    jobListCache.delete(fileInfo);
1070    fileInfo.delete();
1071  }
1072
1073  List<FileStatus> getHistoryDirsForCleaning(long cutoff) throws IOException {
1074      return JobHistoryUtils.
1075        getHistoryDirsForCleaning(doneDirFc, doneDirPrefixPath, cutoff);
1076  }
1077
1078  /**
1079   * Clean up older history files.
1080   * 
1081   * @throws IOException
1082   *           on any error trying to remove the entries.
1083   */
1084  @SuppressWarnings("unchecked")
1085  void clean() throws IOException {
1086    long cutoff = System.currentTimeMillis() - maxHistoryAge;
1087    boolean halted = false;
1088    List<FileStatus> serialDirList = getHistoryDirsForCleaning(cutoff);
1089    // Sort in ascending order. Relies on YYYY/MM/DD/Serial
1090    Collections.sort(serialDirList);
1091    for (FileStatus serialDir : serialDirList) {
1092      List<FileStatus> historyFileList = scanDirectoryForHistoryFiles(
1093          serialDir.getPath(), doneDirFc);
1094      for (FileStatus historyFile : historyFileList) {
1095        JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(historyFile
1096            .getPath().getName());
1097        long effectiveTimestamp = getEffectiveTimestamp(
1098            jobIndexInfo.getFinishTime(), historyFile);
1099        if (effectiveTimestamp <= cutoff) {
1100          HistoryFileInfo fileInfo = this.jobListCache.get(jobIndexInfo
1101              .getJobId());
1102          if (fileInfo == null) {
1103            String confFileName = JobHistoryUtils
1104                .getIntermediateConfFileName(jobIndexInfo.getJobId());
1105
1106            fileInfo = new HistoryFileInfo(historyFile.getPath(), new Path(
1107                historyFile.getPath().getParent(), confFileName), null,
1108                jobIndexInfo, true);
1109          }
1110          deleteJobFromDone(fileInfo);
1111        } else {
1112          halted = true;
1113          break;
1114        }
1115      }
1116      if (!halted) {
1117        deleteDir(serialDir);
1118        removeDirectoryFromSerialNumberIndex(serialDir.getPath());
1119        existingDoneSubdirs.remove(serialDir.getPath());
1120      } else {
1121        break; // Don't scan any more directories.
1122      }
1123    }
1124  }
1125  
1126  protected boolean deleteDir(FileStatus serialDir)
1127      throws AccessControlException, FileNotFoundException,
1128      UnsupportedFileSystemException, IOException {
1129    return doneDirFc.delete(doneDirFc.makeQualified(serialDir.getPath()), true);
1130  }
1131
1132  @VisibleForTesting
1133  protected void setMaxHistoryAge(long newValue){
1134    maxHistoryAge=newValue;
1135  } 
1136}