001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce.v2.hs; 020 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.net.ConnectException; 024import java.util.ArrayList; 025import java.util.Collection; 026import java.util.Collections; 027import java.util.HashSet; 028import java.util.Iterator; 029import java.util.List; 030import java.util.NavigableSet; 031import java.util.Set; 032import java.util.SortedMap; 033import java.util.TreeMap; 034import java.util.concurrent.ConcurrentHashMap; 035import java.util.concurrent.ConcurrentMap; 036import java.util.concurrent.ConcurrentSkipListMap; 037import java.util.concurrent.LinkedBlockingQueue; 038import java.util.concurrent.ThreadFactory; 039import java.util.concurrent.ThreadPoolExecutor; 040import java.util.concurrent.TimeUnit; 041import java.util.concurrent.atomic.AtomicInteger; 042 043import org.apache.commons.logging.Log; 044import org.apache.commons.logging.LogFactory; 045import org.apache.hadoop.classification.InterfaceAudience; 046import org.apache.hadoop.classification.InterfaceStability; 047import org.apache.hadoop.conf.Configuration; 048import org.apache.hadoop.fs.FSDataInputStream; 049import org.apache.hadoop.fs.FileAlreadyExistsException; 050import org.apache.hadoop.fs.FileContext; 051import org.apache.hadoop.fs.FileStatus; 052import org.apache.hadoop.fs.Options; 053import org.apache.hadoop.fs.Path; 054import org.apache.hadoop.fs.PathFilter; 055import org.apache.hadoop.fs.RemoteIterator; 056import org.apache.hadoop.fs.UnsupportedFileSystemException; 057import org.apache.hadoop.fs.permission.FsPermission; 058import org.apache.hadoop.mapred.JobACLsManager; 059import org.apache.hadoop.mapreduce.jobhistory.JobSummary; 060import org.apache.hadoop.mapreduce.v2.api.records.JobId; 061import org.apache.hadoop.mapreduce.v2.app.job.Job; 062import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils; 063import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; 064import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; 065import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo; 066import org.apache.hadoop.security.AccessControlException; 067import org.apache.hadoop.service.AbstractService; 068import org.apache.hadoop.util.ShutdownThreadsHelper; 069import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; 070 071import com.google.common.annotations.VisibleForTesting; 072import com.google.common.util.concurrent.ThreadFactoryBuilder; 073import org.apache.hadoop.yarn.util.Clock; 074import org.apache.hadoop.yarn.util.SystemClock; 075 076/** 077 * This class provides a way to interact with history files in a thread safe 078 * manor. 079 */ 080@InterfaceAudience.Public 081@InterfaceStability.Unstable 082public class HistoryFileManager extends AbstractService { 083 private static final Log LOG = LogFactory.getLog(HistoryFileManager.class); 084 private static final Log SUMMARY_LOG = LogFactory.getLog(JobSummary.class); 085 086 private static enum HistoryInfoState { 087 IN_INTERMEDIATE, IN_DONE, DELETED, MOVE_FAILED 088 }; 089 090 private static String DONE_BEFORE_SERIAL_TAIL = JobHistoryUtils 091 .doneSubdirsBeforeSerialTail(); 092 093 /** 094 * Maps between a serial number (generated based on jobId) and the timestamp 095 * component(s) to which it belongs. Facilitates jobId based searches. If a 096 * jobId is not found in this list - it will not be found. 097 */ 098 private static class SerialNumberIndex { 099 private SortedMap<String, Set<String>> cache; 100 private int maxSize; 101 102 public SerialNumberIndex(int maxSize) { 103 this.cache = new TreeMap<String, Set<String>>(); 104 this.maxSize = maxSize; 105 } 106 107 public synchronized void add(String serialPart, String timestampPart) { 108 if (!cache.containsKey(serialPart)) { 109 cache.put(serialPart, new HashSet<String>()); 110 if (cache.size() > maxSize) { 111 String key = cache.firstKey(); 112 LOG.error("Dropping " + key 113 + " from the SerialNumberIndex. We will no " 114 + "longer be able to see jobs that are in that serial index for " 115 + cache.get(key)); 116 cache.remove(key); 117 } 118 } 119 Set<String> datePartSet = cache.get(serialPart); 120 datePartSet.add(timestampPart); 121 } 122 123 public synchronized void remove(String serialPart, String timeStampPart) { 124 if (cache.containsKey(serialPart)) { 125 Set<String> set = cache.get(serialPart); 126 set.remove(timeStampPart); 127 if (set.isEmpty()) { 128 cache.remove(serialPart); 129 } 130 } 131 } 132 133 public synchronized Set<String> get(String serialPart) { 134 Set<String> found = cache.get(serialPart); 135 if (found != null) { 136 return new HashSet<String>(found); 137 } 138 return null; 139 } 140 } 141 142 /** 143 * Wrapper around {@link ConcurrentSkipListMap} that maintains size along 144 * side for O(1) size() implementation for use in JobListCache. 145 * 146 * Note: The size is not updated atomically with changes additions/removals. 147 * This race can lead to size() returning an incorrect size at times. 148 */ 149 static class JobIdHistoryFileInfoMap { 150 private ConcurrentSkipListMap<JobId, HistoryFileInfo> cache; 151 private AtomicInteger mapSize; 152 153 JobIdHistoryFileInfoMap() { 154 cache = new ConcurrentSkipListMap<JobId, HistoryFileInfo>(); 155 mapSize = new AtomicInteger(); 156 } 157 158 public HistoryFileInfo putIfAbsent(JobId key, HistoryFileInfo value) { 159 HistoryFileInfo ret = cache.putIfAbsent(key, value); 160 if (ret == null) { 161 mapSize.incrementAndGet(); 162 } 163 return ret; 164 } 165 166 public HistoryFileInfo remove(JobId key) { 167 HistoryFileInfo ret = cache.remove(key); 168 if (ret != null) { 169 mapSize.decrementAndGet(); 170 } 171 return ret; 172 } 173 174 /** 175 * Returns the recorded size of the internal map. Note that this could be out 176 * of sync with the actual size of the map 177 * @return "recorded" size 178 */ 179 public int size() { 180 return mapSize.get(); 181 } 182 183 public HistoryFileInfo get(JobId key) { 184 return cache.get(key); 185 } 186 187 public NavigableSet<JobId> navigableKeySet() { 188 return cache.navigableKeySet(); 189 } 190 191 public Collection<HistoryFileInfo> values() { 192 return cache.values(); 193 } 194 } 195 196 static class JobListCache { 197 private JobIdHistoryFileInfoMap cache; 198 private int maxSize; 199 private long maxAge; 200 201 public JobListCache(int maxSize, long maxAge) { 202 this.maxSize = maxSize; 203 this.maxAge = maxAge; 204 this.cache = new JobIdHistoryFileInfoMap(); 205 } 206 207 public HistoryFileInfo addIfAbsent(HistoryFileInfo fileInfo) { 208 JobId jobId = fileInfo.getJobId(); 209 if (LOG.isDebugEnabled()) { 210 LOG.debug("Adding " + jobId + " to job list cache with " 211 + fileInfo.getJobIndexInfo()); 212 } 213 HistoryFileInfo old = cache.putIfAbsent(jobId, fileInfo); 214 if (cache.size() > maxSize) { 215 //There is a race here, where more then one thread could be trying to 216 // remove entries. This could result in too many entries being removed 217 // from the cache. This is considered OK as the size of the cache 218 // should be rather large, and we would rather have performance over 219 // keeping the cache size exactly at the maximum. 220 Iterator<JobId> keys = cache.navigableKeySet().iterator(); 221 long cutoff = System.currentTimeMillis() - maxAge; 222 while(cache.size() > maxSize && keys.hasNext()) { 223 JobId key = keys.next(); 224 HistoryFileInfo firstValue = cache.get(key); 225 if(firstValue != null) { 226 synchronized(firstValue) { 227 if (firstValue.isMovePending()) { 228 if(firstValue.didMoveFail() && 229 firstValue.jobIndexInfo.getFinishTime() <= cutoff) { 230 cache.remove(key); 231 //Now lets try to delete it 232 try { 233 firstValue.delete(); 234 } catch (IOException e) { 235 LOG.error("Error while trying to delete history files" + 236 " that could not be moved to done.", e); 237 } 238 } else { 239 LOG.warn("Waiting to remove " + key 240 + " from JobListCache because it is not in done yet."); 241 } 242 } else { 243 cache.remove(key); 244 } 245 } 246 } 247 } 248 } 249 return old; 250 } 251 252 public void delete(HistoryFileInfo fileInfo) { 253 if (LOG.isDebugEnabled()) { 254 LOG.debug("Removing from cache " + fileInfo); 255 } 256 cache.remove(fileInfo.getJobId()); 257 } 258 259 public Collection<HistoryFileInfo> values() { 260 return new ArrayList<HistoryFileInfo>(cache.values()); 261 } 262 263 public HistoryFileInfo get(JobId jobId) { 264 return cache.get(jobId); 265 } 266 267 public boolean isFull() { 268 return cache.size() >= maxSize; 269 } 270 } 271 272 /** 273 * This class represents a user dir in the intermediate done directory. This 274 * is mostly for locking purposes. 275 */ 276 private class UserLogDir { 277 long modTime = 0; 278 279 public synchronized void scanIfNeeded(FileStatus fs) { 280 long newModTime = fs.getModificationTime(); 281 if (modTime != newModTime) { 282 Path p = fs.getPath(); 283 try { 284 scanIntermediateDirectory(p); 285 //If scanning fails, we will scan again. We assume the failure is 286 // temporary. 287 modTime = newModTime; 288 } catch (IOException e) { 289 LOG.error("Error while trying to scan the directory " + p, e); 290 } 291 } else { 292 if (LOG.isDebugEnabled()) { 293 LOG.debug("Scan not needed of " + fs.getPath()); 294 } 295 } 296 } 297 } 298 299 public class HistoryFileInfo { 300 private Path historyFile; 301 private Path confFile; 302 private Path summaryFile; 303 private JobIndexInfo jobIndexInfo; 304 private HistoryInfoState state; 305 306 private HistoryFileInfo(Path historyFile, Path confFile, Path summaryFile, 307 JobIndexInfo jobIndexInfo, boolean isInDone) { 308 this.historyFile = historyFile; 309 this.confFile = confFile; 310 this.summaryFile = summaryFile; 311 this.jobIndexInfo = jobIndexInfo; 312 state = isInDone ? HistoryInfoState.IN_DONE 313 : HistoryInfoState.IN_INTERMEDIATE; 314 } 315 316 @VisibleForTesting 317 synchronized boolean isMovePending() { 318 return state == HistoryInfoState.IN_INTERMEDIATE 319 || state == HistoryInfoState.MOVE_FAILED; 320 } 321 322 @VisibleForTesting 323 synchronized boolean didMoveFail() { 324 return state == HistoryInfoState.MOVE_FAILED; 325 } 326 327 /** 328 * @return true if the files backed by this were deleted. 329 */ 330 public synchronized boolean isDeleted() { 331 return state == HistoryInfoState.DELETED; 332 } 333 334 @Override 335 public String toString() { 336 return "HistoryFileInfo jobID " + getJobId() 337 + " historyFile = " + historyFile; 338 } 339 340 private synchronized void moveToDone() throws IOException { 341 if (LOG.isDebugEnabled()) { 342 LOG.debug("moveToDone: " + historyFile); 343 } 344 if (!isMovePending()) { 345 // It was either deleted or is already in done. Either way do nothing 346 if (LOG.isDebugEnabled()) { 347 LOG.debug("Move no longer pending"); 348 } 349 return; 350 } 351 try { 352 long completeTime = jobIndexInfo.getFinishTime(); 353 if (completeTime == 0) { 354 completeTime = System.currentTimeMillis(); 355 } 356 JobId jobId = jobIndexInfo.getJobId(); 357 358 List<Path> paths = new ArrayList<Path>(2); 359 if (historyFile == null) { 360 LOG.info("No file for job-history with " + jobId + " found in cache!"); 361 } else { 362 paths.add(historyFile); 363 } 364 365 if (confFile == null) { 366 LOG.info("No file for jobConf with " + jobId + " found in cache!"); 367 } else { 368 paths.add(confFile); 369 } 370 371 if (summaryFile == null) { 372 LOG.info("No summary file for job: " + jobId); 373 } else { 374 String jobSummaryString = getJobSummary(intermediateDoneDirFc, 375 summaryFile); 376 SUMMARY_LOG.info(jobSummaryString); 377 LOG.info("Deleting JobSummary file: [" + summaryFile + "]"); 378 intermediateDoneDirFc.delete(summaryFile, false); 379 summaryFile = null; 380 } 381 382 Path targetDir = canonicalHistoryLogPath(jobId, completeTime); 383 addDirectoryToSerialNumberIndex(targetDir); 384 makeDoneSubdir(targetDir); 385 if (historyFile != null) { 386 Path toPath = doneDirFc.makeQualified(new Path(targetDir, historyFile 387 .getName())); 388 if (!toPath.equals(historyFile)) { 389 moveToDoneNow(historyFile, toPath); 390 historyFile = toPath; 391 } 392 } 393 if (confFile != null) { 394 Path toPath = doneDirFc.makeQualified(new Path(targetDir, confFile 395 .getName())); 396 if (!toPath.equals(confFile)) { 397 moveToDoneNow(confFile, toPath); 398 confFile = toPath; 399 } 400 } 401 state = HistoryInfoState.IN_DONE; 402 } catch (Throwable t) { 403 LOG.error("Error while trying to move a job to done", t); 404 this.state = HistoryInfoState.MOVE_FAILED; 405 } 406 } 407 408 /** 409 * Parse a job from the JobHistoryFile, if the underlying file is not going 410 * to be deleted. 411 * 412 * @return the Job or null if the underlying file was deleted. 413 * @throws IOException 414 * if there is an error trying to read the file. 415 */ 416 public synchronized Job loadJob() throws IOException { 417 return new CompletedJob(conf, jobIndexInfo.getJobId(), historyFile, 418 false, jobIndexInfo.getUser(), this, aclsMgr); 419 } 420 421 /** 422 * Return the history file. This should only be used for testing. 423 * @return the history file. 424 */ 425 synchronized Path getHistoryFile() { 426 return historyFile; 427 } 428 429 protected synchronized void delete() throws IOException { 430 if (LOG.isDebugEnabled()) { 431 LOG.debug("deleting " + historyFile + " and " + confFile); 432 } 433 state = HistoryInfoState.DELETED; 434 doneDirFc.delete(doneDirFc.makeQualified(historyFile), false); 435 doneDirFc.delete(doneDirFc.makeQualified(confFile), false); 436 } 437 438 public JobIndexInfo getJobIndexInfo() { 439 return jobIndexInfo; 440 } 441 442 public JobId getJobId() { 443 return jobIndexInfo.getJobId(); 444 } 445 446 public synchronized Path getConfFile() { 447 return confFile; 448 } 449 450 public synchronized Configuration loadConfFile() throws IOException { 451 FileContext fc = FileContext.getFileContext(confFile.toUri(), conf); 452 Configuration jobConf = new Configuration(false); 453 jobConf.addResource(fc.open(confFile), confFile.toString()); 454 return jobConf; 455 } 456 } 457 458 private SerialNumberIndex serialNumberIndex = null; 459 protected JobListCache jobListCache = null; 460 461 // Maintains a list of known done subdirectories. 462 private final Set<Path> existingDoneSubdirs = Collections 463 .synchronizedSet(new HashSet<Path>()); 464 465 /** 466 * Maintains a mapping between intermediate user directories and the last 467 * known modification time. 468 */ 469 private ConcurrentMap<String, UserLogDir> userDirModificationTimeMap = 470 new ConcurrentHashMap<String, UserLogDir>(); 471 472 private JobACLsManager aclsMgr; 473 474 @VisibleForTesting 475 Configuration conf; 476 477 private String serialNumberFormat; 478 479 private Path doneDirPrefixPath = null; // folder for completed jobs 480 private FileContext doneDirFc; // done Dir FileContext 481 482 private Path intermediateDoneDirPath = null; // Intermediate Done Dir Path 483 private FileContext intermediateDoneDirFc; // Intermediate Done Dir 484 // FileContext 485 @VisibleForTesting 486 protected ThreadPoolExecutor moveToDoneExecutor = null; 487 private long maxHistoryAge = 0; 488 489 public HistoryFileManager() { 490 super(HistoryFileManager.class.getName()); 491 } 492 493 @Override 494 protected void serviceInit(Configuration conf) throws Exception { 495 this.conf = conf; 496 497 int serialNumberLowDigits = 3; 498 serialNumberFormat = ("%0" 499 + (JobHistoryUtils.SERIAL_NUMBER_DIRECTORY_DIGITS + serialNumberLowDigits) 500 + "d"); 501 502 long maxFSWaitTime = conf.getLong( 503 JHAdminConfig.MR_HISTORY_MAX_START_WAIT_TIME, 504 JHAdminConfig.DEFAULT_MR_HISTORY_MAX_START_WAIT_TIME); 505 createHistoryDirs(new SystemClock(), 10 * 1000, maxFSWaitTime); 506 507 this.aclsMgr = new JobACLsManager(conf); 508 509 maxHistoryAge = conf.getLong(JHAdminConfig.MR_HISTORY_MAX_AGE_MS, 510 JHAdminConfig.DEFAULT_MR_HISTORY_MAX_AGE); 511 512 jobListCache = createJobListCache(); 513 514 serialNumberIndex = new SerialNumberIndex(conf.getInt( 515 JHAdminConfig.MR_HISTORY_DATESTRING_CACHE_SIZE, 516 JHAdminConfig.DEFAULT_MR_HISTORY_DATESTRING_CACHE_SIZE)); 517 518 int numMoveThreads = conf.getInt( 519 JHAdminConfig.MR_HISTORY_MOVE_THREAD_COUNT, 520 JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_THREAD_COUNT); 521 ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat( 522 "MoveIntermediateToDone Thread #%d").build(); 523 moveToDoneExecutor = new ThreadPoolExecutor(numMoveThreads, numMoveThreads, 524 1, TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf); 525 526 super.serviceInit(conf); 527 } 528 529 @VisibleForTesting 530 void createHistoryDirs(Clock clock, long intervalCheckMillis, 531 long timeOutMillis) throws IOException { 532 long start = clock.getTime(); 533 boolean done = false; 534 int counter = 0; 535 while (!done && 536 ((timeOutMillis == -1) || (clock.getTime() - start < timeOutMillis))) { 537 done = tryCreatingHistoryDirs(counter++ % 3 == 0); // log every 3 attempts, 30sec 538 try { 539 Thread.sleep(intervalCheckMillis); 540 } catch (InterruptedException ex) { 541 throw new YarnRuntimeException(ex); 542 } 543 } 544 if (!done) { 545 throw new YarnRuntimeException("Timed out '" + timeOutMillis+ 546 "ms' waiting for FileSystem to become available"); 547 } 548 } 549 550 /** 551 * DistributedFileSystem returns a RemoteException with a message stating 552 * SafeModeException in it. So this is only way to check it is because of 553 * being in safe mode. 554 */ 555 private boolean isBecauseSafeMode(Throwable ex) { 556 return ex.toString().contains("SafeModeException"); 557 } 558 559 /** 560 * Returns TRUE if the history dirs were created, FALSE if they could not 561 * be created because the FileSystem is not reachable or in safe mode and 562 * throws and exception otherwise. 563 */ 564 @VisibleForTesting 565 boolean tryCreatingHistoryDirs(boolean logWait) throws IOException { 566 boolean succeeded = true; 567 String doneDirPrefix = JobHistoryUtils. 568 getConfiguredHistoryServerDoneDirPrefix(conf); 569 try { 570 doneDirPrefixPath = FileContext.getFileContext(conf).makeQualified( 571 new Path(doneDirPrefix)); 572 doneDirFc = FileContext.getFileContext(doneDirPrefixPath.toUri(), conf); 573 doneDirFc.setUMask(JobHistoryUtils.HISTORY_DONE_DIR_UMASK); 574 mkdir(doneDirFc, doneDirPrefixPath, new FsPermission( 575 JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION)); 576 } catch (ConnectException ex) { 577 if (logWait) { 578 LOG.info("Waiting for FileSystem at " + 579 doneDirPrefixPath.toUri().getAuthority() + "to be available"); 580 } 581 succeeded = false; 582 } catch (IOException e) { 583 if (isBecauseSafeMode(e)) { 584 succeeded = false; 585 if (logWait) { 586 LOG.info("Waiting for FileSystem at " + 587 doneDirPrefixPath.toUri().getAuthority() + 588 "to be out of safe mode"); 589 } 590 } else { 591 throw new YarnRuntimeException("Error creating done directory: [" 592 + doneDirPrefixPath + "]", e); 593 } 594 } 595 if (succeeded) { 596 String intermediateDoneDirPrefix = JobHistoryUtils. 597 getConfiguredHistoryIntermediateDoneDirPrefix(conf); 598 try { 599 intermediateDoneDirPath = FileContext.getFileContext(conf).makeQualified( 600 new Path(intermediateDoneDirPrefix)); 601 intermediateDoneDirFc = FileContext.getFileContext( 602 intermediateDoneDirPath.toUri(), conf); 603 mkdir(intermediateDoneDirFc, intermediateDoneDirPath, new FsPermission( 604 JobHistoryUtils.HISTORY_INTERMEDIATE_DONE_DIR_PERMISSIONS.toShort())); 605 } catch (ConnectException ex) { 606 succeeded = false; 607 if (logWait) { 608 LOG.info("Waiting for FileSystem at " + 609 intermediateDoneDirPath.toUri().getAuthority() + 610 "to be available"); 611 } 612 } catch (IOException e) { 613 if (isBecauseSafeMode(e)) { 614 succeeded = false; 615 if (logWait) { 616 LOG.info("Waiting for FileSystem at " + 617 intermediateDoneDirPath.toUri().getAuthority() + 618 "to be out of safe mode"); 619 } 620 } else { 621 throw new YarnRuntimeException( 622 "Error creating intermediate done directory: [" 623 + intermediateDoneDirPath + "]", e); 624 } 625 } 626 } 627 return succeeded; 628 } 629 630 @Override 631 public void serviceStop() throws Exception { 632 ShutdownThreadsHelper.shutdownExecutorService(moveToDoneExecutor); 633 super.serviceStop(); 634 } 635 636 protected JobListCache createJobListCache() { 637 return new JobListCache(conf.getInt( 638 JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE, 639 JHAdminConfig.DEFAULT_MR_HISTORY_JOBLIST_CACHE_SIZE), maxHistoryAge); 640 } 641 642 private void mkdir(FileContext fc, Path path, FsPermission fsp) 643 throws IOException { 644 if (!fc.util().exists(path)) { 645 try { 646 fc.mkdir(path, fsp, true); 647 648 FileStatus fsStatus = fc.getFileStatus(path); 649 LOG.info("Perms after creating " + fsStatus.getPermission().toShort() 650 + ", Expected: " + fsp.toShort()); 651 if (fsStatus.getPermission().toShort() != fsp.toShort()) { 652 LOG.info("Explicitly setting permissions to : " + fsp.toShort() 653 + ", " + fsp); 654 fc.setPermission(path, fsp); 655 } 656 } catch (FileAlreadyExistsException e) { 657 LOG.info("Directory: [" + path + "] already exists."); 658 } 659 } 660 } 661 662 /** 663 * Populates index data structures. Should only be called at initialization 664 * times. 665 */ 666 @SuppressWarnings("unchecked") 667 void initExisting() throws IOException { 668 LOG.info("Initializing Existing Jobs..."); 669 List<FileStatus> timestampedDirList = findTimestampedDirectories(); 670 // Sort first just so insertion is in a consistent order 671 Collections.sort(timestampedDirList); 672 for (FileStatus fs : timestampedDirList) { 673 // TODO Could verify the correct format for these directories. 674 addDirectoryToSerialNumberIndex(fs.getPath()); 675 } 676 for (int i= timestampedDirList.size() - 1; 677 i >= 0 && !jobListCache.isFull(); i--) { 678 FileStatus fs = timestampedDirList.get(i); 679 addDirectoryToJobListCache(fs.getPath()); 680 } 681 } 682 683 private void removeDirectoryFromSerialNumberIndex(Path serialDirPath) { 684 String serialPart = serialDirPath.getName(); 685 String timeStampPart = JobHistoryUtils 686 .getTimestampPartFromPath(serialDirPath.toString()); 687 if (timeStampPart == null) { 688 LOG.warn("Could not find timestamp portion from path: " 689 + serialDirPath.toString() + ". Continuing with next"); 690 return; 691 } 692 if (serialPart == null) { 693 LOG.warn("Could not find serial portion from path: " 694 + serialDirPath.toString() + ". Continuing with next"); 695 return; 696 } 697 serialNumberIndex.remove(serialPart, timeStampPart); 698 } 699 700 private void addDirectoryToSerialNumberIndex(Path serialDirPath) { 701 if (LOG.isDebugEnabled()) { 702 LOG.debug("Adding " + serialDirPath + " to serial index"); 703 } 704 String serialPart = serialDirPath.getName(); 705 String timestampPart = JobHistoryUtils 706 .getTimestampPartFromPath(serialDirPath.toString()); 707 if (timestampPart == null) { 708 LOG.warn("Could not find timestamp portion from path: " + serialDirPath 709 + ". Continuing with next"); 710 return; 711 } 712 if (serialPart == null) { 713 LOG.warn("Could not find serial portion from path: " 714 + serialDirPath.toString() + ". Continuing with next"); 715 } else { 716 serialNumberIndex.add(serialPart, timestampPart); 717 } 718 } 719 720 private void addDirectoryToJobListCache(Path path) throws IOException { 721 if (LOG.isDebugEnabled()) { 722 LOG.debug("Adding " + path + " to job list cache."); 723 } 724 List<FileStatus> historyFileList = scanDirectoryForHistoryFiles(path, 725 doneDirFc); 726 for (FileStatus fs : historyFileList) { 727 if (LOG.isDebugEnabled()) { 728 LOG.debug("Adding in history for " + fs.getPath()); 729 } 730 JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath() 731 .getName()); 732 String confFileName = JobHistoryUtils 733 .getIntermediateConfFileName(jobIndexInfo.getJobId()); 734 String summaryFileName = JobHistoryUtils 735 .getIntermediateSummaryFileName(jobIndexInfo.getJobId()); 736 HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(fs 737 .getPath().getParent(), confFileName), new Path(fs.getPath() 738 .getParent(), summaryFileName), jobIndexInfo, true); 739 jobListCache.addIfAbsent(fileInfo); 740 } 741 } 742 743 @VisibleForTesting 744 protected static List<FileStatus> scanDirectory(Path path, FileContext fc, 745 PathFilter pathFilter) throws IOException { 746 path = fc.makeQualified(path); 747 List<FileStatus> jhStatusList = new ArrayList<FileStatus>(); 748 try { 749 RemoteIterator<FileStatus> fileStatusIter = fc.listStatus(path); 750 while (fileStatusIter.hasNext()) { 751 FileStatus fileStatus = fileStatusIter.next(); 752 Path filePath = fileStatus.getPath(); 753 if (fileStatus.isFile() && pathFilter.accept(filePath)) { 754 jhStatusList.add(fileStatus); 755 } 756 } 757 } catch (FileNotFoundException fe) { 758 LOG.error("Error while scanning directory " + path, fe); 759 } 760 return jhStatusList; 761 } 762 763 protected List<FileStatus> scanDirectoryForHistoryFiles(Path path, 764 FileContext fc) throws IOException { 765 return scanDirectory(path, fc, JobHistoryUtils.getHistoryFileFilter()); 766 } 767 768 /** 769 * Finds all history directories with a timestamp component by scanning the 770 * filesystem. Used when the JobHistory server is started. 771 * 772 * @return list of history directories 773 */ 774 protected List<FileStatus> findTimestampedDirectories() throws IOException { 775 List<FileStatus> fsList = JobHistoryUtils.localGlobber(doneDirFc, 776 doneDirPrefixPath, DONE_BEFORE_SERIAL_TAIL); 777 return fsList; 778 } 779 780 /** 781 * Scans the intermediate directory to find user directories. Scans these for 782 * history files if the modification time for the directory has changed. Once 783 * it finds history files it starts the process of moving them to the done 784 * directory. 785 * 786 * @throws IOException 787 * if there was a error while scanning 788 */ 789 void scanIntermediateDirectory() throws IOException { 790 // TODO it would be great to limit how often this happens, except in the 791 // case where we are looking for a particular job. 792 List<FileStatus> userDirList = JobHistoryUtils.localGlobber( 793 intermediateDoneDirFc, intermediateDoneDirPath, ""); 794 LOG.debug("Scanning intermediate dirs"); 795 for (FileStatus userDir : userDirList) { 796 String name = userDir.getPath().getName(); 797 UserLogDir dir = userDirModificationTimeMap.get(name); 798 if(dir == null) { 799 dir = new UserLogDir(); 800 UserLogDir old = userDirModificationTimeMap.putIfAbsent(name, dir); 801 if(old != null) { 802 dir = old; 803 } 804 } 805 dir.scanIfNeeded(userDir); 806 } 807 } 808 809 /** 810 * Scans the specified path and populates the intermediate cache. 811 * 812 * @param absPath 813 * @throws IOException 814 */ 815 private void scanIntermediateDirectory(final Path absPath) throws IOException { 816 if (LOG.isDebugEnabled()) { 817 LOG.debug("Scanning intermediate dir " + absPath); 818 } 819 List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(absPath, 820 intermediateDoneDirFc); 821 if (LOG.isDebugEnabled()) { 822 LOG.debug("Found " + fileStatusList.size() + " files"); 823 } 824 for (FileStatus fs : fileStatusList) { 825 if (LOG.isDebugEnabled()) { 826 LOG.debug("scanning file: "+ fs.getPath()); 827 } 828 JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath() 829 .getName()); 830 String confFileName = JobHistoryUtils 831 .getIntermediateConfFileName(jobIndexInfo.getJobId()); 832 String summaryFileName = JobHistoryUtils 833 .getIntermediateSummaryFileName(jobIndexInfo.getJobId()); 834 HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(fs 835 .getPath().getParent(), confFileName), new Path(fs.getPath() 836 .getParent(), summaryFileName), jobIndexInfo, false); 837 838 final HistoryFileInfo old = jobListCache.addIfAbsent(fileInfo); 839 if (old == null || old.didMoveFail()) { 840 final HistoryFileInfo found = (old == null) ? fileInfo : old; 841 long cutoff = System.currentTimeMillis() - maxHistoryAge; 842 if(found.getJobIndexInfo().getFinishTime() <= cutoff) { 843 try { 844 found.delete(); 845 } catch (IOException e) { 846 LOG.warn("Error cleaning up a HistoryFile that is out of date.", e); 847 } 848 } else { 849 if (LOG.isDebugEnabled()) { 850 LOG.debug("Scheduling move to done of " +found); 851 } 852 moveToDoneExecutor.execute(new Runnable() { 853 @Override 854 public void run() { 855 try { 856 found.moveToDone(); 857 } catch (IOException e) { 858 LOG.info("Failed to process fileInfo for job: " + 859 found.getJobId(), e); 860 } 861 } 862 }); 863 } 864 } else if (!old.isMovePending()) { 865 //This is a duplicate so just delete it 866 if (LOG.isDebugEnabled()) { 867 LOG.debug("Duplicate: deleting"); 868 } 869 fileInfo.delete(); 870 } 871 } 872 } 873 874 /** 875 * Searches the job history file FileStatus list for the specified JobId. 876 * 877 * @param fileStatusList 878 * fileStatus list of Job History Files. 879 * @param jobId 880 * The JobId to find. 881 * @return A FileInfo object for the jobId, null if not found. 882 * @throws IOException 883 */ 884 private HistoryFileInfo getJobFileInfo(List<FileStatus> fileStatusList, 885 JobId jobId) throws IOException { 886 for (FileStatus fs : fileStatusList) { 887 JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath() 888 .getName()); 889 if (jobIndexInfo.getJobId().equals(jobId)) { 890 String confFileName = JobHistoryUtils 891 .getIntermediateConfFileName(jobIndexInfo.getJobId()); 892 String summaryFileName = JobHistoryUtils 893 .getIntermediateSummaryFileName(jobIndexInfo.getJobId()); 894 HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path( 895 fs.getPath().getParent(), confFileName), new Path(fs.getPath() 896 .getParent(), summaryFileName), jobIndexInfo, true); 897 return fileInfo; 898 } 899 } 900 return null; 901 } 902 903 /** 904 * Scans old directories known by the idToDateString map for the specified 905 * jobId. If the number of directories is higher than the supported size of 906 * the idToDateString cache, the jobId will not be found. 907 * 908 * @param jobId 909 * the jobId. 910 * @return 911 * @throws IOException 912 */ 913 private HistoryFileInfo scanOldDirsForJob(JobId jobId) throws IOException { 914 String boxedSerialNumber = JobHistoryUtils.serialNumberDirectoryComponent( 915 jobId, serialNumberFormat); 916 Set<String> dateStringSet = serialNumberIndex.get(boxedSerialNumber); 917 if (dateStringSet == null) { 918 return null; 919 } 920 for (String timestampPart : dateStringSet) { 921 Path logDir = canonicalHistoryLogPath(jobId, timestampPart); 922 List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(logDir, 923 doneDirFc); 924 HistoryFileInfo fileInfo = getJobFileInfo(fileStatusList, jobId); 925 if (fileInfo != null) { 926 return fileInfo; 927 } 928 } 929 return null; 930 } 931 932 public Collection<HistoryFileInfo> getAllFileInfo() throws IOException { 933 scanIntermediateDirectory(); 934 return jobListCache.values(); 935 } 936 937 public HistoryFileInfo getFileInfo(JobId jobId) throws IOException { 938 // FileInfo available in cache. 939 HistoryFileInfo fileInfo = jobListCache.get(jobId); 940 if (fileInfo != null) { 941 return fileInfo; 942 } 943 // OK so scan the intermediate to be sure we did not lose it that way 944 scanIntermediateDirectory(); 945 fileInfo = jobListCache.get(jobId); 946 if (fileInfo != null) { 947 return fileInfo; 948 } 949 950 // Intermediate directory does not contain job. Search through older ones. 951 fileInfo = scanOldDirsForJob(jobId); 952 if (fileInfo != null) { 953 return fileInfo; 954 } 955 return null; 956 } 957 958 private void moveToDoneNow(final Path src, final Path target) 959 throws IOException { 960 LOG.info("Moving " + src.toString() + " to " + target.toString()); 961 intermediateDoneDirFc.rename(src, target, Options.Rename.NONE); 962 } 963 964 private String getJobSummary(FileContext fc, Path path) throws IOException { 965 Path qPath = fc.makeQualified(path); 966 FSDataInputStream in = fc.open(qPath); 967 String jobSummaryString = in.readUTF(); 968 in.close(); 969 return jobSummaryString; 970 } 971 972 private void makeDoneSubdir(Path path) throws IOException { 973 try { 974 doneDirFc.getFileStatus(path); 975 existingDoneSubdirs.add(path); 976 } catch (FileNotFoundException fnfE) { 977 try { 978 FsPermission fsp = new FsPermission( 979 JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION); 980 doneDirFc.mkdir(path, fsp, true); 981 FileStatus fsStatus = doneDirFc.getFileStatus(path); 982 LOG.info("Perms after creating " + fsStatus.getPermission().toShort() 983 + ", Expected: " + fsp.toShort()); 984 if (fsStatus.getPermission().toShort() != fsp.toShort()) { 985 LOG.info("Explicitly setting permissions to : " + fsp.toShort() 986 + ", " + fsp); 987 doneDirFc.setPermission(path, fsp); 988 } 989 existingDoneSubdirs.add(path); 990 } catch (FileAlreadyExistsException faeE) { // Nothing to do. 991 } 992 } 993 } 994 995 private Path canonicalHistoryLogPath(JobId id, String timestampComponent) { 996 return new Path(doneDirPrefixPath, JobHistoryUtils.historyLogSubdirectory( 997 id, timestampComponent, serialNumberFormat)); 998 } 999 1000 private Path canonicalHistoryLogPath(JobId id, long millisecondTime) { 1001 String timestampComponent = JobHistoryUtils 1002 .timestampDirectoryComponent(millisecondTime); 1003 return new Path(doneDirPrefixPath, JobHistoryUtils.historyLogSubdirectory( 1004 id, timestampComponent, serialNumberFormat)); 1005 } 1006 1007 private long getEffectiveTimestamp(long finishTime, FileStatus fileStatus) { 1008 if (finishTime == 0) { 1009 return fileStatus.getModificationTime(); 1010 } 1011 return finishTime; 1012 } 1013 1014 private void deleteJobFromDone(HistoryFileInfo fileInfo) throws IOException { 1015 jobListCache.delete(fileInfo); 1016 fileInfo.delete(); 1017 } 1018 1019 List<FileStatus> getHistoryDirsForCleaning(long cutoff) throws IOException { 1020 return JobHistoryUtils. 1021 getHistoryDirsForCleaning(doneDirFc, doneDirPrefixPath, cutoff); 1022 } 1023 1024 /** 1025 * Clean up older history files. 1026 * 1027 * @throws IOException 1028 * on any error trying to remove the entries. 1029 */ 1030 @SuppressWarnings("unchecked") 1031 void clean() throws IOException { 1032 long cutoff = System.currentTimeMillis() - maxHistoryAge; 1033 boolean halted = false; 1034 List<FileStatus> serialDirList = getHistoryDirsForCleaning(cutoff); 1035 // Sort in ascending order. Relies on YYYY/MM/DD/Serial 1036 Collections.sort(serialDirList); 1037 for (FileStatus serialDir : serialDirList) { 1038 List<FileStatus> historyFileList = scanDirectoryForHistoryFiles( 1039 serialDir.getPath(), doneDirFc); 1040 for (FileStatus historyFile : historyFileList) { 1041 JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(historyFile 1042 .getPath().getName()); 1043 long effectiveTimestamp = getEffectiveTimestamp( 1044 jobIndexInfo.getFinishTime(), historyFile); 1045 if (effectiveTimestamp <= cutoff) { 1046 HistoryFileInfo fileInfo = this.jobListCache.get(jobIndexInfo 1047 .getJobId()); 1048 if (fileInfo == null) { 1049 String confFileName = JobHistoryUtils 1050 .getIntermediateConfFileName(jobIndexInfo.getJobId()); 1051 1052 fileInfo = new HistoryFileInfo(historyFile.getPath(), new Path( 1053 historyFile.getPath().getParent(), confFileName), null, 1054 jobIndexInfo, true); 1055 } 1056 deleteJobFromDone(fileInfo); 1057 } else { 1058 halted = true; 1059 break; 1060 } 1061 } 1062 if (!halted) { 1063 deleteDir(serialDir); 1064 removeDirectoryFromSerialNumberIndex(serialDir.getPath()); 1065 existingDoneSubdirs.remove(serialDir.getPath()); 1066 } else { 1067 break; // Don't scan any more directories. 1068 } 1069 } 1070 } 1071 1072 protected boolean deleteDir(FileStatus serialDir) 1073 throws AccessControlException, FileNotFoundException, 1074 UnsupportedFileSystemException, IOException { 1075 return doneDirFc.delete(doneDirFc.makeQualified(serialDir.getPath()), true); 1076 } 1077 1078 @VisibleForTesting 1079 protected void setMaxHistoryAge(long newValue){ 1080 maxHistoryAge=newValue; 1081 } 1082}