001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.yarn.logaggregation;
020
021 import java.io.DataInput;
022 import java.io.DataInputStream;
023 import java.io.DataOutput;
024 import java.io.DataOutputStream;
025 import java.io.EOFException;
026 import java.io.File;
027 import java.io.FileInputStream;
028 import java.io.IOException;
029 import java.io.InputStreamReader;
030 import java.io.OutputStream;
031 import java.io.PrintStream;
032 import java.io.Writer;
033 import java.security.PrivilegedExceptionAction;
034 import java.util.ArrayList;
035 import java.util.Arrays;
036 import java.util.Collections;
037 import java.util.EnumSet;
038 import java.util.HashMap;
039 import java.util.HashSet;
040 import java.util.Iterator;
041 import java.util.List;
042 import java.util.Map;
043 import java.util.Map.Entry;
044 import java.util.Set;
045 import java.util.regex.Pattern;
046
047 import org.apache.commons.io.input.BoundedInputStream;
048 import org.apache.commons.io.output.WriterOutputStream;
049 import org.apache.commons.logging.Log;
050 import org.apache.commons.logging.LogFactory;
051 import org.apache.hadoop.classification.InterfaceAudience.Private;
052 import org.apache.hadoop.classification.InterfaceAudience.Public;
053 import org.apache.hadoop.classification.InterfaceStability.Evolving;
054 import org.apache.hadoop.conf.Configuration;
055 import org.apache.hadoop.fs.CreateFlag;
056 import org.apache.hadoop.fs.FSDataInputStream;
057 import org.apache.hadoop.fs.FSDataOutputStream;
058 import org.apache.hadoop.fs.FileContext;
059 import org.apache.hadoop.fs.Options;
060 import org.apache.hadoop.fs.Path;
061 import org.apache.hadoop.fs.permission.FsPermission;
062 import org.apache.hadoop.io.IOUtils;
063 import org.apache.hadoop.io.SecureIOUtils;
064 import org.apache.hadoop.io.Writable;
065 import org.apache.hadoop.io.file.tfile.TFile;
066 import org.apache.hadoop.security.UserGroupInformation;
067 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
068 import org.apache.hadoop.yarn.api.records.ContainerId;
069 import org.apache.hadoop.yarn.api.records.LogAggregationContext;
070 import org.apache.hadoop.yarn.conf.YarnConfiguration;
071 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
072 import org.apache.hadoop.yarn.util.ConverterUtils;
073 import org.apache.hadoop.yarn.util.Times;
074
075 import com.google.common.annotations.VisibleForTesting;
076 import com.google.common.base.Predicate;
077 import com.google.common.collect.Iterables;
078 import com.google.common.collect.Sets;
079
080 @Public
081 @Evolving
082 public class AggregatedLogFormat {
083
084 private static final Log LOG = LogFactory.getLog(AggregatedLogFormat.class);
085 private static final LogKey APPLICATION_ACL_KEY = new LogKey("APPLICATION_ACL");
086 private static final LogKey APPLICATION_OWNER_KEY = new LogKey("APPLICATION_OWNER");
087 private static final LogKey VERSION_KEY = new LogKey("VERSION");
088 private static final Map<String, LogKey> RESERVED_KEYS;
089 //Maybe write out the retention policy.
090 //Maybe write out a list of containerLogs skipped by the retention policy.
091 private static final int VERSION = 1;
092
093 /**
094 * Umask for the log file.
095 */
096 private static final FsPermission APP_LOG_FILE_UMASK = FsPermission
097 .createImmutable((short) (0640 ^ 0777));
098
099
100 static {
101 RESERVED_KEYS = new HashMap<String, AggregatedLogFormat.LogKey>();
102 RESERVED_KEYS.put(APPLICATION_ACL_KEY.toString(), APPLICATION_ACL_KEY);
103 RESERVED_KEYS.put(APPLICATION_OWNER_KEY.toString(), APPLICATION_OWNER_KEY);
104 RESERVED_KEYS.put(VERSION_KEY.toString(), VERSION_KEY);
105 }
106
107 @Public
108 public static class LogKey implements Writable {
109
110 private String keyString;
111
112 public LogKey() {
113
114 }
115
116 public LogKey(ContainerId containerId) {
117 this.keyString = containerId.toString();
118 }
119
120 public LogKey(String keyString) {
121 this.keyString = keyString;
122 }
123
124 @Override
125 public int hashCode() {
126 return keyString == null ? 0 : keyString.hashCode();
127 }
128
129 @Override
130 public boolean equals(Object obj) {
131 if (obj instanceof LogKey) {
132 LogKey other = (LogKey) obj;
133 if (this.keyString == null) {
134 return other.keyString == null;
135 }
136 return this.keyString.equals(other.keyString);
137 }
138 return false;
139 }
140
141 @Private
142 @Override
143 public void write(DataOutput out) throws IOException {
144 out.writeUTF(this.keyString);
145 }
146
147 @Private
148 @Override
149 public void readFields(DataInput in) throws IOException {
150 this.keyString = in.readUTF();
151 }
152
153 @Override
154 public String toString() {
155 return this.keyString;
156 }
157 }
158
159 @Private
160 public static class LogValue {
161
162 private final List<String> rootLogDirs;
163 private final ContainerId containerId;
164 private final String user;
165 private final LogAggregationContext logAggregationContext;
166 private Set<File> uploadedFiles = new HashSet<File>();
167 private final Set<String> alreadyUploadedLogFiles;
168 private Set<String> allExistingFileMeta = new HashSet<String>();
169 // TODO Maybe add a version string here. Instead of changing the version of
170 // the entire k-v format
171
172 public LogValue(List<String> rootLogDirs, ContainerId containerId,
173 String user) {
174 this(rootLogDirs, containerId, user, null, new HashSet<String>());
175 }
176
177 public LogValue(List<String> rootLogDirs, ContainerId containerId,
178 String user, LogAggregationContext logAggregationContext,
179 Set<String> alreadyUploadedLogFiles) {
180 this.rootLogDirs = new ArrayList<String>(rootLogDirs);
181 this.containerId = containerId;
182 this.user = user;
183
184 // Ensure logs are processed in lexical order
185 Collections.sort(this.rootLogDirs);
186 this.logAggregationContext = logAggregationContext;
187 this.alreadyUploadedLogFiles = alreadyUploadedLogFiles;
188 }
189
190 private Set<File> getPendingLogFilesToUploadForThisContainer() {
191 Set<File> pendingUploadFiles = new HashSet<File>();
192 for (String rootLogDir : this.rootLogDirs) {
193 File appLogDir =
194 new File(rootLogDir,
195 ConverterUtils.toString(
196 this.containerId.getApplicationAttemptId().
197 getApplicationId())
198 );
199 File containerLogDir =
200 new File(appLogDir, ConverterUtils.toString(this.containerId));
201
202 if (!containerLogDir.isDirectory()) {
203 continue; // ContainerDir may have been deleted by the user.
204 }
205
206 pendingUploadFiles
207 .addAll(getPendingLogFilesToUpload(containerLogDir));
208 }
209 return pendingUploadFiles;
210 }
211
212 public void write(DataOutputStream out, Set<File> pendingUploadFiles)
213 throws IOException {
214 List<File> fileList = new ArrayList<File>(pendingUploadFiles);
215 Collections.sort(fileList);
216
217 for (File logFile : fileList) {
218 // We only aggregate top level files.
219 // Ignore anything inside sub-folders.
220 if (logFile.isDirectory()) {
221 LOG.warn(logFile.getAbsolutePath() + " is a directory. Ignore it.");
222 continue;
223 }
224
225 FileInputStream in = null;
226 try {
227 in = secureOpenFile(logFile);
228 } catch (IOException e) {
229 logErrorMessage(logFile, e);
230 IOUtils.cleanup(LOG, in);
231 continue;
232 }
233
234 final long fileLength = logFile.length();
235 // Write the logFile Type
236 out.writeUTF(logFile.getName());
237
238 // Write the log length as UTF so that it is printable
239 out.writeUTF(String.valueOf(fileLength));
240
241 // Write the log itself
242 try {
243 byte[] buf = new byte[65535];
244 int len = 0;
245 long bytesLeft = fileLength;
246 while ((len = in.read(buf)) != -1) {
247 //If buffer contents within fileLength, write
248 if (len < bytesLeft) {
249 out.write(buf, 0, len);
250 bytesLeft-=len;
251 }
252 //else only write contents within fileLength, then exit early
253 else {
254 out.write(buf, 0, (int)bytesLeft);
255 break;
256 }
257 }
258 long newLength = logFile.length();
259 if(fileLength < newLength) {
260 LOG.warn("Aggregated logs truncated by approximately "+
261 (newLength-fileLength) +" bytes.");
262 }
263 this.uploadedFiles.add(logFile);
264 } catch (IOException e) {
265 String message = logErrorMessage(logFile, e);
266 out.write(message.getBytes());
267 } finally {
268 IOUtils.cleanup(LOG, in);
269 }
270 }
271 }
272
273 @VisibleForTesting
274 public FileInputStream secureOpenFile(File logFile) throws IOException {
275 return SecureIOUtils.openForRead(logFile, getUser(), null);
276 }
277
278 private static String logErrorMessage(File logFile, Exception e) {
279 String message = "Error aggregating log file. Log file : "
280 + logFile.getAbsolutePath() + ". " + e.getMessage();
281 LOG.error(message, e);
282 return message;
283 }
284
285 // Added for testing purpose.
286 public String getUser() {
287 return user;
288 }
289
290 private Set<File> getPendingLogFilesToUpload(File containerLogDir) {
291 Set<File> candidates =
292 new HashSet<File>(Arrays.asList(containerLogDir.listFiles()));
293 for (File logFile : candidates) {
294 this.allExistingFileMeta.add(getLogFileMetaData(logFile));
295 }
296
297 if (this.logAggregationContext != null && candidates.size() > 0) {
298 if (this.logAggregationContext.getIncludePattern() != null
299 && !this.logAggregationContext.getIncludePattern().isEmpty()) {
300 filterFiles(this.logAggregationContext.getIncludePattern(),
301 candidates, false);
302 }
303
304 if (this.logAggregationContext.getExcludePattern() != null
305 && !this.logAggregationContext.getExcludePattern().isEmpty()) {
306 filterFiles(this.logAggregationContext.getExcludePattern(),
307 candidates, true);
308 }
309
310 Iterable<File> mask =
311 Iterables.filter(candidates, new Predicate<File>() {
312 @Override
313 public boolean apply(File next) {
314 return !alreadyUploadedLogFiles
315 .contains(getLogFileMetaData(next));
316 }
317 });
318 candidates = Sets.newHashSet(mask);
319 }
320 return candidates;
321 }
322
323 private void filterFiles(String pattern, Set<File> candidates,
324 boolean exclusion) {
325 Pattern filterPattern =
326 Pattern.compile(pattern);
327 for (Iterator<File> candidatesItr = candidates.iterator(); candidatesItr
328 .hasNext();) {
329 File candidate = candidatesItr.next();
330 boolean match = filterPattern.matcher(candidate.getName()).find();
331 if ((!match && !exclusion) || (match && exclusion)) {
332 candidatesItr.remove();
333 }
334 }
335 }
336
337 public Set<Path> getCurrentUpLoadedFilesPath() {
338 Set<Path> path = new HashSet<Path>();
339 for (File file : this.uploadedFiles) {
340 path.add(new Path(file.getAbsolutePath()));
341 }
342 return path;
343 }
344
345 public Set<String> getCurrentUpLoadedFileMeta() {
346 Set<String> info = new HashSet<String>();
347 for (File file : this.uploadedFiles) {
348 info.add(getLogFileMetaData(file));
349 }
350 return info;
351 }
352
353 public Set<String> getAllExistingFilesMeta() {
354 return this.allExistingFileMeta;
355 }
356
357 private String getLogFileMetaData(File file) {
358 return containerId.toString() + "_" + file.getName() + "_"
359 + file.lastModified();
360 }
361 }
362
363 /**
364 * The writer that writes out the aggregated logs.
365 */
366 @Private
367 public static class LogWriter {
368
369 private final FSDataOutputStream fsDataOStream;
370 private final TFile.Writer writer;
371 private FileContext fc;
372
373 public LogWriter(final Configuration conf, final Path remoteAppLogFile,
374 UserGroupInformation userUgi) throws IOException {
375 try {
376 this.fsDataOStream =
377 userUgi.doAs(new PrivilegedExceptionAction<FSDataOutputStream>() {
378 @Override
379 public FSDataOutputStream run() throws Exception {
380 fc = FileContext.getFileContext(conf);
381 fc.setUMask(APP_LOG_FILE_UMASK);
382 return fc.create(
383 remoteAppLogFile,
384 EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
385 new Options.CreateOpts[] {});
386 }
387 });
388 } catch (InterruptedException e) {
389 throw new IOException(e);
390 }
391
392 // Keys are not sorted: null arg
393 // 256KB minBlockSize : Expected log size for each container too
394 this.writer =
395 new TFile.Writer(this.fsDataOStream, 256 * 1024, conf.get(
396 YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE,
397 YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE), null, conf);
398 //Write the version string
399 writeVersion();
400 }
401
402 @VisibleForTesting
403 public TFile.Writer getWriter() {
404 return this.writer;
405 }
406
407 private void writeVersion() throws IOException {
408 DataOutputStream out = this.writer.prepareAppendKey(-1);
409 VERSION_KEY.write(out);
410 out.close();
411 out = this.writer.prepareAppendValue(-1);
412 out.writeInt(VERSION);
413 out.close();
414 }
415
416 public void writeApplicationOwner(String user) throws IOException {
417 DataOutputStream out = this.writer.prepareAppendKey(-1);
418 APPLICATION_OWNER_KEY.write(out);
419 out.close();
420 out = this.writer.prepareAppendValue(-1);
421 out.writeUTF(user);
422 out.close();
423 }
424
425 public void writeApplicationACLs(Map<ApplicationAccessType, String> appAcls)
426 throws IOException {
427 DataOutputStream out = this.writer.prepareAppendKey(-1);
428 APPLICATION_ACL_KEY.write(out);
429 out.close();
430 out = this.writer.prepareAppendValue(-1);
431 for (Entry<ApplicationAccessType, String> entry : appAcls.entrySet()) {
432 out.writeUTF(entry.getKey().toString());
433 out.writeUTF(entry.getValue());
434 }
435 out.close();
436 }
437
438 public void append(LogKey logKey, LogValue logValue) throws IOException {
439 Set<File> pendingUploadFiles =
440 logValue.getPendingLogFilesToUploadForThisContainer();
441 if (pendingUploadFiles.size() == 0) {
442 return;
443 }
444 DataOutputStream out = this.writer.prepareAppendKey(-1);
445 logKey.write(out);
446 out.close();
447 out = this.writer.prepareAppendValue(-1);
448 logValue.write(out, pendingUploadFiles);
449 out.close();
450 }
451
452 public void close() {
453 try {
454 this.writer.close();
455 } catch (IOException e) {
456 LOG.warn("Exception closing writer", e);
457 }
458 IOUtils.closeStream(fsDataOStream);
459 }
460 }
461
462 @Public
463 @Evolving
464 public static class LogReader {
465
466 private final FSDataInputStream fsDataIStream;
467 private final TFile.Reader.Scanner scanner;
468 private final TFile.Reader reader;
469
470 public LogReader(Configuration conf, Path remoteAppLogFile)
471 throws IOException {
472 FileContext fileContext = FileContext.getFileContext(conf);
473 this.fsDataIStream = fileContext.open(remoteAppLogFile);
474 reader =
475 new TFile.Reader(this.fsDataIStream, fileContext.getFileStatus(
476 remoteAppLogFile).getLen(), conf);
477 this.scanner = reader.createScanner();
478 }
479
480 private boolean atBeginning = true;
481
482 /**
483 * Returns the owner of the application.
484 *
485 * @return the application owner.
486 * @throws IOException
487 */
488 public String getApplicationOwner() throws IOException {
489 TFile.Reader.Scanner ownerScanner = reader.createScanner();
490 LogKey key = new LogKey();
491 while (!ownerScanner.atEnd()) {
492 TFile.Reader.Scanner.Entry entry = ownerScanner.entry();
493 key.readFields(entry.getKeyStream());
494 if (key.toString().equals(APPLICATION_OWNER_KEY.toString())) {
495 DataInputStream valueStream = entry.getValueStream();
496 return valueStream.readUTF();
497 }
498 ownerScanner.advance();
499 }
500 return null;
501 }
502
503 /**
504 * Returns ACLs for the application. An empty map is returned if no ACLs are
505 * found.
506 *
507 * @return a map of the Application ACLs.
508 * @throws IOException
509 */
510 public Map<ApplicationAccessType, String> getApplicationAcls()
511 throws IOException {
512 // TODO Seek directly to the key once a comparator is specified.
513 TFile.Reader.Scanner aclScanner = reader.createScanner();
514 LogKey key = new LogKey();
515 Map<ApplicationAccessType, String> acls =
516 new HashMap<ApplicationAccessType, String>();
517 while (!aclScanner.atEnd()) {
518 TFile.Reader.Scanner.Entry entry = aclScanner.entry();
519 key.readFields(entry.getKeyStream());
520 if (key.toString().equals(APPLICATION_ACL_KEY.toString())) {
521 DataInputStream valueStream = entry.getValueStream();
522 while (true) {
523 String appAccessOp = null;
524 String aclString = null;
525 try {
526 appAccessOp = valueStream.readUTF();
527 } catch (EOFException e) {
528 // Valid end of stream.
529 break;
530 }
531 try {
532 aclString = valueStream.readUTF();
533 } catch (EOFException e) {
534 throw new YarnRuntimeException("Error reading ACLs", e);
535 }
536 acls.put(ApplicationAccessType.valueOf(appAccessOp), aclString);
537 }
538
539 }
540 aclScanner.advance();
541 }
542 return acls;
543 }
544
545 /**
546 * Read the next key and return the value-stream.
547 *
548 * @param key
549 * @return the valueStream if there are more keys or null otherwise.
550 * @throws IOException
551 */
552 public DataInputStream next(LogKey key) throws IOException {
553 if (!this.atBeginning) {
554 this.scanner.advance();
555 } else {
556 this.atBeginning = false;
557 }
558 if (this.scanner.atEnd()) {
559 return null;
560 }
561 TFile.Reader.Scanner.Entry entry = this.scanner.entry();
562 key.readFields(entry.getKeyStream());
563 // Skip META keys
564 if (RESERVED_KEYS.containsKey(key.toString())) {
565 return next(key);
566 }
567 DataInputStream valueStream = entry.getValueStream();
568 return valueStream;
569 }
570
571 /**
572 * Get a ContainerLogsReader to read the logs for
573 * the specified container.
574 *
575 * @param containerId
576 * @return object to read the container's logs or null if the
577 * logs could not be found
578 * @throws IOException
579 */
580 @Private
581 public ContainerLogsReader getContainerLogsReader(
582 ContainerId containerId) throws IOException {
583 ContainerLogsReader logReader = null;
584
585 final LogKey containerKey = new LogKey(containerId);
586 LogKey key = new LogKey();
587 DataInputStream valueStream = next(key);
588 while (valueStream != null && !key.equals(containerKey)) {
589 valueStream = next(key);
590 }
591
592 if (valueStream != null) {
593 logReader = new ContainerLogsReader(valueStream);
594 }
595
596 return logReader;
597 }
598
599 //TODO Change Log format and interfaces to be containerId specific.
600 // Avoid returning completeValueStreams.
601 // public List<String> getTypesForContainer(DataInputStream valueStream){}
602 //
603 // /**
604 // * @param valueStream
605 // * The Log stream for the container.
606 // * @param fileType
607 // * the log type required.
608 // * @return An InputStreamReader for the required log type or null if the
609 // * type is not found.
610 // * @throws IOException
611 // */
612 // public InputStreamReader getLogStreamForType(DataInputStream valueStream,
613 // String fileType) throws IOException {
614 // valueStream.reset();
615 // try {
616 // while (true) {
617 // String ft = valueStream.readUTF();
618 // String fileLengthStr = valueStream.readUTF();
619 // long fileLength = Long.parseLong(fileLengthStr);
620 // if (ft.equals(fileType)) {
621 // BoundedInputStream bis =
622 // new BoundedInputStream(valueStream, fileLength);
623 // return new InputStreamReader(bis);
624 // } else {
625 // long totalSkipped = 0;
626 // long currSkipped = 0;
627 // while (currSkipped != -1 && totalSkipped < fileLength) {
628 // currSkipped = valueStream.skip(fileLength - totalSkipped);
629 // totalSkipped += currSkipped;
630 // }
631 // // TODO Verify skip behaviour.
632 // if (currSkipped == -1) {
633 // return null;
634 // }
635 // }
636 // }
637 // } catch (EOFException e) {
638 // return null;
639 // }
640 // }
641
642 /**
643 * Writes all logs for a single container to the provided writer.
644 * @param valueStream
645 * @param writer
646 * @param logUploadedTime
647 * @throws IOException
648 */
649 public static void readAcontainerLogs(DataInputStream valueStream,
650 Writer writer, long logUploadedTime) throws IOException {
651 OutputStream os = null;
652 PrintStream ps = null;
653 try {
654 os = new WriterOutputStream(writer);
655 ps = new PrintStream(os);
656 while (true) {
657 try {
658 readContainerLogs(valueStream, ps, logUploadedTime);
659 } catch (EOFException e) {
660 // EndOfFile
661 return;
662 }
663 }
664 } finally {
665 IOUtils.cleanup(LOG, ps);
666 IOUtils.cleanup(LOG, os);
667 }
668 }
669
670 /**
671 * Writes all logs for a single container to the provided writer.
672 * @param valueStream
673 * @param writer
674 * @throws IOException
675 */
676 public static void readAcontainerLogs(DataInputStream valueStream,
677 Writer writer) throws IOException {
678 readAcontainerLogs(valueStream, writer, -1);
679 }
680
681 private static void readContainerLogs(DataInputStream valueStream,
682 PrintStream out, long logUploadedTime) throws IOException {
683 byte[] buf = new byte[65535];
684
685 String fileType = valueStream.readUTF();
686 String fileLengthStr = valueStream.readUTF();
687 long fileLength = Long.parseLong(fileLengthStr);
688 out.print("LogType:");
689 out.println(fileType);
690 if (logUploadedTime != -1) {
691 out.print("Log Upload Time:");
692 out.println(Times.format(logUploadedTime));
693 }
694 out.print("LogLength:");
695 out.println(fileLengthStr);
696 out.println("Log Contents:");
697
698 long curRead = 0;
699 long pendingRead = fileLength - curRead;
700 int toRead =
701 pendingRead > buf.length ? buf.length : (int) pendingRead;
702 int len = valueStream.read(buf, 0, toRead);
703 while (len != -1 && curRead < fileLength) {
704 out.write(buf, 0, len);
705 curRead += len;
706
707 pendingRead = fileLength - curRead;
708 toRead =
709 pendingRead > buf.length ? buf.length : (int) pendingRead;
710 len = valueStream.read(buf, 0, toRead);
711 }
712 out.println("");
713 }
714
715 /**
716 * Keep calling this till you get a {@link EOFException} for getting logs of
717 * all types for a single container.
718 *
719 * @param valueStream
720 * @param out
721 * @param logUploadedTime
722 * @throws IOException
723 */
724 public static void readAContainerLogsForALogType(
725 DataInputStream valueStream, PrintStream out, long logUploadedTime)
726 throws IOException {
727 readContainerLogs(valueStream, out, logUploadedTime);
728 }
729
730 /**
731 * Keep calling this till you get a {@link EOFException} for getting logs of
732 * all types for a single container.
733 *
734 * @param valueStream
735 * @param out
736 * @throws IOException
737 */
738 public static void readAContainerLogsForALogType(
739 DataInputStream valueStream, PrintStream out)
740 throws IOException {
741 readAContainerLogsForALogType(valueStream, out, -1);
742 }
743
744 public void close() {
745 IOUtils.cleanup(LOG, scanner, reader, fsDataIStream);
746 }
747 }
748
749 @Private
750 public static class ContainerLogsReader {
751 private DataInputStream valueStream;
752 private String currentLogType = null;
753 private long currentLogLength = 0;
754 private BoundedInputStream currentLogData = null;
755 private InputStreamReader currentLogISR;
756
757 public ContainerLogsReader(DataInputStream stream) {
758 valueStream = stream;
759 }
760
761 public String nextLog() throws IOException {
762 if (currentLogData != null && currentLogLength > 0) {
763 // seek to the end of the current log, relying on BoundedInputStream
764 // to prevent seeking past the end of the current log
765 do {
766 if (currentLogData.skip(currentLogLength) < 0) {
767 break;
768 }
769 } while (currentLogData.read() != -1);
770 }
771
772 currentLogType = null;
773 currentLogLength = 0;
774 currentLogData = null;
775 currentLogISR = null;
776
777 try {
778 String logType = valueStream.readUTF();
779 String logLengthStr = valueStream.readUTF();
780 currentLogLength = Long.parseLong(logLengthStr);
781 currentLogData =
782 new BoundedInputStream(valueStream, currentLogLength);
783 currentLogData.setPropagateClose(false);
784 currentLogISR = new InputStreamReader(currentLogData);
785 currentLogType = logType;
786 } catch (EOFException e) {
787 }
788
789 return currentLogType;
790 }
791
792 public String getCurrentLogType() {
793 return currentLogType;
794 }
795
796 public long getCurrentLogLength() {
797 return currentLogLength;
798 }
799
800 public long skip(long n) throws IOException {
801 return currentLogData.skip(n);
802 }
803
804 public int read(byte[] buf, int off, int len) throws IOException {
805 return currentLogData.read(buf, off, len);
806 }
807
808 public int read(char[] buf, int off, int len) throws IOException {
809 return currentLogISR.read(buf, off, len);
810 }
811 }
812 }