001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce.v2.hs; 020 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.net.ConnectException; 024import java.util.ArrayList; 025import java.util.Collection; 026import java.util.Collections; 027import java.util.HashSet; 028import java.util.Iterator; 029import java.util.List; 030import java.util.NavigableSet; 031import java.util.Set; 032import java.util.SortedMap; 033import java.util.TreeMap; 034import java.util.concurrent.ConcurrentHashMap; 035import java.util.concurrent.ConcurrentMap; 036import java.util.concurrent.ConcurrentSkipListMap; 037import java.util.concurrent.LinkedBlockingQueue; 038import java.util.concurrent.ThreadFactory; 039import java.util.concurrent.ThreadPoolExecutor; 040import java.util.concurrent.TimeUnit; 041import java.util.concurrent.atomic.AtomicInteger; 042 043import org.apache.commons.logging.Log; 044import org.apache.commons.logging.LogFactory; 045import org.apache.hadoop.classification.InterfaceAudience; 046import org.apache.hadoop.classification.InterfaceStability; 047import org.apache.hadoop.conf.Configuration; 048import org.apache.hadoop.fs.FSDataInputStream; 049import org.apache.hadoop.fs.FileAlreadyExistsException; 050import org.apache.hadoop.fs.FileContext; 051import org.apache.hadoop.fs.FileStatus; 052import org.apache.hadoop.fs.Options; 053import org.apache.hadoop.fs.Path; 054import org.apache.hadoop.fs.PathFilter; 055import org.apache.hadoop.fs.RemoteIterator; 056import org.apache.hadoop.fs.UnsupportedFileSystemException; 057import org.apache.hadoop.fs.permission.FsPermission; 058import org.apache.hadoop.mapred.JobACLsManager; 059import org.apache.hadoop.mapreduce.jobhistory.JobSummary; 060import org.apache.hadoop.mapreduce.v2.api.records.JobId; 061import org.apache.hadoop.mapreduce.v2.app.job.Job; 062import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils; 063import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; 064import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; 065import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo; 066import org.apache.hadoop.security.AccessControlException; 067import org.apache.hadoop.service.AbstractService; 068import org.apache.hadoop.util.ShutdownThreadsHelper; 069import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; 070 071import com.google.common.annotations.VisibleForTesting; 072import com.google.common.util.concurrent.ThreadFactoryBuilder; 073import org.apache.hadoop.yarn.util.Clock; 074import org.apache.hadoop.yarn.util.SystemClock; 075 076/** 077 * This class provides a way to interact with history files in a thread safe 078 * manor. 079 */ 080@InterfaceAudience.Public 081@InterfaceStability.Unstable 082public class HistoryFileManager extends AbstractService { 083 private static final Log LOG = LogFactory.getLog(HistoryFileManager.class); 084 private static final Log SUMMARY_LOG = LogFactory.getLog(JobSummary.class); 085 086 private static enum HistoryInfoState { 087 IN_INTERMEDIATE, IN_DONE, DELETED, MOVE_FAILED 088 }; 089 090 private static String DONE_BEFORE_SERIAL_TAIL = JobHistoryUtils 091 .doneSubdirsBeforeSerialTail(); 092 093 /** 094 * Maps between a serial number (generated based on jobId) and the timestamp 095 * component(s) to which it belongs. Facilitates jobId based searches. If a 096 * jobId is not found in this list - it will not be found. 097 */ 098 private static class SerialNumberIndex { 099 private SortedMap<String, Set<String>> cache; 100 private int maxSize; 101 102 public SerialNumberIndex(int maxSize) { 103 this.cache = new TreeMap<String, Set<String>>(); 104 this.maxSize = maxSize; 105 } 106 107 public synchronized void add(String serialPart, String timestampPart) { 108 if (!cache.containsKey(serialPart)) { 109 cache.put(serialPart, new HashSet<String>()); 110 if (cache.size() > maxSize) { 111 String key = cache.firstKey(); 112 LOG.error("Dropping " + key 113 + " from the SerialNumberIndex. We will no " 114 + "longer be able to see jobs that are in that serial index for " 115 + cache.get(key)); 116 cache.remove(key); 117 } 118 } 119 Set<String> datePartSet = cache.get(serialPart); 120 datePartSet.add(timestampPart); 121 } 122 123 public synchronized void remove(String serialPart, String timeStampPart) { 124 if (cache.containsKey(serialPart)) { 125 Set<String> set = cache.get(serialPart); 126 set.remove(timeStampPart); 127 if (set.isEmpty()) { 128 cache.remove(serialPart); 129 } 130 } 131 } 132 133 public synchronized Set<String> get(String serialPart) { 134 Set<String> found = cache.get(serialPart); 135 if (found != null) { 136 return new HashSet<String>(found); 137 } 138 return null; 139 } 140 } 141 142 /** 143 * Wrapper around {@link ConcurrentSkipListMap} that maintains size along 144 * side for O(1) size() implementation for use in JobListCache. 145 * 146 * Note: The size is not updated atomically with changes additions/removals. 147 * This race can lead to size() returning an incorrect size at times. 148 */ 149 static class JobIdHistoryFileInfoMap { 150 private ConcurrentSkipListMap<JobId, HistoryFileInfo> cache; 151 private AtomicInteger mapSize; 152 153 JobIdHistoryFileInfoMap() { 154 cache = new ConcurrentSkipListMap<JobId, HistoryFileInfo>(); 155 mapSize = new AtomicInteger(); 156 } 157 158 public HistoryFileInfo putIfAbsent(JobId key, HistoryFileInfo value) { 159 HistoryFileInfo ret = cache.putIfAbsent(key, value); 160 if (ret == null) { 161 mapSize.incrementAndGet(); 162 } 163 return ret; 164 } 165 166 public HistoryFileInfo remove(JobId key) { 167 HistoryFileInfo ret = cache.remove(key); 168 if (ret != null) { 169 mapSize.decrementAndGet(); 170 } 171 return ret; 172 } 173 174 /** 175 * Returns the recorded size of the internal map. Note that this could be out 176 * of sync with the actual size of the map 177 * @return "recorded" size 178 */ 179 public int size() { 180 return mapSize.get(); 181 } 182 183 public HistoryFileInfo get(JobId key) { 184 return cache.get(key); 185 } 186 187 public NavigableSet<JobId> navigableKeySet() { 188 return cache.navigableKeySet(); 189 } 190 191 public Collection<HistoryFileInfo> values() { 192 return cache.values(); 193 } 194 } 195 196 static class JobListCache { 197 private JobIdHistoryFileInfoMap cache; 198 private int maxSize; 199 private long maxAge; 200 201 public JobListCache(int maxSize, long maxAge) { 202 this.maxSize = maxSize; 203 this.maxAge = maxAge; 204 this.cache = new JobIdHistoryFileInfoMap(); 205 } 206 207 public HistoryFileInfo addIfAbsent(HistoryFileInfo fileInfo) { 208 JobId jobId = fileInfo.getJobId(); 209 if (LOG.isDebugEnabled()) { 210 LOG.debug("Adding " + jobId + " to job list cache with " 211 + fileInfo.getJobIndexInfo()); 212 } 213 HistoryFileInfo old = cache.putIfAbsent(jobId, fileInfo); 214 if (cache.size() > maxSize) { 215 //There is a race here, where more then one thread could be trying to 216 // remove entries. This could result in too many entries being removed 217 // from the cache. This is considered OK as the size of the cache 218 // should be rather large, and we would rather have performance over 219 // keeping the cache size exactly at the maximum. 220 Iterator<JobId> keys = cache.navigableKeySet().iterator(); 221 long cutoff = System.currentTimeMillis() - maxAge; 222 223 // MAPREDUCE-6436: In order to reduce the number of logs written 224 // in case of a lot of move pending histories. 225 JobId firstInIntermediateKey = null; 226 int inIntermediateCount = 0; 227 JobId firstMoveFailedKey = null; 228 int moveFailedCount = 0; 229 230 while (cache.size() > maxSize && keys.hasNext()) { 231 JobId key = keys.next(); 232 HistoryFileInfo firstValue = cache.get(key); 233 if (firstValue != null) { 234 if (firstValue.isMovePending()) { 235 if (firstValue.didMoveFail() && 236 firstValue.jobIndexInfo.getFinishTime() <= cutoff) { 237 cache.remove(key); 238 // Now lets try to delete it 239 try { 240 firstValue.delete(); 241 } catch (IOException e) { 242 LOG.error("Error while trying to delete history files" + 243 " that could not be moved to done.", e); 244 } 245 } else { 246 if (firstValue.didMoveFail()) { 247 if (moveFailedCount == 0) { 248 firstMoveFailedKey = key; 249 } 250 moveFailedCount += 1; 251 } else { 252 if (inIntermediateCount == 0) { 253 firstInIntermediateKey = key; 254 } 255 inIntermediateCount += 1; 256 } 257 } 258 } else { 259 cache.remove(key); 260 } 261 } 262 } 263 // Log output only for first jobhisotry in pendings to restrict 264 // the total number of logs. 265 if (inIntermediateCount > 0) { 266 LOG.warn("Waiting to remove IN_INTERMEDIATE state histories " + 267 "(e.g. " + firstInIntermediateKey + ") from JobListCache " + 268 "because it is not in done yet. Total count is " + 269 inIntermediateCount + "."); 270 } 271 if (moveFailedCount > 0) { 272 LOG.warn("Waiting to remove MOVE_FAILED state histories " + 273 "(e.g. " + firstMoveFailedKey + ") from JobListCache " + 274 "because it is not in done yet. Total count is " + 275 moveFailedCount + "."); 276 } 277 } 278 return old; 279 } 280 281 public void delete(HistoryFileInfo fileInfo) { 282 if (LOG.isDebugEnabled()) { 283 LOG.debug("Removing from cache " + fileInfo); 284 } 285 cache.remove(fileInfo.getJobId()); 286 } 287 288 public Collection<HistoryFileInfo> values() { 289 return new ArrayList<HistoryFileInfo>(cache.values()); 290 } 291 292 public HistoryFileInfo get(JobId jobId) { 293 return cache.get(jobId); 294 } 295 296 public boolean isFull() { 297 return cache.size() >= maxSize; 298 } 299 } 300 301 /** 302 * This class represents a user dir in the intermediate done directory. This 303 * is mostly for locking purposes. 304 */ 305 private class UserLogDir { 306 long modTime = 0; 307 private long scanTime = 0; 308 309 public synchronized void scanIfNeeded(FileStatus fs) { 310 long newModTime = fs.getModificationTime(); 311 // MAPREDUCE-6680: In some Cloud FileSystem, like Azure FS or S3, file's 312 // modification time is truncated into seconds. In that case, 313 // modTime == newModTime doesn't means no file update in the directory, 314 // so we need to have additional check. 315 // Note: modTime (X second Y millisecond) could be casted to X second or 316 // X+1 second. 317 if (modTime != newModTime 318 || (scanTime/1000) == (modTime/1000) 319 || (scanTime/1000 + 1) == (modTime/1000)) { 320 // reset scanTime before scanning happens 321 scanTime = System.currentTimeMillis(); 322 Path p = fs.getPath(); 323 try { 324 scanIntermediateDirectory(p); 325 //If scanning fails, we will scan again. We assume the failure is 326 // temporary. 327 modTime = newModTime; 328 } catch (IOException e) { 329 LOG.error("Error while trying to scan the directory " + p, e); 330 } 331 } else { 332 if (LOG.isDebugEnabled()) { 333 LOG.debug("Scan not needed of " + fs.getPath()); 334 } 335 // reset scanTime 336 scanTime = System.currentTimeMillis(); 337 } 338 } 339 } 340 341 public class HistoryFileInfo { 342 private Path historyFile; 343 private Path confFile; 344 private Path summaryFile; 345 private JobIndexInfo jobIndexInfo; 346 private volatile HistoryInfoState state; 347 348 @VisibleForTesting 349 protected HistoryFileInfo(Path historyFile, Path confFile, 350 Path summaryFile, JobIndexInfo jobIndexInfo, boolean isInDone) { 351 this.historyFile = historyFile; 352 this.confFile = confFile; 353 this.summaryFile = summaryFile; 354 this.jobIndexInfo = jobIndexInfo; 355 state = isInDone ? HistoryInfoState.IN_DONE 356 : HistoryInfoState.IN_INTERMEDIATE; 357 } 358 359 @VisibleForTesting 360 boolean isMovePending() { 361 return state == HistoryInfoState.IN_INTERMEDIATE 362 || state == HistoryInfoState.MOVE_FAILED; 363 } 364 365 @VisibleForTesting 366 boolean didMoveFail() { 367 return state == HistoryInfoState.MOVE_FAILED; 368 } 369 370 /** 371 * @return true if the files backed by this were deleted. 372 */ 373 public boolean isDeleted() { 374 return state == HistoryInfoState.DELETED; 375 } 376 377 @Override 378 public String toString() { 379 return "HistoryFileInfo jobID " + getJobId() 380 + " historyFile = " + historyFile; 381 } 382 383 @VisibleForTesting 384 synchronized void moveToDone() throws IOException { 385 if (LOG.isDebugEnabled()) { 386 LOG.debug("moveToDone: " + historyFile); 387 } 388 if (!isMovePending()) { 389 // It was either deleted or is already in done. Either way do nothing 390 if (LOG.isDebugEnabled()) { 391 LOG.debug("Move no longer pending"); 392 } 393 return; 394 } 395 try { 396 long completeTime = jobIndexInfo.getFinishTime(); 397 if (completeTime == 0) { 398 completeTime = System.currentTimeMillis(); 399 } 400 JobId jobId = jobIndexInfo.getJobId(); 401 402 List<Path> paths = new ArrayList<Path>(2); 403 if (historyFile == null) { 404 LOG.info("No file for job-history with " + jobId + " found in cache!"); 405 } else { 406 paths.add(historyFile); 407 } 408 409 if (confFile == null) { 410 LOG.info("No file for jobConf with " + jobId + " found in cache!"); 411 } else { 412 paths.add(confFile); 413 } 414 415 if (summaryFile == null || !intermediateDoneDirFc.util().exists( 416 summaryFile)) { 417 LOG.info("No summary file for job: " + jobId); 418 } else { 419 String jobSummaryString = getJobSummary(intermediateDoneDirFc, 420 summaryFile); 421 SUMMARY_LOG.info(jobSummaryString); 422 LOG.info("Deleting JobSummary file: [" + summaryFile + "]"); 423 intermediateDoneDirFc.delete(summaryFile, false); 424 summaryFile = null; 425 } 426 427 Path targetDir = canonicalHistoryLogPath(jobId, completeTime); 428 addDirectoryToSerialNumberIndex(targetDir); 429 makeDoneSubdir(targetDir); 430 if (historyFile != null) { 431 Path toPath = doneDirFc.makeQualified(new Path(targetDir, historyFile 432 .getName())); 433 if (!toPath.equals(historyFile)) { 434 moveToDoneNow(historyFile, toPath); 435 historyFile = toPath; 436 } 437 } 438 if (confFile != null) { 439 Path toPath = doneDirFc.makeQualified(new Path(targetDir, confFile 440 .getName())); 441 if (!toPath.equals(confFile)) { 442 moveToDoneNow(confFile, toPath); 443 confFile = toPath; 444 } 445 } 446 state = HistoryInfoState.IN_DONE; 447 } catch (Throwable t) { 448 LOG.error("Error while trying to move a job to done", t); 449 this.state = HistoryInfoState.MOVE_FAILED; 450 } 451 } 452 453 /** 454 * Parse a job from the JobHistoryFile, if the underlying file is not going 455 * to be deleted. 456 * 457 * @return the Job or null if the underlying file was deleted. 458 * @throws IOException 459 * if there is an error trying to read the file. 460 */ 461 public synchronized Job loadJob() throws IOException { 462 return new CompletedJob(conf, jobIndexInfo.getJobId(), historyFile, 463 false, jobIndexInfo.getUser(), this, aclsMgr); 464 } 465 466 /** 467 * Return the history file. This should only be used for testing. 468 * @return the history file. 469 */ 470 synchronized Path getHistoryFile() { 471 return historyFile; 472 } 473 474 protected synchronized void delete() throws IOException { 475 if (LOG.isDebugEnabled()) { 476 LOG.debug("deleting " + historyFile + " and " + confFile); 477 } 478 state = HistoryInfoState.DELETED; 479 doneDirFc.delete(doneDirFc.makeQualified(historyFile), false); 480 doneDirFc.delete(doneDirFc.makeQualified(confFile), false); 481 } 482 483 public JobIndexInfo getJobIndexInfo() { 484 return jobIndexInfo; 485 } 486 487 public JobId getJobId() { 488 return jobIndexInfo.getJobId(); 489 } 490 491 public synchronized Path getConfFile() { 492 return confFile; 493 } 494 495 public synchronized Configuration loadConfFile() throws IOException { 496 FileContext fc = FileContext.getFileContext(confFile.toUri(), conf); 497 Configuration jobConf = new Configuration(false); 498 jobConf.addResource(fc.open(confFile), confFile.toString()); 499 return jobConf; 500 } 501 } 502 503 private SerialNumberIndex serialNumberIndex = null; 504 protected JobListCache jobListCache = null; 505 506 // Maintains a list of known done subdirectories. 507 private final Set<Path> existingDoneSubdirs = Collections 508 .synchronizedSet(new HashSet<Path>()); 509 510 /** 511 * Maintains a mapping between intermediate user directories and the last 512 * known modification time. 513 */ 514 private ConcurrentMap<String, UserLogDir> userDirModificationTimeMap = 515 new ConcurrentHashMap<String, UserLogDir>(); 516 517 private JobACLsManager aclsMgr; 518 519 @VisibleForTesting 520 Configuration conf; 521 522 private String serialNumberFormat; 523 524 private Path doneDirPrefixPath = null; // folder for completed jobs 525 private FileContext doneDirFc; // done Dir FileContext 526 527 private Path intermediateDoneDirPath = null; // Intermediate Done Dir Path 528 private FileContext intermediateDoneDirFc; // Intermediate Done Dir 529 // FileContext 530 @VisibleForTesting 531 protected ThreadPoolExecutor moveToDoneExecutor = null; 532 private long maxHistoryAge = 0; 533 534 public HistoryFileManager() { 535 super(HistoryFileManager.class.getName()); 536 } 537 538 @Override 539 protected void serviceInit(Configuration conf) throws Exception { 540 this.conf = conf; 541 542 int serialNumberLowDigits = 3; 543 serialNumberFormat = ("%0" 544 + (JobHistoryUtils.SERIAL_NUMBER_DIRECTORY_DIGITS + serialNumberLowDigits) 545 + "d"); 546 547 long maxFSWaitTime = conf.getLong( 548 JHAdminConfig.MR_HISTORY_MAX_START_WAIT_TIME, 549 JHAdminConfig.DEFAULT_MR_HISTORY_MAX_START_WAIT_TIME); 550 createHistoryDirs(new SystemClock(), 10 * 1000, maxFSWaitTime); 551 552 this.aclsMgr = new JobACLsManager(conf); 553 554 maxHistoryAge = conf.getLong(JHAdminConfig.MR_HISTORY_MAX_AGE_MS, 555 JHAdminConfig.DEFAULT_MR_HISTORY_MAX_AGE); 556 557 jobListCache = createJobListCache(); 558 559 serialNumberIndex = new SerialNumberIndex(conf.getInt( 560 JHAdminConfig.MR_HISTORY_DATESTRING_CACHE_SIZE, 561 JHAdminConfig.DEFAULT_MR_HISTORY_DATESTRING_CACHE_SIZE)); 562 563 int numMoveThreads = conf.getInt( 564 JHAdminConfig.MR_HISTORY_MOVE_THREAD_COUNT, 565 JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_THREAD_COUNT); 566 moveToDoneExecutor = createMoveToDoneThreadPool(numMoveThreads); 567 super.serviceInit(conf); 568 } 569 570 protected ThreadPoolExecutor createMoveToDoneThreadPool(int numMoveThreads) { 571 ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat( 572 "MoveIntermediateToDone Thread #%d").build(); 573 return new ThreadPoolExecutor(numMoveThreads, numMoveThreads, 574 1, TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf); 575 } 576 577 @VisibleForTesting 578 void createHistoryDirs(Clock clock, long intervalCheckMillis, 579 long timeOutMillis) throws IOException { 580 long start = clock.getTime(); 581 boolean done = false; 582 int counter = 0; 583 while (!done && 584 ((timeOutMillis == -1) || (clock.getTime() - start < timeOutMillis))) { 585 done = tryCreatingHistoryDirs(counter++ % 3 == 0); // log every 3 attempts, 30sec 586 try { 587 Thread.sleep(intervalCheckMillis); 588 } catch (InterruptedException ex) { 589 throw new YarnRuntimeException(ex); 590 } 591 } 592 if (!done) { 593 throw new YarnRuntimeException("Timed out '" + timeOutMillis+ 594 "ms' waiting for FileSystem to become available"); 595 } 596 } 597 598 /** 599 * DistributedFileSystem returns a RemoteException with a message stating 600 * SafeModeException in it. So this is only way to check it is because of 601 * being in safe mode. 602 */ 603 private boolean isBecauseSafeMode(Throwable ex) { 604 return ex.toString().contains("SafeModeException"); 605 } 606 607 /** 608 * Returns TRUE if the history dirs were created, FALSE if they could not 609 * be created because the FileSystem is not reachable or in safe mode and 610 * throws and exception otherwise. 611 */ 612 @VisibleForTesting 613 boolean tryCreatingHistoryDirs(boolean logWait) throws IOException { 614 boolean succeeded = true; 615 String doneDirPrefix = JobHistoryUtils. 616 getConfiguredHistoryServerDoneDirPrefix(conf); 617 try { 618 doneDirPrefixPath = FileContext.getFileContext(conf).makeQualified( 619 new Path(doneDirPrefix)); 620 doneDirFc = FileContext.getFileContext(doneDirPrefixPath.toUri(), conf); 621 doneDirFc.setUMask(JobHistoryUtils.HISTORY_DONE_DIR_UMASK); 622 mkdir(doneDirFc, doneDirPrefixPath, new FsPermission( 623 JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION)); 624 } catch (ConnectException ex) { 625 if (logWait) { 626 LOG.info("Waiting for FileSystem at " + 627 doneDirPrefixPath.toUri().getAuthority() + "to be available"); 628 } 629 succeeded = false; 630 } catch (IOException e) { 631 if (isBecauseSafeMode(e)) { 632 succeeded = false; 633 if (logWait) { 634 LOG.info("Waiting for FileSystem at " + 635 doneDirPrefixPath.toUri().getAuthority() + 636 "to be out of safe mode"); 637 } 638 } else { 639 throw new YarnRuntimeException("Error creating done directory: [" 640 + doneDirPrefixPath + "]", e); 641 } 642 } 643 if (succeeded) { 644 String intermediateDoneDirPrefix = JobHistoryUtils. 645 getConfiguredHistoryIntermediateDoneDirPrefix(conf); 646 try { 647 intermediateDoneDirPath = FileContext.getFileContext(conf).makeQualified( 648 new Path(intermediateDoneDirPrefix)); 649 intermediateDoneDirFc = FileContext.getFileContext( 650 intermediateDoneDirPath.toUri(), conf); 651 mkdir(intermediateDoneDirFc, intermediateDoneDirPath, new FsPermission( 652 JobHistoryUtils.HISTORY_INTERMEDIATE_DONE_DIR_PERMISSIONS.toShort())); 653 } catch (ConnectException ex) { 654 succeeded = false; 655 if (logWait) { 656 LOG.info("Waiting for FileSystem at " + 657 intermediateDoneDirPath.toUri().getAuthority() + 658 "to be available"); 659 } 660 } catch (IOException e) { 661 if (isBecauseSafeMode(e)) { 662 succeeded = false; 663 if (logWait) { 664 LOG.info("Waiting for FileSystem at " + 665 intermediateDoneDirPath.toUri().getAuthority() + 666 "to be out of safe mode"); 667 } 668 } else { 669 throw new YarnRuntimeException( 670 "Error creating intermediate done directory: [" 671 + intermediateDoneDirPath + "]", e); 672 } 673 } 674 } 675 return succeeded; 676 } 677 678 @Override 679 public void serviceStop() throws Exception { 680 ShutdownThreadsHelper.shutdownExecutorService(moveToDoneExecutor); 681 super.serviceStop(); 682 } 683 684 protected JobListCache createJobListCache() { 685 return new JobListCache(conf.getInt( 686 JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE, 687 JHAdminConfig.DEFAULT_MR_HISTORY_JOBLIST_CACHE_SIZE), maxHistoryAge); 688 } 689 690 private void mkdir(FileContext fc, Path path, FsPermission fsp) 691 throws IOException { 692 if (!fc.util().exists(path)) { 693 try { 694 fc.mkdir(path, fsp, true); 695 696 FileStatus fsStatus = fc.getFileStatus(path); 697 LOG.info("Perms after creating " + fsStatus.getPermission().toShort() 698 + ", Expected: " + fsp.toShort()); 699 if (fsStatus.getPermission().toShort() != fsp.toShort()) { 700 LOG.info("Explicitly setting permissions to : " + fsp.toShort() 701 + ", " + fsp); 702 fc.setPermission(path, fsp); 703 } 704 } catch (FileAlreadyExistsException e) { 705 LOG.info("Directory: [" + path + "] already exists."); 706 } 707 } 708 } 709 710 protected HistoryFileInfo createHistoryFileInfo(Path historyFile, 711 Path confFile, Path summaryFile, JobIndexInfo jobIndexInfo, 712 boolean isInDone) { 713 return new HistoryFileInfo( 714 historyFile, confFile, summaryFile, jobIndexInfo, isInDone); 715 } 716 717 /** 718 * Populates index data structures. Should only be called at initialization 719 * times. 720 */ 721 @SuppressWarnings("unchecked") 722 void initExisting() throws IOException { 723 LOG.info("Initializing Existing Jobs..."); 724 List<FileStatus> timestampedDirList = findTimestampedDirectories(); 725 // Sort first just so insertion is in a consistent order 726 Collections.sort(timestampedDirList); 727 for (FileStatus fs : timestampedDirList) { 728 // TODO Could verify the correct format for these directories. 729 addDirectoryToSerialNumberIndex(fs.getPath()); 730 } 731 for (int i= timestampedDirList.size() - 1; 732 i >= 0 && !jobListCache.isFull(); i--) { 733 FileStatus fs = timestampedDirList.get(i); 734 addDirectoryToJobListCache(fs.getPath()); 735 } 736 } 737 738 private void removeDirectoryFromSerialNumberIndex(Path serialDirPath) { 739 String serialPart = serialDirPath.getName(); 740 String timeStampPart = JobHistoryUtils 741 .getTimestampPartFromPath(serialDirPath.toString()); 742 if (timeStampPart == null) { 743 LOG.warn("Could not find timestamp portion from path: " 744 + serialDirPath.toString() + ". Continuing with next"); 745 return; 746 } 747 if (serialPart == null) { 748 LOG.warn("Could not find serial portion from path: " 749 + serialDirPath.toString() + ". Continuing with next"); 750 return; 751 } 752 serialNumberIndex.remove(serialPart, timeStampPart); 753 } 754 755 private void addDirectoryToSerialNumberIndex(Path serialDirPath) { 756 if (LOG.isDebugEnabled()) { 757 LOG.debug("Adding " + serialDirPath + " to serial index"); 758 } 759 String serialPart = serialDirPath.getName(); 760 String timestampPart = JobHistoryUtils 761 .getTimestampPartFromPath(serialDirPath.toString()); 762 if (timestampPart == null) { 763 LOG.warn("Could not find timestamp portion from path: " + serialDirPath 764 + ". Continuing with next"); 765 return; 766 } 767 if (serialPart == null) { 768 LOG.warn("Could not find serial portion from path: " 769 + serialDirPath.toString() + ". Continuing with next"); 770 } else { 771 serialNumberIndex.add(serialPart, timestampPart); 772 } 773 } 774 775 private void addDirectoryToJobListCache(Path path) throws IOException { 776 if (LOG.isDebugEnabled()) { 777 LOG.debug("Adding " + path + " to job list cache."); 778 } 779 List<FileStatus> historyFileList = scanDirectoryForHistoryFiles(path, 780 doneDirFc); 781 for (FileStatus fs : historyFileList) { 782 if (LOG.isDebugEnabled()) { 783 LOG.debug("Adding in history for " + fs.getPath()); 784 } 785 JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath() 786 .getName()); 787 String confFileName = JobHistoryUtils 788 .getIntermediateConfFileName(jobIndexInfo.getJobId()); 789 String summaryFileName = JobHistoryUtils 790 .getIntermediateSummaryFileName(jobIndexInfo.getJobId()); 791 HistoryFileInfo fileInfo = createHistoryFileInfo(fs.getPath(), new Path(fs 792 .getPath().getParent(), confFileName), new Path(fs.getPath() 793 .getParent(), summaryFileName), jobIndexInfo, true); 794 jobListCache.addIfAbsent(fileInfo); 795 } 796 } 797 798 @VisibleForTesting 799 protected static List<FileStatus> scanDirectory(Path path, FileContext fc, 800 PathFilter pathFilter) throws IOException { 801 path = fc.makeQualified(path); 802 List<FileStatus> jhStatusList = new ArrayList<FileStatus>(); 803 try { 804 RemoteIterator<FileStatus> fileStatusIter = fc.listStatus(path); 805 while (fileStatusIter.hasNext()) { 806 FileStatus fileStatus = fileStatusIter.next(); 807 Path filePath = fileStatus.getPath(); 808 if (fileStatus.isFile() && pathFilter.accept(filePath)) { 809 jhStatusList.add(fileStatus); 810 } 811 } 812 } catch (FileNotFoundException fe) { 813 LOG.error("Error while scanning directory " + path, fe); 814 } 815 return jhStatusList; 816 } 817 818 protected List<FileStatus> scanDirectoryForHistoryFiles(Path path, 819 FileContext fc) throws IOException { 820 return scanDirectory(path, fc, JobHistoryUtils.getHistoryFileFilter()); 821 } 822 823 /** 824 * Finds all history directories with a timestamp component by scanning the 825 * filesystem. Used when the JobHistory server is started. 826 * 827 * @return list of history directories 828 */ 829 protected List<FileStatus> findTimestampedDirectories() throws IOException { 830 List<FileStatus> fsList = JobHistoryUtils.localGlobber(doneDirFc, 831 doneDirPrefixPath, DONE_BEFORE_SERIAL_TAIL); 832 return fsList; 833 } 834 835 /** 836 * Scans the intermediate directory to find user directories. Scans these for 837 * history files if the modification time for the directory has changed. Once 838 * it finds history files it starts the process of moving them to the done 839 * directory. 840 * 841 * @throws IOException 842 * if there was a error while scanning 843 */ 844 void scanIntermediateDirectory() throws IOException { 845 // TODO it would be great to limit how often this happens, except in the 846 // case where we are looking for a particular job. 847 List<FileStatus> userDirList = JobHistoryUtils.localGlobber( 848 intermediateDoneDirFc, intermediateDoneDirPath, ""); 849 LOG.debug("Scanning intermediate dirs"); 850 for (FileStatus userDir : userDirList) { 851 String name = userDir.getPath().getName(); 852 UserLogDir dir = userDirModificationTimeMap.get(name); 853 if(dir == null) { 854 dir = new UserLogDir(); 855 UserLogDir old = userDirModificationTimeMap.putIfAbsent(name, dir); 856 if(old != null) { 857 dir = old; 858 } 859 } 860 dir.scanIfNeeded(userDir); 861 } 862 } 863 864 /** 865 * Scans the specified path and populates the intermediate cache. 866 * 867 * @param absPath 868 * @throws IOException 869 */ 870 private void scanIntermediateDirectory(final Path absPath) throws IOException { 871 if (LOG.isDebugEnabled()) { 872 LOG.debug("Scanning intermediate dir " + absPath); 873 } 874 List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(absPath, 875 intermediateDoneDirFc); 876 if (LOG.isDebugEnabled()) { 877 LOG.debug("Found " + fileStatusList.size() + " files"); 878 } 879 for (FileStatus fs : fileStatusList) { 880 if (LOG.isDebugEnabled()) { 881 LOG.debug("scanning file: "+ fs.getPath()); 882 } 883 JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath() 884 .getName()); 885 String confFileName = JobHistoryUtils 886 .getIntermediateConfFileName(jobIndexInfo.getJobId()); 887 String summaryFileName = JobHistoryUtils 888 .getIntermediateSummaryFileName(jobIndexInfo.getJobId()); 889 HistoryFileInfo fileInfo = createHistoryFileInfo(fs.getPath(), new Path(fs 890 .getPath().getParent(), confFileName), new Path(fs.getPath() 891 .getParent(), summaryFileName), jobIndexInfo, false); 892 893 final HistoryFileInfo old = jobListCache.addIfAbsent(fileInfo); 894 if (old == null || old.didMoveFail()) { 895 final HistoryFileInfo found = (old == null) ? fileInfo : old; 896 long cutoff = System.currentTimeMillis() - maxHistoryAge; 897 if(found.getJobIndexInfo().getFinishTime() <= cutoff) { 898 try { 899 found.delete(); 900 } catch (IOException e) { 901 LOG.warn("Error cleaning up a HistoryFile that is out of date.", e); 902 } 903 } else { 904 if (LOG.isDebugEnabled()) { 905 LOG.debug("Scheduling move to done of " +found); 906 } 907 moveToDoneExecutor.execute(new Runnable() { 908 @Override 909 public void run() { 910 try { 911 found.moveToDone(); 912 } catch (IOException e) { 913 LOG.info("Failed to process fileInfo for job: " + 914 found.getJobId(), e); 915 } 916 } 917 }); 918 } 919 } else if (!old.isMovePending()) { 920 //This is a duplicate so just delete it 921 if (LOG.isDebugEnabled()) { 922 LOG.debug("Duplicate: deleting"); 923 } 924 fileInfo.delete(); 925 } 926 } 927 } 928 929 /** 930 * Searches the job history file FileStatus list for the specified JobId. 931 * 932 * @param fileStatusList 933 * fileStatus list of Job History Files. 934 * @param jobId 935 * The JobId to find. 936 * @return A FileInfo object for the jobId, null if not found. 937 * @throws IOException 938 */ 939 private HistoryFileInfo getJobFileInfo(List<FileStatus> fileStatusList, 940 JobId jobId) throws IOException { 941 for (FileStatus fs : fileStatusList) { 942 JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath() 943 .getName()); 944 if (jobIndexInfo.getJobId().equals(jobId)) { 945 String confFileName = JobHistoryUtils 946 .getIntermediateConfFileName(jobIndexInfo.getJobId()); 947 String summaryFileName = JobHistoryUtils 948 .getIntermediateSummaryFileName(jobIndexInfo.getJobId()); 949 HistoryFileInfo fileInfo = createHistoryFileInfo(fs.getPath(), new Path( 950 fs.getPath().getParent(), confFileName), new Path(fs.getPath() 951 .getParent(), summaryFileName), jobIndexInfo, true); 952 return fileInfo; 953 } 954 } 955 return null; 956 } 957 958 /** 959 * Scans old directories known by the idToDateString map for the specified 960 * jobId. If the number of directories is higher than the supported size of 961 * the idToDateString cache, the jobId will not be found. 962 * 963 * @param jobId 964 * the jobId. 965 * @return 966 * @throws IOException 967 */ 968 private HistoryFileInfo scanOldDirsForJob(JobId jobId) throws IOException { 969 String boxedSerialNumber = JobHistoryUtils.serialNumberDirectoryComponent( 970 jobId, serialNumberFormat); 971 Set<String> dateStringSet = serialNumberIndex.get(boxedSerialNumber); 972 if (dateStringSet == null) { 973 return null; 974 } 975 for (String timestampPart : dateStringSet) { 976 Path logDir = canonicalHistoryLogPath(jobId, timestampPart); 977 List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(logDir, 978 doneDirFc); 979 HistoryFileInfo fileInfo = getJobFileInfo(fileStatusList, jobId); 980 if (fileInfo != null) { 981 return fileInfo; 982 } 983 } 984 return null; 985 } 986 987 public Collection<HistoryFileInfo> getAllFileInfo() throws IOException { 988 scanIntermediateDirectory(); 989 return jobListCache.values(); 990 } 991 992 public HistoryFileInfo getFileInfo(JobId jobId) throws IOException { 993 // FileInfo available in cache. 994 HistoryFileInfo fileInfo = jobListCache.get(jobId); 995 if (fileInfo != null) { 996 return fileInfo; 997 } 998 // OK so scan the intermediate to be sure we did not lose it that way 999 scanIntermediateDirectory(); 1000 fileInfo = jobListCache.get(jobId); 1001 if (fileInfo != null) { 1002 return fileInfo; 1003 } 1004 1005 // Intermediate directory does not contain job. Search through older ones. 1006 fileInfo = scanOldDirsForJob(jobId); 1007 if (fileInfo != null) { 1008 return fileInfo; 1009 } 1010 return null; 1011 } 1012 1013 private void moveToDoneNow(final Path src, final Path target) 1014 throws IOException { 1015 LOG.info("Moving " + src.toString() + " to " + target.toString()); 1016 intermediateDoneDirFc.rename(src, target, Options.Rename.NONE); 1017 } 1018 1019 private String getJobSummary(FileContext fc, Path path) throws IOException { 1020 Path qPath = fc.makeQualified(path); 1021 FSDataInputStream in = null; 1022 String jobSummaryString = null; 1023 try { 1024 in = fc.open(qPath); 1025 jobSummaryString = in.readUTF(); 1026 } finally { 1027 if (in != null) { 1028 in.close(); 1029 } 1030 } 1031 return jobSummaryString; 1032 } 1033 1034 private void makeDoneSubdir(Path path) throws IOException { 1035 try { 1036 doneDirFc.getFileStatus(path); 1037 existingDoneSubdirs.add(path); 1038 } catch (FileNotFoundException fnfE) { 1039 try { 1040 FsPermission fsp = new FsPermission( 1041 JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION); 1042 doneDirFc.mkdir(path, fsp, true); 1043 FileStatus fsStatus = doneDirFc.getFileStatus(path); 1044 LOG.info("Perms after creating " + fsStatus.getPermission().toShort() 1045 + ", Expected: " + fsp.toShort()); 1046 if (fsStatus.getPermission().toShort() != fsp.toShort()) { 1047 LOG.info("Explicitly setting permissions to : " + fsp.toShort() 1048 + ", " + fsp); 1049 doneDirFc.setPermission(path, fsp); 1050 } 1051 existingDoneSubdirs.add(path); 1052 } catch (FileAlreadyExistsException faeE) { // Nothing to do. 1053 } 1054 } 1055 } 1056 1057 private Path canonicalHistoryLogPath(JobId id, String timestampComponent) { 1058 return new Path(doneDirPrefixPath, JobHistoryUtils.historyLogSubdirectory( 1059 id, timestampComponent, serialNumberFormat)); 1060 } 1061 1062 private Path canonicalHistoryLogPath(JobId id, long millisecondTime) { 1063 String timestampComponent = JobHistoryUtils 1064 .timestampDirectoryComponent(millisecondTime); 1065 return new Path(doneDirPrefixPath, JobHistoryUtils.historyLogSubdirectory( 1066 id, timestampComponent, serialNumberFormat)); 1067 } 1068 1069 private long getEffectiveTimestamp(long finishTime, FileStatus fileStatus) { 1070 if (finishTime == 0) { 1071 return fileStatus.getModificationTime(); 1072 } 1073 return finishTime; 1074 } 1075 1076 private void deleteJobFromDone(HistoryFileInfo fileInfo) throws IOException { 1077 jobListCache.delete(fileInfo); 1078 fileInfo.delete(); 1079 } 1080 1081 List<FileStatus> getHistoryDirsForCleaning(long cutoff) throws IOException { 1082 return JobHistoryUtils. 1083 getHistoryDirsForCleaning(doneDirFc, doneDirPrefixPath, cutoff); 1084 } 1085 1086 /** 1087 * Clean up older history files. 1088 * 1089 * @throws IOException 1090 * on any error trying to remove the entries. 1091 */ 1092 @SuppressWarnings("unchecked") 1093 void clean() throws IOException { 1094 long cutoff = System.currentTimeMillis() - maxHistoryAge; 1095 boolean halted = false; 1096 List<FileStatus> serialDirList = getHistoryDirsForCleaning(cutoff); 1097 // Sort in ascending order. Relies on YYYY/MM/DD/Serial 1098 Collections.sort(serialDirList); 1099 for (FileStatus serialDir : serialDirList) { 1100 List<FileStatus> historyFileList = scanDirectoryForHistoryFiles( 1101 serialDir.getPath(), doneDirFc); 1102 for (FileStatus historyFile : historyFileList) { 1103 JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(historyFile 1104 .getPath().getName()); 1105 long effectiveTimestamp = getEffectiveTimestamp( 1106 jobIndexInfo.getFinishTime(), historyFile); 1107 if (effectiveTimestamp <= cutoff) { 1108 HistoryFileInfo fileInfo = this.jobListCache.get(jobIndexInfo 1109 .getJobId()); 1110 if (fileInfo == null) { 1111 String confFileName = JobHistoryUtils 1112 .getIntermediateConfFileName(jobIndexInfo.getJobId()); 1113 1114 fileInfo = createHistoryFileInfo(historyFile.getPath(), new Path( 1115 historyFile.getPath().getParent(), confFileName), null, 1116 jobIndexInfo, true); 1117 } 1118 deleteJobFromDone(fileInfo); 1119 } else { 1120 halted = true; 1121 break; 1122 } 1123 } 1124 if (!halted) { 1125 deleteDir(serialDir); 1126 removeDirectoryFromSerialNumberIndex(serialDir.getPath()); 1127 existingDoneSubdirs.remove(serialDir.getPath()); 1128 } else { 1129 break; // Don't scan any more directories. 1130 } 1131 } 1132 } 1133 1134 protected boolean deleteDir(FileStatus serialDir) 1135 throws AccessControlException, FileNotFoundException, 1136 UnsupportedFileSystemException, IOException { 1137 return doneDirFc.delete(doneDirFc.makeQualified(serialDir.getPath()), true); 1138 } 1139 1140 @VisibleForTesting 1141 protected void setMaxHistoryAge(long newValue){ 1142 maxHistoryAge=newValue; 1143 } 1144}