001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce.v2.hs; 020 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.net.ConnectException; 024import java.util.ArrayList; 025import java.util.Collection; 026import java.util.Collections; 027import java.util.HashSet; 028import java.util.Iterator; 029import java.util.List; 030import java.util.NavigableSet; 031import java.util.Set; 032import java.util.SortedMap; 033import java.util.TreeMap; 034import java.util.concurrent.ConcurrentHashMap; 035import java.util.concurrent.ConcurrentMap; 036import java.util.concurrent.ConcurrentSkipListMap; 037import java.util.concurrent.LinkedBlockingQueue; 038import java.util.concurrent.ThreadFactory; 039import java.util.concurrent.ThreadPoolExecutor; 040import java.util.concurrent.TimeUnit; 041import java.util.concurrent.atomic.AtomicInteger; 042 043import org.apache.commons.logging.Log; 044import org.apache.commons.logging.LogFactory; 045import org.apache.hadoop.classification.InterfaceAudience; 046import org.apache.hadoop.classification.InterfaceStability; 047import org.apache.hadoop.conf.Configuration; 048import org.apache.hadoop.fs.FSDataInputStream; 049import org.apache.hadoop.fs.FileAlreadyExistsException; 050import org.apache.hadoop.fs.FileContext; 051import org.apache.hadoop.fs.FileStatus; 052import org.apache.hadoop.fs.Options; 053import org.apache.hadoop.fs.Path; 054import org.apache.hadoop.fs.PathFilter; 055import org.apache.hadoop.fs.RemoteIterator; 056import org.apache.hadoop.fs.UnsupportedFileSystemException; 057import org.apache.hadoop.fs.permission.FsPermission; 058import org.apache.hadoop.mapred.JobACLsManager; 059import org.apache.hadoop.mapreduce.jobhistory.JobSummary; 060import org.apache.hadoop.mapreduce.v2.api.records.JobId; 061import org.apache.hadoop.mapreduce.v2.app.job.Job; 062import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils; 063import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; 064import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; 065import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo; 066import org.apache.hadoop.security.AccessControlException; 067import org.apache.hadoop.service.AbstractService; 068import org.apache.hadoop.util.ShutdownThreadsHelper; 069import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; 070 071import com.google.common.annotations.VisibleForTesting; 072import com.google.common.util.concurrent.ThreadFactoryBuilder; 073import org.apache.hadoop.yarn.util.Clock; 074import org.apache.hadoop.yarn.util.SystemClock; 075 076/** 077 * This class provides a way to interact with history files in a thread safe 078 * manor. 079 */ 080@InterfaceAudience.Public 081@InterfaceStability.Unstable 082public class HistoryFileManager extends AbstractService { 083 private static final Log LOG = LogFactory.getLog(HistoryFileManager.class); 084 private static final Log SUMMARY_LOG = LogFactory.getLog(JobSummary.class); 085 086 private static enum HistoryInfoState { 087 IN_INTERMEDIATE, IN_DONE, DELETED, MOVE_FAILED 088 }; 089 090 private static String DONE_BEFORE_SERIAL_TAIL = JobHistoryUtils 091 .doneSubdirsBeforeSerialTail(); 092 093 /** 094 * Maps between a serial number (generated based on jobId) and the timestamp 095 * component(s) to which it belongs. Facilitates jobId based searches. If a 096 * jobId is not found in this list - it will not be found. 097 */ 098 private static class SerialNumberIndex { 099 private SortedMap<String, Set<String>> cache; 100 private int maxSize; 101 102 public SerialNumberIndex(int maxSize) { 103 this.cache = new TreeMap<String, Set<String>>(); 104 this.maxSize = maxSize; 105 } 106 107 public synchronized void add(String serialPart, String timestampPart) { 108 if (!cache.containsKey(serialPart)) { 109 cache.put(serialPart, new HashSet<String>()); 110 if (cache.size() > maxSize) { 111 String key = cache.firstKey(); 112 LOG.error("Dropping " + key 113 + " from the SerialNumberIndex. We will no " 114 + "longer be able to see jobs that are in that serial index for " 115 + cache.get(key)); 116 cache.remove(key); 117 } 118 } 119 Set<String> datePartSet = cache.get(serialPart); 120 datePartSet.add(timestampPart); 121 } 122 123 public synchronized void remove(String serialPart, String timeStampPart) { 124 if (cache.containsKey(serialPart)) { 125 Set<String> set = cache.get(serialPart); 126 set.remove(timeStampPart); 127 if (set.isEmpty()) { 128 cache.remove(serialPart); 129 } 130 } 131 } 132 133 public synchronized Set<String> get(String serialPart) { 134 Set<String> found = cache.get(serialPart); 135 if (found != null) { 136 return new HashSet<String>(found); 137 } 138 return null; 139 } 140 } 141 142 /** 143 * Wrapper around {@link ConcurrentSkipListMap} that maintains size along 144 * side for O(1) size() implementation for use in JobListCache. 145 * 146 * Note: The size is not updated atomically with changes additions/removals. 147 * This race can lead to size() returning an incorrect size at times. 148 */ 149 static class JobIdHistoryFileInfoMap { 150 private ConcurrentSkipListMap<JobId, HistoryFileInfo> cache; 151 private AtomicInteger mapSize; 152 153 JobIdHistoryFileInfoMap() { 154 cache = new ConcurrentSkipListMap<JobId, HistoryFileInfo>(); 155 mapSize = new AtomicInteger(); 156 } 157 158 public HistoryFileInfo putIfAbsent(JobId key, HistoryFileInfo value) { 159 HistoryFileInfo ret = cache.putIfAbsent(key, value); 160 if (ret == null) { 161 mapSize.incrementAndGet(); 162 } 163 return ret; 164 } 165 166 public HistoryFileInfo remove(JobId key) { 167 HistoryFileInfo ret = cache.remove(key); 168 if (ret != null) { 169 mapSize.decrementAndGet(); 170 } 171 return ret; 172 } 173 174 /** 175 * Returns the recorded size of the internal map. Note that this could be out 176 * of sync with the actual size of the map 177 * @return "recorded" size 178 */ 179 public int size() { 180 return mapSize.get(); 181 } 182 183 public HistoryFileInfo get(JobId key) { 184 return cache.get(key); 185 } 186 187 public NavigableSet<JobId> navigableKeySet() { 188 return cache.navigableKeySet(); 189 } 190 191 public Collection<HistoryFileInfo> values() { 192 return cache.values(); 193 } 194 } 195 196 static class JobListCache { 197 private JobIdHistoryFileInfoMap cache; 198 private int maxSize; 199 private long maxAge; 200 201 public JobListCache(int maxSize, long maxAge) { 202 this.maxSize = maxSize; 203 this.maxAge = maxAge; 204 this.cache = new JobIdHistoryFileInfoMap(); 205 } 206 207 public HistoryFileInfo addIfAbsent(HistoryFileInfo fileInfo) { 208 JobId jobId = fileInfo.getJobId(); 209 if (LOG.isDebugEnabled()) { 210 LOG.debug("Adding " + jobId + " to job list cache with " 211 + fileInfo.getJobIndexInfo()); 212 } 213 HistoryFileInfo old = cache.putIfAbsent(jobId, fileInfo); 214 if (cache.size() > maxSize) { 215 //There is a race here, where more then one thread could be trying to 216 // remove entries. This could result in too many entries being removed 217 // from the cache. This is considered OK as the size of the cache 218 // should be rather large, and we would rather have performance over 219 // keeping the cache size exactly at the maximum. 220 Iterator<JobId> keys = cache.navigableKeySet().iterator(); 221 long cutoff = System.currentTimeMillis() - maxAge; 222 while(cache.size() > maxSize && keys.hasNext()) { 223 JobId key = keys.next(); 224 HistoryFileInfo firstValue = cache.get(key); 225 if(firstValue != null) { 226 synchronized(firstValue) { 227 if (firstValue.isMovePending()) { 228 if(firstValue.didMoveFail() && 229 firstValue.jobIndexInfo.getFinishTime() <= cutoff) { 230 cache.remove(key); 231 //Now lets try to delete it 232 try { 233 firstValue.delete(); 234 } catch (IOException e) { 235 LOG.error("Error while trying to delete history files" + 236 " that could not be moved to done.", e); 237 } 238 } else { 239 LOG.warn("Waiting to remove " + key 240 + " from JobListCache because it is not in done yet."); 241 } 242 } else { 243 cache.remove(key); 244 } 245 } 246 } 247 } 248 } 249 return old; 250 } 251 252 public void delete(HistoryFileInfo fileInfo) { 253 if (LOG.isDebugEnabled()) { 254 LOG.debug("Removing from cache " + fileInfo); 255 } 256 cache.remove(fileInfo.getJobId()); 257 } 258 259 public Collection<HistoryFileInfo> values() { 260 return new ArrayList<HistoryFileInfo>(cache.values()); 261 } 262 263 public HistoryFileInfo get(JobId jobId) { 264 return cache.get(jobId); 265 } 266 267 public boolean isFull() { 268 return cache.size() >= maxSize; 269 } 270 } 271 272 /** 273 * This class represents a user dir in the intermediate done directory. This 274 * is mostly for locking purposes. 275 */ 276 private class UserLogDir { 277 long modTime = 0; 278 279 public synchronized void scanIfNeeded(FileStatus fs) { 280 long newModTime = fs.getModificationTime(); 281 if (modTime != newModTime) { 282 Path p = fs.getPath(); 283 try { 284 scanIntermediateDirectory(p); 285 //If scanning fails, we will scan again. We assume the failure is 286 // temporary. 287 modTime = newModTime; 288 } catch (IOException e) { 289 LOG.error("Error while trying to scan the directory " + p, e); 290 } 291 } else { 292 if (LOG.isDebugEnabled()) { 293 LOG.debug("Scan not needed of " + fs.getPath()); 294 } 295 } 296 } 297 } 298 299 public class HistoryFileInfo { 300 private Path historyFile; 301 private Path confFile; 302 private Path summaryFile; 303 private JobIndexInfo jobIndexInfo; 304 private HistoryInfoState state; 305 306 @VisibleForTesting 307 protected HistoryFileInfo(Path historyFile, Path confFile, 308 Path summaryFile, JobIndexInfo jobIndexInfo, boolean isInDone) { 309 this.historyFile = historyFile; 310 this.confFile = confFile; 311 this.summaryFile = summaryFile; 312 this.jobIndexInfo = jobIndexInfo; 313 state = isInDone ? HistoryInfoState.IN_DONE 314 : HistoryInfoState.IN_INTERMEDIATE; 315 } 316 317 @VisibleForTesting 318 synchronized boolean isMovePending() { 319 return state == HistoryInfoState.IN_INTERMEDIATE 320 || state == HistoryInfoState.MOVE_FAILED; 321 } 322 323 @VisibleForTesting 324 synchronized boolean didMoveFail() { 325 return state == HistoryInfoState.MOVE_FAILED; 326 } 327 328 /** 329 * @return true if the files backed by this were deleted. 330 */ 331 public synchronized boolean isDeleted() { 332 return state == HistoryInfoState.DELETED; 333 } 334 335 @Override 336 public String toString() { 337 return "HistoryFileInfo jobID " + getJobId() 338 + " historyFile = " + historyFile; 339 } 340 341 @VisibleForTesting 342 synchronized void moveToDone() throws IOException { 343 if (LOG.isDebugEnabled()) { 344 LOG.debug("moveToDone: " + historyFile); 345 } 346 if (!isMovePending()) { 347 // It was either deleted or is already in done. Either way do nothing 348 if (LOG.isDebugEnabled()) { 349 LOG.debug("Move no longer pending"); 350 } 351 return; 352 } 353 try { 354 long completeTime = jobIndexInfo.getFinishTime(); 355 if (completeTime == 0) { 356 completeTime = System.currentTimeMillis(); 357 } 358 JobId jobId = jobIndexInfo.getJobId(); 359 360 List<Path> paths = new ArrayList<Path>(2); 361 if (historyFile == null) { 362 LOG.info("No file for job-history with " + jobId + " found in cache!"); 363 } else { 364 paths.add(historyFile); 365 } 366 367 if (confFile == null) { 368 LOG.info("No file for jobConf with " + jobId + " found in cache!"); 369 } else { 370 paths.add(confFile); 371 } 372 373 if (summaryFile == null || !intermediateDoneDirFc.util().exists( 374 summaryFile)) { 375 LOG.info("No summary file for job: " + jobId); 376 } else { 377 String jobSummaryString = getJobSummary(intermediateDoneDirFc, 378 summaryFile); 379 SUMMARY_LOG.info(jobSummaryString); 380 LOG.info("Deleting JobSummary file: [" + summaryFile + "]"); 381 intermediateDoneDirFc.delete(summaryFile, false); 382 summaryFile = null; 383 } 384 385 Path targetDir = canonicalHistoryLogPath(jobId, completeTime); 386 addDirectoryToSerialNumberIndex(targetDir); 387 makeDoneSubdir(targetDir); 388 if (historyFile != null) { 389 Path toPath = doneDirFc.makeQualified(new Path(targetDir, historyFile 390 .getName())); 391 if (!toPath.equals(historyFile)) { 392 moveToDoneNow(historyFile, toPath); 393 historyFile = toPath; 394 } 395 } 396 if (confFile != null) { 397 Path toPath = doneDirFc.makeQualified(new Path(targetDir, confFile 398 .getName())); 399 if (!toPath.equals(confFile)) { 400 moveToDoneNow(confFile, toPath); 401 confFile = toPath; 402 } 403 } 404 state = HistoryInfoState.IN_DONE; 405 } catch (Throwable t) { 406 LOG.error("Error while trying to move a job to done", t); 407 this.state = HistoryInfoState.MOVE_FAILED; 408 } 409 } 410 411 /** 412 * Parse a job from the JobHistoryFile, if the underlying file is not going 413 * to be deleted. 414 * 415 * @return the Job or null if the underlying file was deleted. 416 * @throws IOException 417 * if there is an error trying to read the file. 418 */ 419 public synchronized Job loadJob() throws IOException { 420 return new CompletedJob(conf, jobIndexInfo.getJobId(), historyFile, 421 false, jobIndexInfo.getUser(), this, aclsMgr); 422 } 423 424 /** 425 * Return the history file. This should only be used for testing. 426 * @return the history file. 427 */ 428 synchronized Path getHistoryFile() { 429 return historyFile; 430 } 431 432 protected synchronized void delete() throws IOException { 433 if (LOG.isDebugEnabled()) { 434 LOG.debug("deleting " + historyFile + " and " + confFile); 435 } 436 state = HistoryInfoState.DELETED; 437 doneDirFc.delete(doneDirFc.makeQualified(historyFile), false); 438 doneDirFc.delete(doneDirFc.makeQualified(confFile), false); 439 } 440 441 public JobIndexInfo getJobIndexInfo() { 442 return jobIndexInfo; 443 } 444 445 public JobId getJobId() { 446 return jobIndexInfo.getJobId(); 447 } 448 449 public synchronized Path getConfFile() { 450 return confFile; 451 } 452 453 public synchronized Configuration loadConfFile() throws IOException { 454 FileContext fc = FileContext.getFileContext(confFile.toUri(), conf); 455 Configuration jobConf = new Configuration(false); 456 jobConf.addResource(fc.open(confFile), confFile.toString()); 457 return jobConf; 458 } 459 } 460 461 private SerialNumberIndex serialNumberIndex = null; 462 protected JobListCache jobListCache = null; 463 464 // Maintains a list of known done subdirectories. 465 private final Set<Path> existingDoneSubdirs = Collections 466 .synchronizedSet(new HashSet<Path>()); 467 468 /** 469 * Maintains a mapping between intermediate user directories and the last 470 * known modification time. 471 */ 472 private ConcurrentMap<String, UserLogDir> userDirModificationTimeMap = 473 new ConcurrentHashMap<String, UserLogDir>(); 474 475 private JobACLsManager aclsMgr; 476 477 @VisibleForTesting 478 Configuration conf; 479 480 private String serialNumberFormat; 481 482 private Path doneDirPrefixPath = null; // folder for completed jobs 483 private FileContext doneDirFc; // done Dir FileContext 484 485 private Path intermediateDoneDirPath = null; // Intermediate Done Dir Path 486 private FileContext intermediateDoneDirFc; // Intermediate Done Dir 487 // FileContext 488 @VisibleForTesting 489 protected ThreadPoolExecutor moveToDoneExecutor = null; 490 private long maxHistoryAge = 0; 491 492 public HistoryFileManager() { 493 super(HistoryFileManager.class.getName()); 494 } 495 496 @Override 497 protected void serviceInit(Configuration conf) throws Exception { 498 this.conf = conf; 499 500 int serialNumberLowDigits = 3; 501 serialNumberFormat = ("%0" 502 + (JobHistoryUtils.SERIAL_NUMBER_DIRECTORY_DIGITS + serialNumberLowDigits) 503 + "d"); 504 505 long maxFSWaitTime = conf.getLong( 506 JHAdminConfig.MR_HISTORY_MAX_START_WAIT_TIME, 507 JHAdminConfig.DEFAULT_MR_HISTORY_MAX_START_WAIT_TIME); 508 createHistoryDirs(new SystemClock(), 10 * 1000, maxFSWaitTime); 509 510 this.aclsMgr = new JobACLsManager(conf); 511 512 maxHistoryAge = conf.getLong(JHAdminConfig.MR_HISTORY_MAX_AGE_MS, 513 JHAdminConfig.DEFAULT_MR_HISTORY_MAX_AGE); 514 515 jobListCache = createJobListCache(); 516 517 serialNumberIndex = new SerialNumberIndex(conf.getInt( 518 JHAdminConfig.MR_HISTORY_DATESTRING_CACHE_SIZE, 519 JHAdminConfig.DEFAULT_MR_HISTORY_DATESTRING_CACHE_SIZE)); 520 521 int numMoveThreads = conf.getInt( 522 JHAdminConfig.MR_HISTORY_MOVE_THREAD_COUNT, 523 JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_THREAD_COUNT); 524 ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat( 525 "MoveIntermediateToDone Thread #%d").build(); 526 moveToDoneExecutor = new ThreadPoolExecutor(numMoveThreads, numMoveThreads, 527 1, TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf); 528 529 super.serviceInit(conf); 530 } 531 532 @VisibleForTesting 533 void createHistoryDirs(Clock clock, long intervalCheckMillis, 534 long timeOutMillis) throws IOException { 535 long start = clock.getTime(); 536 boolean done = false; 537 int counter = 0; 538 while (!done && 539 ((timeOutMillis == -1) || (clock.getTime() - start < timeOutMillis))) { 540 done = tryCreatingHistoryDirs(counter++ % 3 == 0); // log every 3 attempts, 30sec 541 try { 542 Thread.sleep(intervalCheckMillis); 543 } catch (InterruptedException ex) { 544 throw new YarnRuntimeException(ex); 545 } 546 } 547 if (!done) { 548 throw new YarnRuntimeException("Timed out '" + timeOutMillis+ 549 "ms' waiting for FileSystem to become available"); 550 } 551 } 552 553 /** 554 * DistributedFileSystem returns a RemoteException with a message stating 555 * SafeModeException in it. So this is only way to check it is because of 556 * being in safe mode. 557 */ 558 private boolean isBecauseSafeMode(Throwable ex) { 559 return ex.toString().contains("SafeModeException"); 560 } 561 562 /** 563 * Returns TRUE if the history dirs were created, FALSE if they could not 564 * be created because the FileSystem is not reachable or in safe mode and 565 * throws and exception otherwise. 566 */ 567 @VisibleForTesting 568 boolean tryCreatingHistoryDirs(boolean logWait) throws IOException { 569 boolean succeeded = true; 570 String doneDirPrefix = JobHistoryUtils. 571 getConfiguredHistoryServerDoneDirPrefix(conf); 572 try { 573 doneDirPrefixPath = FileContext.getFileContext(conf).makeQualified( 574 new Path(doneDirPrefix)); 575 doneDirFc = FileContext.getFileContext(doneDirPrefixPath.toUri(), conf); 576 doneDirFc.setUMask(JobHistoryUtils.HISTORY_DONE_DIR_UMASK); 577 mkdir(doneDirFc, doneDirPrefixPath, new FsPermission( 578 JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION)); 579 } catch (ConnectException ex) { 580 if (logWait) { 581 LOG.info("Waiting for FileSystem at " + 582 doneDirPrefixPath.toUri().getAuthority() + "to be available"); 583 } 584 succeeded = false; 585 } catch (IOException e) { 586 if (isBecauseSafeMode(e)) { 587 succeeded = false; 588 if (logWait) { 589 LOG.info("Waiting for FileSystem at " + 590 doneDirPrefixPath.toUri().getAuthority() + 591 "to be out of safe mode"); 592 } 593 } else { 594 throw new YarnRuntimeException("Error creating done directory: [" 595 + doneDirPrefixPath + "]", e); 596 } 597 } 598 if (succeeded) { 599 String intermediateDoneDirPrefix = JobHistoryUtils. 600 getConfiguredHistoryIntermediateDoneDirPrefix(conf); 601 try { 602 intermediateDoneDirPath = FileContext.getFileContext(conf).makeQualified( 603 new Path(intermediateDoneDirPrefix)); 604 intermediateDoneDirFc = FileContext.getFileContext( 605 intermediateDoneDirPath.toUri(), conf); 606 mkdir(intermediateDoneDirFc, intermediateDoneDirPath, new FsPermission( 607 JobHistoryUtils.HISTORY_INTERMEDIATE_DONE_DIR_PERMISSIONS.toShort())); 608 } catch (ConnectException ex) { 609 succeeded = false; 610 if (logWait) { 611 LOG.info("Waiting for FileSystem at " + 612 intermediateDoneDirPath.toUri().getAuthority() + 613 "to be available"); 614 } 615 } catch (IOException e) { 616 if (isBecauseSafeMode(e)) { 617 succeeded = false; 618 if (logWait) { 619 LOG.info("Waiting for FileSystem at " + 620 intermediateDoneDirPath.toUri().getAuthority() + 621 "to be out of safe mode"); 622 } 623 } else { 624 throw new YarnRuntimeException( 625 "Error creating intermediate done directory: [" 626 + intermediateDoneDirPath + "]", e); 627 } 628 } 629 } 630 return succeeded; 631 } 632 633 @Override 634 public void serviceStop() throws Exception { 635 ShutdownThreadsHelper.shutdownExecutorService(moveToDoneExecutor); 636 super.serviceStop(); 637 } 638 639 protected JobListCache createJobListCache() { 640 return new JobListCache(conf.getInt( 641 JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE, 642 JHAdminConfig.DEFAULT_MR_HISTORY_JOBLIST_CACHE_SIZE), maxHistoryAge); 643 } 644 645 private void mkdir(FileContext fc, Path path, FsPermission fsp) 646 throws IOException { 647 if (!fc.util().exists(path)) { 648 try { 649 fc.mkdir(path, fsp, true); 650 651 FileStatus fsStatus = fc.getFileStatus(path); 652 LOG.info("Perms after creating " + fsStatus.getPermission().toShort() 653 + ", Expected: " + fsp.toShort()); 654 if (fsStatus.getPermission().toShort() != fsp.toShort()) { 655 LOG.info("Explicitly setting permissions to : " + fsp.toShort() 656 + ", " + fsp); 657 fc.setPermission(path, fsp); 658 } 659 } catch (FileAlreadyExistsException e) { 660 LOG.info("Directory: [" + path + "] already exists."); 661 } 662 } 663 } 664 665 /** 666 * Populates index data structures. Should only be called at initialization 667 * times. 668 */ 669 @SuppressWarnings("unchecked") 670 void initExisting() throws IOException { 671 LOG.info("Initializing Existing Jobs..."); 672 List<FileStatus> timestampedDirList = findTimestampedDirectories(); 673 // Sort first just so insertion is in a consistent order 674 Collections.sort(timestampedDirList); 675 for (FileStatus fs : timestampedDirList) { 676 // TODO Could verify the correct format for these directories. 677 addDirectoryToSerialNumberIndex(fs.getPath()); 678 } 679 for (int i= timestampedDirList.size() - 1; 680 i >= 0 && !jobListCache.isFull(); i--) { 681 FileStatus fs = timestampedDirList.get(i); 682 addDirectoryToJobListCache(fs.getPath()); 683 } 684 } 685 686 private void removeDirectoryFromSerialNumberIndex(Path serialDirPath) { 687 String serialPart = serialDirPath.getName(); 688 String timeStampPart = JobHistoryUtils 689 .getTimestampPartFromPath(serialDirPath.toString()); 690 if (timeStampPart == null) { 691 LOG.warn("Could not find timestamp portion from path: " 692 + serialDirPath.toString() + ". Continuing with next"); 693 return; 694 } 695 if (serialPart == null) { 696 LOG.warn("Could not find serial portion from path: " 697 + serialDirPath.toString() + ". Continuing with next"); 698 return; 699 } 700 serialNumberIndex.remove(serialPart, timeStampPart); 701 } 702 703 private void addDirectoryToSerialNumberIndex(Path serialDirPath) { 704 if (LOG.isDebugEnabled()) { 705 LOG.debug("Adding " + serialDirPath + " to serial index"); 706 } 707 String serialPart = serialDirPath.getName(); 708 String timestampPart = JobHistoryUtils 709 .getTimestampPartFromPath(serialDirPath.toString()); 710 if (timestampPart == null) { 711 LOG.warn("Could not find timestamp portion from path: " + serialDirPath 712 + ". Continuing with next"); 713 return; 714 } 715 if (serialPart == null) { 716 LOG.warn("Could not find serial portion from path: " 717 + serialDirPath.toString() + ". Continuing with next"); 718 } else { 719 serialNumberIndex.add(serialPart, timestampPart); 720 } 721 } 722 723 private void addDirectoryToJobListCache(Path path) throws IOException { 724 if (LOG.isDebugEnabled()) { 725 LOG.debug("Adding " + path + " to job list cache."); 726 } 727 List<FileStatus> historyFileList = scanDirectoryForHistoryFiles(path, 728 doneDirFc); 729 for (FileStatus fs : historyFileList) { 730 if (LOG.isDebugEnabled()) { 731 LOG.debug("Adding in history for " + fs.getPath()); 732 } 733 JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath() 734 .getName()); 735 String confFileName = JobHistoryUtils 736 .getIntermediateConfFileName(jobIndexInfo.getJobId()); 737 String summaryFileName = JobHistoryUtils 738 .getIntermediateSummaryFileName(jobIndexInfo.getJobId()); 739 HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(fs 740 .getPath().getParent(), confFileName), new Path(fs.getPath() 741 .getParent(), summaryFileName), jobIndexInfo, true); 742 jobListCache.addIfAbsent(fileInfo); 743 } 744 } 745 746 @VisibleForTesting 747 protected static List<FileStatus> scanDirectory(Path path, FileContext fc, 748 PathFilter pathFilter) throws IOException { 749 path = fc.makeQualified(path); 750 List<FileStatus> jhStatusList = new ArrayList<FileStatus>(); 751 try { 752 RemoteIterator<FileStatus> fileStatusIter = fc.listStatus(path); 753 while (fileStatusIter.hasNext()) { 754 FileStatus fileStatus = fileStatusIter.next(); 755 Path filePath = fileStatus.getPath(); 756 if (fileStatus.isFile() && pathFilter.accept(filePath)) { 757 jhStatusList.add(fileStatus); 758 } 759 } 760 } catch (FileNotFoundException fe) { 761 LOG.error("Error while scanning directory " + path, fe); 762 } 763 return jhStatusList; 764 } 765 766 protected List<FileStatus> scanDirectoryForHistoryFiles(Path path, 767 FileContext fc) throws IOException { 768 return scanDirectory(path, fc, JobHistoryUtils.getHistoryFileFilter()); 769 } 770 771 /** 772 * Finds all history directories with a timestamp component by scanning the 773 * filesystem. Used when the JobHistory server is started. 774 * 775 * @return list of history directories 776 */ 777 protected List<FileStatus> findTimestampedDirectories() throws IOException { 778 List<FileStatus> fsList = JobHistoryUtils.localGlobber(doneDirFc, 779 doneDirPrefixPath, DONE_BEFORE_SERIAL_TAIL); 780 return fsList; 781 } 782 783 /** 784 * Scans the intermediate directory to find user directories. Scans these for 785 * history files if the modification time for the directory has changed. Once 786 * it finds history files it starts the process of moving them to the done 787 * directory. 788 * 789 * @throws IOException 790 * if there was a error while scanning 791 */ 792 void scanIntermediateDirectory() throws IOException { 793 // TODO it would be great to limit how often this happens, except in the 794 // case where we are looking for a particular job. 795 List<FileStatus> userDirList = JobHistoryUtils.localGlobber( 796 intermediateDoneDirFc, intermediateDoneDirPath, ""); 797 LOG.debug("Scanning intermediate dirs"); 798 for (FileStatus userDir : userDirList) { 799 String name = userDir.getPath().getName(); 800 UserLogDir dir = userDirModificationTimeMap.get(name); 801 if(dir == null) { 802 dir = new UserLogDir(); 803 UserLogDir old = userDirModificationTimeMap.putIfAbsent(name, dir); 804 if(old != null) { 805 dir = old; 806 } 807 } 808 dir.scanIfNeeded(userDir); 809 } 810 } 811 812 /** 813 * Scans the specified path and populates the intermediate cache. 814 * 815 * @param absPath 816 * @throws IOException 817 */ 818 private void scanIntermediateDirectory(final Path absPath) throws IOException { 819 if (LOG.isDebugEnabled()) { 820 LOG.debug("Scanning intermediate dir " + absPath); 821 } 822 List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(absPath, 823 intermediateDoneDirFc); 824 if (LOG.isDebugEnabled()) { 825 LOG.debug("Found " + fileStatusList.size() + " files"); 826 } 827 for (FileStatus fs : fileStatusList) { 828 if (LOG.isDebugEnabled()) { 829 LOG.debug("scanning file: "+ fs.getPath()); 830 } 831 JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath() 832 .getName()); 833 String confFileName = JobHistoryUtils 834 .getIntermediateConfFileName(jobIndexInfo.getJobId()); 835 String summaryFileName = JobHistoryUtils 836 .getIntermediateSummaryFileName(jobIndexInfo.getJobId()); 837 HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(fs 838 .getPath().getParent(), confFileName), new Path(fs.getPath() 839 .getParent(), summaryFileName), jobIndexInfo, false); 840 841 final HistoryFileInfo old = jobListCache.addIfAbsent(fileInfo); 842 if (old == null || old.didMoveFail()) { 843 final HistoryFileInfo found = (old == null) ? fileInfo : old; 844 long cutoff = System.currentTimeMillis() - maxHistoryAge; 845 if(found.getJobIndexInfo().getFinishTime() <= cutoff) { 846 try { 847 found.delete(); 848 } catch (IOException e) { 849 LOG.warn("Error cleaning up a HistoryFile that is out of date.", e); 850 } 851 } else { 852 if (LOG.isDebugEnabled()) { 853 LOG.debug("Scheduling move to done of " +found); 854 } 855 moveToDoneExecutor.execute(new Runnable() { 856 @Override 857 public void run() { 858 try { 859 found.moveToDone(); 860 } catch (IOException e) { 861 LOG.info("Failed to process fileInfo for job: " + 862 found.getJobId(), e); 863 } 864 } 865 }); 866 } 867 } else if (!old.isMovePending()) { 868 //This is a duplicate so just delete it 869 if (LOG.isDebugEnabled()) { 870 LOG.debug("Duplicate: deleting"); 871 } 872 fileInfo.delete(); 873 } 874 } 875 } 876 877 /** 878 * Searches the job history file FileStatus list for the specified JobId. 879 * 880 * @param fileStatusList 881 * fileStatus list of Job History Files. 882 * @param jobId 883 * The JobId to find. 884 * @return A FileInfo object for the jobId, null if not found. 885 * @throws IOException 886 */ 887 private HistoryFileInfo getJobFileInfo(List<FileStatus> fileStatusList, 888 JobId jobId) throws IOException { 889 for (FileStatus fs : fileStatusList) { 890 JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath() 891 .getName()); 892 if (jobIndexInfo.getJobId().equals(jobId)) { 893 String confFileName = JobHistoryUtils 894 .getIntermediateConfFileName(jobIndexInfo.getJobId()); 895 String summaryFileName = JobHistoryUtils 896 .getIntermediateSummaryFileName(jobIndexInfo.getJobId()); 897 HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path( 898 fs.getPath().getParent(), confFileName), new Path(fs.getPath() 899 .getParent(), summaryFileName), jobIndexInfo, true); 900 return fileInfo; 901 } 902 } 903 return null; 904 } 905 906 /** 907 * Scans old directories known by the idToDateString map for the specified 908 * jobId. If the number of directories is higher than the supported size of 909 * the idToDateString cache, the jobId will not be found. 910 * 911 * @param jobId 912 * the jobId. 913 * @return 914 * @throws IOException 915 */ 916 private HistoryFileInfo scanOldDirsForJob(JobId jobId) throws IOException { 917 String boxedSerialNumber = JobHistoryUtils.serialNumberDirectoryComponent( 918 jobId, serialNumberFormat); 919 Set<String> dateStringSet = serialNumberIndex.get(boxedSerialNumber); 920 if (dateStringSet == null) { 921 return null; 922 } 923 for (String timestampPart : dateStringSet) { 924 Path logDir = canonicalHistoryLogPath(jobId, timestampPart); 925 List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(logDir, 926 doneDirFc); 927 HistoryFileInfo fileInfo = getJobFileInfo(fileStatusList, jobId); 928 if (fileInfo != null) { 929 return fileInfo; 930 } 931 } 932 return null; 933 } 934 935 public Collection<HistoryFileInfo> getAllFileInfo() throws IOException { 936 scanIntermediateDirectory(); 937 return jobListCache.values(); 938 } 939 940 public HistoryFileInfo getFileInfo(JobId jobId) throws IOException { 941 // FileInfo available in cache. 942 HistoryFileInfo fileInfo = jobListCache.get(jobId); 943 if (fileInfo != null) { 944 return fileInfo; 945 } 946 // OK so scan the intermediate to be sure we did not lose it that way 947 scanIntermediateDirectory(); 948 fileInfo = jobListCache.get(jobId); 949 if (fileInfo != null) { 950 return fileInfo; 951 } 952 953 // Intermediate directory does not contain job. Search through older ones. 954 fileInfo = scanOldDirsForJob(jobId); 955 if (fileInfo != null) { 956 return fileInfo; 957 } 958 return null; 959 } 960 961 private void moveToDoneNow(final Path src, final Path target) 962 throws IOException { 963 LOG.info("Moving " + src.toString() + " to " + target.toString()); 964 intermediateDoneDirFc.rename(src, target, Options.Rename.NONE); 965 } 966 967 private String getJobSummary(FileContext fc, Path path) throws IOException { 968 Path qPath = fc.makeQualified(path); 969 FSDataInputStream in = null; 970 String jobSummaryString = null; 971 try { 972 in = fc.open(qPath); 973 jobSummaryString = in.readUTF(); 974 } finally { 975 if (in != null) { 976 in.close(); 977 } 978 } 979 return jobSummaryString; 980 } 981 982 private void makeDoneSubdir(Path path) throws IOException { 983 try { 984 doneDirFc.getFileStatus(path); 985 existingDoneSubdirs.add(path); 986 } catch (FileNotFoundException fnfE) { 987 try { 988 FsPermission fsp = new FsPermission( 989 JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION); 990 doneDirFc.mkdir(path, fsp, true); 991 FileStatus fsStatus = doneDirFc.getFileStatus(path); 992 LOG.info("Perms after creating " + fsStatus.getPermission().toShort() 993 + ", Expected: " + fsp.toShort()); 994 if (fsStatus.getPermission().toShort() != fsp.toShort()) { 995 LOG.info("Explicitly setting permissions to : " + fsp.toShort() 996 + ", " + fsp); 997 doneDirFc.setPermission(path, fsp); 998 } 999 existingDoneSubdirs.add(path); 1000 } catch (FileAlreadyExistsException faeE) { // Nothing to do. 1001 } 1002 } 1003 } 1004 1005 private Path canonicalHistoryLogPath(JobId id, String timestampComponent) { 1006 return new Path(doneDirPrefixPath, JobHistoryUtils.historyLogSubdirectory( 1007 id, timestampComponent, serialNumberFormat)); 1008 } 1009 1010 private Path canonicalHistoryLogPath(JobId id, long millisecondTime) { 1011 String timestampComponent = JobHistoryUtils 1012 .timestampDirectoryComponent(millisecondTime); 1013 return new Path(doneDirPrefixPath, JobHistoryUtils.historyLogSubdirectory( 1014 id, timestampComponent, serialNumberFormat)); 1015 } 1016 1017 private long getEffectiveTimestamp(long finishTime, FileStatus fileStatus) { 1018 if (finishTime == 0) { 1019 return fileStatus.getModificationTime(); 1020 } 1021 return finishTime; 1022 } 1023 1024 private void deleteJobFromDone(HistoryFileInfo fileInfo) throws IOException { 1025 jobListCache.delete(fileInfo); 1026 fileInfo.delete(); 1027 } 1028 1029 List<FileStatus> getHistoryDirsForCleaning(long cutoff) throws IOException { 1030 return JobHistoryUtils. 1031 getHistoryDirsForCleaning(doneDirFc, doneDirPrefixPath, cutoff); 1032 } 1033 1034 /** 1035 * Clean up older history files. 1036 * 1037 * @throws IOException 1038 * on any error trying to remove the entries. 1039 */ 1040 @SuppressWarnings("unchecked") 1041 void clean() throws IOException { 1042 long cutoff = System.currentTimeMillis() - maxHistoryAge; 1043 boolean halted = false; 1044 List<FileStatus> serialDirList = getHistoryDirsForCleaning(cutoff); 1045 // Sort in ascending order. Relies on YYYY/MM/DD/Serial 1046 Collections.sort(serialDirList); 1047 for (FileStatus serialDir : serialDirList) { 1048 List<FileStatus> historyFileList = scanDirectoryForHistoryFiles( 1049 serialDir.getPath(), doneDirFc); 1050 for (FileStatus historyFile : historyFileList) { 1051 JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(historyFile 1052 .getPath().getName()); 1053 long effectiveTimestamp = getEffectiveTimestamp( 1054 jobIndexInfo.getFinishTime(), historyFile); 1055 if (effectiveTimestamp <= cutoff) { 1056 HistoryFileInfo fileInfo = this.jobListCache.get(jobIndexInfo 1057 .getJobId()); 1058 if (fileInfo == null) { 1059 String confFileName = JobHistoryUtils 1060 .getIntermediateConfFileName(jobIndexInfo.getJobId()); 1061 1062 fileInfo = new HistoryFileInfo(historyFile.getPath(), new Path( 1063 historyFile.getPath().getParent(), confFileName), null, 1064 jobIndexInfo, true); 1065 } 1066 deleteJobFromDone(fileInfo); 1067 } else { 1068 halted = true; 1069 break; 1070 } 1071 } 1072 if (!halted) { 1073 deleteDir(serialDir); 1074 removeDirectoryFromSerialNumberIndex(serialDir.getPath()); 1075 existingDoneSubdirs.remove(serialDir.getPath()); 1076 } else { 1077 break; // Don't scan any more directories. 1078 } 1079 } 1080 } 1081 1082 protected boolean deleteDir(FileStatus serialDir) 1083 throws AccessControlException, FileNotFoundException, 1084 UnsupportedFileSystemException, IOException { 1085 return doneDirFc.delete(doneDirFc.makeQualified(serialDir.getPath()), true); 1086 } 1087 1088 @VisibleForTesting 1089 protected void setMaxHistoryAge(long newValue){ 1090 maxHistoryAge=newValue; 1091 } 1092}