001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.v2.hs;
020
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.net.ConnectException;
024import java.util.ArrayList;
025import java.util.Collection;
026import java.util.Collections;
027import java.util.HashSet;
028import java.util.Iterator;
029import java.util.List;
030import java.util.NavigableSet;
031import java.util.Set;
032import java.util.SortedMap;
033import java.util.TreeMap;
034import java.util.concurrent.ConcurrentHashMap;
035import java.util.concurrent.ConcurrentMap;
036import java.util.concurrent.ConcurrentSkipListMap;
037import java.util.concurrent.LinkedBlockingQueue;
038import java.util.concurrent.ThreadFactory;
039import java.util.concurrent.ThreadPoolExecutor;
040import java.util.concurrent.TimeUnit;
041import java.util.concurrent.atomic.AtomicInteger;
042
043import org.apache.commons.logging.Log;
044import org.apache.commons.logging.LogFactory;
045import org.apache.hadoop.classification.InterfaceAudience;
046import org.apache.hadoop.classification.InterfaceStability;
047import org.apache.hadoop.conf.Configuration;
048import org.apache.hadoop.fs.FSDataInputStream;
049import org.apache.hadoop.fs.FileAlreadyExistsException;
050import org.apache.hadoop.fs.FileContext;
051import org.apache.hadoop.fs.FileStatus;
052import org.apache.hadoop.fs.Options;
053import org.apache.hadoop.fs.Path;
054import org.apache.hadoop.fs.PathFilter;
055import org.apache.hadoop.fs.RemoteIterator;
056import org.apache.hadoop.fs.UnsupportedFileSystemException;
057import org.apache.hadoop.fs.permission.FsPermission;
058import org.apache.hadoop.mapred.JobACLsManager;
059import org.apache.hadoop.mapreduce.jobhistory.JobSummary;
060import org.apache.hadoop.mapreduce.v2.api.records.JobId;
061import org.apache.hadoop.mapreduce.v2.app.job.Job;
062import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
063import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
064import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
065import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
066import org.apache.hadoop.security.AccessControlException;
067import org.apache.hadoop.service.AbstractService;
068import org.apache.hadoop.util.ShutdownThreadsHelper;
069import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
070
071import com.google.common.annotations.VisibleForTesting;
072import com.google.common.util.concurrent.ThreadFactoryBuilder;
073import org.apache.hadoop.yarn.util.Clock;
074import org.apache.hadoop.yarn.util.SystemClock;
075
076/**
077 * This class provides a way to interact with history files in a thread safe
078 * manor.
079 */
080@InterfaceAudience.Public
081@InterfaceStability.Unstable
082public class HistoryFileManager extends AbstractService {
083  private static final Log LOG = LogFactory.getLog(HistoryFileManager.class);
084  private static final Log SUMMARY_LOG = LogFactory.getLog(JobSummary.class);
085
086  private static enum HistoryInfoState {
087    IN_INTERMEDIATE, IN_DONE, DELETED, MOVE_FAILED
088  };
089  
090  private static String DONE_BEFORE_SERIAL_TAIL = JobHistoryUtils
091      .doneSubdirsBeforeSerialTail();
092
093  /**
094   * Maps between a serial number (generated based on jobId) and the timestamp
095   * component(s) to which it belongs. Facilitates jobId based searches. If a
096   * jobId is not found in this list - it will not be found.
097   */
098  private static class SerialNumberIndex {
099    private SortedMap<String, Set<String>> cache;
100    private int maxSize;
101
102    public SerialNumberIndex(int maxSize) {
103      this.cache = new TreeMap<String, Set<String>>();
104      this.maxSize = maxSize;
105    }
106
107    public synchronized void add(String serialPart, String timestampPart) {
108      if (!cache.containsKey(serialPart)) {
109        cache.put(serialPart, new HashSet<String>());
110        if (cache.size() > maxSize) {
111          String key = cache.firstKey();
112          LOG.error("Dropping " + key
113              + " from the SerialNumberIndex. We will no "
114              + "longer be able to see jobs that are in that serial index for "
115              + cache.get(key));
116          cache.remove(key);
117        }
118      }
119      Set<String> datePartSet = cache.get(serialPart);
120      datePartSet.add(timestampPart);
121    }
122
123    public synchronized void remove(String serialPart, String timeStampPart) {
124      if (cache.containsKey(serialPart)) {
125        Set<String> set = cache.get(serialPart);
126        set.remove(timeStampPart);
127        if (set.isEmpty()) {
128          cache.remove(serialPart);
129        }
130      }
131    }
132
133    public synchronized Set<String> get(String serialPart) {
134      Set<String> found = cache.get(serialPart);
135      if (found != null) {
136        return new HashSet<String>(found);
137      }
138      return null;
139    }
140  }
141
142  /**
143   * Wrapper around {@link ConcurrentSkipListMap} that maintains size along
144   * side for O(1) size() implementation for use in JobListCache.
145   *
146   * Note: The size is not updated atomically with changes additions/removals.
147   * This race can lead to size() returning an incorrect size at times.
148   */
149  static class JobIdHistoryFileInfoMap {
150    private ConcurrentSkipListMap<JobId, HistoryFileInfo> cache;
151    private AtomicInteger mapSize;
152
153    JobIdHistoryFileInfoMap() {
154      cache = new ConcurrentSkipListMap<JobId, HistoryFileInfo>();
155      mapSize = new AtomicInteger();
156    }
157
158    public HistoryFileInfo putIfAbsent(JobId key, HistoryFileInfo value) {
159      HistoryFileInfo ret = cache.putIfAbsent(key, value);
160      if (ret == null) {
161        mapSize.incrementAndGet();
162      }
163      return ret;
164    }
165
166    public HistoryFileInfo remove(JobId key) {
167      HistoryFileInfo ret = cache.remove(key);
168      if (ret != null) {
169        mapSize.decrementAndGet();
170      }
171      return ret;
172    }
173
174    /**
175     * Returns the recorded size of the internal map. Note that this could be out
176     * of sync with the actual size of the map
177     * @return "recorded" size
178     */
179    public int size() {
180      return mapSize.get();
181    }
182
183    public HistoryFileInfo get(JobId key) {
184      return cache.get(key);
185    }
186
187    public NavigableSet<JobId> navigableKeySet() {
188      return cache.navigableKeySet();
189    }
190
191    public Collection<HistoryFileInfo> values() {
192      return cache.values();
193    }
194  }
195
196  static class JobListCache {
197    private JobIdHistoryFileInfoMap cache;
198    private int maxSize;
199    private long maxAge;
200
201    public JobListCache(int maxSize, long maxAge) {
202      this.maxSize = maxSize;
203      this.maxAge = maxAge;
204      this.cache = new JobIdHistoryFileInfoMap();
205    }
206
207    public HistoryFileInfo addIfAbsent(HistoryFileInfo fileInfo) {
208      JobId jobId = fileInfo.getJobId();
209      if (LOG.isDebugEnabled()) {
210        LOG.debug("Adding " + jobId + " to job list cache with "
211            + fileInfo.getJobIndexInfo());
212      }
213      HistoryFileInfo old = cache.putIfAbsent(jobId, fileInfo);
214      if (cache.size() > maxSize) {
215        //There is a race here, where more then one thread could be trying to
216        // remove entries.  This could result in too many entries being removed
217        // from the cache.  This is considered OK as the size of the cache
218        // should be rather large, and we would rather have performance over
219        // keeping the cache size exactly at the maximum.
220        Iterator<JobId> keys = cache.navigableKeySet().iterator();
221        long cutoff = System.currentTimeMillis() - maxAge;
222        while(cache.size() > maxSize && keys.hasNext()) {
223          JobId key = keys.next();
224          HistoryFileInfo firstValue = cache.get(key);
225          if(firstValue != null) {
226            synchronized(firstValue) {
227              if (firstValue.isMovePending()) {
228                if(firstValue.didMoveFail() && 
229                    firstValue.jobIndexInfo.getFinishTime() <= cutoff) {
230                  cache.remove(key);
231                  //Now lets try to delete it
232                  try {
233                    firstValue.delete();
234                  } catch (IOException e) {
235                    LOG.error("Error while trying to delete history files" +
236                                " that could not be moved to done.", e);
237                  }
238                } else {
239                  LOG.warn("Waiting to remove " + key
240                      + " from JobListCache because it is not in done yet.");
241                }
242              } else {
243                cache.remove(key);
244              }
245            }
246          }
247        }
248      }
249      return old;
250    }
251
252    public void delete(HistoryFileInfo fileInfo) {
253      if (LOG.isDebugEnabled()) {
254        LOG.debug("Removing from cache " + fileInfo);
255      }
256      cache.remove(fileInfo.getJobId());
257    }
258
259    public Collection<HistoryFileInfo> values() {
260      return new ArrayList<HistoryFileInfo>(cache.values());
261    }
262
263    public HistoryFileInfo get(JobId jobId) {
264      return cache.get(jobId);
265    }
266
267    public boolean isFull() {
268      return cache.size() >= maxSize;
269    }
270  }
271
272  /**
273   * This class represents a user dir in the intermediate done directory.  This
274   * is mostly for locking purposes. 
275   */
276  private class UserLogDir {
277    long modTime = 0;
278    
279    public synchronized void scanIfNeeded(FileStatus fs) {
280      long newModTime = fs.getModificationTime();
281      if (modTime != newModTime) {
282        Path p = fs.getPath();
283        try {
284          scanIntermediateDirectory(p);
285          //If scanning fails, we will scan again.  We assume the failure is
286          // temporary.
287          modTime = newModTime;
288        } catch (IOException e) {
289          LOG.error("Error while trying to scan the directory " + p, e);
290        }
291      } else {
292        if (LOG.isDebugEnabled()) {
293          LOG.debug("Scan not needed of " + fs.getPath());
294        }
295      }
296    }
297  }
298  
299  public class HistoryFileInfo {
300    private Path historyFile;
301    private Path confFile;
302    private Path summaryFile;
303    private JobIndexInfo jobIndexInfo;
304    private HistoryInfoState state;
305
306    private HistoryFileInfo(Path historyFile, Path confFile, Path summaryFile,
307        JobIndexInfo jobIndexInfo, boolean isInDone) {
308      this.historyFile = historyFile;
309      this.confFile = confFile;
310      this.summaryFile = summaryFile;
311      this.jobIndexInfo = jobIndexInfo;
312      state = isInDone ? HistoryInfoState.IN_DONE
313          : HistoryInfoState.IN_INTERMEDIATE;
314    }
315
316    @VisibleForTesting
317    synchronized boolean isMovePending() {
318      return state == HistoryInfoState.IN_INTERMEDIATE
319          || state == HistoryInfoState.MOVE_FAILED;
320    }
321
322    @VisibleForTesting
323    synchronized boolean didMoveFail() {
324      return state == HistoryInfoState.MOVE_FAILED;
325    }
326    
327    /**
328     * @return true if the files backed by this were deleted.
329     */
330    public synchronized boolean isDeleted() {
331      return state == HistoryInfoState.DELETED;
332    }
333
334    @Override
335    public String toString() {
336      return "HistoryFileInfo jobID " + getJobId()
337             + " historyFile = " + historyFile;
338    }
339
340    private synchronized void moveToDone() throws IOException {
341      if (LOG.isDebugEnabled()) {
342        LOG.debug("moveToDone: " + historyFile);
343      }
344      if (!isMovePending()) {
345        // It was either deleted or is already in done. Either way do nothing
346        if (LOG.isDebugEnabled()) {
347          LOG.debug("Move no longer pending");
348        }
349        return;
350      }
351      try {
352        long completeTime = jobIndexInfo.getFinishTime();
353        if (completeTime == 0) {
354          completeTime = System.currentTimeMillis();
355        }
356        JobId jobId = jobIndexInfo.getJobId();
357
358        List<Path> paths = new ArrayList<Path>(2);
359        if (historyFile == null) {
360          LOG.info("No file for job-history with " + jobId + " found in cache!");
361        } else {
362          paths.add(historyFile);
363        }
364
365        if (confFile == null) {
366          LOG.info("No file for jobConf with " + jobId + " found in cache!");
367        } else {
368          paths.add(confFile);
369        }
370
371        if (summaryFile == null) {
372          LOG.info("No summary file for job: " + jobId);
373        } else {
374          String jobSummaryString = getJobSummary(intermediateDoneDirFc,
375              summaryFile);
376          SUMMARY_LOG.info(jobSummaryString);
377          LOG.info("Deleting JobSummary file: [" + summaryFile + "]");
378          intermediateDoneDirFc.delete(summaryFile, false);
379          summaryFile = null;
380        }
381
382        Path targetDir = canonicalHistoryLogPath(jobId, completeTime);
383        addDirectoryToSerialNumberIndex(targetDir);
384        makeDoneSubdir(targetDir);
385        if (historyFile != null) {
386          Path toPath = doneDirFc.makeQualified(new Path(targetDir, historyFile
387              .getName()));
388          if (!toPath.equals(historyFile)) {
389            moveToDoneNow(historyFile, toPath);
390            historyFile = toPath;
391          }
392        }
393        if (confFile != null) {
394          Path toPath = doneDirFc.makeQualified(new Path(targetDir, confFile
395              .getName()));
396          if (!toPath.equals(confFile)) {
397            moveToDoneNow(confFile, toPath);
398            confFile = toPath;
399          }
400        }
401        state = HistoryInfoState.IN_DONE;
402      } catch (Throwable t) {
403        LOG.error("Error while trying to move a job to done", t);
404        this.state = HistoryInfoState.MOVE_FAILED;
405      }
406    }
407
408    /**
409     * Parse a job from the JobHistoryFile, if the underlying file is not going
410     * to be deleted.
411     * 
412     * @return the Job or null if the underlying file was deleted.
413     * @throws IOException
414     *           if there is an error trying to read the file.
415     */
416    public synchronized Job loadJob() throws IOException {
417      return new CompletedJob(conf, jobIndexInfo.getJobId(), historyFile,
418          false, jobIndexInfo.getUser(), this, aclsMgr);
419    }
420
421    /**
422     * Return the history file.  This should only be used for testing.
423     * @return the history file.
424     */
425    synchronized Path getHistoryFile() {
426      return historyFile;
427    }
428    
429    protected synchronized void delete() throws IOException {
430      if (LOG.isDebugEnabled()) {
431        LOG.debug("deleting " + historyFile + " and " + confFile);
432      }
433      state = HistoryInfoState.DELETED;
434      doneDirFc.delete(doneDirFc.makeQualified(historyFile), false);
435      doneDirFc.delete(doneDirFc.makeQualified(confFile), false);
436    }
437
438    public JobIndexInfo getJobIndexInfo() {
439      return jobIndexInfo;
440    }
441
442    public JobId getJobId() {
443      return jobIndexInfo.getJobId();
444    }
445
446    public synchronized Path getConfFile() {
447      return confFile;
448    }
449    
450    public synchronized Configuration loadConfFile() throws IOException {
451      FileContext fc = FileContext.getFileContext(confFile.toUri(), conf);
452      Configuration jobConf = new Configuration(false);
453      jobConf.addResource(fc.open(confFile), confFile.toString());
454      return jobConf;
455    }
456  }
457
458  private SerialNumberIndex serialNumberIndex = null;
459  protected JobListCache jobListCache = null;
460
461  // Maintains a list of known done subdirectories.
462  private final Set<Path> existingDoneSubdirs = Collections
463      .synchronizedSet(new HashSet<Path>());
464
465  /**
466   * Maintains a mapping between intermediate user directories and the last
467   * known modification time.
468   */
469  private ConcurrentMap<String, UserLogDir> userDirModificationTimeMap = 
470    new ConcurrentHashMap<String, UserLogDir>();
471
472  private JobACLsManager aclsMgr;
473
474  @VisibleForTesting
475  Configuration conf;
476
477  private String serialNumberFormat;
478
479  private Path doneDirPrefixPath = null; // folder for completed jobs
480  private FileContext doneDirFc; // done Dir FileContext
481
482  private Path intermediateDoneDirPath = null; // Intermediate Done Dir Path
483  private FileContext intermediateDoneDirFc; // Intermediate Done Dir
484                                             // FileContext
485  @VisibleForTesting
486  protected ThreadPoolExecutor moveToDoneExecutor = null;
487  private long maxHistoryAge = 0;
488  
489  public HistoryFileManager() {
490    super(HistoryFileManager.class.getName());
491  }
492
493  @Override
494  protected void serviceInit(Configuration conf) throws Exception {
495    this.conf = conf;
496
497    int serialNumberLowDigits = 3;
498    serialNumberFormat = ("%0"
499        + (JobHistoryUtils.SERIAL_NUMBER_DIRECTORY_DIGITS + serialNumberLowDigits)
500        + "d");
501
502    long maxFSWaitTime = conf.getLong(
503        JHAdminConfig.MR_HISTORY_MAX_START_WAIT_TIME,
504        JHAdminConfig.DEFAULT_MR_HISTORY_MAX_START_WAIT_TIME);
505    createHistoryDirs(new SystemClock(), 10 * 1000, maxFSWaitTime);
506
507    this.aclsMgr = new JobACLsManager(conf);
508
509    maxHistoryAge = conf.getLong(JHAdminConfig.MR_HISTORY_MAX_AGE_MS,
510        JHAdminConfig.DEFAULT_MR_HISTORY_MAX_AGE);
511    
512    jobListCache = createJobListCache();
513
514    serialNumberIndex = new SerialNumberIndex(conf.getInt(
515        JHAdminConfig.MR_HISTORY_DATESTRING_CACHE_SIZE,
516        JHAdminConfig.DEFAULT_MR_HISTORY_DATESTRING_CACHE_SIZE));
517
518    int numMoveThreads = conf.getInt(
519        JHAdminConfig.MR_HISTORY_MOVE_THREAD_COUNT,
520        JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_THREAD_COUNT);
521    ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
522        "MoveIntermediateToDone Thread #%d").build();
523    moveToDoneExecutor = new ThreadPoolExecutor(numMoveThreads, numMoveThreads,
524        1, TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
525
526    super.serviceInit(conf);
527  }
528
529  @VisibleForTesting
530  void createHistoryDirs(Clock clock, long intervalCheckMillis,
531      long timeOutMillis) throws IOException {
532    long start = clock.getTime();
533    boolean done = false;
534    int counter = 0;
535    while (!done &&
536        ((timeOutMillis == -1) || (clock.getTime() - start < timeOutMillis))) {
537      done = tryCreatingHistoryDirs(counter++ % 3 == 0); // log every 3 attempts, 30sec
538      try {
539        Thread.sleep(intervalCheckMillis);
540      } catch (InterruptedException ex) {
541        throw new YarnRuntimeException(ex);
542      }
543    }
544    if (!done) {
545      throw new YarnRuntimeException("Timed out '" + timeOutMillis+
546              "ms' waiting for FileSystem to become available");
547    }
548  }
549
550  /**
551   * DistributedFileSystem returns a RemoteException with a message stating
552   * SafeModeException in it. So this is only way to check it is because of
553   * being in safe mode.
554   */
555  private boolean isBecauseSafeMode(Throwable ex) {
556    return ex.toString().contains("SafeModeException");
557  }
558
559  /**
560   * Returns TRUE if the history dirs were created, FALSE if they could not
561   * be created because the FileSystem is not reachable or in safe mode and
562   * throws and exception otherwise.
563   */
564  @VisibleForTesting
565  boolean tryCreatingHistoryDirs(boolean logWait) throws IOException {
566    boolean succeeded = true;
567    String doneDirPrefix = JobHistoryUtils.
568        getConfiguredHistoryServerDoneDirPrefix(conf);
569    try {
570      doneDirPrefixPath = FileContext.getFileContext(conf).makeQualified(
571          new Path(doneDirPrefix));
572      doneDirFc = FileContext.getFileContext(doneDirPrefixPath.toUri(), conf);
573      doneDirFc.setUMask(JobHistoryUtils.HISTORY_DONE_DIR_UMASK);
574      mkdir(doneDirFc, doneDirPrefixPath, new FsPermission(
575          JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION));
576    } catch (ConnectException ex) {
577      if (logWait) {
578        LOG.info("Waiting for FileSystem at " +
579            doneDirPrefixPath.toUri().getAuthority()  + "to be available");
580      }
581      succeeded = false;
582    } catch (IOException e) {
583      if (isBecauseSafeMode(e)) {
584        succeeded = false;
585        if (logWait) {
586          LOG.info("Waiting for FileSystem at " +
587              doneDirPrefixPath.toUri().getAuthority() +
588              "to be out of safe mode");
589        }
590      } else {
591        throw new YarnRuntimeException("Error creating done directory: ["
592            + doneDirPrefixPath + "]", e);
593      }
594    }
595    if (succeeded) {
596      String intermediateDoneDirPrefix = JobHistoryUtils.
597          getConfiguredHistoryIntermediateDoneDirPrefix(conf);
598      try {
599        intermediateDoneDirPath = FileContext.getFileContext(conf).makeQualified(
600            new Path(intermediateDoneDirPrefix));
601        intermediateDoneDirFc = FileContext.getFileContext(
602            intermediateDoneDirPath.toUri(), conf);
603        mkdir(intermediateDoneDirFc, intermediateDoneDirPath, new FsPermission(
604            JobHistoryUtils.HISTORY_INTERMEDIATE_DONE_DIR_PERMISSIONS.toShort()));
605      } catch (ConnectException ex) {
606        succeeded = false;
607        if (logWait) {
608          LOG.info("Waiting for FileSystem at " +
609              intermediateDoneDirPath.toUri().getAuthority() +
610              "to be available");
611        }
612      } catch (IOException e) {
613        if (isBecauseSafeMode(e)) {
614          succeeded = false;
615          if (logWait) {
616            LOG.info("Waiting for FileSystem at " +
617                intermediateDoneDirPath.toUri().getAuthority() +
618                "to be out of safe mode");
619          }
620        } else {
621          throw new YarnRuntimeException(
622              "Error creating intermediate done directory: ["
623              + intermediateDoneDirPath + "]", e);
624        }
625      }
626    }
627    return succeeded;
628  }
629
630  @Override
631  public void serviceStop() throws Exception {
632    ShutdownThreadsHelper.shutdownExecutorService(moveToDoneExecutor);
633    super.serviceStop();
634  }
635
636  protected JobListCache createJobListCache() {
637    return new JobListCache(conf.getInt(
638        JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE,
639        JHAdminConfig.DEFAULT_MR_HISTORY_JOBLIST_CACHE_SIZE), maxHistoryAge);
640  }
641
642  private void mkdir(FileContext fc, Path path, FsPermission fsp)
643      throws IOException {
644    if (!fc.util().exists(path)) {
645      try {
646        fc.mkdir(path, fsp, true);
647
648        FileStatus fsStatus = fc.getFileStatus(path);
649        LOG.info("Perms after creating " + fsStatus.getPermission().toShort()
650            + ", Expected: " + fsp.toShort());
651        if (fsStatus.getPermission().toShort() != fsp.toShort()) {
652          LOG.info("Explicitly setting permissions to : " + fsp.toShort()
653              + ", " + fsp);
654          fc.setPermission(path, fsp);
655        }
656      } catch (FileAlreadyExistsException e) {
657        LOG.info("Directory: [" + path + "] already exists.");
658      }
659    }
660  }
661
662  /**
663   * Populates index data structures. Should only be called at initialization
664   * times.
665   */
666  @SuppressWarnings("unchecked")
667  void initExisting() throws IOException {
668    LOG.info("Initializing Existing Jobs...");
669    List<FileStatus> timestampedDirList = findTimestampedDirectories();
670    // Sort first just so insertion is in a consistent order
671    Collections.sort(timestampedDirList);
672    for (FileStatus fs : timestampedDirList) {
673      // TODO Could verify the correct format for these directories.
674      addDirectoryToSerialNumberIndex(fs.getPath());
675    }
676    for (int i= timestampedDirList.size() - 1;
677        i >= 0 && !jobListCache.isFull(); i--) {
678      FileStatus fs = timestampedDirList.get(i); 
679      addDirectoryToJobListCache(fs.getPath());
680    }
681  }
682
683  private void removeDirectoryFromSerialNumberIndex(Path serialDirPath) {
684    String serialPart = serialDirPath.getName();
685    String timeStampPart = JobHistoryUtils
686        .getTimestampPartFromPath(serialDirPath.toString());
687    if (timeStampPart == null) {
688      LOG.warn("Could not find timestamp portion from path: "
689          + serialDirPath.toString() + ". Continuing with next");
690      return;
691    }
692    if (serialPart == null) {
693      LOG.warn("Could not find serial portion from path: "
694          + serialDirPath.toString() + ". Continuing with next");
695      return;
696    }
697    serialNumberIndex.remove(serialPart, timeStampPart);
698  }
699
700  private void addDirectoryToSerialNumberIndex(Path serialDirPath) {
701    if (LOG.isDebugEnabled()) {
702      LOG.debug("Adding " + serialDirPath + " to serial index");
703    }
704    String serialPart = serialDirPath.getName();
705    String timestampPart = JobHistoryUtils
706        .getTimestampPartFromPath(serialDirPath.toString());
707    if (timestampPart == null) {
708      LOG.warn("Could not find timestamp portion from path: " + serialDirPath
709          + ". Continuing with next");
710      return;
711    }
712    if (serialPart == null) {
713      LOG.warn("Could not find serial portion from path: "
714          + serialDirPath.toString() + ". Continuing with next");
715    } else {
716      serialNumberIndex.add(serialPart, timestampPart);
717    }
718  }
719
720  private void addDirectoryToJobListCache(Path path) throws IOException {
721    if (LOG.isDebugEnabled()) {
722      LOG.debug("Adding " + path + " to job list cache.");
723    }
724    List<FileStatus> historyFileList = scanDirectoryForHistoryFiles(path,
725        doneDirFc);
726    for (FileStatus fs : historyFileList) {
727      if (LOG.isDebugEnabled()) {
728        LOG.debug("Adding in history for " + fs.getPath());
729      }
730      JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
731          .getName());
732      String confFileName = JobHistoryUtils
733          .getIntermediateConfFileName(jobIndexInfo.getJobId());
734      String summaryFileName = JobHistoryUtils
735          .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
736      HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(fs
737          .getPath().getParent(), confFileName), new Path(fs.getPath()
738          .getParent(), summaryFileName), jobIndexInfo, true);
739      jobListCache.addIfAbsent(fileInfo);
740    }
741  }
742
743  @VisibleForTesting
744  protected static List<FileStatus> scanDirectory(Path path, FileContext fc,
745      PathFilter pathFilter) throws IOException {
746    path = fc.makeQualified(path);
747    List<FileStatus> jhStatusList = new ArrayList<FileStatus>();
748    try {
749      RemoteIterator<FileStatus> fileStatusIter = fc.listStatus(path);
750      while (fileStatusIter.hasNext()) {
751        FileStatus fileStatus = fileStatusIter.next();
752        Path filePath = fileStatus.getPath();
753        if (fileStatus.isFile() && pathFilter.accept(filePath)) {
754          jhStatusList.add(fileStatus);
755        }
756      }
757    } catch (FileNotFoundException fe) {
758      LOG.error("Error while scanning directory " + path, fe);
759    }
760    return jhStatusList;
761  }
762
763  protected List<FileStatus> scanDirectoryForHistoryFiles(Path path,
764      FileContext fc) throws IOException {
765    return scanDirectory(path, fc, JobHistoryUtils.getHistoryFileFilter());
766  }
767  
768  /**
769   * Finds all history directories with a timestamp component by scanning the
770   * filesystem. Used when the JobHistory server is started.
771   * 
772   * @return list of history directories
773   */
774  protected List<FileStatus> findTimestampedDirectories() throws IOException {
775    List<FileStatus> fsList = JobHistoryUtils.localGlobber(doneDirFc,
776        doneDirPrefixPath, DONE_BEFORE_SERIAL_TAIL);
777    return fsList;
778  }
779
780  /**
781   * Scans the intermediate directory to find user directories. Scans these for
782   * history files if the modification time for the directory has changed. Once
783   * it finds history files it starts the process of moving them to the done 
784   * directory.
785   * 
786   * @throws IOException
787   *           if there was a error while scanning
788   */
789  void scanIntermediateDirectory() throws IOException {
790    // TODO it would be great to limit how often this happens, except in the
791    // case where we are looking for a particular job.
792    List<FileStatus> userDirList = JobHistoryUtils.localGlobber(
793        intermediateDoneDirFc, intermediateDoneDirPath, "");
794    LOG.debug("Scanning intermediate dirs");
795    for (FileStatus userDir : userDirList) {
796      String name = userDir.getPath().getName();
797      UserLogDir dir = userDirModificationTimeMap.get(name);
798      if(dir == null) {
799        dir = new UserLogDir();
800        UserLogDir old = userDirModificationTimeMap.putIfAbsent(name, dir);
801        if(old != null) {
802          dir = old;
803        }
804      }
805      dir.scanIfNeeded(userDir);
806    }
807  }
808
809  /**
810   * Scans the specified path and populates the intermediate cache.
811   * 
812   * @param absPath
813   * @throws IOException
814   */
815  private void scanIntermediateDirectory(final Path absPath) throws IOException {
816    if (LOG.isDebugEnabled()) {
817      LOG.debug("Scanning intermediate dir " + absPath);
818    }
819    List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(absPath,
820        intermediateDoneDirFc);
821    if (LOG.isDebugEnabled()) {
822      LOG.debug("Found " + fileStatusList.size() + " files");
823    }
824    for (FileStatus fs : fileStatusList) {
825      if (LOG.isDebugEnabled()) {
826        LOG.debug("scanning file: "+ fs.getPath());
827      }
828      JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
829          .getName());
830      String confFileName = JobHistoryUtils
831          .getIntermediateConfFileName(jobIndexInfo.getJobId());
832      String summaryFileName = JobHistoryUtils
833          .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
834      HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(fs
835          .getPath().getParent(), confFileName), new Path(fs.getPath()
836          .getParent(), summaryFileName), jobIndexInfo, false);
837
838      final HistoryFileInfo old = jobListCache.addIfAbsent(fileInfo);
839      if (old == null || old.didMoveFail()) {
840        final HistoryFileInfo found = (old == null) ? fileInfo : old;
841        long cutoff = System.currentTimeMillis() - maxHistoryAge;
842        if(found.getJobIndexInfo().getFinishTime() <= cutoff) {
843          try {
844            found.delete();
845          } catch (IOException e) {
846            LOG.warn("Error cleaning up a HistoryFile that is out of date.", e);
847          }
848        } else {
849          if (LOG.isDebugEnabled()) {
850            LOG.debug("Scheduling move to done of " +found);
851          }
852          moveToDoneExecutor.execute(new Runnable() {
853            @Override
854            public void run() {
855              try {
856                found.moveToDone();
857              } catch (IOException e) {
858                LOG.info("Failed to process fileInfo for job: " + 
859                    found.getJobId(), e);
860              }
861            }
862          });
863        }
864      } else if (!old.isMovePending()) {
865        //This is a duplicate so just delete it
866        if (LOG.isDebugEnabled()) {
867          LOG.debug("Duplicate: deleting");
868        }
869        fileInfo.delete();
870      }
871    }
872  }
873
874  /**
875   * Searches the job history file FileStatus list for the specified JobId.
876   * 
877   * @param fileStatusList
878   *          fileStatus list of Job History Files.
879   * @param jobId
880   *          The JobId to find.
881   * @return A FileInfo object for the jobId, null if not found.
882   * @throws IOException
883   */
884  private HistoryFileInfo getJobFileInfo(List<FileStatus> fileStatusList,
885      JobId jobId) throws IOException {
886    for (FileStatus fs : fileStatusList) {
887      JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
888          .getName());
889      if (jobIndexInfo.getJobId().equals(jobId)) {
890        String confFileName = JobHistoryUtils
891            .getIntermediateConfFileName(jobIndexInfo.getJobId());
892        String summaryFileName = JobHistoryUtils
893            .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
894        HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(
895            fs.getPath().getParent(), confFileName), new Path(fs.getPath()
896            .getParent(), summaryFileName), jobIndexInfo, true);
897        return fileInfo;
898      }
899    }
900    return null;
901  }
902
903  /**
904   * Scans old directories known by the idToDateString map for the specified
905   * jobId. If the number of directories is higher than the supported size of
906   * the idToDateString cache, the jobId will not be found.
907   * 
908   * @param jobId
909   *          the jobId.
910   * @return
911   * @throws IOException
912   */
913  private HistoryFileInfo scanOldDirsForJob(JobId jobId) throws IOException {
914    String boxedSerialNumber = JobHistoryUtils.serialNumberDirectoryComponent(
915        jobId, serialNumberFormat);
916    Set<String> dateStringSet = serialNumberIndex.get(boxedSerialNumber);
917    if (dateStringSet == null) {
918      return null;
919    }
920    for (String timestampPart : dateStringSet) {
921      Path logDir = canonicalHistoryLogPath(jobId, timestampPart);
922      List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(logDir,
923          doneDirFc);
924      HistoryFileInfo fileInfo = getJobFileInfo(fileStatusList, jobId);
925      if (fileInfo != null) {
926        return fileInfo;
927      }
928    }
929    return null;
930  }
931
932  public Collection<HistoryFileInfo> getAllFileInfo() throws IOException {
933    scanIntermediateDirectory();
934    return jobListCache.values();
935  }
936
937  public HistoryFileInfo getFileInfo(JobId jobId) throws IOException {
938    // FileInfo available in cache.
939    HistoryFileInfo fileInfo = jobListCache.get(jobId);
940    if (fileInfo != null) {
941      return fileInfo;
942    }
943    // OK so scan the intermediate to be sure we did not lose it that way
944    scanIntermediateDirectory();
945    fileInfo = jobListCache.get(jobId);
946    if (fileInfo != null) {
947      return fileInfo;
948    }
949
950    // Intermediate directory does not contain job. Search through older ones.
951    fileInfo = scanOldDirsForJob(jobId);
952    if (fileInfo != null) {
953      return fileInfo;
954    }
955    return null;
956  }
957
958  private void moveToDoneNow(final Path src, final Path target)
959      throws IOException {
960    LOG.info("Moving " + src.toString() + " to " + target.toString());
961    intermediateDoneDirFc.rename(src, target, Options.Rename.NONE);
962  }
963
964  private String getJobSummary(FileContext fc, Path path) throws IOException {
965    Path qPath = fc.makeQualified(path);
966    FSDataInputStream in = fc.open(qPath);
967    String jobSummaryString = in.readUTF();
968    in.close();
969    return jobSummaryString;
970  }
971
972  private void makeDoneSubdir(Path path) throws IOException {
973    try {
974      doneDirFc.getFileStatus(path);
975      existingDoneSubdirs.add(path);
976    } catch (FileNotFoundException fnfE) {
977      try {
978        FsPermission fsp = new FsPermission(
979            JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION);
980        doneDirFc.mkdir(path, fsp, true);
981        FileStatus fsStatus = doneDirFc.getFileStatus(path);
982        LOG.info("Perms after creating " + fsStatus.getPermission().toShort()
983            + ", Expected: " + fsp.toShort());
984        if (fsStatus.getPermission().toShort() != fsp.toShort()) {
985          LOG.info("Explicitly setting permissions to : " + fsp.toShort()
986              + ", " + fsp);
987          doneDirFc.setPermission(path, fsp);
988        }
989        existingDoneSubdirs.add(path);
990      } catch (FileAlreadyExistsException faeE) { // Nothing to do.
991      }
992    }
993  }
994
995  private Path canonicalHistoryLogPath(JobId id, String timestampComponent) {
996    return new Path(doneDirPrefixPath, JobHistoryUtils.historyLogSubdirectory(
997        id, timestampComponent, serialNumberFormat));
998  }
999
1000  private Path canonicalHistoryLogPath(JobId id, long millisecondTime) {
1001    String timestampComponent = JobHistoryUtils
1002        .timestampDirectoryComponent(millisecondTime);
1003    return new Path(doneDirPrefixPath, JobHistoryUtils.historyLogSubdirectory(
1004        id, timestampComponent, serialNumberFormat));
1005  }
1006
1007  private long getEffectiveTimestamp(long finishTime, FileStatus fileStatus) {
1008    if (finishTime == 0) {
1009      return fileStatus.getModificationTime();
1010    }
1011    return finishTime;
1012  }
1013
1014  private void deleteJobFromDone(HistoryFileInfo fileInfo) throws IOException {
1015    jobListCache.delete(fileInfo);
1016    fileInfo.delete();
1017  }
1018
1019  List<FileStatus> getHistoryDirsForCleaning(long cutoff) throws IOException {
1020      return JobHistoryUtils.
1021        getHistoryDirsForCleaning(doneDirFc, doneDirPrefixPath, cutoff);
1022  }
1023
1024  /**
1025   * Clean up older history files.
1026   * 
1027   * @throws IOException
1028   *           on any error trying to remove the entries.
1029   */
1030  @SuppressWarnings("unchecked")
1031  void clean() throws IOException {
1032    long cutoff = System.currentTimeMillis() - maxHistoryAge;
1033    boolean halted = false;
1034    List<FileStatus> serialDirList = getHistoryDirsForCleaning(cutoff);
1035    // Sort in ascending order. Relies on YYYY/MM/DD/Serial
1036    Collections.sort(serialDirList);
1037    for (FileStatus serialDir : serialDirList) {
1038      List<FileStatus> historyFileList = scanDirectoryForHistoryFiles(
1039          serialDir.getPath(), doneDirFc);
1040      for (FileStatus historyFile : historyFileList) {
1041        JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(historyFile
1042            .getPath().getName());
1043        long effectiveTimestamp = getEffectiveTimestamp(
1044            jobIndexInfo.getFinishTime(), historyFile);
1045        if (effectiveTimestamp <= cutoff) {
1046          HistoryFileInfo fileInfo = this.jobListCache.get(jobIndexInfo
1047              .getJobId());
1048          if (fileInfo == null) {
1049            String confFileName = JobHistoryUtils
1050                .getIntermediateConfFileName(jobIndexInfo.getJobId());
1051
1052            fileInfo = new HistoryFileInfo(historyFile.getPath(), new Path(
1053                historyFile.getPath().getParent(), confFileName), null,
1054                jobIndexInfo, true);
1055          }
1056          deleteJobFromDone(fileInfo);
1057        } else {
1058          halted = true;
1059          break;
1060        }
1061      }
1062      if (!halted) {
1063        deleteDir(serialDir);
1064        removeDirectoryFromSerialNumberIndex(serialDir.getPath());
1065        existingDoneSubdirs.remove(serialDir.getPath());
1066      } else {
1067        break; // Don't scan any more directories.
1068      }
1069    }
1070  }
1071  
1072  protected boolean deleteDir(FileStatus serialDir)
1073      throws AccessControlException, FileNotFoundException,
1074      UnsupportedFileSystemException, IOException {
1075    return doneDirFc.delete(doneDirFc.makeQualified(serialDir.getPath()), true);
1076  }
1077
1078  @VisibleForTesting
1079  protected void setMaxHistoryAge(long newValue){
1080    maxHistoryAge=newValue;
1081  } 
1082}