001/** 002* Licensed to the Apache Software Foundation (ASF) under one 003* or more contributor license agreements. See the NOTICE file 004* distributed with this work for additional information 005* regarding copyright ownership. The ASF licenses this file 006* to you under the Apache License, Version 2.0 (the 007* "License"); you may not use this file except in compliance 008* with the License. You may obtain a copy of the License at 009* 010* http://www.apache.org/licenses/LICENSE-2.0 011* 012* Unless required by applicable law or agreed to in writing, software 013* distributed under the License is distributed on an "AS IS" BASIS, 014* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015* See the License for the specific language governing permissions and 016* limitations under the License. 017*/ 018 019package org.apache.hadoop.yarn.logaggregation; 020 021import java.io.DataInput; 022import java.io.DataInputStream; 023import java.io.DataOutput; 024import java.io.DataOutputStream; 025import java.io.EOFException; 026import java.io.File; 027import java.io.FileInputStream; 028import java.io.IOException; 029import java.io.InputStreamReader; 030import java.io.OutputStream; 031import java.io.PrintStream; 032import java.io.Writer; 033import java.security.PrivilegedExceptionAction; 034import java.util.ArrayList; 035import java.util.Arrays; 036import java.util.Collections; 037import java.util.EnumSet; 038import java.util.HashMap; 039import java.util.HashSet; 040import java.util.Iterator; 041import java.util.List; 042import java.util.Map; 043import java.util.Map.Entry; 044import java.util.Set; 045import java.util.regex.Pattern; 046 047import org.apache.commons.io.input.BoundedInputStream; 048import org.apache.commons.io.output.WriterOutputStream; 049import org.apache.commons.logging.Log; 050import org.apache.commons.logging.LogFactory; 051import org.apache.hadoop.classification.InterfaceAudience.Private; 052import org.apache.hadoop.classification.InterfaceAudience.Public; 053import org.apache.hadoop.classification.InterfaceStability.Evolving; 054import org.apache.hadoop.conf.Configuration; 055import org.apache.hadoop.fs.CreateFlag; 056import org.apache.hadoop.fs.FSDataInputStream; 057import org.apache.hadoop.fs.FSDataOutputStream; 058import org.apache.hadoop.fs.FileContext; 059import org.apache.hadoop.fs.Options; 060import org.apache.hadoop.fs.Path; 061import org.apache.hadoop.fs.permission.FsPermission; 062import org.apache.hadoop.io.IOUtils; 063import org.apache.hadoop.io.SecureIOUtils; 064import org.apache.hadoop.io.Writable; 065import org.apache.hadoop.io.file.tfile.TFile; 066import org.apache.hadoop.security.UserGroupInformation; 067import org.apache.hadoop.yarn.api.records.ApplicationAccessType; 068import org.apache.hadoop.yarn.api.records.ContainerId; 069import org.apache.hadoop.yarn.api.records.LogAggregationContext; 070import org.apache.hadoop.yarn.conf.YarnConfiguration; 071import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; 072import org.apache.hadoop.yarn.util.ConverterUtils; 073import org.apache.hadoop.yarn.util.Times; 074 075import com.google.common.annotations.VisibleForTesting; 076import com.google.common.base.Predicate; 077import com.google.common.collect.Iterables; 078import com.google.common.collect.Sets; 079 080@Public 081@Evolving 082public class AggregatedLogFormat { 083 084 private static final Log LOG = LogFactory.getLog(AggregatedLogFormat.class); 085 private static final LogKey APPLICATION_ACL_KEY = new LogKey("APPLICATION_ACL"); 086 private static final LogKey APPLICATION_OWNER_KEY = new LogKey("APPLICATION_OWNER"); 087 private static final LogKey VERSION_KEY = new LogKey("VERSION"); 088 private static final Map<String, LogKey> RESERVED_KEYS; 089 //Maybe write out the retention policy. 090 //Maybe write out a list of containerLogs skipped by the retention policy. 091 private static final int VERSION = 1; 092 093 /** 094 * Umask for the log file. 095 */ 096 private static final FsPermission APP_LOG_FILE_UMASK = FsPermission 097 .createImmutable((short) (0640 ^ 0777)); 098 099 100 static { 101 RESERVED_KEYS = new HashMap<String, AggregatedLogFormat.LogKey>(); 102 RESERVED_KEYS.put(APPLICATION_ACL_KEY.toString(), APPLICATION_ACL_KEY); 103 RESERVED_KEYS.put(APPLICATION_OWNER_KEY.toString(), APPLICATION_OWNER_KEY); 104 RESERVED_KEYS.put(VERSION_KEY.toString(), VERSION_KEY); 105 } 106 107 @Public 108 public static class LogKey implements Writable { 109 110 private String keyString; 111 112 public LogKey() { 113 114 } 115 116 public LogKey(ContainerId containerId) { 117 this.keyString = containerId.toString(); 118 } 119 120 public LogKey(String keyString) { 121 this.keyString = keyString; 122 } 123 124 @Override 125 public int hashCode() { 126 return keyString == null ? 0 : keyString.hashCode(); 127 } 128 129 @Override 130 public boolean equals(Object obj) { 131 if (obj instanceof LogKey) { 132 LogKey other = (LogKey) obj; 133 if (this.keyString == null) { 134 return other.keyString == null; 135 } 136 return this.keyString.equals(other.keyString); 137 } 138 return false; 139 } 140 141 @Private 142 @Override 143 public void write(DataOutput out) throws IOException { 144 out.writeUTF(this.keyString); 145 } 146 147 @Private 148 @Override 149 public void readFields(DataInput in) throws IOException { 150 this.keyString = in.readUTF(); 151 } 152 153 @Override 154 public String toString() { 155 return this.keyString; 156 } 157 } 158 159 @Private 160 public static class LogValue { 161 162 private final List<String> rootLogDirs; 163 private final ContainerId containerId; 164 private final String user; 165 private final LogAggregationContext logAggregationContext; 166 private Set<File> uploadedFiles = new HashSet<File>(); 167 private final Set<String> alreadyUploadedLogFiles; 168 private Set<String> allExistingFileMeta = new HashSet<String>(); 169 // TODO Maybe add a version string here. Instead of changing the version of 170 // the entire k-v format 171 172 public LogValue(List<String> rootLogDirs, ContainerId containerId, 173 String user) { 174 this(rootLogDirs, containerId, user, null, new HashSet<String>()); 175 } 176 177 public LogValue(List<String> rootLogDirs, ContainerId containerId, 178 String user, LogAggregationContext logAggregationContext, 179 Set<String> alreadyUploadedLogFiles) { 180 this.rootLogDirs = new ArrayList<String>(rootLogDirs); 181 this.containerId = containerId; 182 this.user = user; 183 184 // Ensure logs are processed in lexical order 185 Collections.sort(this.rootLogDirs); 186 this.logAggregationContext = logAggregationContext; 187 this.alreadyUploadedLogFiles = alreadyUploadedLogFiles; 188 } 189 190 private Set<File> getPendingLogFilesToUploadForThisContainer() { 191 Set<File> pendingUploadFiles = new HashSet<File>(); 192 for (String rootLogDir : this.rootLogDirs) { 193 File appLogDir = 194 new File(rootLogDir, 195 ConverterUtils.toString( 196 this.containerId.getApplicationAttemptId(). 197 getApplicationId()) 198 ); 199 File containerLogDir = 200 new File(appLogDir, ConverterUtils.toString(this.containerId)); 201 202 if (!containerLogDir.isDirectory()) { 203 continue; // ContainerDir may have been deleted by the user. 204 } 205 206 pendingUploadFiles 207 .addAll(getPendingLogFilesToUpload(containerLogDir)); 208 } 209 return pendingUploadFiles; 210 } 211 212 public void write(DataOutputStream out, Set<File> pendingUploadFiles) 213 throws IOException { 214 List<File> fileList = new ArrayList<File>(pendingUploadFiles); 215 Collections.sort(fileList); 216 217 for (File logFile : fileList) { 218 // We only aggregate top level files. 219 // Ignore anything inside sub-folders. 220 if (logFile.isDirectory()) { 221 LOG.warn(logFile.getAbsolutePath() + " is a directory. Ignore it."); 222 continue; 223 } 224 225 FileInputStream in = null; 226 try { 227 in = secureOpenFile(logFile); 228 } catch (IOException e) { 229 logErrorMessage(logFile, e); 230 IOUtils.cleanup(LOG, in); 231 continue; 232 } 233 234 final long fileLength = logFile.length(); 235 // Write the logFile Type 236 out.writeUTF(logFile.getName()); 237 238 // Write the log length as UTF so that it is printable 239 out.writeUTF(String.valueOf(fileLength)); 240 241 // Write the log itself 242 try { 243 byte[] buf = new byte[65535]; 244 int len = 0; 245 long bytesLeft = fileLength; 246 while ((len = in.read(buf)) != -1) { 247 //If buffer contents within fileLength, write 248 if (len < bytesLeft) { 249 out.write(buf, 0, len); 250 bytesLeft-=len; 251 } 252 //else only write contents within fileLength, then exit early 253 else { 254 out.write(buf, 0, (int)bytesLeft); 255 break; 256 } 257 } 258 long newLength = logFile.length(); 259 if(fileLength < newLength) { 260 LOG.warn("Aggregated logs truncated by approximately "+ 261 (newLength-fileLength) +" bytes."); 262 } 263 this.uploadedFiles.add(logFile); 264 } catch (IOException e) { 265 String message = logErrorMessage(logFile, e); 266 out.write(message.getBytes()); 267 } finally { 268 IOUtils.cleanup(LOG, in); 269 } 270 } 271 } 272 273 @VisibleForTesting 274 public FileInputStream secureOpenFile(File logFile) throws IOException { 275 return SecureIOUtils.openForRead(logFile, getUser(), null); 276 } 277 278 private static String logErrorMessage(File logFile, Exception e) { 279 String message = "Error aggregating log file. Log file : " 280 + logFile.getAbsolutePath() + ". " + e.getMessage(); 281 LOG.error(message, e); 282 return message; 283 } 284 285 // Added for testing purpose. 286 public String getUser() { 287 return user; 288 } 289 290 private Set<File> getPendingLogFilesToUpload(File containerLogDir) { 291 Set<File> candidates = 292 new HashSet<File>(Arrays.asList(containerLogDir.listFiles())); 293 for (File logFile : candidates) { 294 this.allExistingFileMeta.add(getLogFileMetaData(logFile)); 295 } 296 297 if (this.logAggregationContext != null && candidates.size() > 0) { 298 if (this.logAggregationContext.getIncludePattern() != null 299 && !this.logAggregationContext.getIncludePattern().isEmpty()) { 300 filterFiles(this.logAggregationContext.getIncludePattern(), 301 candidates, false); 302 } 303 304 if (this.logAggregationContext.getExcludePattern() != null 305 && !this.logAggregationContext.getExcludePattern().isEmpty()) { 306 filterFiles(this.logAggregationContext.getExcludePattern(), 307 candidates, true); 308 } 309 310 Iterable<File> mask = 311 Iterables.filter(candidates, new Predicate<File>() { 312 @Override 313 public boolean apply(File next) { 314 return !alreadyUploadedLogFiles 315 .contains(getLogFileMetaData(next)); 316 } 317 }); 318 candidates = Sets.newHashSet(mask); 319 } 320 return candidates; 321 } 322 323 private void filterFiles(String pattern, Set<File> candidates, 324 boolean exclusion) { 325 Pattern filterPattern = 326 Pattern.compile(pattern); 327 for (Iterator<File> candidatesItr = candidates.iterator(); candidatesItr 328 .hasNext();) { 329 File candidate = candidatesItr.next(); 330 boolean match = filterPattern.matcher(candidate.getName()).find(); 331 if ((!match && !exclusion) || (match && exclusion)) { 332 candidatesItr.remove(); 333 } 334 } 335 } 336 337 public Set<Path> getCurrentUpLoadedFilesPath() { 338 Set<Path> path = new HashSet<Path>(); 339 for (File file : this.uploadedFiles) { 340 path.add(new Path(file.getAbsolutePath())); 341 } 342 return path; 343 } 344 345 public Set<String> getCurrentUpLoadedFileMeta() { 346 Set<String> info = new HashSet<String>(); 347 for (File file : this.uploadedFiles) { 348 info.add(getLogFileMetaData(file)); 349 } 350 return info; 351 } 352 353 public Set<String> getAllExistingFilesMeta() { 354 return this.allExistingFileMeta; 355 } 356 357 private String getLogFileMetaData(File file) { 358 return containerId.toString() + "_" + file.getName() + "_" 359 + file.lastModified(); 360 } 361 } 362 363 /** 364 * The writer that writes out the aggregated logs. 365 */ 366 @Private 367 public static class LogWriter { 368 369 private final FSDataOutputStream fsDataOStream; 370 private final TFile.Writer writer; 371 private FileContext fc; 372 373 public LogWriter(final Configuration conf, final Path remoteAppLogFile, 374 UserGroupInformation userUgi) throws IOException { 375 try { 376 this.fsDataOStream = 377 userUgi.doAs(new PrivilegedExceptionAction<FSDataOutputStream>() { 378 @Override 379 public FSDataOutputStream run() throws Exception { 380 fc = FileContext.getFileContext(conf); 381 fc.setUMask(APP_LOG_FILE_UMASK); 382 return fc.create( 383 remoteAppLogFile, 384 EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), 385 new Options.CreateOpts[] {}); 386 } 387 }); 388 } catch (InterruptedException e) { 389 throw new IOException(e); 390 } 391 392 // Keys are not sorted: null arg 393 // 256KB minBlockSize : Expected log size for each container too 394 this.writer = 395 new TFile.Writer(this.fsDataOStream, 256 * 1024, conf.get( 396 YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE, 397 YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE), null, conf); 398 //Write the version string 399 writeVersion(); 400 } 401 402 @VisibleForTesting 403 public TFile.Writer getWriter() { 404 return this.writer; 405 } 406 407 private void writeVersion() throws IOException { 408 DataOutputStream out = this.writer.prepareAppendKey(-1); 409 VERSION_KEY.write(out); 410 out.close(); 411 out = this.writer.prepareAppendValue(-1); 412 out.writeInt(VERSION); 413 out.close(); 414 } 415 416 public void writeApplicationOwner(String user) throws IOException { 417 DataOutputStream out = this.writer.prepareAppendKey(-1); 418 APPLICATION_OWNER_KEY.write(out); 419 out.close(); 420 out = this.writer.prepareAppendValue(-1); 421 out.writeUTF(user); 422 out.close(); 423 } 424 425 public void writeApplicationACLs(Map<ApplicationAccessType, String> appAcls) 426 throws IOException { 427 DataOutputStream out = this.writer.prepareAppendKey(-1); 428 APPLICATION_ACL_KEY.write(out); 429 out.close(); 430 out = this.writer.prepareAppendValue(-1); 431 for (Entry<ApplicationAccessType, String> entry : appAcls.entrySet()) { 432 out.writeUTF(entry.getKey().toString()); 433 out.writeUTF(entry.getValue()); 434 } 435 out.close(); 436 } 437 438 public void append(LogKey logKey, LogValue logValue) throws IOException { 439 Set<File> pendingUploadFiles = 440 logValue.getPendingLogFilesToUploadForThisContainer(); 441 if (pendingUploadFiles.size() == 0) { 442 return; 443 } 444 DataOutputStream out = this.writer.prepareAppendKey(-1); 445 logKey.write(out); 446 out.close(); 447 out = this.writer.prepareAppendValue(-1); 448 logValue.write(out, pendingUploadFiles); 449 out.close(); 450 } 451 452 public void close() { 453 try { 454 this.writer.close(); 455 } catch (IOException e) { 456 LOG.warn("Exception closing writer", e); 457 } 458 IOUtils.closeStream(fsDataOStream); 459 } 460 } 461 462 @Public 463 @Evolving 464 public static class LogReader { 465 466 private final FSDataInputStream fsDataIStream; 467 private final TFile.Reader.Scanner scanner; 468 private final TFile.Reader reader; 469 470 public LogReader(Configuration conf, Path remoteAppLogFile) 471 throws IOException { 472 FileContext fileContext = FileContext.getFileContext(conf); 473 this.fsDataIStream = fileContext.open(remoteAppLogFile); 474 reader = 475 new TFile.Reader(this.fsDataIStream, fileContext.getFileStatus( 476 remoteAppLogFile).getLen(), conf); 477 this.scanner = reader.createScanner(); 478 } 479 480 private boolean atBeginning = true; 481 482 /** 483 * Returns the owner of the application. 484 * 485 * @return the application owner. 486 * @throws IOException 487 */ 488 public String getApplicationOwner() throws IOException { 489 TFile.Reader.Scanner ownerScanner = reader.createScanner(); 490 LogKey key = new LogKey(); 491 while (!ownerScanner.atEnd()) { 492 TFile.Reader.Scanner.Entry entry = ownerScanner.entry(); 493 key.readFields(entry.getKeyStream()); 494 if (key.toString().equals(APPLICATION_OWNER_KEY.toString())) { 495 DataInputStream valueStream = entry.getValueStream(); 496 return valueStream.readUTF(); 497 } 498 ownerScanner.advance(); 499 } 500 return null; 501 } 502 503 /** 504 * Returns ACLs for the application. An empty map is returned if no ACLs are 505 * found. 506 * 507 * @return a map of the Application ACLs. 508 * @throws IOException 509 */ 510 public Map<ApplicationAccessType, String> getApplicationAcls() 511 throws IOException { 512 // TODO Seek directly to the key once a comparator is specified. 513 TFile.Reader.Scanner aclScanner = reader.createScanner(); 514 LogKey key = new LogKey(); 515 Map<ApplicationAccessType, String> acls = 516 new HashMap<ApplicationAccessType, String>(); 517 while (!aclScanner.atEnd()) { 518 TFile.Reader.Scanner.Entry entry = aclScanner.entry(); 519 key.readFields(entry.getKeyStream()); 520 if (key.toString().equals(APPLICATION_ACL_KEY.toString())) { 521 DataInputStream valueStream = entry.getValueStream(); 522 while (true) { 523 String appAccessOp = null; 524 String aclString = null; 525 try { 526 appAccessOp = valueStream.readUTF(); 527 } catch (EOFException e) { 528 // Valid end of stream. 529 break; 530 } 531 try { 532 aclString = valueStream.readUTF(); 533 } catch (EOFException e) { 534 throw new YarnRuntimeException("Error reading ACLs", e); 535 } 536 acls.put(ApplicationAccessType.valueOf(appAccessOp), aclString); 537 } 538 539 } 540 aclScanner.advance(); 541 } 542 return acls; 543 } 544 545 /** 546 * Read the next key and return the value-stream. 547 * 548 * @param key 549 * @return the valueStream if there are more keys or null otherwise. 550 * @throws IOException 551 */ 552 public DataInputStream next(LogKey key) throws IOException { 553 if (!this.atBeginning) { 554 this.scanner.advance(); 555 } else { 556 this.atBeginning = false; 557 } 558 if (this.scanner.atEnd()) { 559 return null; 560 } 561 TFile.Reader.Scanner.Entry entry = this.scanner.entry(); 562 key.readFields(entry.getKeyStream()); 563 // Skip META keys 564 if (RESERVED_KEYS.containsKey(key.toString())) { 565 return next(key); 566 } 567 DataInputStream valueStream = entry.getValueStream(); 568 return valueStream; 569 } 570 571 /** 572 * Get a ContainerLogsReader to read the logs for 573 * the specified container. 574 * 575 * @param containerId 576 * @return object to read the container's logs or null if the 577 * logs could not be found 578 * @throws IOException 579 */ 580 @Private 581 public ContainerLogsReader getContainerLogsReader( 582 ContainerId containerId) throws IOException { 583 ContainerLogsReader logReader = null; 584 585 final LogKey containerKey = new LogKey(containerId); 586 LogKey key = new LogKey(); 587 DataInputStream valueStream = next(key); 588 while (valueStream != null && !key.equals(containerKey)) { 589 valueStream = next(key); 590 } 591 592 if (valueStream != null) { 593 logReader = new ContainerLogsReader(valueStream); 594 } 595 596 return logReader; 597 } 598 599 //TODO Change Log format and interfaces to be containerId specific. 600 // Avoid returning completeValueStreams. 601// public List<String> getTypesForContainer(DataInputStream valueStream){} 602// 603// /** 604// * @param valueStream 605// * The Log stream for the container. 606// * @param fileType 607// * the log type required. 608// * @return An InputStreamReader for the required log type or null if the 609// * type is not found. 610// * @throws IOException 611// */ 612// public InputStreamReader getLogStreamForType(DataInputStream valueStream, 613// String fileType) throws IOException { 614// valueStream.reset(); 615// try { 616// while (true) { 617// String ft = valueStream.readUTF(); 618// String fileLengthStr = valueStream.readUTF(); 619// long fileLength = Long.parseLong(fileLengthStr); 620// if (ft.equals(fileType)) { 621// BoundedInputStream bis = 622// new BoundedInputStream(valueStream, fileLength); 623// return new InputStreamReader(bis); 624// } else { 625// long totalSkipped = 0; 626// long currSkipped = 0; 627// while (currSkipped != -1 && totalSkipped < fileLength) { 628// currSkipped = valueStream.skip(fileLength - totalSkipped); 629// totalSkipped += currSkipped; 630// } 631// // TODO Verify skip behaviour. 632// if (currSkipped == -1) { 633// return null; 634// } 635// } 636// } 637// } catch (EOFException e) { 638// return null; 639// } 640// } 641 642 /** 643 * Writes all logs for a single container to the provided writer. 644 * @param valueStream 645 * @param writer 646 * @param logUploadedTime 647 * @throws IOException 648 */ 649 public static void readAcontainerLogs(DataInputStream valueStream, 650 Writer writer, long logUploadedTime) throws IOException { 651 OutputStream os = null; 652 PrintStream ps = null; 653 try { 654 os = new WriterOutputStream(writer); 655 ps = new PrintStream(os); 656 while (true) { 657 try { 658 readContainerLogs(valueStream, ps, logUploadedTime); 659 } catch (EOFException e) { 660 // EndOfFile 661 return; 662 } 663 } 664 } finally { 665 IOUtils.cleanup(LOG, ps); 666 IOUtils.cleanup(LOG, os); 667 } 668 } 669 670 /** 671 * Writes all logs for a single container to the provided writer. 672 * @param valueStream 673 * @param writer 674 * @throws IOException 675 */ 676 public static void readAcontainerLogs(DataInputStream valueStream, 677 Writer writer) throws IOException { 678 readAcontainerLogs(valueStream, writer, -1); 679 } 680 681 private static void readContainerLogs(DataInputStream valueStream, 682 PrintStream out, long logUploadedTime) throws IOException { 683 byte[] buf = new byte[65535]; 684 685 String fileType = valueStream.readUTF(); 686 String fileLengthStr = valueStream.readUTF(); 687 long fileLength = Long.parseLong(fileLengthStr); 688 out.print("LogType:"); 689 out.println(fileType); 690 if (logUploadedTime != -1) { 691 out.print("Log Upload Time:"); 692 out.println(Times.format(logUploadedTime)); 693 } 694 out.print("LogLength:"); 695 out.println(fileLengthStr); 696 out.println("Log Contents:"); 697 698 long curRead = 0; 699 long pendingRead = fileLength - curRead; 700 int toRead = 701 pendingRead > buf.length ? buf.length : (int) pendingRead; 702 int len = valueStream.read(buf, 0, toRead); 703 while (len != -1 && curRead < fileLength) { 704 out.write(buf, 0, len); 705 curRead += len; 706 707 pendingRead = fileLength - curRead; 708 toRead = 709 pendingRead > buf.length ? buf.length : (int) pendingRead; 710 len = valueStream.read(buf, 0, toRead); 711 } 712 out.println(""); 713 } 714 715 /** 716 * Keep calling this till you get a {@link EOFException} for getting logs of 717 * all types for a single container. 718 * 719 * @param valueStream 720 * @param out 721 * @param logUploadedTime 722 * @throws IOException 723 */ 724 public static void readAContainerLogsForALogType( 725 DataInputStream valueStream, PrintStream out, long logUploadedTime) 726 throws IOException { 727 readContainerLogs(valueStream, out, logUploadedTime); 728 } 729 730 /** 731 * Keep calling this till you get a {@link EOFException} for getting logs of 732 * all types for a single container. 733 * 734 * @param valueStream 735 * @param out 736 * @throws IOException 737 */ 738 public static void readAContainerLogsForALogType( 739 DataInputStream valueStream, PrintStream out) 740 throws IOException { 741 readAContainerLogsForALogType(valueStream, out, -1); 742 } 743 744 public void close() { 745 IOUtils.cleanup(LOG, scanner, reader, fsDataIStream); 746 } 747 } 748 749 @Private 750 public static class ContainerLogsReader { 751 private DataInputStream valueStream; 752 private String currentLogType = null; 753 private long currentLogLength = 0; 754 private BoundedInputStream currentLogData = null; 755 private InputStreamReader currentLogISR; 756 757 public ContainerLogsReader(DataInputStream stream) { 758 valueStream = stream; 759 } 760 761 public String nextLog() throws IOException { 762 if (currentLogData != null && currentLogLength > 0) { 763 // seek to the end of the current log, relying on BoundedInputStream 764 // to prevent seeking past the end of the current log 765 do { 766 if (currentLogData.skip(currentLogLength) < 0) { 767 break; 768 } 769 } while (currentLogData.read() != -1); 770 } 771 772 currentLogType = null; 773 currentLogLength = 0; 774 currentLogData = null; 775 currentLogISR = null; 776 777 try { 778 String logType = valueStream.readUTF(); 779 String logLengthStr = valueStream.readUTF(); 780 currentLogLength = Long.parseLong(logLengthStr); 781 currentLogData = 782 new BoundedInputStream(valueStream, currentLogLength); 783 currentLogData.setPropagateClose(false); 784 currentLogISR = new InputStreamReader(currentLogData); 785 currentLogType = logType; 786 } catch (EOFException e) { 787 } 788 789 return currentLogType; 790 } 791 792 public String getCurrentLogType() { 793 return currentLogType; 794 } 795 796 public long getCurrentLogLength() { 797 return currentLogLength; 798 } 799 800 public long skip(long n) throws IOException { 801 return currentLogData.skip(n); 802 } 803 804 public int read() throws IOException { 805 return currentLogData.read(); 806 } 807 808 public int read(byte[] buf, int off, int len) throws IOException { 809 return currentLogData.read(buf, off, len); 810 } 811 812 public int read(char[] buf, int off, int len) throws IOException { 813 return currentLogISR.read(buf, off, len); 814 } 815 } 816}