001/**
002* Licensed to the Apache Software Foundation (ASF) under one
003* or more contributor license agreements.  See the NOTICE file
004* distributed with this work for additional information
005* regarding copyright ownership.  The ASF licenses this file
006* to you under the Apache License, Version 2.0 (the
007* "License"); you may not use this file except in compliance
008* with the License.  You may obtain a copy of the License at
009*
010*     http://www.apache.org/licenses/LICENSE-2.0
011*
012* Unless required by applicable law or agreed to in writing, software
013* distributed under the License is distributed on an "AS IS" BASIS,
014* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015* See the License for the specific language governing permissions and
016* limitations under the License.
017*/
018
019package org.apache.hadoop.yarn.logaggregation;
020
021import java.io.DataInput;
022import java.io.DataInputStream;
023import java.io.DataOutput;
024import java.io.DataOutputStream;
025import java.io.EOFException;
026import java.io.File;
027import java.io.FileInputStream;
028import java.io.IOException;
029import java.io.InputStreamReader;
030import java.io.OutputStream;
031import java.io.PrintStream;
032import java.io.Writer;
033import java.security.PrivilegedExceptionAction;
034import java.util.ArrayList;
035import java.util.Arrays;
036import java.util.Collections;
037import java.util.EnumSet;
038import java.util.HashMap;
039import java.util.HashSet;
040import java.util.Iterator;
041import java.util.List;
042import java.util.Map;
043import java.util.Map.Entry;
044import java.util.Set;
045import java.util.regex.Pattern;
046
047import org.apache.commons.io.input.BoundedInputStream;
048import org.apache.commons.io.output.WriterOutputStream;
049import org.apache.commons.logging.Log;
050import org.apache.commons.logging.LogFactory;
051import org.apache.hadoop.classification.InterfaceAudience.Private;
052import org.apache.hadoop.classification.InterfaceAudience.Public;
053import org.apache.hadoop.classification.InterfaceStability.Evolving;
054import org.apache.hadoop.conf.Configuration;
055import org.apache.hadoop.fs.CreateFlag;
056import org.apache.hadoop.fs.FSDataInputStream;
057import org.apache.hadoop.fs.FSDataOutputStream;
058import org.apache.hadoop.fs.FileContext;
059import org.apache.hadoop.fs.Options;
060import org.apache.hadoop.fs.Path;
061import org.apache.hadoop.fs.permission.FsPermission;
062import org.apache.hadoop.io.IOUtils;
063import org.apache.hadoop.io.SecureIOUtils;
064import org.apache.hadoop.io.Writable;
065import org.apache.hadoop.io.file.tfile.TFile;
066import org.apache.hadoop.security.UserGroupInformation;
067import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
068import org.apache.hadoop.yarn.api.records.ContainerId;
069import org.apache.hadoop.yarn.api.records.LogAggregationContext;
070import org.apache.hadoop.yarn.conf.YarnConfiguration;
071import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
072import org.apache.hadoop.yarn.util.ConverterUtils;
073import org.apache.hadoop.yarn.util.Times;
074
075import com.google.common.annotations.VisibleForTesting;
076import com.google.common.base.Predicate;
077import com.google.common.collect.Iterables;
078import com.google.common.collect.Sets;
079
080@Public
081@Evolving
082public class AggregatedLogFormat {
083
084  private static final Log LOG = LogFactory.getLog(AggregatedLogFormat.class);
085  private static final LogKey APPLICATION_ACL_KEY = new LogKey("APPLICATION_ACL");
086  private static final LogKey APPLICATION_OWNER_KEY = new LogKey("APPLICATION_OWNER");
087  private static final LogKey VERSION_KEY = new LogKey("VERSION");
088  private static final Map<String, LogKey> RESERVED_KEYS;
089  //Maybe write out the retention policy.
090  //Maybe write out a list of containerLogs skipped by the retention policy.
091  private static final int VERSION = 1;
092
093  /**
094   * Umask for the log file.
095   */
096  private static final FsPermission APP_LOG_FILE_UMASK = FsPermission
097      .createImmutable((short) (0640 ^ 0777));
098
099
100  static {
101    RESERVED_KEYS = new HashMap<String, AggregatedLogFormat.LogKey>();
102    RESERVED_KEYS.put(APPLICATION_ACL_KEY.toString(), APPLICATION_ACL_KEY);
103    RESERVED_KEYS.put(APPLICATION_OWNER_KEY.toString(), APPLICATION_OWNER_KEY);
104    RESERVED_KEYS.put(VERSION_KEY.toString(), VERSION_KEY);
105  }
106
107  @Public
108  public static class LogKey implements Writable {
109
110    private String keyString;
111
112    public LogKey() {
113
114    }
115
116    public LogKey(ContainerId containerId) {
117      this.keyString = containerId.toString();
118    }
119
120    public LogKey(String keyString) {
121      this.keyString = keyString;
122    }
123    
124    @Override
125    public int hashCode() {
126      return keyString == null ? 0 : keyString.hashCode();
127    }
128
129    @Override
130    public boolean equals(Object obj) {
131      if (obj instanceof LogKey) {
132        LogKey other = (LogKey) obj;
133        if (this.keyString == null) {
134          return other.keyString == null;
135        }
136        return this.keyString.equals(other.keyString);
137      }
138      return false;
139    }
140
141    @Private
142    @Override
143    public void write(DataOutput out) throws IOException {
144      out.writeUTF(this.keyString);
145    }
146
147    @Private
148    @Override
149    public void readFields(DataInput in) throws IOException {
150      this.keyString = in.readUTF();
151    }
152
153    @Override
154    public String toString() {
155      return this.keyString;
156    }
157  }
158
159  @Private
160  public static class LogValue {
161
162    private final List<String> rootLogDirs;
163    private final ContainerId containerId;
164    private final String user;
165    private final LogAggregationContext logAggregationContext;
166    private Set<File> uploadedFiles = new HashSet<File>();
167    private final Set<String> alreadyUploadedLogFiles;
168    private Set<String> allExistingFileMeta = new HashSet<String>();
169    private final boolean appFinished;
170    // TODO Maybe add a version string here. Instead of changing the version of
171    // the entire k-v format
172
173    public LogValue(List<String> rootLogDirs, ContainerId containerId,
174        String user) {
175      this(rootLogDirs, containerId, user, null, new HashSet<String>(), true);
176    }
177
178    public LogValue(List<String> rootLogDirs, ContainerId containerId,
179        String user, LogAggregationContext logAggregationContext,
180        Set<String> alreadyUploadedLogFiles, boolean appFinished) {
181      this.rootLogDirs = new ArrayList<String>(rootLogDirs);
182      this.containerId = containerId;
183      this.user = user;
184
185      // Ensure logs are processed in lexical order
186      Collections.sort(this.rootLogDirs);
187      this.logAggregationContext = logAggregationContext;
188      this.alreadyUploadedLogFiles = alreadyUploadedLogFiles;
189      this.appFinished = appFinished;
190    }
191
192    private Set<File> getPendingLogFilesToUploadForThisContainer() {
193      Set<File> pendingUploadFiles = new HashSet<File>();
194      for (String rootLogDir : this.rootLogDirs) {
195        File appLogDir =
196            new File(rootLogDir, 
197                ConverterUtils.toString(
198                    this.containerId.getApplicationAttemptId().
199                        getApplicationId())
200                );
201        File containerLogDir =
202            new File(appLogDir, ConverterUtils.toString(this.containerId));
203
204        if (!containerLogDir.isDirectory()) {
205          continue; // ContainerDir may have been deleted by the user.
206        }
207
208        pendingUploadFiles
209          .addAll(getPendingLogFilesToUpload(containerLogDir));
210      }
211      return pendingUploadFiles;
212    }
213
214    public void write(DataOutputStream out, Set<File> pendingUploadFiles)
215        throws IOException {
216      List<File> fileList = new ArrayList<File>(pendingUploadFiles);
217      Collections.sort(fileList);
218
219      for (File logFile : fileList) {
220        // We only aggregate top level files.
221        // Ignore anything inside sub-folders.
222        if (logFile.isDirectory()) {
223          LOG.warn(logFile.getAbsolutePath() + " is a directory. Ignore it.");
224          continue;
225        }
226
227        FileInputStream in = null;
228        try {
229          in = secureOpenFile(logFile);
230        } catch (IOException e) {
231          logErrorMessage(logFile, e);
232          IOUtils.cleanup(LOG, in);
233          continue;
234        }
235
236        final long fileLength = logFile.length();
237        // Write the logFile Type
238        out.writeUTF(logFile.getName());
239
240        // Write the log length as UTF so that it is printable
241        out.writeUTF(String.valueOf(fileLength));
242
243        // Write the log itself
244        try {
245          byte[] buf = new byte[65535];
246          int len = 0;
247          long bytesLeft = fileLength;
248          while ((len = in.read(buf)) != -1) {
249            //If buffer contents within fileLength, write
250            if (len < bytesLeft) {
251              out.write(buf, 0, len);
252              bytesLeft-=len;
253            }
254            //else only write contents within fileLength, then exit early
255            else {
256              out.write(buf, 0, (int)bytesLeft);
257              break;
258            }
259          }
260          long newLength = logFile.length();
261          if(fileLength < newLength) {
262            LOG.warn("Aggregated logs truncated by approximately "+
263                (newLength-fileLength) +" bytes.");
264          }
265          this.uploadedFiles.add(logFile);
266        } catch (IOException e) {
267          String message = logErrorMessage(logFile, e);
268          out.write(message.getBytes());
269        } finally {
270          IOUtils.cleanup(LOG, in);
271        }
272      }
273    }
274
275    @VisibleForTesting
276    public FileInputStream secureOpenFile(File logFile) throws IOException {
277      return SecureIOUtils.openForRead(logFile, getUser(), null);
278    }
279
280    private static String logErrorMessage(File logFile, Exception e) {
281      String message = "Error aggregating log file. Log file : "
282          + logFile.getAbsolutePath() + ". " + e.getMessage();
283      LOG.error(message, e);
284      return message;
285    }
286
287    // Added for testing purpose.
288    public String getUser() {
289      return user;
290    }
291
292    private Set<File> getPendingLogFilesToUpload(File containerLogDir) {
293      Set<File> candidates =
294          new HashSet<File>(Arrays.asList(containerLogDir.listFiles()));
295      for (File logFile : candidates) {
296        this.allExistingFileMeta.add(getLogFileMetaData(logFile));
297      }
298
299      if (this.logAggregationContext != null && candidates.size() > 0) {
300        filterFiles(
301          this.appFinished ? this.logAggregationContext.getIncludePattern()
302              : this.logAggregationContext.getRolledLogsIncludePattern(),
303          candidates, false);
304
305        filterFiles(
306          this.appFinished ? this.logAggregationContext.getExcludePattern()
307              : this.logAggregationContext.getRolledLogsExcludePattern(),
308          candidates, true);
309
310        Iterable<File> mask =
311            Iterables.filter(candidates, new Predicate<File>() {
312              @Override
313              public boolean apply(File next) {
314                return !alreadyUploadedLogFiles
315                  .contains(getLogFileMetaData(next));
316              }
317            });
318        candidates = Sets.newHashSet(mask);
319      }
320      return candidates;
321    }
322
323    private void filterFiles(String pattern, Set<File> candidates,
324        boolean exclusion) {
325      if (pattern != null && !pattern.isEmpty()) {
326        Pattern filterPattern = Pattern.compile(pattern);
327        for (Iterator<File> candidatesItr = candidates.iterator(); candidatesItr
328          .hasNext();) {
329          File candidate = candidatesItr.next();
330          boolean match = filterPattern.matcher(candidate.getName()).find();
331          if ((!match && !exclusion) || (match && exclusion)) {
332            candidatesItr.remove();
333          }
334        }
335      }
336    }
337
338    public Set<Path> getCurrentUpLoadedFilesPath() {
339      Set<Path> path = new HashSet<Path>();
340      for (File file : this.uploadedFiles) {
341        path.add(new Path(file.getAbsolutePath()));
342      }
343      return path;
344    }
345
346    public Set<String> getCurrentUpLoadedFileMeta() {
347      Set<String> info = new HashSet<String>();
348      for (File file : this.uploadedFiles) {
349        info.add(getLogFileMetaData(file));
350      }
351      return info;
352    }
353
354    public Set<String> getAllExistingFilesMeta() {
355      return this.allExistingFileMeta;
356    }
357
358    private String getLogFileMetaData(File file) {
359      return containerId.toString() + "_" + file.getName() + "_"
360          + file.lastModified();
361    }
362  }
363
364  /**
365   * The writer that writes out the aggregated logs.
366   */
367  @Private
368  public static class LogWriter {
369
370    private final FSDataOutputStream fsDataOStream;
371    private final TFile.Writer writer;
372    private FileContext fc;
373
374    public LogWriter(final Configuration conf, final Path remoteAppLogFile,
375        UserGroupInformation userUgi) throws IOException {
376      try {
377        this.fsDataOStream =
378            userUgi.doAs(new PrivilegedExceptionAction<FSDataOutputStream>() {
379              @Override
380              public FSDataOutputStream run() throws Exception {
381                fc = FileContext.getFileContext(conf);
382                fc.setUMask(APP_LOG_FILE_UMASK);
383                return fc.create(
384                    remoteAppLogFile,
385                    EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
386                    new Options.CreateOpts[] {});
387              }
388            });
389      } catch (InterruptedException e) {
390        throw new IOException(e);
391      }
392
393      // Keys are not sorted: null arg
394      // 256KB minBlockSize : Expected log size for each container too
395      this.writer =
396          new TFile.Writer(this.fsDataOStream, 256 * 1024, conf.get(
397              YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE,
398              YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE), null, conf);
399      //Write the version string
400      writeVersion();
401    }
402
403    @VisibleForTesting
404    public TFile.Writer getWriter() {
405      return this.writer;
406    }
407
408    private void writeVersion() throws IOException {
409      DataOutputStream out = this.writer.prepareAppendKey(-1);
410      VERSION_KEY.write(out);
411      out.close();
412      out = this.writer.prepareAppendValue(-1);
413      out.writeInt(VERSION);
414      out.close();
415    }
416
417    public void writeApplicationOwner(String user) throws IOException {
418      DataOutputStream out = this.writer.prepareAppendKey(-1);
419      APPLICATION_OWNER_KEY.write(out);
420      out.close();
421      out = this.writer.prepareAppendValue(-1);
422      out.writeUTF(user);
423      out.close();
424    }
425
426    public void writeApplicationACLs(Map<ApplicationAccessType, String> appAcls)
427        throws IOException {
428      DataOutputStream out = this.writer.prepareAppendKey(-1);
429      APPLICATION_ACL_KEY.write(out);
430      out.close();
431      out = this.writer.prepareAppendValue(-1);
432      for (Entry<ApplicationAccessType, String> entry : appAcls.entrySet()) {
433        out.writeUTF(entry.getKey().toString());
434        out.writeUTF(entry.getValue());
435      }
436      out.close();
437    }
438
439    public void append(LogKey logKey, LogValue logValue) throws IOException {
440      Set<File> pendingUploadFiles =
441          logValue.getPendingLogFilesToUploadForThisContainer();
442      if (pendingUploadFiles.size() == 0) {
443        return;
444      }
445      DataOutputStream out = this.writer.prepareAppendKey(-1);
446      logKey.write(out);
447      out.close();
448      out = this.writer.prepareAppendValue(-1);
449      logValue.write(out, pendingUploadFiles);
450      out.close();
451    }
452
453    public void close() {
454      try {
455        this.writer.close();
456      } catch (IOException e) {
457        LOG.warn("Exception closing writer", e);
458      }
459      IOUtils.closeStream(fsDataOStream);
460    }
461  }
462
463  @Public
464  @Evolving
465  public static class LogReader {
466
467    private final FSDataInputStream fsDataIStream;
468    private final TFile.Reader.Scanner scanner;
469    private final TFile.Reader reader;
470
471    public LogReader(Configuration conf, Path remoteAppLogFile)
472        throws IOException {
473      FileContext fileContext = FileContext.getFileContext(conf);
474      this.fsDataIStream = fileContext.open(remoteAppLogFile);
475      reader =
476          new TFile.Reader(this.fsDataIStream, fileContext.getFileStatus(
477              remoteAppLogFile).getLen(), conf);
478      this.scanner = reader.createScanner();
479    }
480
481    private boolean atBeginning = true;
482
483    /**
484     * Returns the owner of the application.
485     * 
486     * @return the application owner.
487     * @throws IOException
488     */
489    public String getApplicationOwner() throws IOException {
490      TFile.Reader.Scanner ownerScanner = reader.createScanner();
491      LogKey key = new LogKey();
492      while (!ownerScanner.atEnd()) {
493        TFile.Reader.Scanner.Entry entry = ownerScanner.entry();
494        key.readFields(entry.getKeyStream());
495        if (key.toString().equals(APPLICATION_OWNER_KEY.toString())) {
496          DataInputStream valueStream = entry.getValueStream();
497          return valueStream.readUTF();
498        }
499        ownerScanner.advance();
500      }
501      return null;
502    }
503
504    /**
505     * Returns ACLs for the application. An empty map is returned if no ACLs are
506     * found.
507     * 
508     * @return a map of the Application ACLs.
509     * @throws IOException
510     */
511    public Map<ApplicationAccessType, String> getApplicationAcls()
512        throws IOException {
513      // TODO Seek directly to the key once a comparator is specified.
514      TFile.Reader.Scanner aclScanner = reader.createScanner();
515      LogKey key = new LogKey();
516      Map<ApplicationAccessType, String> acls =
517          new HashMap<ApplicationAccessType, String>();
518      while (!aclScanner.atEnd()) {
519        TFile.Reader.Scanner.Entry entry = aclScanner.entry();
520        key.readFields(entry.getKeyStream());
521        if (key.toString().equals(APPLICATION_ACL_KEY.toString())) {
522          DataInputStream valueStream = entry.getValueStream();
523          while (true) {
524            String appAccessOp = null;
525            String aclString = null;
526            try {
527              appAccessOp = valueStream.readUTF();
528            } catch (EOFException e) {
529              // Valid end of stream.
530              break;
531            }
532            try {
533              aclString = valueStream.readUTF();
534            } catch (EOFException e) {
535              throw new YarnRuntimeException("Error reading ACLs", e);
536            }
537            acls.put(ApplicationAccessType.valueOf(appAccessOp), aclString);
538          }
539
540        }
541        aclScanner.advance();
542      }
543      return acls;
544    }
545    
546    /**
547     * Read the next key and return the value-stream.
548     * 
549     * @param key
550     * @return the valueStream if there are more keys or null otherwise.
551     * @throws IOException
552     */
553    public DataInputStream next(LogKey key) throws IOException {
554      if (!this.atBeginning) {
555        this.scanner.advance();
556      } else {
557        this.atBeginning = false;
558      }
559      if (this.scanner.atEnd()) {
560        return null;
561      }
562      TFile.Reader.Scanner.Entry entry = this.scanner.entry();
563      key.readFields(entry.getKeyStream());
564      // Skip META keys
565      if (RESERVED_KEYS.containsKey(key.toString())) {
566        return next(key);
567      }
568      DataInputStream valueStream = entry.getValueStream();
569      return valueStream;
570    }
571
572    /**
573     * Get a ContainerLogsReader to read the logs for
574     * the specified container.
575     *
576     * @param containerId
577     * @return object to read the container's logs or null if the
578     *         logs could not be found
579     * @throws IOException
580     */
581    @Private
582    public ContainerLogsReader getContainerLogsReader(
583        ContainerId containerId) throws IOException {
584      ContainerLogsReader logReader = null;
585
586      final LogKey containerKey = new LogKey(containerId);
587      LogKey key = new LogKey();
588      DataInputStream valueStream = next(key);
589      while (valueStream != null && !key.equals(containerKey)) {
590        valueStream = next(key);
591      }
592
593      if (valueStream != null) {
594        logReader = new ContainerLogsReader(valueStream);
595      }
596
597      return logReader;
598    }
599
600    //TODO  Change Log format and interfaces to be containerId specific.
601    // Avoid returning completeValueStreams.
602//    public List<String> getTypesForContainer(DataInputStream valueStream){}
603//    
604//    /**
605//     * @param valueStream
606//     *          The Log stream for the container.
607//     * @param fileType
608//     *          the log type required.
609//     * @return An InputStreamReader for the required log type or null if the
610//     *         type is not found.
611//     * @throws IOException
612//     */
613//    public InputStreamReader getLogStreamForType(DataInputStream valueStream,
614//        String fileType) throws IOException {
615//      valueStream.reset();
616//      try {
617//        while (true) {
618//          String ft = valueStream.readUTF();
619//          String fileLengthStr = valueStream.readUTF();
620//          long fileLength = Long.parseLong(fileLengthStr);
621//          if (ft.equals(fileType)) {
622//            BoundedInputStream bis =
623//                new BoundedInputStream(valueStream, fileLength);
624//            return new InputStreamReader(bis);
625//          } else {
626//            long totalSkipped = 0;
627//            long currSkipped = 0;
628//            while (currSkipped != -1 && totalSkipped < fileLength) {
629//              currSkipped = valueStream.skip(fileLength - totalSkipped);
630//              totalSkipped += currSkipped;
631//            }
632//            // TODO Verify skip behaviour.
633//            if (currSkipped == -1) {
634//              return null;
635//            }
636//          }
637//        }
638//      } catch (EOFException e) {
639//        return null;
640//      }
641//    }
642
643    /**
644     * Writes all logs for a single container to the provided writer.
645     * @param valueStream
646     * @param writer
647     * @param logUploadedTime
648     * @throws IOException
649     */
650    public static void readAcontainerLogs(DataInputStream valueStream,
651        Writer writer, long logUploadedTime) throws IOException {
652      OutputStream os = null;
653      PrintStream ps = null;
654      try {
655        os = new WriterOutputStream(writer);
656        ps = new PrintStream(os);
657        while (true) {
658          try {
659            readContainerLogs(valueStream, ps, logUploadedTime);
660          } catch (EOFException e) {
661            // EndOfFile
662            return;
663          }
664        }
665      } finally {
666        IOUtils.cleanup(LOG, ps);
667        IOUtils.cleanup(LOG, os);
668      }
669    }
670
671    /**
672     * Writes all logs for a single container to the provided writer.
673     * @param valueStream
674     * @param writer
675     * @throws IOException
676     */
677    public static void readAcontainerLogs(DataInputStream valueStream,
678        Writer writer) throws IOException {
679      readAcontainerLogs(valueStream, writer, -1);
680    }
681
682    private static void readContainerLogs(DataInputStream valueStream,
683        PrintStream out, long logUploadedTime) throws IOException {
684      byte[] buf = new byte[65535];
685
686      String fileType = valueStream.readUTF();
687      String fileLengthStr = valueStream.readUTF();
688      long fileLength = Long.parseLong(fileLengthStr);
689      out.print("LogType:");
690      out.println(fileType);
691      if (logUploadedTime != -1) {
692        out.print("Log Upload Time:");
693        out.println(Times.format(logUploadedTime));
694      }
695      out.print("LogLength:");
696      out.println(fileLengthStr);
697      out.println("Log Contents:");
698
699      long curRead = 0;
700      long pendingRead = fileLength - curRead;
701      int toRead =
702                pendingRead > buf.length ? buf.length : (int) pendingRead;
703      int len = valueStream.read(buf, 0, toRead);
704      while (len != -1 && curRead < fileLength) {
705        out.write(buf, 0, len);
706        curRead += len;
707
708        pendingRead = fileLength - curRead;
709        toRead =
710                  pendingRead > buf.length ? buf.length : (int) pendingRead;
711        len = valueStream.read(buf, 0, toRead);
712      }
713      out.println("");
714    }
715
716    /**
717     * Keep calling this till you get a {@link EOFException} for getting logs of
718     * all types for a single container.
719     * 
720     * @param valueStream
721     * @param out
722     * @param logUploadedTime
723     * @throws IOException
724     */
725    public static void readAContainerLogsForALogType(
726        DataInputStream valueStream, PrintStream out, long logUploadedTime)
727          throws IOException {
728      readContainerLogs(valueStream, out, logUploadedTime);
729    }
730
731    /**
732     * Keep calling this till you get a {@link EOFException} for getting logs of
733     * all types for a single container.
734     * 
735     * @param valueStream
736     * @param out
737     * @throws IOException
738     */
739    public static void readAContainerLogsForALogType(
740        DataInputStream valueStream, PrintStream out)
741          throws IOException {
742      readAContainerLogsForALogType(valueStream, out, -1);
743    }
744
745    public void close() {
746      IOUtils.cleanup(LOG, scanner, reader, fsDataIStream);
747    }
748  }
749
750  @Private
751  public static class ContainerLogsReader {
752    private DataInputStream valueStream;
753    private String currentLogType = null;
754    private long currentLogLength = 0;
755    private BoundedInputStream currentLogData = null;
756    private InputStreamReader currentLogISR;
757
758    public ContainerLogsReader(DataInputStream stream) {
759      valueStream = stream;
760    }
761
762    public String nextLog() throws IOException {
763      if (currentLogData != null && currentLogLength > 0) {
764        // seek to the end of the current log, relying on BoundedInputStream
765        // to prevent seeking past the end of the current log
766        do {
767          if (currentLogData.skip(currentLogLength) < 0) {
768            break;
769          }
770        } while (currentLogData.read() != -1);
771      }
772
773      currentLogType = null;
774      currentLogLength = 0;
775      currentLogData = null;
776      currentLogISR = null;
777
778      try {
779        String logType = valueStream.readUTF();
780        String logLengthStr = valueStream.readUTF();
781        currentLogLength = Long.parseLong(logLengthStr);
782        currentLogData =
783            new BoundedInputStream(valueStream, currentLogLength);
784        currentLogData.setPropagateClose(false);
785        currentLogISR = new InputStreamReader(currentLogData);
786        currentLogType = logType;
787      } catch (EOFException e) {
788      }
789
790      return currentLogType;
791    }
792
793    public String getCurrentLogType() {
794      return currentLogType;
795    }
796
797    public long getCurrentLogLength() {
798      return currentLogLength;
799    }
800
801    public long skip(long n) throws IOException {
802      return currentLogData.skip(n);
803    }
804
805    public int read() throws IOException {
806      return currentLogData.read();
807    }
808
809    public int read(byte[] buf, int off, int len) throws IOException {
810      return currentLogData.read(buf, off, len);
811    }
812
813    public int read(char[] buf, int off, int len) throws IOException {
814      return currentLogISR.read(buf, off, len);
815    }
816  }
817}