001/** 002* Licensed to the Apache Software Foundation (ASF) under one 003* or more contributor license agreements. See the NOTICE file 004* distributed with this work for additional information 005* regarding copyright ownership. The ASF licenses this file 006* to you under the Apache License, Version 2.0 (the 007* "License"); you may not use this file except in compliance 008* with the License. You may obtain a copy of the License at 009* 010* http://www.apache.org/licenses/LICENSE-2.0 011* 012* Unless required by applicable law or agreed to in writing, software 013* distributed under the License is distributed on an "AS IS" BASIS, 014* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015* See the License for the specific language governing permissions and 016* limitations under the License. 017*/ 018 019package org.apache.hadoop.yarn.logaggregation; 020 021import java.io.DataInput; 022import java.io.DataInputStream; 023import java.io.DataOutput; 024import java.io.DataOutputStream; 025import java.io.EOFException; 026import java.io.File; 027import java.io.FileInputStream; 028import java.io.IOException; 029import java.io.InputStreamReader; 030import java.io.OutputStream; 031import java.io.PrintStream; 032import java.io.Writer; 033import java.security.PrivilegedExceptionAction; 034import java.util.ArrayList; 035import java.util.Arrays; 036import java.util.Collections; 037import java.util.EnumSet; 038import java.util.HashMap; 039import java.util.HashSet; 040import java.util.Iterator; 041import java.util.List; 042import java.util.Map; 043import java.util.Map.Entry; 044import java.util.Set; 045import java.util.regex.Pattern; 046 047import org.apache.commons.io.input.BoundedInputStream; 048import org.apache.commons.io.output.WriterOutputStream; 049import org.apache.commons.logging.Log; 050import org.apache.commons.logging.LogFactory; 051import org.apache.hadoop.classification.InterfaceAudience.Private; 052import org.apache.hadoop.classification.InterfaceAudience.Public; 053import org.apache.hadoop.classification.InterfaceStability.Evolving; 054import org.apache.hadoop.conf.Configuration; 055import org.apache.hadoop.fs.CreateFlag; 056import org.apache.hadoop.fs.FSDataInputStream; 057import org.apache.hadoop.fs.FSDataOutputStream; 058import org.apache.hadoop.fs.FileContext; 059import org.apache.hadoop.fs.Options; 060import org.apache.hadoop.fs.Path; 061import org.apache.hadoop.fs.permission.FsPermission; 062import org.apache.hadoop.io.IOUtils; 063import org.apache.hadoop.io.SecureIOUtils; 064import org.apache.hadoop.io.Writable; 065import org.apache.hadoop.io.file.tfile.TFile; 066import org.apache.hadoop.security.UserGroupInformation; 067import org.apache.hadoop.yarn.api.records.ApplicationAccessType; 068import org.apache.hadoop.yarn.api.records.ContainerId; 069import org.apache.hadoop.yarn.api.records.LogAggregationContext; 070import org.apache.hadoop.yarn.conf.YarnConfiguration; 071import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; 072import org.apache.hadoop.yarn.util.ConverterUtils; 073import org.apache.hadoop.yarn.util.Times; 074 075import com.google.common.annotations.VisibleForTesting; 076import com.google.common.base.Predicate; 077import com.google.common.collect.Iterables; 078import com.google.common.collect.Sets; 079 080@Public 081@Evolving 082public class AggregatedLogFormat { 083 084 private static final Log LOG = LogFactory.getLog(AggregatedLogFormat.class); 085 private static final LogKey APPLICATION_ACL_KEY = new LogKey("APPLICATION_ACL"); 086 private static final LogKey APPLICATION_OWNER_KEY = new LogKey("APPLICATION_OWNER"); 087 private static final LogKey VERSION_KEY = new LogKey("VERSION"); 088 private static final Map<String, LogKey> RESERVED_KEYS; 089 //Maybe write out the retention policy. 090 //Maybe write out a list of containerLogs skipped by the retention policy. 091 private static final int VERSION = 1; 092 093 /** 094 * Umask for the log file. 095 */ 096 private static final FsPermission APP_LOG_FILE_UMASK = FsPermission 097 .createImmutable((short) (0640 ^ 0777)); 098 099 100 static { 101 RESERVED_KEYS = new HashMap<String, AggregatedLogFormat.LogKey>(); 102 RESERVED_KEYS.put(APPLICATION_ACL_KEY.toString(), APPLICATION_ACL_KEY); 103 RESERVED_KEYS.put(APPLICATION_OWNER_KEY.toString(), APPLICATION_OWNER_KEY); 104 RESERVED_KEYS.put(VERSION_KEY.toString(), VERSION_KEY); 105 } 106 107 @Public 108 public static class LogKey implements Writable { 109 110 private String keyString; 111 112 public LogKey() { 113 114 } 115 116 public LogKey(ContainerId containerId) { 117 this.keyString = containerId.toString(); 118 } 119 120 public LogKey(String keyString) { 121 this.keyString = keyString; 122 } 123 124 @Override 125 public int hashCode() { 126 return keyString == null ? 0 : keyString.hashCode(); 127 } 128 129 @Override 130 public boolean equals(Object obj) { 131 if (obj instanceof LogKey) { 132 LogKey other = (LogKey) obj; 133 if (this.keyString == null) { 134 return other.keyString == null; 135 } 136 return this.keyString.equals(other.keyString); 137 } 138 return false; 139 } 140 141 @Private 142 @Override 143 public void write(DataOutput out) throws IOException { 144 out.writeUTF(this.keyString); 145 } 146 147 @Private 148 @Override 149 public void readFields(DataInput in) throws IOException { 150 this.keyString = in.readUTF(); 151 } 152 153 @Override 154 public String toString() { 155 return this.keyString; 156 } 157 } 158 159 @Private 160 public static class LogValue { 161 162 private final List<String> rootLogDirs; 163 private final ContainerId containerId; 164 private final String user; 165 private final LogAggregationContext logAggregationContext; 166 private Set<File> uploadedFiles = new HashSet<File>(); 167 private final Set<String> alreadyUploadedLogFiles; 168 private Set<String> allExistingFileMeta = new HashSet<String>(); 169 private final boolean appFinished; 170 // TODO Maybe add a version string here. Instead of changing the version of 171 // the entire k-v format 172 173 public LogValue(List<String> rootLogDirs, ContainerId containerId, 174 String user) { 175 this(rootLogDirs, containerId, user, null, new HashSet<String>(), true); 176 } 177 178 public LogValue(List<String> rootLogDirs, ContainerId containerId, 179 String user, LogAggregationContext logAggregationContext, 180 Set<String> alreadyUploadedLogFiles, boolean appFinished) { 181 this.rootLogDirs = new ArrayList<String>(rootLogDirs); 182 this.containerId = containerId; 183 this.user = user; 184 185 // Ensure logs are processed in lexical order 186 Collections.sort(this.rootLogDirs); 187 this.logAggregationContext = logAggregationContext; 188 this.alreadyUploadedLogFiles = alreadyUploadedLogFiles; 189 this.appFinished = appFinished; 190 } 191 192 private Set<File> getPendingLogFilesToUploadForThisContainer() { 193 Set<File> pendingUploadFiles = new HashSet<File>(); 194 for (String rootLogDir : this.rootLogDirs) { 195 File appLogDir = 196 new File(rootLogDir, 197 ConverterUtils.toString( 198 this.containerId.getApplicationAttemptId(). 199 getApplicationId()) 200 ); 201 File containerLogDir = 202 new File(appLogDir, ConverterUtils.toString(this.containerId)); 203 204 if (!containerLogDir.isDirectory()) { 205 continue; // ContainerDir may have been deleted by the user. 206 } 207 208 pendingUploadFiles 209 .addAll(getPendingLogFilesToUpload(containerLogDir)); 210 } 211 return pendingUploadFiles; 212 } 213 214 public void write(DataOutputStream out, Set<File> pendingUploadFiles) 215 throws IOException { 216 List<File> fileList = new ArrayList<File>(pendingUploadFiles); 217 Collections.sort(fileList); 218 219 for (File logFile : fileList) { 220 // We only aggregate top level files. 221 // Ignore anything inside sub-folders. 222 if (logFile.isDirectory()) { 223 LOG.warn(logFile.getAbsolutePath() + " is a directory. Ignore it."); 224 continue; 225 } 226 227 FileInputStream in = null; 228 try { 229 in = secureOpenFile(logFile); 230 } catch (IOException e) { 231 logErrorMessage(logFile, e); 232 IOUtils.cleanup(LOG, in); 233 continue; 234 } 235 236 final long fileLength = logFile.length(); 237 // Write the logFile Type 238 out.writeUTF(logFile.getName()); 239 240 // Write the log length as UTF so that it is printable 241 out.writeUTF(String.valueOf(fileLength)); 242 243 // Write the log itself 244 try { 245 byte[] buf = new byte[65535]; 246 int len = 0; 247 long bytesLeft = fileLength; 248 while ((len = in.read(buf)) != -1) { 249 //If buffer contents within fileLength, write 250 if (len < bytesLeft) { 251 out.write(buf, 0, len); 252 bytesLeft-=len; 253 } 254 //else only write contents within fileLength, then exit early 255 else { 256 out.write(buf, 0, (int)bytesLeft); 257 break; 258 } 259 } 260 long newLength = logFile.length(); 261 if(fileLength < newLength) { 262 LOG.warn("Aggregated logs truncated by approximately "+ 263 (newLength-fileLength) +" bytes."); 264 } 265 this.uploadedFiles.add(logFile); 266 } catch (IOException e) { 267 String message = logErrorMessage(logFile, e); 268 out.write(message.getBytes()); 269 } finally { 270 IOUtils.cleanup(LOG, in); 271 } 272 } 273 } 274 275 @VisibleForTesting 276 public FileInputStream secureOpenFile(File logFile) throws IOException { 277 return SecureIOUtils.openForRead(logFile, getUser(), null); 278 } 279 280 private static String logErrorMessage(File logFile, Exception e) { 281 String message = "Error aggregating log file. Log file : " 282 + logFile.getAbsolutePath() + ". " + e.getMessage(); 283 LOG.error(message, e); 284 return message; 285 } 286 287 // Added for testing purpose. 288 public String getUser() { 289 return user; 290 } 291 292 private Set<File> getPendingLogFilesToUpload(File containerLogDir) { 293 Set<File> candidates = 294 new HashSet<File>(Arrays.asList(containerLogDir.listFiles())); 295 for (File logFile : candidates) { 296 this.allExistingFileMeta.add(getLogFileMetaData(logFile)); 297 } 298 299 if (this.logAggregationContext != null && candidates.size() > 0) { 300 filterFiles( 301 this.appFinished ? this.logAggregationContext.getIncludePattern() 302 : this.logAggregationContext.getRolledLogsIncludePattern(), 303 candidates, false); 304 305 filterFiles( 306 this.appFinished ? this.logAggregationContext.getExcludePattern() 307 : this.logAggregationContext.getRolledLogsExcludePattern(), 308 candidates, true); 309 310 Iterable<File> mask = 311 Iterables.filter(candidates, new Predicate<File>() { 312 @Override 313 public boolean apply(File next) { 314 return !alreadyUploadedLogFiles 315 .contains(getLogFileMetaData(next)); 316 } 317 }); 318 candidates = Sets.newHashSet(mask); 319 } 320 return candidates; 321 } 322 323 private void filterFiles(String pattern, Set<File> candidates, 324 boolean exclusion) { 325 if (pattern != null && !pattern.isEmpty()) { 326 Pattern filterPattern = Pattern.compile(pattern); 327 for (Iterator<File> candidatesItr = candidates.iterator(); candidatesItr 328 .hasNext();) { 329 File candidate = candidatesItr.next(); 330 boolean match = filterPattern.matcher(candidate.getName()).find(); 331 if ((!match && !exclusion) || (match && exclusion)) { 332 candidatesItr.remove(); 333 } 334 } 335 } 336 } 337 338 public Set<Path> getCurrentUpLoadedFilesPath() { 339 Set<Path> path = new HashSet<Path>(); 340 for (File file : this.uploadedFiles) { 341 path.add(new Path(file.getAbsolutePath())); 342 } 343 return path; 344 } 345 346 public Set<String> getCurrentUpLoadedFileMeta() { 347 Set<String> info = new HashSet<String>(); 348 for (File file : this.uploadedFiles) { 349 info.add(getLogFileMetaData(file)); 350 } 351 return info; 352 } 353 354 public Set<String> getAllExistingFilesMeta() { 355 return this.allExistingFileMeta; 356 } 357 358 private String getLogFileMetaData(File file) { 359 return containerId.toString() + "_" + file.getName() + "_" 360 + file.lastModified(); 361 } 362 } 363 364 /** 365 * The writer that writes out the aggregated logs. 366 */ 367 @Private 368 public static class LogWriter { 369 370 private final FSDataOutputStream fsDataOStream; 371 private final TFile.Writer writer; 372 private FileContext fc; 373 374 public LogWriter(final Configuration conf, final Path remoteAppLogFile, 375 UserGroupInformation userUgi) throws IOException { 376 try { 377 this.fsDataOStream = 378 userUgi.doAs(new PrivilegedExceptionAction<FSDataOutputStream>() { 379 @Override 380 public FSDataOutputStream run() throws Exception { 381 fc = FileContext.getFileContext(conf); 382 fc.setUMask(APP_LOG_FILE_UMASK); 383 return fc.create( 384 remoteAppLogFile, 385 EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), 386 new Options.CreateOpts[] {}); 387 } 388 }); 389 } catch (InterruptedException e) { 390 throw new IOException(e); 391 } 392 393 // Keys are not sorted: null arg 394 // 256KB minBlockSize : Expected log size for each container too 395 this.writer = 396 new TFile.Writer(this.fsDataOStream, 256 * 1024, conf.get( 397 YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE, 398 YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE), null, conf); 399 //Write the version string 400 writeVersion(); 401 } 402 403 @VisibleForTesting 404 public TFile.Writer getWriter() { 405 return this.writer; 406 } 407 408 private void writeVersion() throws IOException { 409 DataOutputStream out = this.writer.prepareAppendKey(-1); 410 VERSION_KEY.write(out); 411 out.close(); 412 out = this.writer.prepareAppendValue(-1); 413 out.writeInt(VERSION); 414 out.close(); 415 } 416 417 public void writeApplicationOwner(String user) throws IOException { 418 DataOutputStream out = this.writer.prepareAppendKey(-1); 419 APPLICATION_OWNER_KEY.write(out); 420 out.close(); 421 out = this.writer.prepareAppendValue(-1); 422 out.writeUTF(user); 423 out.close(); 424 } 425 426 public void writeApplicationACLs(Map<ApplicationAccessType, String> appAcls) 427 throws IOException { 428 DataOutputStream out = this.writer.prepareAppendKey(-1); 429 APPLICATION_ACL_KEY.write(out); 430 out.close(); 431 out = this.writer.prepareAppendValue(-1); 432 for (Entry<ApplicationAccessType, String> entry : appAcls.entrySet()) { 433 out.writeUTF(entry.getKey().toString()); 434 out.writeUTF(entry.getValue()); 435 } 436 out.close(); 437 } 438 439 public void append(LogKey logKey, LogValue logValue) throws IOException { 440 Set<File> pendingUploadFiles = 441 logValue.getPendingLogFilesToUploadForThisContainer(); 442 if (pendingUploadFiles.size() == 0) { 443 return; 444 } 445 DataOutputStream out = this.writer.prepareAppendKey(-1); 446 logKey.write(out); 447 out.close(); 448 out = this.writer.prepareAppendValue(-1); 449 logValue.write(out, pendingUploadFiles); 450 out.close(); 451 } 452 453 public void close() { 454 try { 455 this.writer.close(); 456 } catch (IOException e) { 457 LOG.warn("Exception closing writer", e); 458 } 459 IOUtils.closeStream(fsDataOStream); 460 } 461 } 462 463 @Public 464 @Evolving 465 public static class LogReader { 466 467 private final FSDataInputStream fsDataIStream; 468 private final TFile.Reader.Scanner scanner; 469 private final TFile.Reader reader; 470 471 public LogReader(Configuration conf, Path remoteAppLogFile) 472 throws IOException { 473 FileContext fileContext = FileContext.getFileContext(conf); 474 this.fsDataIStream = fileContext.open(remoteAppLogFile); 475 reader = 476 new TFile.Reader(this.fsDataIStream, fileContext.getFileStatus( 477 remoteAppLogFile).getLen(), conf); 478 this.scanner = reader.createScanner(); 479 } 480 481 private boolean atBeginning = true; 482 483 /** 484 * Returns the owner of the application. 485 * 486 * @return the application owner. 487 * @throws IOException 488 */ 489 public String getApplicationOwner() throws IOException { 490 TFile.Reader.Scanner ownerScanner = reader.createScanner(); 491 LogKey key = new LogKey(); 492 while (!ownerScanner.atEnd()) { 493 TFile.Reader.Scanner.Entry entry = ownerScanner.entry(); 494 key.readFields(entry.getKeyStream()); 495 if (key.toString().equals(APPLICATION_OWNER_KEY.toString())) { 496 DataInputStream valueStream = entry.getValueStream(); 497 return valueStream.readUTF(); 498 } 499 ownerScanner.advance(); 500 } 501 return null; 502 } 503 504 /** 505 * Returns ACLs for the application. An empty map is returned if no ACLs are 506 * found. 507 * 508 * @return a map of the Application ACLs. 509 * @throws IOException 510 */ 511 public Map<ApplicationAccessType, String> getApplicationAcls() 512 throws IOException { 513 // TODO Seek directly to the key once a comparator is specified. 514 TFile.Reader.Scanner aclScanner = reader.createScanner(); 515 LogKey key = new LogKey(); 516 Map<ApplicationAccessType, String> acls = 517 new HashMap<ApplicationAccessType, String>(); 518 while (!aclScanner.atEnd()) { 519 TFile.Reader.Scanner.Entry entry = aclScanner.entry(); 520 key.readFields(entry.getKeyStream()); 521 if (key.toString().equals(APPLICATION_ACL_KEY.toString())) { 522 DataInputStream valueStream = entry.getValueStream(); 523 while (true) { 524 String appAccessOp = null; 525 String aclString = null; 526 try { 527 appAccessOp = valueStream.readUTF(); 528 } catch (EOFException e) { 529 // Valid end of stream. 530 break; 531 } 532 try { 533 aclString = valueStream.readUTF(); 534 } catch (EOFException e) { 535 throw new YarnRuntimeException("Error reading ACLs", e); 536 } 537 acls.put(ApplicationAccessType.valueOf(appAccessOp), aclString); 538 } 539 540 } 541 aclScanner.advance(); 542 } 543 return acls; 544 } 545 546 /** 547 * Read the next key and return the value-stream. 548 * 549 * @param key 550 * @return the valueStream if there are more keys or null otherwise. 551 * @throws IOException 552 */ 553 public DataInputStream next(LogKey key) throws IOException { 554 if (!this.atBeginning) { 555 this.scanner.advance(); 556 } else { 557 this.atBeginning = false; 558 } 559 if (this.scanner.atEnd()) { 560 return null; 561 } 562 TFile.Reader.Scanner.Entry entry = this.scanner.entry(); 563 key.readFields(entry.getKeyStream()); 564 // Skip META keys 565 if (RESERVED_KEYS.containsKey(key.toString())) { 566 return next(key); 567 } 568 DataInputStream valueStream = entry.getValueStream(); 569 return valueStream; 570 } 571 572 /** 573 * Get a ContainerLogsReader to read the logs for 574 * the specified container. 575 * 576 * @param containerId 577 * @return object to read the container's logs or null if the 578 * logs could not be found 579 * @throws IOException 580 */ 581 @Private 582 public ContainerLogsReader getContainerLogsReader( 583 ContainerId containerId) throws IOException { 584 ContainerLogsReader logReader = null; 585 586 final LogKey containerKey = new LogKey(containerId); 587 LogKey key = new LogKey(); 588 DataInputStream valueStream = next(key); 589 while (valueStream != null && !key.equals(containerKey)) { 590 valueStream = next(key); 591 } 592 593 if (valueStream != null) { 594 logReader = new ContainerLogsReader(valueStream); 595 } 596 597 return logReader; 598 } 599 600 //TODO Change Log format and interfaces to be containerId specific. 601 // Avoid returning completeValueStreams. 602// public List<String> getTypesForContainer(DataInputStream valueStream){} 603// 604// /** 605// * @param valueStream 606// * The Log stream for the container. 607// * @param fileType 608// * the log type required. 609// * @return An InputStreamReader for the required log type or null if the 610// * type is not found. 611// * @throws IOException 612// */ 613// public InputStreamReader getLogStreamForType(DataInputStream valueStream, 614// String fileType) throws IOException { 615// valueStream.reset(); 616// try { 617// while (true) { 618// String ft = valueStream.readUTF(); 619// String fileLengthStr = valueStream.readUTF(); 620// long fileLength = Long.parseLong(fileLengthStr); 621// if (ft.equals(fileType)) { 622// BoundedInputStream bis = 623// new BoundedInputStream(valueStream, fileLength); 624// return new InputStreamReader(bis); 625// } else { 626// long totalSkipped = 0; 627// long currSkipped = 0; 628// while (currSkipped != -1 && totalSkipped < fileLength) { 629// currSkipped = valueStream.skip(fileLength - totalSkipped); 630// totalSkipped += currSkipped; 631// } 632// // TODO Verify skip behaviour. 633// if (currSkipped == -1) { 634// return null; 635// } 636// } 637// } 638// } catch (EOFException e) { 639// return null; 640// } 641// } 642 643 /** 644 * Writes all logs for a single container to the provided writer. 645 * @param valueStream 646 * @param writer 647 * @param logUploadedTime 648 * @throws IOException 649 */ 650 public static void readAcontainerLogs(DataInputStream valueStream, 651 Writer writer, long logUploadedTime) throws IOException { 652 OutputStream os = null; 653 PrintStream ps = null; 654 try { 655 os = new WriterOutputStream(writer); 656 ps = new PrintStream(os); 657 while (true) { 658 try { 659 readContainerLogs(valueStream, ps, logUploadedTime); 660 } catch (EOFException e) { 661 // EndOfFile 662 return; 663 } 664 } 665 } finally { 666 IOUtils.cleanup(LOG, ps); 667 IOUtils.cleanup(LOG, os); 668 } 669 } 670 671 /** 672 * Writes all logs for a single container to the provided writer. 673 * @param valueStream 674 * @param writer 675 * @throws IOException 676 */ 677 public static void readAcontainerLogs(DataInputStream valueStream, 678 Writer writer) throws IOException { 679 readAcontainerLogs(valueStream, writer, -1); 680 } 681 682 private static void readContainerLogs(DataInputStream valueStream, 683 PrintStream out, long logUploadedTime) throws IOException { 684 byte[] buf = new byte[65535]; 685 686 String fileType = valueStream.readUTF(); 687 String fileLengthStr = valueStream.readUTF(); 688 long fileLength = Long.parseLong(fileLengthStr); 689 out.print("LogType:"); 690 out.println(fileType); 691 if (logUploadedTime != -1) { 692 out.print("Log Upload Time:"); 693 out.println(Times.format(logUploadedTime)); 694 } 695 out.print("LogLength:"); 696 out.println(fileLengthStr); 697 out.println("Log Contents:"); 698 699 long curRead = 0; 700 long pendingRead = fileLength - curRead; 701 int toRead = 702 pendingRead > buf.length ? buf.length : (int) pendingRead; 703 int len = valueStream.read(buf, 0, toRead); 704 while (len != -1 && curRead < fileLength) { 705 out.write(buf, 0, len); 706 curRead += len; 707 708 pendingRead = fileLength - curRead; 709 toRead = 710 pendingRead > buf.length ? buf.length : (int) pendingRead; 711 len = valueStream.read(buf, 0, toRead); 712 } 713 out.println(""); 714 } 715 716 /** 717 * Keep calling this till you get a {@link EOFException} for getting logs of 718 * all types for a single container. 719 * 720 * @param valueStream 721 * @param out 722 * @param logUploadedTime 723 * @throws IOException 724 */ 725 public static void readAContainerLogsForALogType( 726 DataInputStream valueStream, PrintStream out, long logUploadedTime) 727 throws IOException { 728 readContainerLogs(valueStream, out, logUploadedTime); 729 } 730 731 /** 732 * Keep calling this till you get a {@link EOFException} for getting logs of 733 * all types for a single container. 734 * 735 * @param valueStream 736 * @param out 737 * @throws IOException 738 */ 739 public static void readAContainerLogsForALogType( 740 DataInputStream valueStream, PrintStream out) 741 throws IOException { 742 readAContainerLogsForALogType(valueStream, out, -1); 743 } 744 745 public void close() { 746 IOUtils.cleanup(LOG, scanner, reader, fsDataIStream); 747 } 748 } 749 750 @Private 751 public static class ContainerLogsReader { 752 private DataInputStream valueStream; 753 private String currentLogType = null; 754 private long currentLogLength = 0; 755 private BoundedInputStream currentLogData = null; 756 private InputStreamReader currentLogISR; 757 758 public ContainerLogsReader(DataInputStream stream) { 759 valueStream = stream; 760 } 761 762 public String nextLog() throws IOException { 763 if (currentLogData != null && currentLogLength > 0) { 764 // seek to the end of the current log, relying on BoundedInputStream 765 // to prevent seeking past the end of the current log 766 do { 767 if (currentLogData.skip(currentLogLength) < 0) { 768 break; 769 } 770 } while (currentLogData.read() != -1); 771 } 772 773 currentLogType = null; 774 currentLogLength = 0; 775 currentLogData = null; 776 currentLogISR = null; 777 778 try { 779 String logType = valueStream.readUTF(); 780 String logLengthStr = valueStream.readUTF(); 781 currentLogLength = Long.parseLong(logLengthStr); 782 currentLogData = 783 new BoundedInputStream(valueStream, currentLogLength); 784 currentLogData.setPropagateClose(false); 785 currentLogISR = new InputStreamReader(currentLogData); 786 currentLogType = logType; 787 } catch (EOFException e) { 788 } 789 790 return currentLogType; 791 } 792 793 public String getCurrentLogType() { 794 return currentLogType; 795 } 796 797 public long getCurrentLogLength() { 798 return currentLogLength; 799 } 800 801 public long skip(long n) throws IOException { 802 return currentLogData.skip(n); 803 } 804 805 public int read() throws IOException { 806 return currentLogData.read(); 807 } 808 809 public int read(byte[] buf, int off, int len) throws IOException { 810 return currentLogData.read(buf, off, len); 811 } 812 813 public int read(char[] buf, int off, int len) throws IOException { 814 return currentLogISR.read(buf, off, len); 815 } 816 } 817}