001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce.v2.hs; 020 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.net.ConnectException; 024import java.util.ArrayList; 025import java.util.Collection; 026import java.util.Collections; 027import java.util.HashSet; 028import java.util.Iterator; 029import java.util.List; 030import java.util.NavigableSet; 031import java.util.Set; 032import java.util.SortedMap; 033import java.util.TreeMap; 034import java.util.concurrent.ConcurrentHashMap; 035import java.util.concurrent.ConcurrentMap; 036import java.util.concurrent.ConcurrentSkipListMap; 037import java.util.concurrent.LinkedBlockingQueue; 038import java.util.concurrent.ThreadFactory; 039import java.util.concurrent.ThreadPoolExecutor; 040import java.util.concurrent.TimeUnit; 041import java.util.concurrent.atomic.AtomicInteger; 042 043import org.apache.commons.logging.Log; 044import org.apache.commons.logging.LogFactory; 045import org.apache.hadoop.classification.InterfaceAudience; 046import org.apache.hadoop.classification.InterfaceStability; 047import org.apache.hadoop.conf.Configuration; 048import org.apache.hadoop.fs.FSDataInputStream; 049import org.apache.hadoop.fs.FileAlreadyExistsException; 050import org.apache.hadoop.fs.FileContext; 051import org.apache.hadoop.fs.FileStatus; 052import org.apache.hadoop.fs.Options; 053import org.apache.hadoop.fs.Path; 054import org.apache.hadoop.fs.PathFilter; 055import org.apache.hadoop.fs.RemoteIterator; 056import org.apache.hadoop.fs.UnsupportedFileSystemException; 057import org.apache.hadoop.fs.permission.FsPermission; 058import org.apache.hadoop.mapred.JobACLsManager; 059import org.apache.hadoop.mapreduce.jobhistory.JobSummary; 060import org.apache.hadoop.mapreduce.v2.api.records.JobId; 061import org.apache.hadoop.mapreduce.v2.app.job.Job; 062import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils; 063import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; 064import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; 065import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo; 066import org.apache.hadoop.security.AccessControlException; 067import org.apache.hadoop.service.AbstractService; 068import org.apache.hadoop.util.ShutdownThreadsHelper; 069import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; 070 071import com.google.common.annotations.VisibleForTesting; 072import com.google.common.util.concurrent.ThreadFactoryBuilder; 073import org.apache.hadoop.yarn.util.Clock; 074import org.apache.hadoop.yarn.util.SystemClock; 075 076/** 077 * This class provides a way to interact with history files in a thread safe 078 * manor. 079 */ 080@InterfaceAudience.Public 081@InterfaceStability.Unstable 082public class HistoryFileManager extends AbstractService { 083 private static final Log LOG = LogFactory.getLog(HistoryFileManager.class); 084 private static final Log SUMMARY_LOG = LogFactory.getLog(JobSummary.class); 085 086 private static enum HistoryInfoState { 087 IN_INTERMEDIATE, IN_DONE, DELETED, MOVE_FAILED 088 }; 089 090 private static String DONE_BEFORE_SERIAL_TAIL = JobHistoryUtils 091 .doneSubdirsBeforeSerialTail(); 092 093 /** 094 * Maps between a serial number (generated based on jobId) and the timestamp 095 * component(s) to which it belongs. Facilitates jobId based searches. If a 096 * jobId is not found in this list - it will not be found. 097 */ 098 private static class SerialNumberIndex { 099 private SortedMap<String, Set<String>> cache; 100 private int maxSize; 101 102 public SerialNumberIndex(int maxSize) { 103 this.cache = new TreeMap<String, Set<String>>(); 104 this.maxSize = maxSize; 105 } 106 107 public synchronized void add(String serialPart, String timestampPart) { 108 if (!cache.containsKey(serialPart)) { 109 cache.put(serialPart, new HashSet<String>()); 110 if (cache.size() > maxSize) { 111 String key = cache.firstKey(); 112 LOG.error("Dropping " + key 113 + " from the SerialNumberIndex. We will no " 114 + "longer be able to see jobs that are in that serial index for " 115 + cache.get(key)); 116 cache.remove(key); 117 } 118 } 119 Set<String> datePartSet = cache.get(serialPart); 120 datePartSet.add(timestampPart); 121 } 122 123 public synchronized void remove(String serialPart, String timeStampPart) { 124 if (cache.containsKey(serialPart)) { 125 Set<String> set = cache.get(serialPart); 126 set.remove(timeStampPart); 127 if (set.isEmpty()) { 128 cache.remove(serialPart); 129 } 130 } 131 } 132 133 public synchronized Set<String> get(String serialPart) { 134 Set<String> found = cache.get(serialPart); 135 if (found != null) { 136 return new HashSet<String>(found); 137 } 138 return null; 139 } 140 } 141 142 /** 143 * Wrapper around {@link ConcurrentSkipListMap} that maintains size along 144 * side for O(1) size() implementation for use in JobListCache. 145 * 146 * Note: The size is not updated atomically with changes additions/removals. 147 * This race can lead to size() returning an incorrect size at times. 148 */ 149 static class JobIdHistoryFileInfoMap { 150 private ConcurrentSkipListMap<JobId, HistoryFileInfo> cache; 151 private AtomicInteger mapSize; 152 153 JobIdHistoryFileInfoMap() { 154 cache = new ConcurrentSkipListMap<JobId, HistoryFileInfo>(); 155 mapSize = new AtomicInteger(); 156 } 157 158 public HistoryFileInfo putIfAbsent(JobId key, HistoryFileInfo value) { 159 HistoryFileInfo ret = cache.putIfAbsent(key, value); 160 if (ret == null) { 161 mapSize.incrementAndGet(); 162 } 163 return ret; 164 } 165 166 public HistoryFileInfo remove(JobId key) { 167 HistoryFileInfo ret = cache.remove(key); 168 if (ret != null) { 169 mapSize.decrementAndGet(); 170 } 171 return ret; 172 } 173 174 /** 175 * Returns the recorded size of the internal map. Note that this could be out 176 * of sync with the actual size of the map 177 * @return "recorded" size 178 */ 179 public int size() { 180 return mapSize.get(); 181 } 182 183 public HistoryFileInfo get(JobId key) { 184 return cache.get(key); 185 } 186 187 public NavigableSet<JobId> navigableKeySet() { 188 return cache.navigableKeySet(); 189 } 190 191 public Collection<HistoryFileInfo> values() { 192 return cache.values(); 193 } 194 } 195 196 static class JobListCache { 197 private JobIdHistoryFileInfoMap cache; 198 private int maxSize; 199 private long maxAge; 200 201 public JobListCache(int maxSize, long maxAge) { 202 this.maxSize = maxSize; 203 this.maxAge = maxAge; 204 this.cache = new JobIdHistoryFileInfoMap(); 205 } 206 207 public HistoryFileInfo addIfAbsent(HistoryFileInfo fileInfo) { 208 JobId jobId = fileInfo.getJobId(); 209 if (LOG.isDebugEnabled()) { 210 LOG.debug("Adding " + jobId + " to job list cache with " 211 + fileInfo.getJobIndexInfo()); 212 } 213 HistoryFileInfo old = cache.putIfAbsent(jobId, fileInfo); 214 if (cache.size() > maxSize) { 215 //There is a race here, where more then one thread could be trying to 216 // remove entries. This could result in too many entries being removed 217 // from the cache. This is considered OK as the size of the cache 218 // should be rather large, and we would rather have performance over 219 // keeping the cache size exactly at the maximum. 220 Iterator<JobId> keys = cache.navigableKeySet().iterator(); 221 long cutoff = System.currentTimeMillis() - maxAge; 222 223 // MAPREDUCE-6436: In order to reduce the number of logs written 224 // in case of a lot of move pending histories. 225 JobId firstInIntermediateKey = null; 226 int inIntermediateCount = 0; 227 JobId firstMoveFailedKey = null; 228 int moveFailedCount = 0; 229 230 while(cache.size() > maxSize && keys.hasNext()) { 231 JobId key = keys.next(); 232 HistoryFileInfo firstValue = cache.get(key); 233 if(firstValue != null) { 234 synchronized(firstValue) { 235 if (firstValue.isMovePending()) { 236 if(firstValue.didMoveFail() && 237 firstValue.jobIndexInfo.getFinishTime() <= cutoff) { 238 cache.remove(key); 239 //Now lets try to delete it 240 try { 241 firstValue.delete(); 242 } catch (IOException e) { 243 LOG.error("Error while trying to delete history files" + 244 " that could not be moved to done.", e); 245 } 246 } else { 247 if (firstValue.didMoveFail()) { 248 if (moveFailedCount == 0) { 249 firstMoveFailedKey = key; 250 } 251 moveFailedCount += 1; 252 } else { 253 if (inIntermediateCount == 0) { 254 firstInIntermediateKey = key; 255 } 256 inIntermediateCount += 1; 257 } 258 } 259 } else { 260 cache.remove(key); 261 } 262 } 263 } 264 } 265 // Log output only for first jobhisotry in pendings to restrict 266 // the total number of logs. 267 if (inIntermediateCount > 0) { 268 LOG.warn("Waiting to remove IN_INTERMEDIATE state histories " + 269 "(e.g. " + firstInIntermediateKey + ") from JobListCache " + 270 "because it is not in done yet. Total count is " + 271 inIntermediateCount + "."); 272 } 273 if (moveFailedCount > 0) { 274 LOG.warn("Waiting to remove MOVE_FAILED state histories " + 275 "(e.g. " + firstMoveFailedKey + ") from JobListCache " + 276 "because it is not in done yet. Total count is " + 277 moveFailedCount + "."); 278 } 279 } 280 return old; 281 } 282 283 public void delete(HistoryFileInfo fileInfo) { 284 if (LOG.isDebugEnabled()) { 285 LOG.debug("Removing from cache " + fileInfo); 286 } 287 cache.remove(fileInfo.getJobId()); 288 } 289 290 public Collection<HistoryFileInfo> values() { 291 return new ArrayList<HistoryFileInfo>(cache.values()); 292 } 293 294 public HistoryFileInfo get(JobId jobId) { 295 return cache.get(jobId); 296 } 297 298 public boolean isFull() { 299 return cache.size() >= maxSize; 300 } 301 } 302 303 /** 304 * This class represents a user dir in the intermediate done directory. This 305 * is mostly for locking purposes. 306 */ 307 private class UserLogDir { 308 long modTime = 0; 309 private long scanTime = 0; 310 311 public synchronized void scanIfNeeded(FileStatus fs) { 312 long newModTime = fs.getModificationTime(); 313 // MAPREDUCE-6680: In some Cloud FileSystem, like Azure FS or S3, file's 314 // modification time is truncated into seconds. In that case, 315 // modTime == newModTime doesn't means no file update in the directory, 316 // so we need to have additional check. 317 // Note: modTime (X second Y millisecond) could be casted to X second or 318 // X+1 second. 319 if (modTime != newModTime 320 || (scanTime/1000) == (modTime/1000) 321 || (scanTime/1000 + 1) == (modTime/1000)) { 322 // reset scanTime before scanning happens 323 scanTime = System.currentTimeMillis(); 324 Path p = fs.getPath(); 325 try { 326 scanIntermediateDirectory(p); 327 //If scanning fails, we will scan again. We assume the failure is 328 // temporary. 329 modTime = newModTime; 330 } catch (IOException e) { 331 LOG.error("Error while trying to scan the directory " + p, e); 332 } 333 } else { 334 if (LOG.isDebugEnabled()) { 335 LOG.debug("Scan not needed of " + fs.getPath()); 336 } 337 // reset scanTime 338 scanTime = System.currentTimeMillis(); 339 } 340 } 341 } 342 343 public class HistoryFileInfo { 344 private Path historyFile; 345 private Path confFile; 346 private Path summaryFile; 347 private JobIndexInfo jobIndexInfo; 348 private HistoryInfoState state; 349 350 @VisibleForTesting 351 protected HistoryFileInfo(Path historyFile, Path confFile, 352 Path summaryFile, JobIndexInfo jobIndexInfo, boolean isInDone) { 353 this.historyFile = historyFile; 354 this.confFile = confFile; 355 this.summaryFile = summaryFile; 356 this.jobIndexInfo = jobIndexInfo; 357 state = isInDone ? HistoryInfoState.IN_DONE 358 : HistoryInfoState.IN_INTERMEDIATE; 359 } 360 361 @VisibleForTesting 362 synchronized boolean isMovePending() { 363 return state == HistoryInfoState.IN_INTERMEDIATE 364 || state == HistoryInfoState.MOVE_FAILED; 365 } 366 367 @VisibleForTesting 368 synchronized boolean didMoveFail() { 369 return state == HistoryInfoState.MOVE_FAILED; 370 } 371 372 /** 373 * @return true if the files backed by this were deleted. 374 */ 375 public synchronized boolean isDeleted() { 376 return state == HistoryInfoState.DELETED; 377 } 378 379 @Override 380 public String toString() { 381 return "HistoryFileInfo jobID " + getJobId() 382 + " historyFile = " + historyFile; 383 } 384 385 @VisibleForTesting 386 synchronized void moveToDone() throws IOException { 387 if (LOG.isDebugEnabled()) { 388 LOG.debug("moveToDone: " + historyFile); 389 } 390 if (!isMovePending()) { 391 // It was either deleted or is already in done. Either way do nothing 392 if (LOG.isDebugEnabled()) { 393 LOG.debug("Move no longer pending"); 394 } 395 return; 396 } 397 try { 398 long completeTime = jobIndexInfo.getFinishTime(); 399 if (completeTime == 0) { 400 completeTime = System.currentTimeMillis(); 401 } 402 JobId jobId = jobIndexInfo.getJobId(); 403 404 List<Path> paths = new ArrayList<Path>(2); 405 if (historyFile == null) { 406 LOG.info("No file for job-history with " + jobId + " found in cache!"); 407 } else { 408 paths.add(historyFile); 409 } 410 411 if (confFile == null) { 412 LOG.info("No file for jobConf with " + jobId + " found in cache!"); 413 } else { 414 paths.add(confFile); 415 } 416 417 if (summaryFile == null || !intermediateDoneDirFc.util().exists( 418 summaryFile)) { 419 LOG.info("No summary file for job: " + jobId); 420 } else { 421 String jobSummaryString = getJobSummary(intermediateDoneDirFc, 422 summaryFile); 423 SUMMARY_LOG.info(jobSummaryString); 424 LOG.info("Deleting JobSummary file: [" + summaryFile + "]"); 425 intermediateDoneDirFc.delete(summaryFile, false); 426 summaryFile = null; 427 } 428 429 Path targetDir = canonicalHistoryLogPath(jobId, completeTime); 430 addDirectoryToSerialNumberIndex(targetDir); 431 makeDoneSubdir(targetDir); 432 if (historyFile != null) { 433 Path toPath = doneDirFc.makeQualified(new Path(targetDir, historyFile 434 .getName())); 435 if (!toPath.equals(historyFile)) { 436 moveToDoneNow(historyFile, toPath); 437 historyFile = toPath; 438 } 439 } 440 if (confFile != null) { 441 Path toPath = doneDirFc.makeQualified(new Path(targetDir, confFile 442 .getName())); 443 if (!toPath.equals(confFile)) { 444 moveToDoneNow(confFile, toPath); 445 confFile = toPath; 446 } 447 } 448 state = HistoryInfoState.IN_DONE; 449 } catch (Throwable t) { 450 LOG.error("Error while trying to move a job to done", t); 451 this.state = HistoryInfoState.MOVE_FAILED; 452 } 453 } 454 455 /** 456 * Parse a job from the JobHistoryFile, if the underlying file is not going 457 * to be deleted. 458 * 459 * @return the Job or null if the underlying file was deleted. 460 * @throws IOException 461 * if there is an error trying to read the file. 462 */ 463 public synchronized Job loadJob() throws IOException { 464 return new CompletedJob(conf, jobIndexInfo.getJobId(), historyFile, 465 false, jobIndexInfo.getUser(), this, aclsMgr); 466 } 467 468 /** 469 * Return the history file. This should only be used for testing. 470 * @return the history file. 471 */ 472 synchronized Path getHistoryFile() { 473 return historyFile; 474 } 475 476 protected synchronized void delete() throws IOException { 477 if (LOG.isDebugEnabled()) { 478 LOG.debug("deleting " + historyFile + " and " + confFile); 479 } 480 state = HistoryInfoState.DELETED; 481 doneDirFc.delete(doneDirFc.makeQualified(historyFile), false); 482 doneDirFc.delete(doneDirFc.makeQualified(confFile), false); 483 } 484 485 public JobIndexInfo getJobIndexInfo() { 486 return jobIndexInfo; 487 } 488 489 public JobId getJobId() { 490 return jobIndexInfo.getJobId(); 491 } 492 493 public synchronized Path getConfFile() { 494 return confFile; 495 } 496 497 public synchronized Configuration loadConfFile() throws IOException { 498 FileContext fc = FileContext.getFileContext(confFile.toUri(), conf); 499 Configuration jobConf = new Configuration(false); 500 jobConf.addResource(fc.open(confFile), confFile.toString()); 501 return jobConf; 502 } 503 } 504 505 private SerialNumberIndex serialNumberIndex = null; 506 protected JobListCache jobListCache = null; 507 508 // Maintains a list of known done subdirectories. 509 private final Set<Path> existingDoneSubdirs = Collections 510 .synchronizedSet(new HashSet<Path>()); 511 512 /** 513 * Maintains a mapping between intermediate user directories and the last 514 * known modification time. 515 */ 516 private ConcurrentMap<String, UserLogDir> userDirModificationTimeMap = 517 new ConcurrentHashMap<String, UserLogDir>(); 518 519 private JobACLsManager aclsMgr; 520 521 @VisibleForTesting 522 Configuration conf; 523 524 private String serialNumberFormat; 525 526 private Path doneDirPrefixPath = null; // folder for completed jobs 527 private FileContext doneDirFc; // done Dir FileContext 528 529 private Path intermediateDoneDirPath = null; // Intermediate Done Dir Path 530 private FileContext intermediateDoneDirFc; // Intermediate Done Dir 531 // FileContext 532 @VisibleForTesting 533 protected ThreadPoolExecutor moveToDoneExecutor = null; 534 private long maxHistoryAge = 0; 535 536 public HistoryFileManager() { 537 super(HistoryFileManager.class.getName()); 538 } 539 540 @Override 541 protected void serviceInit(Configuration conf) throws Exception { 542 this.conf = conf; 543 544 int serialNumberLowDigits = 3; 545 serialNumberFormat = ("%0" 546 + (JobHistoryUtils.SERIAL_NUMBER_DIRECTORY_DIGITS + serialNumberLowDigits) 547 + "d"); 548 549 long maxFSWaitTime = conf.getLong( 550 JHAdminConfig.MR_HISTORY_MAX_START_WAIT_TIME, 551 JHAdminConfig.DEFAULT_MR_HISTORY_MAX_START_WAIT_TIME); 552 createHistoryDirs(new SystemClock(), 10 * 1000, maxFSWaitTime); 553 554 this.aclsMgr = new JobACLsManager(conf); 555 556 maxHistoryAge = conf.getLong(JHAdminConfig.MR_HISTORY_MAX_AGE_MS, 557 JHAdminConfig.DEFAULT_MR_HISTORY_MAX_AGE); 558 559 jobListCache = createJobListCache(); 560 561 serialNumberIndex = new SerialNumberIndex(conf.getInt( 562 JHAdminConfig.MR_HISTORY_DATESTRING_CACHE_SIZE, 563 JHAdminConfig.DEFAULT_MR_HISTORY_DATESTRING_CACHE_SIZE)); 564 565 int numMoveThreads = conf.getInt( 566 JHAdminConfig.MR_HISTORY_MOVE_THREAD_COUNT, 567 JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_THREAD_COUNT); 568 ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat( 569 "MoveIntermediateToDone Thread #%d").build(); 570 moveToDoneExecutor = new ThreadPoolExecutor(numMoveThreads, numMoveThreads, 571 1, TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf); 572 573 super.serviceInit(conf); 574 } 575 576 @VisibleForTesting 577 void createHistoryDirs(Clock clock, long intervalCheckMillis, 578 long timeOutMillis) throws IOException { 579 long start = clock.getTime(); 580 boolean done = false; 581 int counter = 0; 582 while (!done && 583 ((timeOutMillis == -1) || (clock.getTime() - start < timeOutMillis))) { 584 done = tryCreatingHistoryDirs(counter++ % 3 == 0); // log every 3 attempts, 30sec 585 try { 586 Thread.sleep(intervalCheckMillis); 587 } catch (InterruptedException ex) { 588 throw new YarnRuntimeException(ex); 589 } 590 } 591 if (!done) { 592 throw new YarnRuntimeException("Timed out '" + timeOutMillis+ 593 "ms' waiting for FileSystem to become available"); 594 } 595 } 596 597 /** 598 * DistributedFileSystem returns a RemoteException with a message stating 599 * SafeModeException in it. So this is only way to check it is because of 600 * being in safe mode. 601 */ 602 private boolean isBecauseSafeMode(Throwable ex) { 603 return ex.toString().contains("SafeModeException"); 604 } 605 606 /** 607 * Returns TRUE if the history dirs were created, FALSE if they could not 608 * be created because the FileSystem is not reachable or in safe mode and 609 * throws and exception otherwise. 610 */ 611 @VisibleForTesting 612 boolean tryCreatingHistoryDirs(boolean logWait) throws IOException { 613 boolean succeeded = true; 614 String doneDirPrefix = JobHistoryUtils. 615 getConfiguredHistoryServerDoneDirPrefix(conf); 616 try { 617 doneDirPrefixPath = FileContext.getFileContext(conf).makeQualified( 618 new Path(doneDirPrefix)); 619 doneDirFc = FileContext.getFileContext(doneDirPrefixPath.toUri(), conf); 620 doneDirFc.setUMask(JobHistoryUtils.HISTORY_DONE_DIR_UMASK); 621 mkdir(doneDirFc, doneDirPrefixPath, new FsPermission( 622 JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION)); 623 } catch (ConnectException ex) { 624 if (logWait) { 625 LOG.info("Waiting for FileSystem at " + 626 doneDirPrefixPath.toUri().getAuthority() + "to be available"); 627 } 628 succeeded = false; 629 } catch (IOException e) { 630 if (isBecauseSafeMode(e)) { 631 succeeded = false; 632 if (logWait) { 633 LOG.info("Waiting for FileSystem at " + 634 doneDirPrefixPath.toUri().getAuthority() + 635 "to be out of safe mode"); 636 } 637 } else { 638 throw new YarnRuntimeException("Error creating done directory: [" 639 + doneDirPrefixPath + "]", e); 640 } 641 } 642 if (succeeded) { 643 String intermediateDoneDirPrefix = JobHistoryUtils. 644 getConfiguredHistoryIntermediateDoneDirPrefix(conf); 645 try { 646 intermediateDoneDirPath = FileContext.getFileContext(conf).makeQualified( 647 new Path(intermediateDoneDirPrefix)); 648 intermediateDoneDirFc = FileContext.getFileContext( 649 intermediateDoneDirPath.toUri(), conf); 650 mkdir(intermediateDoneDirFc, intermediateDoneDirPath, new FsPermission( 651 JobHistoryUtils.HISTORY_INTERMEDIATE_DONE_DIR_PERMISSIONS.toShort())); 652 } catch (ConnectException ex) { 653 succeeded = false; 654 if (logWait) { 655 LOG.info("Waiting for FileSystem at " + 656 intermediateDoneDirPath.toUri().getAuthority() + 657 "to be available"); 658 } 659 } catch (IOException e) { 660 if (isBecauseSafeMode(e)) { 661 succeeded = false; 662 if (logWait) { 663 LOG.info("Waiting for FileSystem at " + 664 intermediateDoneDirPath.toUri().getAuthority() + 665 "to be out of safe mode"); 666 } 667 } else { 668 throw new YarnRuntimeException( 669 "Error creating intermediate done directory: [" 670 + intermediateDoneDirPath + "]", e); 671 } 672 } 673 } 674 return succeeded; 675 } 676 677 @Override 678 public void serviceStop() throws Exception { 679 ShutdownThreadsHelper.shutdownExecutorService(moveToDoneExecutor); 680 super.serviceStop(); 681 } 682 683 protected JobListCache createJobListCache() { 684 return new JobListCache(conf.getInt( 685 JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE, 686 JHAdminConfig.DEFAULT_MR_HISTORY_JOBLIST_CACHE_SIZE), maxHistoryAge); 687 } 688 689 private void mkdir(FileContext fc, Path path, FsPermission fsp) 690 throws IOException { 691 if (!fc.util().exists(path)) { 692 try { 693 fc.mkdir(path, fsp, true); 694 695 FileStatus fsStatus = fc.getFileStatus(path); 696 LOG.info("Perms after creating " + fsStatus.getPermission().toShort() 697 + ", Expected: " + fsp.toShort()); 698 if (fsStatus.getPermission().toShort() != fsp.toShort()) { 699 LOG.info("Explicitly setting permissions to : " + fsp.toShort() 700 + ", " + fsp); 701 fc.setPermission(path, fsp); 702 } 703 } catch (FileAlreadyExistsException e) { 704 LOG.info("Directory: [" + path + "] already exists."); 705 } 706 } 707 } 708 709 /** 710 * Populates index data structures. Should only be called at initialization 711 * times. 712 */ 713 @SuppressWarnings("unchecked") 714 void initExisting() throws IOException { 715 LOG.info("Initializing Existing Jobs..."); 716 List<FileStatus> timestampedDirList = findTimestampedDirectories(); 717 // Sort first just so insertion is in a consistent order 718 Collections.sort(timestampedDirList); 719 for (FileStatus fs : timestampedDirList) { 720 // TODO Could verify the correct format for these directories. 721 addDirectoryToSerialNumberIndex(fs.getPath()); 722 } 723 for (int i= timestampedDirList.size() - 1; 724 i >= 0 && !jobListCache.isFull(); i--) { 725 FileStatus fs = timestampedDirList.get(i); 726 addDirectoryToJobListCache(fs.getPath()); 727 } 728 } 729 730 private void removeDirectoryFromSerialNumberIndex(Path serialDirPath) { 731 String serialPart = serialDirPath.getName(); 732 String timeStampPart = JobHistoryUtils 733 .getTimestampPartFromPath(serialDirPath.toString()); 734 if (timeStampPart == null) { 735 LOG.warn("Could not find timestamp portion from path: " 736 + serialDirPath.toString() + ". Continuing with next"); 737 return; 738 } 739 if (serialPart == null) { 740 LOG.warn("Could not find serial portion from path: " 741 + serialDirPath.toString() + ". Continuing with next"); 742 return; 743 } 744 serialNumberIndex.remove(serialPart, timeStampPart); 745 } 746 747 private void addDirectoryToSerialNumberIndex(Path serialDirPath) { 748 if (LOG.isDebugEnabled()) { 749 LOG.debug("Adding " + serialDirPath + " to serial index"); 750 } 751 String serialPart = serialDirPath.getName(); 752 String timestampPart = JobHistoryUtils 753 .getTimestampPartFromPath(serialDirPath.toString()); 754 if (timestampPart == null) { 755 LOG.warn("Could not find timestamp portion from path: " + serialDirPath 756 + ". Continuing with next"); 757 return; 758 } 759 if (serialPart == null) { 760 LOG.warn("Could not find serial portion from path: " 761 + serialDirPath.toString() + ". Continuing with next"); 762 } else { 763 serialNumberIndex.add(serialPart, timestampPart); 764 } 765 } 766 767 private void addDirectoryToJobListCache(Path path) throws IOException { 768 if (LOG.isDebugEnabled()) { 769 LOG.debug("Adding " + path + " to job list cache."); 770 } 771 List<FileStatus> historyFileList = scanDirectoryForHistoryFiles(path, 772 doneDirFc); 773 for (FileStatus fs : historyFileList) { 774 if (LOG.isDebugEnabled()) { 775 LOG.debug("Adding in history for " + fs.getPath()); 776 } 777 JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath() 778 .getName()); 779 String confFileName = JobHistoryUtils 780 .getIntermediateConfFileName(jobIndexInfo.getJobId()); 781 String summaryFileName = JobHistoryUtils 782 .getIntermediateSummaryFileName(jobIndexInfo.getJobId()); 783 HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(fs 784 .getPath().getParent(), confFileName), new Path(fs.getPath() 785 .getParent(), summaryFileName), jobIndexInfo, true); 786 jobListCache.addIfAbsent(fileInfo); 787 } 788 } 789 790 @VisibleForTesting 791 protected static List<FileStatus> scanDirectory(Path path, FileContext fc, 792 PathFilter pathFilter) throws IOException { 793 path = fc.makeQualified(path); 794 List<FileStatus> jhStatusList = new ArrayList<FileStatus>(); 795 try { 796 RemoteIterator<FileStatus> fileStatusIter = fc.listStatus(path); 797 while (fileStatusIter.hasNext()) { 798 FileStatus fileStatus = fileStatusIter.next(); 799 Path filePath = fileStatus.getPath(); 800 if (fileStatus.isFile() && pathFilter.accept(filePath)) { 801 jhStatusList.add(fileStatus); 802 } 803 } 804 } catch (FileNotFoundException fe) { 805 LOG.error("Error while scanning directory " + path, fe); 806 } 807 return jhStatusList; 808 } 809 810 protected List<FileStatus> scanDirectoryForHistoryFiles(Path path, 811 FileContext fc) throws IOException { 812 return scanDirectory(path, fc, JobHistoryUtils.getHistoryFileFilter()); 813 } 814 815 /** 816 * Finds all history directories with a timestamp component by scanning the 817 * filesystem. Used when the JobHistory server is started. 818 * 819 * @return list of history directories 820 */ 821 protected List<FileStatus> findTimestampedDirectories() throws IOException { 822 List<FileStatus> fsList = JobHistoryUtils.localGlobber(doneDirFc, 823 doneDirPrefixPath, DONE_BEFORE_SERIAL_TAIL); 824 return fsList; 825 } 826 827 /** 828 * Scans the intermediate directory to find user directories. Scans these for 829 * history files if the modification time for the directory has changed. Once 830 * it finds history files it starts the process of moving them to the done 831 * directory. 832 * 833 * @throws IOException 834 * if there was a error while scanning 835 */ 836 void scanIntermediateDirectory() throws IOException { 837 // TODO it would be great to limit how often this happens, except in the 838 // case where we are looking for a particular job. 839 List<FileStatus> userDirList = JobHistoryUtils.localGlobber( 840 intermediateDoneDirFc, intermediateDoneDirPath, ""); 841 LOG.debug("Scanning intermediate dirs"); 842 for (FileStatus userDir : userDirList) { 843 String name = userDir.getPath().getName(); 844 UserLogDir dir = userDirModificationTimeMap.get(name); 845 if(dir == null) { 846 dir = new UserLogDir(); 847 UserLogDir old = userDirModificationTimeMap.putIfAbsent(name, dir); 848 if(old != null) { 849 dir = old; 850 } 851 } 852 dir.scanIfNeeded(userDir); 853 } 854 } 855 856 /** 857 * Scans the specified path and populates the intermediate cache. 858 * 859 * @param absPath 860 * @throws IOException 861 */ 862 private void scanIntermediateDirectory(final Path absPath) throws IOException { 863 if (LOG.isDebugEnabled()) { 864 LOG.debug("Scanning intermediate dir " + absPath); 865 } 866 List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(absPath, 867 intermediateDoneDirFc); 868 if (LOG.isDebugEnabled()) { 869 LOG.debug("Found " + fileStatusList.size() + " files"); 870 } 871 for (FileStatus fs : fileStatusList) { 872 if (LOG.isDebugEnabled()) { 873 LOG.debug("scanning file: "+ fs.getPath()); 874 } 875 JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath() 876 .getName()); 877 String confFileName = JobHistoryUtils 878 .getIntermediateConfFileName(jobIndexInfo.getJobId()); 879 String summaryFileName = JobHistoryUtils 880 .getIntermediateSummaryFileName(jobIndexInfo.getJobId()); 881 HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(fs 882 .getPath().getParent(), confFileName), new Path(fs.getPath() 883 .getParent(), summaryFileName), jobIndexInfo, false); 884 885 final HistoryFileInfo old = jobListCache.addIfAbsent(fileInfo); 886 if (old == null || old.didMoveFail()) { 887 final HistoryFileInfo found = (old == null) ? fileInfo : old; 888 long cutoff = System.currentTimeMillis() - maxHistoryAge; 889 if(found.getJobIndexInfo().getFinishTime() <= cutoff) { 890 try { 891 found.delete(); 892 } catch (IOException e) { 893 LOG.warn("Error cleaning up a HistoryFile that is out of date.", e); 894 } 895 } else { 896 if (LOG.isDebugEnabled()) { 897 LOG.debug("Scheduling move to done of " +found); 898 } 899 moveToDoneExecutor.execute(new Runnable() { 900 @Override 901 public void run() { 902 try { 903 found.moveToDone(); 904 } catch (IOException e) { 905 LOG.info("Failed to process fileInfo for job: " + 906 found.getJobId(), e); 907 } 908 } 909 }); 910 } 911 } else if (!old.isMovePending()) { 912 //This is a duplicate so just delete it 913 if (LOG.isDebugEnabled()) { 914 LOG.debug("Duplicate: deleting"); 915 } 916 fileInfo.delete(); 917 } 918 } 919 } 920 921 /** 922 * Searches the job history file FileStatus list for the specified JobId. 923 * 924 * @param fileStatusList 925 * fileStatus list of Job History Files. 926 * @param jobId 927 * The JobId to find. 928 * @return A FileInfo object for the jobId, null if not found. 929 * @throws IOException 930 */ 931 private HistoryFileInfo getJobFileInfo(List<FileStatus> fileStatusList, 932 JobId jobId) throws IOException { 933 for (FileStatus fs : fileStatusList) { 934 JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath() 935 .getName()); 936 if (jobIndexInfo.getJobId().equals(jobId)) { 937 String confFileName = JobHistoryUtils 938 .getIntermediateConfFileName(jobIndexInfo.getJobId()); 939 String summaryFileName = JobHistoryUtils 940 .getIntermediateSummaryFileName(jobIndexInfo.getJobId()); 941 HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path( 942 fs.getPath().getParent(), confFileName), new Path(fs.getPath() 943 .getParent(), summaryFileName), jobIndexInfo, true); 944 return fileInfo; 945 } 946 } 947 return null; 948 } 949 950 /** 951 * Scans old directories known by the idToDateString map for the specified 952 * jobId. If the number of directories is higher than the supported size of 953 * the idToDateString cache, the jobId will not be found. 954 * 955 * @param jobId 956 * the jobId. 957 * @return 958 * @throws IOException 959 */ 960 private HistoryFileInfo scanOldDirsForJob(JobId jobId) throws IOException { 961 String boxedSerialNumber = JobHistoryUtils.serialNumberDirectoryComponent( 962 jobId, serialNumberFormat); 963 Set<String> dateStringSet = serialNumberIndex.get(boxedSerialNumber); 964 if (dateStringSet == null) { 965 return null; 966 } 967 for (String timestampPart : dateStringSet) { 968 Path logDir = canonicalHistoryLogPath(jobId, timestampPart); 969 List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(logDir, 970 doneDirFc); 971 HistoryFileInfo fileInfo = getJobFileInfo(fileStatusList, jobId); 972 if (fileInfo != null) { 973 return fileInfo; 974 } 975 } 976 return null; 977 } 978 979 public Collection<HistoryFileInfo> getAllFileInfo() throws IOException { 980 scanIntermediateDirectory(); 981 return jobListCache.values(); 982 } 983 984 public HistoryFileInfo getFileInfo(JobId jobId) throws IOException { 985 // FileInfo available in cache. 986 HistoryFileInfo fileInfo = jobListCache.get(jobId); 987 if (fileInfo != null) { 988 return fileInfo; 989 } 990 // OK so scan the intermediate to be sure we did not lose it that way 991 scanIntermediateDirectory(); 992 fileInfo = jobListCache.get(jobId); 993 if (fileInfo != null) { 994 return fileInfo; 995 } 996 997 // Intermediate directory does not contain job. Search through older ones. 998 fileInfo = scanOldDirsForJob(jobId); 999 if (fileInfo != null) { 1000 return fileInfo; 1001 } 1002 return null; 1003 } 1004 1005 private void moveToDoneNow(final Path src, final Path target) 1006 throws IOException { 1007 LOG.info("Moving " + src.toString() + " to " + target.toString()); 1008 intermediateDoneDirFc.rename(src, target, Options.Rename.NONE); 1009 } 1010 1011 private String getJobSummary(FileContext fc, Path path) throws IOException { 1012 Path qPath = fc.makeQualified(path); 1013 FSDataInputStream in = null; 1014 String jobSummaryString = null; 1015 try { 1016 in = fc.open(qPath); 1017 jobSummaryString = in.readUTF(); 1018 } finally { 1019 if (in != null) { 1020 in.close(); 1021 } 1022 } 1023 return jobSummaryString; 1024 } 1025 1026 private void makeDoneSubdir(Path path) throws IOException { 1027 try { 1028 doneDirFc.getFileStatus(path); 1029 existingDoneSubdirs.add(path); 1030 } catch (FileNotFoundException fnfE) { 1031 try { 1032 FsPermission fsp = new FsPermission( 1033 JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION); 1034 doneDirFc.mkdir(path, fsp, true); 1035 FileStatus fsStatus = doneDirFc.getFileStatus(path); 1036 LOG.info("Perms after creating " + fsStatus.getPermission().toShort() 1037 + ", Expected: " + fsp.toShort()); 1038 if (fsStatus.getPermission().toShort() != fsp.toShort()) { 1039 LOG.info("Explicitly setting permissions to : " + fsp.toShort() 1040 + ", " + fsp); 1041 doneDirFc.setPermission(path, fsp); 1042 } 1043 existingDoneSubdirs.add(path); 1044 } catch (FileAlreadyExistsException faeE) { // Nothing to do. 1045 } 1046 } 1047 } 1048 1049 private Path canonicalHistoryLogPath(JobId id, String timestampComponent) { 1050 return new Path(doneDirPrefixPath, JobHistoryUtils.historyLogSubdirectory( 1051 id, timestampComponent, serialNumberFormat)); 1052 } 1053 1054 private Path canonicalHistoryLogPath(JobId id, long millisecondTime) { 1055 String timestampComponent = JobHistoryUtils 1056 .timestampDirectoryComponent(millisecondTime); 1057 return new Path(doneDirPrefixPath, JobHistoryUtils.historyLogSubdirectory( 1058 id, timestampComponent, serialNumberFormat)); 1059 } 1060 1061 private long getEffectiveTimestamp(long finishTime, FileStatus fileStatus) { 1062 if (finishTime == 0) { 1063 return fileStatus.getModificationTime(); 1064 } 1065 return finishTime; 1066 } 1067 1068 private void deleteJobFromDone(HistoryFileInfo fileInfo) throws IOException { 1069 jobListCache.delete(fileInfo); 1070 fileInfo.delete(); 1071 } 1072 1073 List<FileStatus> getHistoryDirsForCleaning(long cutoff) throws IOException { 1074 return JobHistoryUtils. 1075 getHistoryDirsForCleaning(doneDirFc, doneDirPrefixPath, cutoff); 1076 } 1077 1078 /** 1079 * Clean up older history files. 1080 * 1081 * @throws IOException 1082 * on any error trying to remove the entries. 1083 */ 1084 @SuppressWarnings("unchecked") 1085 void clean() throws IOException { 1086 long cutoff = System.currentTimeMillis() - maxHistoryAge; 1087 boolean halted = false; 1088 List<FileStatus> serialDirList = getHistoryDirsForCleaning(cutoff); 1089 // Sort in ascending order. Relies on YYYY/MM/DD/Serial 1090 Collections.sort(serialDirList); 1091 for (FileStatus serialDir : serialDirList) { 1092 List<FileStatus> historyFileList = scanDirectoryForHistoryFiles( 1093 serialDir.getPath(), doneDirFc); 1094 for (FileStatus historyFile : historyFileList) { 1095 JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(historyFile 1096 .getPath().getName()); 1097 long effectiveTimestamp = getEffectiveTimestamp( 1098 jobIndexInfo.getFinishTime(), historyFile); 1099 if (effectiveTimestamp <= cutoff) { 1100 HistoryFileInfo fileInfo = this.jobListCache.get(jobIndexInfo 1101 .getJobId()); 1102 if (fileInfo == null) { 1103 String confFileName = JobHistoryUtils 1104 .getIntermediateConfFileName(jobIndexInfo.getJobId()); 1105 1106 fileInfo = new HistoryFileInfo(historyFile.getPath(), new Path( 1107 historyFile.getPath().getParent(), confFileName), null, 1108 jobIndexInfo, true); 1109 } 1110 deleteJobFromDone(fileInfo); 1111 } else { 1112 halted = true; 1113 break; 1114 } 1115 } 1116 if (!halted) { 1117 deleteDir(serialDir); 1118 removeDirectoryFromSerialNumberIndex(serialDir.getPath()); 1119 existingDoneSubdirs.remove(serialDir.getPath()); 1120 } else { 1121 break; // Don't scan any more directories. 1122 } 1123 } 1124 } 1125 1126 protected boolean deleteDir(FileStatus serialDir) 1127 throws AccessControlException, FileNotFoundException, 1128 UnsupportedFileSystemException, IOException { 1129 return doneDirFc.delete(doneDirFc.makeQualified(serialDir.getPath()), true); 1130 } 1131 1132 @VisibleForTesting 1133 protected void setMaxHistoryAge(long newValue){ 1134 maxHistoryAge=newValue; 1135 } 1136}