001/** 002* Licensed to the Apache Software Foundation (ASF) under one 003* or more contributor license agreements. See the NOTICE file 004* distributed with this work for additional information 005* regarding copyright ownership. The ASF licenses this file 006* to you under the Apache License, Version 2.0 (the 007* "License"); you may not use this file except in compliance 008* with the License. You may obtain a copy of the License at 009* 010* http://www.apache.org/licenses/LICENSE-2.0 011* 012* Unless required by applicable law or agreed to in writing, software 013* distributed under the License is distributed on an "AS IS" BASIS, 014* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015* See the License for the specific language governing permissions and 016* limitations under the License. 017*/ 018 019package org.apache.hadoop.yarn.logaggregation; 020 021import java.io.DataInput; 022import java.io.DataInputStream; 023import java.io.DataOutput; 024import java.io.DataOutputStream; 025import java.io.EOFException; 026import java.io.File; 027import java.io.FileInputStream; 028import java.io.IOException; 029import java.io.InputStreamReader; 030import java.io.OutputStream; 031import java.io.PrintStream; 032import java.io.Writer; 033import java.nio.charset.Charset; 034import java.security.PrivilegedExceptionAction; 035import java.util.ArrayList; 036import java.util.Arrays; 037import java.util.Collections; 038import java.util.EnumSet; 039import java.util.HashMap; 040import java.util.HashSet; 041import java.util.Iterator; 042import java.util.List; 043import java.util.Map; 044import java.util.Map.Entry; 045import java.util.Set; 046import java.util.regex.Pattern; 047 048import org.apache.commons.io.input.BoundedInputStream; 049import org.apache.commons.io.output.WriterOutputStream; 050import org.apache.commons.logging.Log; 051import org.apache.commons.logging.LogFactory; 052import org.apache.hadoop.classification.InterfaceAudience.Private; 053import org.apache.hadoop.classification.InterfaceAudience.Public; 054import org.apache.hadoop.classification.InterfaceStability.Evolving; 055import org.apache.hadoop.conf.Configuration; 056import org.apache.hadoop.fs.CreateFlag; 057import org.apache.hadoop.fs.FSDataInputStream; 058import org.apache.hadoop.fs.FSDataOutputStream; 059import org.apache.hadoop.fs.FileContext; 060import org.apache.hadoop.fs.Options; 061import org.apache.hadoop.fs.Path; 062import org.apache.hadoop.fs.permission.FsPermission; 063import org.apache.hadoop.io.IOUtils; 064import org.apache.hadoop.io.SecureIOUtils; 065import org.apache.hadoop.io.Writable; 066import org.apache.hadoop.io.file.tfile.TFile; 067import org.apache.hadoop.security.UserGroupInformation; 068import org.apache.hadoop.yarn.api.records.ApplicationAccessType; 069import org.apache.hadoop.yarn.api.records.ContainerId; 070import org.apache.hadoop.yarn.api.records.LogAggregationContext; 071import org.apache.hadoop.yarn.conf.YarnConfiguration; 072import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; 073import org.apache.hadoop.yarn.util.ConverterUtils; 074import org.apache.hadoop.yarn.util.Times; 075 076import com.google.common.annotations.VisibleForTesting; 077import com.google.common.base.Predicate; 078import com.google.common.collect.Iterables; 079import com.google.common.collect.Sets; 080 081@Public 082@Evolving 083public class AggregatedLogFormat { 084 085 private static final Log LOG = LogFactory.getLog(AggregatedLogFormat.class); 086 private static final LogKey APPLICATION_ACL_KEY = new LogKey("APPLICATION_ACL"); 087 private static final LogKey APPLICATION_OWNER_KEY = new LogKey("APPLICATION_OWNER"); 088 private static final LogKey VERSION_KEY = new LogKey("VERSION"); 089 private static final Map<String, LogKey> RESERVED_KEYS; 090 //Maybe write out the retention policy. 091 //Maybe write out a list of containerLogs skipped by the retention policy. 092 private static final int VERSION = 1; 093 094 /** 095 * Umask for the log file. 096 */ 097 private static final FsPermission APP_LOG_FILE_UMASK = FsPermission 098 .createImmutable((short) (0640 ^ 0777)); 099 100 101 static { 102 RESERVED_KEYS = new HashMap<String, AggregatedLogFormat.LogKey>(); 103 RESERVED_KEYS.put(APPLICATION_ACL_KEY.toString(), APPLICATION_ACL_KEY); 104 RESERVED_KEYS.put(APPLICATION_OWNER_KEY.toString(), APPLICATION_OWNER_KEY); 105 RESERVED_KEYS.put(VERSION_KEY.toString(), VERSION_KEY); 106 } 107 108 @Public 109 public static class LogKey implements Writable { 110 111 private String keyString; 112 113 public LogKey() { 114 115 } 116 117 public LogKey(ContainerId containerId) { 118 this.keyString = containerId.toString(); 119 } 120 121 public LogKey(String keyString) { 122 this.keyString = keyString; 123 } 124 125 @Override 126 public int hashCode() { 127 return keyString == null ? 0 : keyString.hashCode(); 128 } 129 130 @Override 131 public boolean equals(Object obj) { 132 if (obj instanceof LogKey) { 133 LogKey other = (LogKey) obj; 134 if (this.keyString == null) { 135 return other.keyString == null; 136 } 137 return this.keyString.equals(other.keyString); 138 } 139 return false; 140 } 141 142 @Private 143 @Override 144 public void write(DataOutput out) throws IOException { 145 out.writeUTF(this.keyString); 146 } 147 148 @Private 149 @Override 150 public void readFields(DataInput in) throws IOException { 151 this.keyString = in.readUTF(); 152 } 153 154 @Override 155 public String toString() { 156 return this.keyString; 157 } 158 } 159 160 @Private 161 public static class LogValue { 162 163 private final List<String> rootLogDirs; 164 private final ContainerId containerId; 165 private final String user; 166 private final LogAggregationContext logAggregationContext; 167 private Set<File> uploadedFiles = new HashSet<File>(); 168 private final Set<String> alreadyUploadedLogFiles; 169 private Set<String> allExistingFileMeta = new HashSet<String>(); 170 private final boolean appFinished; 171 private final boolean containerFinished; 172 // TODO Maybe add a version string here. Instead of changing the version of 173 // the entire k-v format 174 175 public LogValue(List<String> rootLogDirs, ContainerId containerId, 176 String user) { 177 this(rootLogDirs, containerId, user, null, new HashSet<String>(), true, 178 true); 179 } 180 181 public LogValue(List<String> rootLogDirs, ContainerId containerId, 182 String user, LogAggregationContext logAggregationContext, 183 Set<String> alreadyUploadedLogFiles, boolean appFinished, 184 boolean containerFinished) { 185 this.rootLogDirs = new ArrayList<String>(rootLogDirs); 186 this.containerId = containerId; 187 this.user = user; 188 189 // Ensure logs are processed in lexical order 190 Collections.sort(this.rootLogDirs); 191 this.logAggregationContext = logAggregationContext; 192 this.alreadyUploadedLogFiles = alreadyUploadedLogFiles; 193 this.appFinished = appFinished; 194 this.containerFinished = containerFinished; 195 } 196 197 private Set<File> getPendingLogFilesToUploadForThisContainer() { 198 Set<File> pendingUploadFiles = new HashSet<File>(); 199 for (String rootLogDir : this.rootLogDirs) { 200 File appLogDir = new File(rootLogDir, 201 this.containerId.getApplicationAttemptId(). 202 getApplicationId().toString()); 203 File containerLogDir = 204 new File(appLogDir, this.containerId.toString()); 205 206 if (!containerLogDir.isDirectory()) { 207 continue; // ContainerDir may have been deleted by the user. 208 } 209 210 pendingUploadFiles 211 .addAll(getPendingLogFilesToUpload(containerLogDir)); 212 } 213 return pendingUploadFiles; 214 } 215 216 public void write(DataOutputStream out, Set<File> pendingUploadFiles) 217 throws IOException { 218 List<File> fileList = new ArrayList<File>(pendingUploadFiles); 219 Collections.sort(fileList); 220 221 for (File logFile : fileList) { 222 // We only aggregate top level files. 223 // Ignore anything inside sub-folders. 224 if (logFile.isDirectory()) { 225 LOG.warn(logFile.getAbsolutePath() + " is a directory. Ignore it."); 226 continue; 227 } 228 229 FileInputStream in = null; 230 try { 231 in = secureOpenFile(logFile); 232 } catch (IOException e) { 233 logErrorMessage(logFile, e); 234 IOUtils.cleanup(LOG, in); 235 continue; 236 } 237 238 final long fileLength = logFile.length(); 239 // Write the logFile Type 240 out.writeUTF(logFile.getName()); 241 242 // Write the log length as UTF so that it is printable 243 out.writeUTF(String.valueOf(fileLength)); 244 245 // Write the log itself 246 try { 247 byte[] buf = new byte[65535]; 248 int len = 0; 249 long bytesLeft = fileLength; 250 while ((len = in.read(buf)) != -1) { 251 //If buffer contents within fileLength, write 252 if (len < bytesLeft) { 253 out.write(buf, 0, len); 254 bytesLeft-=len; 255 } 256 //else only write contents within fileLength, then exit early 257 else { 258 out.write(buf, 0, (int)bytesLeft); 259 break; 260 } 261 } 262 long newLength = logFile.length(); 263 if(fileLength < newLength) { 264 LOG.warn("Aggregated logs truncated by approximately "+ 265 (newLength-fileLength) +" bytes."); 266 } 267 this.uploadedFiles.add(logFile); 268 } catch (IOException e) { 269 String message = logErrorMessage(logFile, e); 270 out.write(message.getBytes(Charset.forName("UTF-8"))); 271 } finally { 272 IOUtils.cleanup(LOG, in); 273 } 274 } 275 } 276 277 @VisibleForTesting 278 public FileInputStream secureOpenFile(File logFile) throws IOException { 279 return SecureIOUtils.openForRead(logFile, getUser(), null); 280 } 281 282 private static String logErrorMessage(File logFile, Exception e) { 283 String message = "Error aggregating log file. Log file : " 284 + logFile.getAbsolutePath() + ". " + e.getMessage(); 285 LOG.error(message, e); 286 return message; 287 } 288 289 // Added for testing purpose. 290 public String getUser() { 291 return user; 292 } 293 294 private Set<File> getPendingLogFilesToUpload(File containerLogDir) { 295 Set<File> candidates = 296 new HashSet<File>(Arrays.asList(containerLogDir.listFiles())); 297 for (File logFile : candidates) { 298 this.allExistingFileMeta.add(getLogFileMetaData(logFile)); 299 } 300 301 Set<File> fileCandidates = new HashSet<File>(candidates); 302 if (this.logAggregationContext != null && candidates.size() > 0) { 303 fileCandidates = getFileCandidates(fileCandidates, this.appFinished); 304 if (!this.appFinished && this.containerFinished) { 305 Set<File> addition = new HashSet<File>(candidates); 306 addition = getFileCandidates(addition, true); 307 fileCandidates.addAll(addition); 308 } 309 } 310 return fileCandidates; 311 } 312 313 private Set<File> getFileCandidates(Set<File> candidates, 314 boolean useRegularPattern) { 315 filterFiles( 316 useRegularPattern ? this.logAggregationContext.getIncludePattern() 317 : this.logAggregationContext.getRolledLogsIncludePattern(), 318 candidates, false); 319 320 filterFiles( 321 useRegularPattern ? this.logAggregationContext.getExcludePattern() 322 : this.logAggregationContext.getRolledLogsExcludePattern(), 323 candidates, true); 324 325 Iterable<File> mask = 326 Iterables.filter(candidates, new Predicate<File>() { 327 @Override 328 public boolean apply(File next) { 329 return !alreadyUploadedLogFiles 330 .contains(getLogFileMetaData(next)); 331 } 332 }); 333 return Sets.newHashSet(mask); 334 } 335 336 private void filterFiles(String pattern, Set<File> candidates, 337 boolean exclusion) { 338 if (pattern != null && !pattern.isEmpty()) { 339 Pattern filterPattern = Pattern.compile(pattern); 340 for (Iterator<File> candidatesItr = candidates.iterator(); candidatesItr 341 .hasNext();) { 342 File candidate = candidatesItr.next(); 343 boolean match = filterPattern.matcher(candidate.getName()).find(); 344 if ((!match && !exclusion) || (match && exclusion)) { 345 candidatesItr.remove(); 346 } 347 } 348 } 349 } 350 351 public Set<Path> getCurrentUpLoadedFilesPath() { 352 Set<Path> path = new HashSet<Path>(); 353 for (File file : this.uploadedFiles) { 354 path.add(new Path(file.getAbsolutePath())); 355 } 356 return path; 357 } 358 359 public Set<String> getCurrentUpLoadedFileMeta() { 360 Set<String> info = new HashSet<String>(); 361 for (File file : this.uploadedFiles) { 362 info.add(getLogFileMetaData(file)); 363 } 364 return info; 365 } 366 367 public Set<String> getAllExistingFilesMeta() { 368 return this.allExistingFileMeta; 369 } 370 371 private String getLogFileMetaData(File file) { 372 return containerId.toString() + "_" + file.getName() + "_" 373 + file.lastModified(); 374 } 375 } 376 377 /** 378 * The writer that writes out the aggregated logs. 379 */ 380 @Private 381 public static class LogWriter { 382 383 private final FSDataOutputStream fsDataOStream; 384 private final TFile.Writer writer; 385 private FileContext fc; 386 387 public LogWriter(final Configuration conf, final Path remoteAppLogFile, 388 UserGroupInformation userUgi) throws IOException { 389 try { 390 this.fsDataOStream = 391 userUgi.doAs(new PrivilegedExceptionAction<FSDataOutputStream>() { 392 @Override 393 public FSDataOutputStream run() throws Exception { 394 fc = FileContext.getFileContext(remoteAppLogFile.toUri(), conf); 395 fc.setUMask(APP_LOG_FILE_UMASK); 396 return fc.create( 397 remoteAppLogFile, 398 EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), 399 new Options.CreateOpts[] {}); 400 } 401 }); 402 } catch (InterruptedException e) { 403 throw new IOException(e); 404 } 405 406 // Keys are not sorted: null arg 407 // 256KB minBlockSize : Expected log size for each container too 408 this.writer = 409 new TFile.Writer(this.fsDataOStream, 256 * 1024, conf.get( 410 YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE, 411 YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE), null, conf); 412 //Write the version string 413 writeVersion(); 414 } 415 416 @VisibleForTesting 417 public TFile.Writer getWriter() { 418 return this.writer; 419 } 420 421 private void writeVersion() throws IOException { 422 DataOutputStream out = this.writer.prepareAppendKey(-1); 423 VERSION_KEY.write(out); 424 out.close(); 425 out = this.writer.prepareAppendValue(-1); 426 out.writeInt(VERSION); 427 out.close(); 428 } 429 430 public void writeApplicationOwner(String user) throws IOException { 431 DataOutputStream out = this.writer.prepareAppendKey(-1); 432 APPLICATION_OWNER_KEY.write(out); 433 out.close(); 434 out = this.writer.prepareAppendValue(-1); 435 out.writeUTF(user); 436 out.close(); 437 } 438 439 public void writeApplicationACLs(Map<ApplicationAccessType, String> appAcls) 440 throws IOException { 441 DataOutputStream out = this.writer.prepareAppendKey(-1); 442 APPLICATION_ACL_KEY.write(out); 443 out.close(); 444 out = this.writer.prepareAppendValue(-1); 445 for (Entry<ApplicationAccessType, String> entry : appAcls.entrySet()) { 446 out.writeUTF(entry.getKey().toString()); 447 out.writeUTF(entry.getValue()); 448 } 449 out.close(); 450 } 451 452 public void append(LogKey logKey, LogValue logValue) throws IOException { 453 Set<File> pendingUploadFiles = 454 logValue.getPendingLogFilesToUploadForThisContainer(); 455 if (pendingUploadFiles.size() == 0) { 456 return; 457 } 458 DataOutputStream out = this.writer.prepareAppendKey(-1); 459 logKey.write(out); 460 out.close(); 461 out = this.writer.prepareAppendValue(-1); 462 logValue.write(out, pendingUploadFiles); 463 out.close(); 464 } 465 466 public void close() { 467 try { 468 this.writer.close(); 469 } catch (IOException e) { 470 LOG.warn("Exception closing writer", e); 471 } 472 IOUtils.closeStream(fsDataOStream); 473 } 474 } 475 476 @Public 477 @Evolving 478 public static class LogReader { 479 480 private final FSDataInputStream fsDataIStream; 481 private final TFile.Reader.Scanner scanner; 482 private final TFile.Reader reader; 483 484 public LogReader(Configuration conf, Path remoteAppLogFile) 485 throws IOException { 486 FileContext fileContext = 487 FileContext.getFileContext(remoteAppLogFile.toUri(), conf); 488 this.fsDataIStream = fileContext.open(remoteAppLogFile); 489 reader = 490 new TFile.Reader(this.fsDataIStream, fileContext.getFileStatus( 491 remoteAppLogFile).getLen(), conf); 492 this.scanner = reader.createScanner(); 493 } 494 495 private boolean atBeginning = true; 496 497 /** 498 * Returns the owner of the application. 499 * 500 * @return the application owner. 501 * @throws IOException 502 */ 503 public String getApplicationOwner() throws IOException { 504 TFile.Reader.Scanner ownerScanner = null; 505 try { 506 ownerScanner = reader.createScanner(); 507 LogKey key = new LogKey(); 508 while (!ownerScanner.atEnd()) { 509 TFile.Reader.Scanner.Entry entry = ownerScanner.entry(); 510 key.readFields(entry.getKeyStream()); 511 if (key.toString().equals(APPLICATION_OWNER_KEY.toString())) { 512 DataInputStream valueStream = entry.getValueStream(); 513 return valueStream.readUTF(); 514 } 515 ownerScanner.advance(); 516 } 517 return null; 518 } finally { 519 IOUtils.cleanup(LOG, ownerScanner); 520 } 521 } 522 523 /** 524 * Returns ACLs for the application. An empty map is returned if no ACLs are 525 * found. 526 * 527 * @return a map of the Application ACLs. 528 * @throws IOException 529 */ 530 public Map<ApplicationAccessType, String> getApplicationAcls() 531 throws IOException { 532 // TODO Seek directly to the key once a comparator is specified. 533 TFile.Reader.Scanner aclScanner = null; 534 try { 535 aclScanner = reader.createScanner(); 536 LogKey key = new LogKey(); 537 Map<ApplicationAccessType, String> acls = 538 new HashMap<ApplicationAccessType, String>(); 539 while (!aclScanner.atEnd()) { 540 TFile.Reader.Scanner.Entry entry = aclScanner.entry(); 541 key.readFields(entry.getKeyStream()); 542 if (key.toString().equals(APPLICATION_ACL_KEY.toString())) { 543 DataInputStream valueStream = entry.getValueStream(); 544 while (true) { 545 String appAccessOp = null; 546 String aclString = null; 547 try { 548 appAccessOp = valueStream.readUTF(); 549 } catch (EOFException e) { 550 // Valid end of stream. 551 break; 552 } 553 try { 554 aclString = valueStream.readUTF(); 555 } catch (EOFException e) { 556 throw new YarnRuntimeException("Error reading ACLs", e); 557 } 558 acls.put(ApplicationAccessType.valueOf(appAccessOp), aclString); 559 } 560 } 561 aclScanner.advance(); 562 } 563 return acls; 564 } finally { 565 IOUtils.cleanup(LOG, aclScanner); 566 } 567 } 568 569 /** 570 * Read the next key and return the value-stream. 571 * 572 * @param key 573 * @return the valueStream if there are more keys or null otherwise. 574 * @throws IOException 575 */ 576 public DataInputStream next(LogKey key) throws IOException { 577 if (!this.atBeginning) { 578 this.scanner.advance(); 579 } else { 580 this.atBeginning = false; 581 } 582 if (this.scanner.atEnd()) { 583 return null; 584 } 585 TFile.Reader.Scanner.Entry entry = this.scanner.entry(); 586 key.readFields(entry.getKeyStream()); 587 // Skip META keys 588 if (RESERVED_KEYS.containsKey(key.toString())) { 589 return next(key); 590 } 591 DataInputStream valueStream = entry.getValueStream(); 592 return valueStream; 593 } 594 595 /** 596 * Get a ContainerLogsReader to read the logs for 597 * the specified container. 598 * 599 * @param containerId 600 * @return object to read the container's logs or null if the 601 * logs could not be found 602 * @throws IOException 603 */ 604 @Private 605 public ContainerLogsReader getContainerLogsReader( 606 ContainerId containerId) throws IOException { 607 ContainerLogsReader logReader = null; 608 609 final LogKey containerKey = new LogKey(containerId); 610 LogKey key = new LogKey(); 611 DataInputStream valueStream = next(key); 612 while (valueStream != null && !key.equals(containerKey)) { 613 valueStream = next(key); 614 } 615 616 if (valueStream != null) { 617 logReader = new ContainerLogsReader(valueStream); 618 } 619 620 return logReader; 621 } 622 623 //TODO Change Log format and interfaces to be containerId specific. 624 // Avoid returning completeValueStreams. 625// public List<String> getTypesForContainer(DataInputStream valueStream){} 626// 627// /** 628// * @param valueStream 629// * The Log stream for the container. 630// * @param fileType 631// * the log type required. 632// * @return An InputStreamReader for the required log type or null if the 633// * type is not found. 634// * @throws IOException 635// */ 636// public InputStreamReader getLogStreamForType(DataInputStream valueStream, 637// String fileType) throws IOException { 638// valueStream.reset(); 639// try { 640// while (true) { 641// String ft = valueStream.readUTF(); 642// String fileLengthStr = valueStream.readUTF(); 643// long fileLength = Long.parseLong(fileLengthStr); 644// if (ft.equals(fileType)) { 645// BoundedInputStream bis = 646// new BoundedInputStream(valueStream, fileLength); 647// return new InputStreamReader(bis); 648// } else { 649// long totalSkipped = 0; 650// long currSkipped = 0; 651// while (currSkipped != -1 && totalSkipped < fileLength) { 652// currSkipped = valueStream.skip(fileLength - totalSkipped); 653// totalSkipped += currSkipped; 654// } 655// // TODO Verify skip behaviour. 656// if (currSkipped == -1) { 657// return null; 658// } 659// } 660// } 661// } catch (EOFException e) { 662// return null; 663// } 664// } 665 666 /** 667 * Writes all logs for a single container to the provided writer. 668 * @param valueStream 669 * @param writer 670 * @param logUploadedTime 671 * @throws IOException 672 */ 673 public static void readAcontainerLogs(DataInputStream valueStream, 674 Writer writer, long logUploadedTime) throws IOException { 675 OutputStream os = null; 676 PrintStream ps = null; 677 try { 678 os = new WriterOutputStream(writer, Charset.forName("UTF-8")); 679 ps = new PrintStream(os); 680 while (true) { 681 try { 682 readContainerLogs(valueStream, ps, logUploadedTime); 683 } catch (EOFException e) { 684 // EndOfFile 685 return; 686 } 687 } 688 } finally { 689 IOUtils.cleanup(LOG, ps); 690 IOUtils.cleanup(LOG, os); 691 } 692 } 693 694 /** 695 * Writes all logs for a single container to the provided writer. 696 * @param valueStream 697 * @param writer 698 * @throws IOException 699 */ 700 public static void readAcontainerLogs(DataInputStream valueStream, 701 Writer writer) throws IOException { 702 readAcontainerLogs(valueStream, writer, -1); 703 } 704 705 private static void readContainerLogs(DataInputStream valueStream, 706 PrintStream out, long logUploadedTime) throws IOException { 707 byte[] buf = new byte[65535]; 708 709 String fileType = valueStream.readUTF(); 710 String fileLengthStr = valueStream.readUTF(); 711 long fileLength = Long.parseLong(fileLengthStr); 712 out.print("LogType:"); 713 out.println(fileType); 714 if (logUploadedTime != -1) { 715 out.print("Log Upload Time:"); 716 out.println(Times.format(logUploadedTime)); 717 } 718 out.print("LogLength:"); 719 out.println(fileLengthStr); 720 out.println("Log Contents:"); 721 722 long curRead = 0; 723 long pendingRead = fileLength - curRead; 724 int toRead = 725 pendingRead > buf.length ? buf.length : (int) pendingRead; 726 int len = valueStream.read(buf, 0, toRead); 727 while (len != -1 && curRead < fileLength) { 728 out.write(buf, 0, len); 729 curRead += len; 730 731 pendingRead = fileLength - curRead; 732 toRead = 733 pendingRead > buf.length ? buf.length : (int) pendingRead; 734 len = valueStream.read(buf, 0, toRead); 735 } 736 out.println("End of LogType:" + fileType); 737 out.println(""); 738 } 739 740 /** 741 * Keep calling this till you get a {@link EOFException} for getting logs of 742 * all types for a single container. 743 * 744 * @param valueStream 745 * @param out 746 * @param logUploadedTime 747 * @throws IOException 748 */ 749 public static void readAContainerLogsForALogType( 750 DataInputStream valueStream, PrintStream out, long logUploadedTime) 751 throws IOException { 752 readContainerLogs(valueStream, out, logUploadedTime); 753 } 754 755 /** 756 * Keep calling this till you get a {@link EOFException} for getting logs of 757 * all types for a single container. 758 * 759 * @param valueStream 760 * @param out 761 * @throws IOException 762 */ 763 public static void readAContainerLogsForALogType( 764 DataInputStream valueStream, PrintStream out) 765 throws IOException { 766 readAContainerLogsForALogType(valueStream, out, -1); 767 } 768 769 /** 770 * Keep calling this till you get a {@link EOFException} for getting logs of 771 * the specific types for a single container. 772 * @param valueStream 773 * @param out 774 * @param logUploadedTime 775 * @param logType 776 * @throws IOException 777 */ 778 public static int readContainerLogsForALogType( 779 DataInputStream valueStream, PrintStream out, long logUploadedTime, 780 List<String> logType) throws IOException { 781 byte[] buf = new byte[65535]; 782 783 String fileType = valueStream.readUTF(); 784 String fileLengthStr = valueStream.readUTF(); 785 long fileLength = Long.parseLong(fileLengthStr); 786 if (logType.contains(fileType)) { 787 out.print("LogType:"); 788 out.println(fileType); 789 if (logUploadedTime != -1) { 790 out.print("Log Upload Time:"); 791 out.println(Times.format(logUploadedTime)); 792 } 793 out.print("LogLength:"); 794 out.println(fileLengthStr); 795 out.println("Log Contents:"); 796 797 long curRead = 0; 798 long pendingRead = fileLength - curRead; 799 int toRead = pendingRead > buf.length ? buf.length : (int) pendingRead; 800 int len = valueStream.read(buf, 0, toRead); 801 while (len != -1 && curRead < fileLength) { 802 out.write(buf, 0, len); 803 curRead += len; 804 805 pendingRead = fileLength - curRead; 806 toRead = pendingRead > buf.length ? buf.length : (int) pendingRead; 807 len = valueStream.read(buf, 0, toRead); 808 } 809 out.println("End of LogType:" + fileType); 810 out.println(""); 811 return 0; 812 } else { 813 long totalSkipped = 0; 814 long currSkipped = 0; 815 while (currSkipped != -1 && totalSkipped < fileLength) { 816 currSkipped = valueStream.skip(fileLength - totalSkipped); 817 totalSkipped += currSkipped; 818 } 819 return -1; 820 } 821 } 822 823 public void close() { 824 IOUtils.cleanup(LOG, scanner, reader, fsDataIStream); 825 } 826 } 827 828 @Private 829 public static class ContainerLogsReader { 830 private DataInputStream valueStream; 831 private String currentLogType = null; 832 private long currentLogLength = 0; 833 private BoundedInputStream currentLogData = null; 834 private InputStreamReader currentLogISR; 835 836 public ContainerLogsReader(DataInputStream stream) { 837 valueStream = stream; 838 } 839 840 public String nextLog() throws IOException { 841 if (currentLogData != null && currentLogLength > 0) { 842 // seek to the end of the current log, relying on BoundedInputStream 843 // to prevent seeking past the end of the current log 844 do { 845 if (currentLogData.skip(currentLogLength) < 0) { 846 break; 847 } 848 } while (currentLogData.read() != -1); 849 } 850 851 currentLogType = null; 852 currentLogLength = 0; 853 currentLogData = null; 854 currentLogISR = null; 855 856 try { 857 String logType = valueStream.readUTF(); 858 String logLengthStr = valueStream.readUTF(); 859 currentLogLength = Long.parseLong(logLengthStr); 860 currentLogData = 861 new BoundedInputStream(valueStream, currentLogLength); 862 currentLogData.setPropagateClose(false); 863 currentLogISR = new InputStreamReader(currentLogData, 864 Charset.forName("UTF-8")); 865 currentLogType = logType; 866 } catch (EOFException e) { 867 } 868 869 return currentLogType; 870 } 871 872 public String getCurrentLogType() { 873 return currentLogType; 874 } 875 876 public long getCurrentLogLength() { 877 return currentLogLength; 878 } 879 880 public long skip(long n) throws IOException { 881 return currentLogData.skip(n); 882 } 883 884 public int read() throws IOException { 885 return currentLogData.read(); 886 } 887 888 public int read(byte[] buf, int off, int len) throws IOException { 889 return currentLogData.read(buf, off, len); 890 } 891 892 public int read(char[] buf, int off, int len) throws IOException { 893 return currentLogISR.read(buf, off, len); 894 } 895 } 896}