001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce.v2.hs; 020 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.net.ConnectException; 024import java.util.ArrayList; 025import java.util.Collection; 026import java.util.Collections; 027import java.util.HashSet; 028import java.util.Iterator; 029import java.util.List; 030import java.util.NavigableSet; 031import java.util.Set; 032import java.util.SortedMap; 033import java.util.TreeMap; 034import java.util.concurrent.ConcurrentHashMap; 035import java.util.concurrent.ConcurrentMap; 036import java.util.concurrent.ConcurrentSkipListMap; 037import java.util.concurrent.LinkedBlockingQueue; 038import java.util.concurrent.ThreadFactory; 039import java.util.concurrent.ThreadPoolExecutor; 040import java.util.concurrent.TimeUnit; 041import java.util.concurrent.atomic.AtomicInteger; 042 043import org.apache.commons.logging.Log; 044import org.apache.commons.logging.LogFactory; 045import org.apache.hadoop.classification.InterfaceAudience; 046import org.apache.hadoop.classification.InterfaceStability; 047import org.apache.hadoop.conf.Configuration; 048import org.apache.hadoop.fs.FSDataInputStream; 049import org.apache.hadoop.fs.FileAlreadyExistsException; 050import org.apache.hadoop.fs.FileContext; 051import org.apache.hadoop.fs.FileStatus; 052import org.apache.hadoop.fs.Options; 053import org.apache.hadoop.fs.Path; 054import org.apache.hadoop.fs.PathFilter; 055import org.apache.hadoop.fs.RemoteIterator; 056import org.apache.hadoop.fs.UnsupportedFileSystemException; 057import org.apache.hadoop.fs.permission.FsPermission; 058import org.apache.hadoop.mapred.JobACLsManager; 059import org.apache.hadoop.mapreduce.jobhistory.JobSummary; 060import org.apache.hadoop.mapreduce.v2.api.records.JobId; 061import org.apache.hadoop.mapreduce.v2.app.job.Job; 062import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils; 063import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; 064import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; 065import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo; 066import org.apache.hadoop.security.AccessControlException; 067import org.apache.hadoop.service.AbstractService; 068import org.apache.hadoop.util.ShutdownThreadsHelper; 069import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; 070 071import com.google.common.annotations.VisibleForTesting; 072import com.google.common.util.concurrent.ThreadFactoryBuilder; 073import org.apache.hadoop.yarn.util.Clock; 074import org.apache.hadoop.yarn.util.SystemClock; 075 076/** 077 * This class provides a way to interact with history files in a thread safe 078 * manor. 079 */ 080@InterfaceAudience.Public 081@InterfaceStability.Unstable 082public class HistoryFileManager extends AbstractService { 083 private static final Log LOG = LogFactory.getLog(HistoryFileManager.class); 084 private static final Log SUMMARY_LOG = LogFactory.getLog(JobSummary.class); 085 086 private static enum HistoryInfoState { 087 IN_INTERMEDIATE, IN_DONE, DELETED, MOVE_FAILED 088 }; 089 090 private static String DONE_BEFORE_SERIAL_TAIL = JobHistoryUtils 091 .doneSubdirsBeforeSerialTail(); 092 093 /** 094 * Maps between a serial number (generated based on jobId) and the timestamp 095 * component(s) to which it belongs. Facilitates jobId based searches. If a 096 * jobId is not found in this list - it will not be found. 097 */ 098 private static class SerialNumberIndex { 099 private SortedMap<String, Set<String>> cache; 100 private int maxSize; 101 102 public SerialNumberIndex(int maxSize) { 103 this.cache = new TreeMap<String, Set<String>>(); 104 this.maxSize = maxSize; 105 } 106 107 public synchronized void add(String serialPart, String timestampPart) { 108 if (!cache.containsKey(serialPart)) { 109 cache.put(serialPart, new HashSet<String>()); 110 if (cache.size() > maxSize) { 111 String key = cache.firstKey(); 112 LOG.error("Dropping " + key 113 + " from the SerialNumberIndex. We will no " 114 + "longer be able to see jobs that are in that serial index for " 115 + cache.get(key)); 116 cache.remove(key); 117 } 118 } 119 Set<String> datePartSet = cache.get(serialPart); 120 datePartSet.add(timestampPart); 121 } 122 123 public synchronized void remove(String serialPart, String timeStampPart) { 124 if (cache.containsKey(serialPart)) { 125 Set<String> set = cache.get(serialPart); 126 set.remove(timeStampPart); 127 if (set.isEmpty()) { 128 cache.remove(serialPart); 129 } 130 } 131 } 132 133 public synchronized Set<String> get(String serialPart) { 134 Set<String> found = cache.get(serialPart); 135 if (found != null) { 136 return new HashSet<String>(found); 137 } 138 return null; 139 } 140 } 141 142 /** 143 * Wrapper around {@link ConcurrentSkipListMap} that maintains size along 144 * side for O(1) size() implementation for use in JobListCache. 145 * 146 * Note: The size is not updated atomically with changes additions/removals. 147 * This race can lead to size() returning an incorrect size at times. 148 */ 149 static class JobIdHistoryFileInfoMap { 150 private ConcurrentSkipListMap<JobId, HistoryFileInfo> cache; 151 private AtomicInteger mapSize; 152 153 JobIdHistoryFileInfoMap() { 154 cache = new ConcurrentSkipListMap<JobId, HistoryFileInfo>(); 155 mapSize = new AtomicInteger(); 156 } 157 158 public HistoryFileInfo putIfAbsent(JobId key, HistoryFileInfo value) { 159 HistoryFileInfo ret = cache.putIfAbsent(key, value); 160 if (ret == null) { 161 mapSize.incrementAndGet(); 162 } 163 return ret; 164 } 165 166 public HistoryFileInfo remove(JobId key) { 167 HistoryFileInfo ret = cache.remove(key); 168 if (ret != null) { 169 mapSize.decrementAndGet(); 170 } 171 return ret; 172 } 173 174 /** 175 * Returns the recorded size of the internal map. Note that this could be out 176 * of sync with the actual size of the map 177 * @return "recorded" size 178 */ 179 public int size() { 180 return mapSize.get(); 181 } 182 183 public HistoryFileInfo get(JobId key) { 184 return cache.get(key); 185 } 186 187 public NavigableSet<JobId> navigableKeySet() { 188 return cache.navigableKeySet(); 189 } 190 191 public Collection<HistoryFileInfo> values() { 192 return cache.values(); 193 } 194 } 195 196 static class JobListCache { 197 private JobIdHistoryFileInfoMap cache; 198 private int maxSize; 199 private long maxAge; 200 201 public JobListCache(int maxSize, long maxAge) { 202 this.maxSize = maxSize; 203 this.maxAge = maxAge; 204 this.cache = new JobIdHistoryFileInfoMap(); 205 } 206 207 public HistoryFileInfo addIfAbsent(HistoryFileInfo fileInfo) { 208 JobId jobId = fileInfo.getJobId(); 209 if (LOG.isDebugEnabled()) { 210 LOG.debug("Adding " + jobId + " to job list cache with " 211 + fileInfo.getJobIndexInfo()); 212 } 213 HistoryFileInfo old = cache.putIfAbsent(jobId, fileInfo); 214 if (cache.size() > maxSize) { 215 //There is a race here, where more then one thread could be trying to 216 // remove entries. This could result in too many entries being removed 217 // from the cache. This is considered OK as the size of the cache 218 // should be rather large, and we would rather have performance over 219 // keeping the cache size exactly at the maximum. 220 Iterator<JobId> keys = cache.navigableKeySet().iterator(); 221 long cutoff = System.currentTimeMillis() - maxAge; 222 while(cache.size() > maxSize && keys.hasNext()) { 223 JobId key = keys.next(); 224 HistoryFileInfo firstValue = cache.get(key); 225 if(firstValue != null) { 226 synchronized(firstValue) { 227 if (firstValue.isMovePending()) { 228 if(firstValue.didMoveFail() && 229 firstValue.jobIndexInfo.getFinishTime() <= cutoff) { 230 cache.remove(key); 231 //Now lets try to delete it 232 try { 233 firstValue.delete(); 234 } catch (IOException e) { 235 LOG.error("Error while trying to delete history files" + 236 " that could not be moved to done.", e); 237 } 238 } else { 239 LOG.warn("Waiting to remove " + key 240 + " from JobListCache because it is not in done yet."); 241 } 242 } else { 243 cache.remove(key); 244 } 245 } 246 } 247 } 248 } 249 return old; 250 } 251 252 public void delete(HistoryFileInfo fileInfo) { 253 if (LOG.isDebugEnabled()) { 254 LOG.debug("Removing from cache " + fileInfo); 255 } 256 cache.remove(fileInfo.getJobId()); 257 } 258 259 public Collection<HistoryFileInfo> values() { 260 return new ArrayList<HistoryFileInfo>(cache.values()); 261 } 262 263 public HistoryFileInfo get(JobId jobId) { 264 return cache.get(jobId); 265 } 266 267 public boolean isFull() { 268 return cache.size() >= maxSize; 269 } 270 } 271 272 /** 273 * This class represents a user dir in the intermediate done directory. This 274 * is mostly for locking purposes. 275 */ 276 private class UserLogDir { 277 long modTime = 0; 278 279 public synchronized void scanIfNeeded(FileStatus fs) { 280 long newModTime = fs.getModificationTime(); 281 if (modTime != newModTime) { 282 Path p = fs.getPath(); 283 try { 284 scanIntermediateDirectory(p); 285 //If scanning fails, we will scan again. We assume the failure is 286 // temporary. 287 modTime = newModTime; 288 } catch (IOException e) { 289 LOG.error("Error while trying to scan the directory " + p, e); 290 } 291 } else { 292 if (LOG.isDebugEnabled()) { 293 LOG.debug("Scan not needed of " + fs.getPath()); 294 } 295 } 296 } 297 } 298 299 public class HistoryFileInfo { 300 private Path historyFile; 301 private Path confFile; 302 private Path summaryFile; 303 private JobIndexInfo jobIndexInfo; 304 private HistoryInfoState state; 305 306 private HistoryFileInfo(Path historyFile, Path confFile, Path summaryFile, 307 JobIndexInfo jobIndexInfo, boolean isInDone) { 308 this.historyFile = historyFile; 309 this.confFile = confFile; 310 this.summaryFile = summaryFile; 311 this.jobIndexInfo = jobIndexInfo; 312 state = isInDone ? HistoryInfoState.IN_DONE 313 : HistoryInfoState.IN_INTERMEDIATE; 314 } 315 316 @VisibleForTesting 317 synchronized boolean isMovePending() { 318 return state == HistoryInfoState.IN_INTERMEDIATE 319 || state == HistoryInfoState.MOVE_FAILED; 320 } 321 322 @VisibleForTesting 323 synchronized boolean didMoveFail() { 324 return state == HistoryInfoState.MOVE_FAILED; 325 } 326 327 /** 328 * @return true if the files backed by this were deleted. 329 */ 330 public synchronized boolean isDeleted() { 331 return state == HistoryInfoState.DELETED; 332 } 333 334 @Override 335 public String toString() { 336 return "HistoryFileInfo jobID " + getJobId() 337 + " historyFile = " + historyFile; 338 } 339 340 private synchronized void moveToDone() throws IOException { 341 if (LOG.isDebugEnabled()) { 342 LOG.debug("moveToDone: " + historyFile); 343 } 344 if (!isMovePending()) { 345 // It was either deleted or is already in done. Either way do nothing 346 if (LOG.isDebugEnabled()) { 347 LOG.debug("Move no longer pending"); 348 } 349 return; 350 } 351 try { 352 long completeTime = jobIndexInfo.getFinishTime(); 353 if (completeTime == 0) { 354 completeTime = System.currentTimeMillis(); 355 } 356 JobId jobId = jobIndexInfo.getJobId(); 357 358 List<Path> paths = new ArrayList<Path>(2); 359 if (historyFile == null) { 360 LOG.info("No file for job-history with " + jobId + " found in cache!"); 361 } else { 362 paths.add(historyFile); 363 } 364 365 if (confFile == null) { 366 LOG.info("No file for jobConf with " + jobId + " found in cache!"); 367 } else { 368 paths.add(confFile); 369 } 370 371 if (summaryFile == null) { 372 LOG.info("No summary file for job: " + jobId); 373 } else { 374 String jobSummaryString = getJobSummary(intermediateDoneDirFc, 375 summaryFile); 376 SUMMARY_LOG.info(jobSummaryString); 377 LOG.info("Deleting JobSummary file: [" + summaryFile + "]"); 378 intermediateDoneDirFc.delete(summaryFile, false); 379 summaryFile = null; 380 } 381 382 Path targetDir = canonicalHistoryLogPath(jobId, completeTime); 383 addDirectoryToSerialNumberIndex(targetDir); 384 makeDoneSubdir(targetDir); 385 if (historyFile != null) { 386 Path toPath = doneDirFc.makeQualified(new Path(targetDir, historyFile 387 .getName())); 388 if (!toPath.equals(historyFile)) { 389 moveToDoneNow(historyFile, toPath); 390 historyFile = toPath; 391 } 392 } 393 if (confFile != null) { 394 Path toPath = doneDirFc.makeQualified(new Path(targetDir, confFile 395 .getName())); 396 if (!toPath.equals(confFile)) { 397 moveToDoneNow(confFile, toPath); 398 confFile = toPath; 399 } 400 } 401 state = HistoryInfoState.IN_DONE; 402 } catch (Throwable t) { 403 LOG.error("Error while trying to move a job to done", t); 404 this.state = HistoryInfoState.MOVE_FAILED; 405 } 406 } 407 408 /** 409 * Parse a job from the JobHistoryFile, if the underlying file is not going 410 * to be deleted. 411 * 412 * @return the Job or null if the underlying file was deleted. 413 * @throws IOException 414 * if there is an error trying to read the file. 415 */ 416 public synchronized Job loadJob() throws IOException { 417 return new CompletedJob(conf, jobIndexInfo.getJobId(), historyFile, 418 false, jobIndexInfo.getUser(), this, aclsMgr); 419 } 420 421 /** 422 * Return the history file. This should only be used for testing. 423 * @return the history file. 424 */ 425 synchronized Path getHistoryFile() { 426 return historyFile; 427 } 428 429 protected synchronized void delete() throws IOException { 430 if (LOG.isDebugEnabled()) { 431 LOG.debug("deleting " + historyFile + " and " + confFile); 432 } 433 state = HistoryInfoState.DELETED; 434 doneDirFc.delete(doneDirFc.makeQualified(historyFile), false); 435 doneDirFc.delete(doneDirFc.makeQualified(confFile), false); 436 } 437 438 public JobIndexInfo getJobIndexInfo() { 439 return jobIndexInfo; 440 } 441 442 public JobId getJobId() { 443 return jobIndexInfo.getJobId(); 444 } 445 446 public synchronized Path getConfFile() { 447 return confFile; 448 } 449 450 public synchronized Configuration loadConfFile() throws IOException { 451 FileContext fc = FileContext.getFileContext(confFile.toUri(), conf); 452 Configuration jobConf = new Configuration(false); 453 jobConf.addResource(fc.open(confFile), confFile.toString()); 454 return jobConf; 455 } 456 } 457 458 private SerialNumberIndex serialNumberIndex = null; 459 protected JobListCache jobListCache = null; 460 461 // Maintains a list of known done subdirectories. 462 private final Set<Path> existingDoneSubdirs = Collections 463 .synchronizedSet(new HashSet<Path>()); 464 465 /** 466 * Maintains a mapping between intermediate user directories and the last 467 * known modification time. 468 */ 469 private ConcurrentMap<String, UserLogDir> userDirModificationTimeMap = 470 new ConcurrentHashMap<String, UserLogDir>(); 471 472 private JobACLsManager aclsMgr; 473 474 @VisibleForTesting 475 Configuration conf; 476 477 private String serialNumberFormat; 478 479 private Path doneDirPrefixPath = null; // folder for completed jobs 480 private FileContext doneDirFc; // done Dir FileContext 481 482 private Path intermediateDoneDirPath = null; // Intermediate Done Dir Path 483 private FileContext intermediateDoneDirFc; // Intermediate Done Dir 484 // FileContext 485 @VisibleForTesting 486 protected ThreadPoolExecutor moveToDoneExecutor = null; 487 private long maxHistoryAge = 0; 488 489 public HistoryFileManager() { 490 super(HistoryFileManager.class.getName()); 491 } 492 493 @Override 494 protected void serviceInit(Configuration conf) throws Exception { 495 this.conf = conf; 496 497 int serialNumberLowDigits = 3; 498 serialNumberFormat = ("%0" 499 + (JobHistoryUtils.SERIAL_NUMBER_DIRECTORY_DIGITS + serialNumberLowDigits) 500 + "d"); 501 502 long maxFSWaitTime = conf.getLong( 503 JHAdminConfig.MR_HISTORY_MAX_START_WAIT_TIME, 504 JHAdminConfig.DEFAULT_MR_HISTORY_MAX_START_WAIT_TIME); 505 createHistoryDirs(new SystemClock(), 10 * 1000, maxFSWaitTime); 506 507 this.aclsMgr = new JobACLsManager(conf); 508 509 maxHistoryAge = conf.getLong(JHAdminConfig.MR_HISTORY_MAX_AGE_MS, 510 JHAdminConfig.DEFAULT_MR_HISTORY_MAX_AGE); 511 512 jobListCache = createJobListCache(); 513 514 serialNumberIndex = new SerialNumberIndex(conf.getInt( 515 JHAdminConfig.MR_HISTORY_DATESTRING_CACHE_SIZE, 516 JHAdminConfig.DEFAULT_MR_HISTORY_DATESTRING_CACHE_SIZE)); 517 518 int numMoveThreads = conf.getInt( 519 JHAdminConfig.MR_HISTORY_MOVE_THREAD_COUNT, 520 JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_THREAD_COUNT); 521 ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat( 522 "MoveIntermediateToDone Thread #%d").build(); 523 moveToDoneExecutor = new ThreadPoolExecutor(numMoveThreads, numMoveThreads, 524 1, TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf); 525 526 super.serviceInit(conf); 527 } 528 529 @VisibleForTesting 530 void createHistoryDirs(Clock clock, long intervalCheckMillis, 531 long timeOutMillis) throws IOException { 532 long start = clock.getTime(); 533 boolean done = false; 534 int counter = 0; 535 while (!done && 536 ((timeOutMillis == -1) || (clock.getTime() - start < timeOutMillis))) { 537 done = tryCreatingHistoryDirs(counter++ % 3 == 0); // log every 3 attempts, 30sec 538 try { 539 Thread.sleep(intervalCheckMillis); 540 } catch (InterruptedException ex) { 541 throw new YarnRuntimeException(ex); 542 } 543 } 544 if (!done) { 545 throw new YarnRuntimeException("Timed out '" + timeOutMillis+ 546 "ms' waiting for FileSystem to become available"); 547 } 548 } 549 550 /** 551 * DistributedFileSystem returns a RemoteException with a message stating 552 * SafeModeException in it. So this is only way to check it is because of 553 * being in safe mode. 554 */ 555 private boolean isBecauseSafeMode(Throwable ex) { 556 return ex.toString().contains("SafeModeException"); 557 } 558 559 /** 560 * Returns TRUE if the history dirs were created, FALSE if they could not 561 * be created because the FileSystem is not reachable or in safe mode and 562 * throws and exception otherwise. 563 */ 564 @VisibleForTesting 565 boolean tryCreatingHistoryDirs(boolean logWait) throws IOException { 566 boolean succeeded = true; 567 String doneDirPrefix = JobHistoryUtils. 568 getConfiguredHistoryServerDoneDirPrefix(conf); 569 try { 570 doneDirPrefixPath = FileContext.getFileContext(conf).makeQualified( 571 new Path(doneDirPrefix)); 572 doneDirFc = FileContext.getFileContext(doneDirPrefixPath.toUri(), conf); 573 doneDirFc.setUMask(JobHistoryUtils.HISTORY_DONE_DIR_UMASK); 574 mkdir(doneDirFc, doneDirPrefixPath, new FsPermission( 575 JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION)); 576 } catch (ConnectException ex) { 577 if (logWait) { 578 LOG.info("Waiting for FileSystem at " + 579 doneDirPrefixPath.toUri().getAuthority() + "to be available"); 580 } 581 succeeded = false; 582 } catch (IOException e) { 583 if (isBecauseSafeMode(e)) { 584 succeeded = false; 585 if (logWait) { 586 LOG.info("Waiting for FileSystem at " + 587 doneDirPrefixPath.toUri().getAuthority() + 588 "to be out of safe mode"); 589 } 590 } else { 591 throw new YarnRuntimeException("Error creating done directory: [" 592 + doneDirPrefixPath + "]", e); 593 } 594 } 595 if (succeeded) { 596 String intermediateDoneDirPrefix = JobHistoryUtils. 597 getConfiguredHistoryIntermediateDoneDirPrefix(conf); 598 try { 599 intermediateDoneDirPath = FileContext.getFileContext(conf).makeQualified( 600 new Path(intermediateDoneDirPrefix)); 601 intermediateDoneDirFc = FileContext.getFileContext( 602 intermediateDoneDirPath.toUri(), conf); 603 mkdir(intermediateDoneDirFc, intermediateDoneDirPath, new FsPermission( 604 JobHistoryUtils.HISTORY_INTERMEDIATE_DONE_DIR_PERMISSIONS.toShort())); 605 } catch (ConnectException ex) { 606 succeeded = false; 607 if (logWait) { 608 LOG.info("Waiting for FileSystem at " + 609 intermediateDoneDirPath.toUri().getAuthority() + 610 "to be available"); 611 } 612 } catch (IOException e) { 613 if (isBecauseSafeMode(e)) { 614 succeeded = false; 615 if (logWait) { 616 LOG.info("Waiting for FileSystem at " + 617 intermediateDoneDirPath.toUri().getAuthority() + 618 "to be out of safe mode"); 619 } 620 } else { 621 throw new YarnRuntimeException( 622 "Error creating intermediate done directory: [" 623 + intermediateDoneDirPath + "]", e); 624 } 625 } 626 } 627 return succeeded; 628 } 629 630 @Override 631 public void serviceStop() throws Exception { 632 ShutdownThreadsHelper.shutdownExecutorService(moveToDoneExecutor); 633 super.serviceStop(); 634 } 635 636 protected JobListCache createJobListCache() { 637 return new JobListCache(conf.getInt( 638 JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE, 639 JHAdminConfig.DEFAULT_MR_HISTORY_JOBLIST_CACHE_SIZE), maxHistoryAge); 640 } 641 642 private void mkdir(FileContext fc, Path path, FsPermission fsp) 643 throws IOException { 644 if (!fc.util().exists(path)) { 645 try { 646 fc.mkdir(path, fsp, true); 647 648 FileStatus fsStatus = fc.getFileStatus(path); 649 LOG.info("Perms after creating " + fsStatus.getPermission().toShort() 650 + ", Expected: " + fsp.toShort()); 651 if (fsStatus.getPermission().toShort() != fsp.toShort()) { 652 LOG.info("Explicitly setting permissions to : " + fsp.toShort() 653 + ", " + fsp); 654 fc.setPermission(path, fsp); 655 } 656 } catch (FileAlreadyExistsException e) { 657 LOG.info("Directory: [" + path + "] already exists."); 658 } 659 } 660 } 661 662 /** 663 * Populates index data structures. Should only be called at initialization 664 * times. 665 */ 666 @SuppressWarnings("unchecked") 667 void initExisting() throws IOException { 668 LOG.info("Initializing Existing Jobs..."); 669 List<FileStatus> timestampedDirList = findTimestampedDirectories(); 670 // Sort first just so insertion is in a consistent order 671 Collections.sort(timestampedDirList); 672 for (FileStatus fs : timestampedDirList) { 673 // TODO Could verify the correct format for these directories. 674 addDirectoryToSerialNumberIndex(fs.getPath()); 675 } 676 for (int i= timestampedDirList.size() - 1; 677 i >= 0 && !jobListCache.isFull(); i--) { 678 FileStatus fs = timestampedDirList.get(i); 679 addDirectoryToJobListCache(fs.getPath()); 680 } 681 } 682 683 private void removeDirectoryFromSerialNumberIndex(Path serialDirPath) { 684 String serialPart = serialDirPath.getName(); 685 String timeStampPart = JobHistoryUtils 686 .getTimestampPartFromPath(serialDirPath.toString()); 687 if (timeStampPart == null) { 688 LOG.warn("Could not find timestamp portion from path: " 689 + serialDirPath.toString() + ". Continuing with next"); 690 return; 691 } 692 if (serialPart == null) { 693 LOG.warn("Could not find serial portion from path: " 694 + serialDirPath.toString() + ". Continuing with next"); 695 return; 696 } 697 serialNumberIndex.remove(serialPart, timeStampPart); 698 } 699 700 private void addDirectoryToSerialNumberIndex(Path serialDirPath) { 701 if (LOG.isDebugEnabled()) { 702 LOG.debug("Adding " + serialDirPath + " to serial index"); 703 } 704 String serialPart = serialDirPath.getName(); 705 String timestampPart = JobHistoryUtils 706 .getTimestampPartFromPath(serialDirPath.toString()); 707 if (timestampPart == null) { 708 LOG.warn("Could not find timestamp portion from path: " + serialDirPath 709 + ". Continuing with next"); 710 return; 711 } 712 if (serialPart == null) { 713 LOG.warn("Could not find serial portion from path: " 714 + serialDirPath.toString() + ". Continuing with next"); 715 } else { 716 serialNumberIndex.add(serialPart, timestampPart); 717 } 718 } 719 720 private void addDirectoryToJobListCache(Path path) throws IOException { 721 if (LOG.isDebugEnabled()) { 722 LOG.debug("Adding " + path + " to job list cache."); 723 } 724 List<FileStatus> historyFileList = scanDirectoryForHistoryFiles(path, 725 doneDirFc); 726 for (FileStatus fs : historyFileList) { 727 if (LOG.isDebugEnabled()) { 728 LOG.debug("Adding in history for " + fs.getPath()); 729 } 730 JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath() 731 .getName()); 732 String confFileName = JobHistoryUtils 733 .getIntermediateConfFileName(jobIndexInfo.getJobId()); 734 String summaryFileName = JobHistoryUtils 735 .getIntermediateSummaryFileName(jobIndexInfo.getJobId()); 736 HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(fs 737 .getPath().getParent(), confFileName), new Path(fs.getPath() 738 .getParent(), summaryFileName), jobIndexInfo, true); 739 jobListCache.addIfAbsent(fileInfo); 740 } 741 } 742 743 private static List<FileStatus> scanDirectory(Path path, FileContext fc, 744 PathFilter pathFilter) throws IOException { 745 path = fc.makeQualified(path); 746 List<FileStatus> jhStatusList = new ArrayList<FileStatus>(); 747 RemoteIterator<FileStatus> fileStatusIter = fc.listStatus(path); 748 while (fileStatusIter.hasNext()) { 749 FileStatus fileStatus = fileStatusIter.next(); 750 Path filePath = fileStatus.getPath(); 751 if (fileStatus.isFile() && pathFilter.accept(filePath)) { 752 jhStatusList.add(fileStatus); 753 } 754 } 755 return jhStatusList; 756 } 757 758 protected List<FileStatus> scanDirectoryForHistoryFiles(Path path, 759 FileContext fc) throws IOException { 760 return scanDirectory(path, fc, JobHistoryUtils.getHistoryFileFilter()); 761 } 762 763 /** 764 * Finds all history directories with a timestamp component by scanning the 765 * filesystem. Used when the JobHistory server is started. 766 * 767 * @return list of history directories 768 */ 769 protected List<FileStatus> findTimestampedDirectories() throws IOException { 770 List<FileStatus> fsList = JobHistoryUtils.localGlobber(doneDirFc, 771 doneDirPrefixPath, DONE_BEFORE_SERIAL_TAIL); 772 return fsList; 773 } 774 775 /** 776 * Scans the intermediate directory to find user directories. Scans these for 777 * history files if the modification time for the directory has changed. Once 778 * it finds history files it starts the process of moving them to the done 779 * directory. 780 * 781 * @throws IOException 782 * if there was a error while scanning 783 */ 784 void scanIntermediateDirectory() throws IOException { 785 // TODO it would be great to limit how often this happens, except in the 786 // case where we are looking for a particular job. 787 List<FileStatus> userDirList = JobHistoryUtils.localGlobber( 788 intermediateDoneDirFc, intermediateDoneDirPath, ""); 789 LOG.debug("Scanning intermediate dirs"); 790 for (FileStatus userDir : userDirList) { 791 String name = userDir.getPath().getName(); 792 UserLogDir dir = userDirModificationTimeMap.get(name); 793 if(dir == null) { 794 dir = new UserLogDir(); 795 UserLogDir old = userDirModificationTimeMap.putIfAbsent(name, dir); 796 if(old != null) { 797 dir = old; 798 } 799 } 800 dir.scanIfNeeded(userDir); 801 } 802 } 803 804 /** 805 * Scans the specified path and populates the intermediate cache. 806 * 807 * @param absPath 808 * @throws IOException 809 */ 810 private void scanIntermediateDirectory(final Path absPath) throws IOException { 811 if (LOG.isDebugEnabled()) { 812 LOG.debug("Scanning intermediate dir " + absPath); 813 } 814 List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(absPath, 815 intermediateDoneDirFc); 816 if (LOG.isDebugEnabled()) { 817 LOG.debug("Found " + fileStatusList.size() + " files"); 818 } 819 for (FileStatus fs : fileStatusList) { 820 if (LOG.isDebugEnabled()) { 821 LOG.debug("scanning file: "+ fs.getPath()); 822 } 823 JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath() 824 .getName()); 825 String confFileName = JobHistoryUtils 826 .getIntermediateConfFileName(jobIndexInfo.getJobId()); 827 String summaryFileName = JobHistoryUtils 828 .getIntermediateSummaryFileName(jobIndexInfo.getJobId()); 829 HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(fs 830 .getPath().getParent(), confFileName), new Path(fs.getPath() 831 .getParent(), summaryFileName), jobIndexInfo, false); 832 833 final HistoryFileInfo old = jobListCache.addIfAbsent(fileInfo); 834 if (old == null || old.didMoveFail()) { 835 final HistoryFileInfo found = (old == null) ? fileInfo : old; 836 long cutoff = System.currentTimeMillis() - maxHistoryAge; 837 if(found.getJobIndexInfo().getFinishTime() <= cutoff) { 838 try { 839 found.delete(); 840 } catch (IOException e) { 841 LOG.warn("Error cleaning up a HistoryFile that is out of date.", e); 842 } 843 } else { 844 if (LOG.isDebugEnabled()) { 845 LOG.debug("Scheduling move to done of " +found); 846 } 847 moveToDoneExecutor.execute(new Runnable() { 848 @Override 849 public void run() { 850 try { 851 found.moveToDone(); 852 } catch (IOException e) { 853 LOG.info("Failed to process fileInfo for job: " + 854 found.getJobId(), e); 855 } 856 } 857 }); 858 } 859 } else if (!old.isMovePending()) { 860 //This is a duplicate so just delete it 861 if (LOG.isDebugEnabled()) { 862 LOG.debug("Duplicate: deleting"); 863 } 864 fileInfo.delete(); 865 } 866 } 867 } 868 869 /** 870 * Searches the job history file FileStatus list for the specified JobId. 871 * 872 * @param fileStatusList 873 * fileStatus list of Job History Files. 874 * @param jobId 875 * The JobId to find. 876 * @return A FileInfo object for the jobId, null if not found. 877 * @throws IOException 878 */ 879 private HistoryFileInfo getJobFileInfo(List<FileStatus> fileStatusList, 880 JobId jobId) throws IOException { 881 for (FileStatus fs : fileStatusList) { 882 JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath() 883 .getName()); 884 if (jobIndexInfo.getJobId().equals(jobId)) { 885 String confFileName = JobHistoryUtils 886 .getIntermediateConfFileName(jobIndexInfo.getJobId()); 887 String summaryFileName = JobHistoryUtils 888 .getIntermediateSummaryFileName(jobIndexInfo.getJobId()); 889 HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path( 890 fs.getPath().getParent(), confFileName), new Path(fs.getPath() 891 .getParent(), summaryFileName), jobIndexInfo, true); 892 return fileInfo; 893 } 894 } 895 return null; 896 } 897 898 /** 899 * Scans old directories known by the idToDateString map for the specified 900 * jobId. If the number of directories is higher than the supported size of 901 * the idToDateString cache, the jobId will not be found. 902 * 903 * @param jobId 904 * the jobId. 905 * @return 906 * @throws IOException 907 */ 908 private HistoryFileInfo scanOldDirsForJob(JobId jobId) throws IOException { 909 String boxedSerialNumber = JobHistoryUtils.serialNumberDirectoryComponent( 910 jobId, serialNumberFormat); 911 Set<String> dateStringSet = serialNumberIndex.get(boxedSerialNumber); 912 if (dateStringSet == null) { 913 return null; 914 } 915 for (String timestampPart : dateStringSet) { 916 Path logDir = canonicalHistoryLogPath(jobId, timestampPart); 917 List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(logDir, 918 doneDirFc); 919 HistoryFileInfo fileInfo = getJobFileInfo(fileStatusList, jobId); 920 if (fileInfo != null) { 921 return fileInfo; 922 } 923 } 924 return null; 925 } 926 927 public Collection<HistoryFileInfo> getAllFileInfo() throws IOException { 928 scanIntermediateDirectory(); 929 return jobListCache.values(); 930 } 931 932 public HistoryFileInfo getFileInfo(JobId jobId) throws IOException { 933 // FileInfo available in cache. 934 HistoryFileInfo fileInfo = jobListCache.get(jobId); 935 if (fileInfo != null) { 936 return fileInfo; 937 } 938 // OK so scan the intermediate to be sure we did not lose it that way 939 scanIntermediateDirectory(); 940 fileInfo = jobListCache.get(jobId); 941 if (fileInfo != null) { 942 return fileInfo; 943 } 944 945 // Intermediate directory does not contain job. Search through older ones. 946 fileInfo = scanOldDirsForJob(jobId); 947 if (fileInfo != null) { 948 return fileInfo; 949 } 950 return null; 951 } 952 953 private void moveToDoneNow(final Path src, final Path target) 954 throws IOException { 955 LOG.info("Moving " + src.toString() + " to " + target.toString()); 956 intermediateDoneDirFc.rename(src, target, Options.Rename.NONE); 957 } 958 959 private String getJobSummary(FileContext fc, Path path) throws IOException { 960 Path qPath = fc.makeQualified(path); 961 FSDataInputStream in = fc.open(qPath); 962 String jobSummaryString = in.readUTF(); 963 in.close(); 964 return jobSummaryString; 965 } 966 967 private void makeDoneSubdir(Path path) throws IOException { 968 try { 969 doneDirFc.getFileStatus(path); 970 existingDoneSubdirs.add(path); 971 } catch (FileNotFoundException fnfE) { 972 try { 973 FsPermission fsp = new FsPermission( 974 JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION); 975 doneDirFc.mkdir(path, fsp, true); 976 FileStatus fsStatus = doneDirFc.getFileStatus(path); 977 LOG.info("Perms after creating " + fsStatus.getPermission().toShort() 978 + ", Expected: " + fsp.toShort()); 979 if (fsStatus.getPermission().toShort() != fsp.toShort()) { 980 LOG.info("Explicitly setting permissions to : " + fsp.toShort() 981 + ", " + fsp); 982 doneDirFc.setPermission(path, fsp); 983 } 984 existingDoneSubdirs.add(path); 985 } catch (FileAlreadyExistsException faeE) { // Nothing to do. 986 } 987 } 988 } 989 990 private Path canonicalHistoryLogPath(JobId id, String timestampComponent) { 991 return new Path(doneDirPrefixPath, JobHistoryUtils.historyLogSubdirectory( 992 id, timestampComponent, serialNumberFormat)); 993 } 994 995 private Path canonicalHistoryLogPath(JobId id, long millisecondTime) { 996 String timestampComponent = JobHistoryUtils 997 .timestampDirectoryComponent(millisecondTime); 998 return new Path(doneDirPrefixPath, JobHistoryUtils.historyLogSubdirectory( 999 id, timestampComponent, serialNumberFormat)); 1000 } 1001 1002 private long getEffectiveTimestamp(long finishTime, FileStatus fileStatus) { 1003 if (finishTime == 0) { 1004 return fileStatus.getModificationTime(); 1005 } 1006 return finishTime; 1007 } 1008 1009 private void deleteJobFromDone(HistoryFileInfo fileInfo) throws IOException { 1010 jobListCache.delete(fileInfo); 1011 fileInfo.delete(); 1012 } 1013 1014 List<FileStatus> getHistoryDirsForCleaning(long cutoff) throws IOException { 1015 return JobHistoryUtils. 1016 getHistoryDirsForCleaning(doneDirFc, doneDirPrefixPath, cutoff); 1017 } 1018 1019 /** 1020 * Clean up older history files. 1021 * 1022 * @throws IOException 1023 * on any error trying to remove the entries. 1024 */ 1025 @SuppressWarnings("unchecked") 1026 void clean() throws IOException { 1027 long cutoff = System.currentTimeMillis() - maxHistoryAge; 1028 boolean halted = false; 1029 List<FileStatus> serialDirList = getHistoryDirsForCleaning(cutoff); 1030 // Sort in ascending order. Relies on YYYY/MM/DD/Serial 1031 Collections.sort(serialDirList); 1032 for (FileStatus serialDir : serialDirList) { 1033 List<FileStatus> historyFileList = scanDirectoryForHistoryFiles( 1034 serialDir.getPath(), doneDirFc); 1035 for (FileStatus historyFile : historyFileList) { 1036 JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(historyFile 1037 .getPath().getName()); 1038 long effectiveTimestamp = getEffectiveTimestamp( 1039 jobIndexInfo.getFinishTime(), historyFile); 1040 if (effectiveTimestamp <= cutoff) { 1041 HistoryFileInfo fileInfo = this.jobListCache.get(jobIndexInfo 1042 .getJobId()); 1043 if (fileInfo == null) { 1044 String confFileName = JobHistoryUtils 1045 .getIntermediateConfFileName(jobIndexInfo.getJobId()); 1046 1047 fileInfo = new HistoryFileInfo(historyFile.getPath(), new Path( 1048 historyFile.getPath().getParent(), confFileName), null, 1049 jobIndexInfo, true); 1050 } 1051 deleteJobFromDone(fileInfo); 1052 } else { 1053 halted = true; 1054 break; 1055 } 1056 } 1057 if (!halted) { 1058 deleteDir(serialDir); 1059 removeDirectoryFromSerialNumberIndex(serialDir.getPath()); 1060 existingDoneSubdirs.remove(serialDir.getPath()); 1061 } else { 1062 break; // Don't scan any more directories. 1063 } 1064 } 1065 } 1066 1067 protected boolean deleteDir(FileStatus serialDir) 1068 throws AccessControlException, FileNotFoundException, 1069 UnsupportedFileSystemException, IOException { 1070 return doneDirFc.delete(doneDirFc.makeQualified(serialDir.getPath()), true); 1071 } 1072 1073 @VisibleForTesting 1074 protected void setMaxHistoryAge(long newValue){ 1075 maxHistoryAge=newValue; 1076 } 1077}