001/**
002* Licensed to the Apache Software Foundation (ASF) under one
003* or more contributor license agreements.  See the NOTICE file
004* distributed with this work for additional information
005* regarding copyright ownership.  The ASF licenses this file
006* to you under the Apache License, Version 2.0 (the
007* "License"); you may not use this file except in compliance
008* with the License.  You may obtain a copy of the License at
009*
010*     http://www.apache.org/licenses/LICENSE-2.0
011*
012* Unless required by applicable law or agreed to in writing, software
013* distributed under the License is distributed on an "AS IS" BASIS,
014* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015* See the License for the specific language governing permissions and
016* limitations under the License.
017*/
018
019package org.apache.hadoop.yarn.logaggregation;
020
021import java.io.DataInput;
022import java.io.DataInputStream;
023import java.io.DataOutput;
024import java.io.DataOutputStream;
025import java.io.EOFException;
026import java.io.File;
027import java.io.FileInputStream;
028import java.io.IOException;
029import java.io.InputStreamReader;
030import java.io.OutputStream;
031import java.io.PrintStream;
032import java.io.Writer;
033import java.security.PrivilegedExceptionAction;
034import java.util.ArrayList;
035import java.util.Arrays;
036import java.util.Collections;
037import java.util.EnumSet;
038import java.util.HashMap;
039import java.util.HashSet;
040import java.util.Iterator;
041import java.util.List;
042import java.util.Map;
043import java.util.Map.Entry;
044import java.util.Set;
045import java.util.regex.Pattern;
046
047import org.apache.commons.io.input.BoundedInputStream;
048import org.apache.commons.io.output.WriterOutputStream;
049import org.apache.commons.logging.Log;
050import org.apache.commons.logging.LogFactory;
051import org.apache.hadoop.classification.InterfaceAudience.Private;
052import org.apache.hadoop.classification.InterfaceAudience.Public;
053import org.apache.hadoop.classification.InterfaceStability.Evolving;
054import org.apache.hadoop.conf.Configuration;
055import org.apache.hadoop.fs.CreateFlag;
056import org.apache.hadoop.fs.FSDataInputStream;
057import org.apache.hadoop.fs.FSDataOutputStream;
058import org.apache.hadoop.fs.FileContext;
059import org.apache.hadoop.fs.Options;
060import org.apache.hadoop.fs.Path;
061import org.apache.hadoop.fs.permission.FsPermission;
062import org.apache.hadoop.io.IOUtils;
063import org.apache.hadoop.io.SecureIOUtils;
064import org.apache.hadoop.io.Writable;
065import org.apache.hadoop.io.file.tfile.TFile;
066import org.apache.hadoop.security.UserGroupInformation;
067import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
068import org.apache.hadoop.yarn.api.records.ContainerId;
069import org.apache.hadoop.yarn.api.records.LogAggregationContext;
070import org.apache.hadoop.yarn.conf.YarnConfiguration;
071import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
072import org.apache.hadoop.yarn.util.ConverterUtils;
073import org.apache.hadoop.yarn.util.Times;
074
075import com.google.common.annotations.VisibleForTesting;
076import com.google.common.base.Predicate;
077import com.google.common.collect.Iterables;
078import com.google.common.collect.Sets;
079
080@Public
081@Evolving
082public class AggregatedLogFormat {
083
084  private static final Log LOG = LogFactory.getLog(AggregatedLogFormat.class);
085  private static final LogKey APPLICATION_ACL_KEY = new LogKey("APPLICATION_ACL");
086  private static final LogKey APPLICATION_OWNER_KEY = new LogKey("APPLICATION_OWNER");
087  private static final LogKey VERSION_KEY = new LogKey("VERSION");
088  private static final Map<String, LogKey> RESERVED_KEYS;
089  //Maybe write out the retention policy.
090  //Maybe write out a list of containerLogs skipped by the retention policy.
091  private static final int VERSION = 1;
092
093  /**
094   * Umask for the log file.
095   */
096  private static final FsPermission APP_LOG_FILE_UMASK = FsPermission
097      .createImmutable((short) (0640 ^ 0777));
098
099
100  static {
101    RESERVED_KEYS = new HashMap<String, AggregatedLogFormat.LogKey>();
102    RESERVED_KEYS.put(APPLICATION_ACL_KEY.toString(), APPLICATION_ACL_KEY);
103    RESERVED_KEYS.put(APPLICATION_OWNER_KEY.toString(), APPLICATION_OWNER_KEY);
104    RESERVED_KEYS.put(VERSION_KEY.toString(), VERSION_KEY);
105  }
106
107  @Public
108  public static class LogKey implements Writable {
109
110    private String keyString;
111
112    public LogKey() {
113
114    }
115
116    public LogKey(ContainerId containerId) {
117      this.keyString = containerId.toString();
118    }
119
120    public LogKey(String keyString) {
121      this.keyString = keyString;
122    }
123    
124    @Override
125    public int hashCode() {
126      return keyString == null ? 0 : keyString.hashCode();
127    }
128
129    @Override
130    public boolean equals(Object obj) {
131      if (obj instanceof LogKey) {
132        LogKey other = (LogKey) obj;
133        if (this.keyString == null) {
134          return other.keyString == null;
135        }
136        return this.keyString.equals(other.keyString);
137      }
138      return false;
139    }
140
141    @Private
142    @Override
143    public void write(DataOutput out) throws IOException {
144      out.writeUTF(this.keyString);
145    }
146
147    @Private
148    @Override
149    public void readFields(DataInput in) throws IOException {
150      this.keyString = in.readUTF();
151    }
152
153    @Override
154    public String toString() {
155      return this.keyString;
156    }
157  }
158
159  @Private
160  public static class LogValue {
161
162    private final List<String> rootLogDirs;
163    private final ContainerId containerId;
164    private final String user;
165    private final LogAggregationContext logAggregationContext;
166    private Set<File> uploadedFiles = new HashSet<File>();
167    private final Set<String> alreadyUploadedLogFiles;
168    private Set<String> allExistingFileMeta = new HashSet<String>();
169    // TODO Maybe add a version string here. Instead of changing the version of
170    // the entire k-v format
171
172    public LogValue(List<String> rootLogDirs, ContainerId containerId,
173        String user) {
174      this(rootLogDirs, containerId, user, null, new HashSet<String>());
175    }
176
177    public LogValue(List<String> rootLogDirs, ContainerId containerId,
178        String user, LogAggregationContext logAggregationContext,
179        Set<String> alreadyUploadedLogFiles) {
180      this.rootLogDirs = new ArrayList<String>(rootLogDirs);
181      this.containerId = containerId;
182      this.user = user;
183
184      // Ensure logs are processed in lexical order
185      Collections.sort(this.rootLogDirs);
186      this.logAggregationContext = logAggregationContext;
187      this.alreadyUploadedLogFiles = alreadyUploadedLogFiles;
188    }
189
190    private Set<File> getPendingLogFilesToUploadForThisContainer() {
191      Set<File> pendingUploadFiles = new HashSet<File>();
192      for (String rootLogDir : this.rootLogDirs) {
193        File appLogDir =
194            new File(rootLogDir, 
195                ConverterUtils.toString(
196                    this.containerId.getApplicationAttemptId().
197                        getApplicationId())
198                );
199        File containerLogDir =
200            new File(appLogDir, ConverterUtils.toString(this.containerId));
201
202        if (!containerLogDir.isDirectory()) {
203          continue; // ContainerDir may have been deleted by the user.
204        }
205
206        pendingUploadFiles
207          .addAll(getPendingLogFilesToUpload(containerLogDir));
208      }
209      return pendingUploadFiles;
210    }
211
212    public void write(DataOutputStream out, Set<File> pendingUploadFiles)
213        throws IOException {
214      List<File> fileList = new ArrayList<File>(pendingUploadFiles);
215      Collections.sort(fileList);
216
217      for (File logFile : fileList) {
218        // We only aggregate top level files.
219        // Ignore anything inside sub-folders.
220        if (logFile.isDirectory()) {
221          LOG.warn(logFile.getAbsolutePath() + " is a directory. Ignore it.");
222          continue;
223        }
224
225        FileInputStream in = null;
226        try {
227          in = secureOpenFile(logFile);
228        } catch (IOException e) {
229          logErrorMessage(logFile, e);
230          IOUtils.cleanup(LOG, in);
231          continue;
232        }
233
234        final long fileLength = logFile.length();
235        // Write the logFile Type
236        out.writeUTF(logFile.getName());
237
238        // Write the log length as UTF so that it is printable
239        out.writeUTF(String.valueOf(fileLength));
240
241        // Write the log itself
242        try {
243          byte[] buf = new byte[65535];
244          int len = 0;
245          long bytesLeft = fileLength;
246          while ((len = in.read(buf)) != -1) {
247            //If buffer contents within fileLength, write
248            if (len < bytesLeft) {
249              out.write(buf, 0, len);
250              bytesLeft-=len;
251            }
252            //else only write contents within fileLength, then exit early
253            else {
254              out.write(buf, 0, (int)bytesLeft);
255              break;
256            }
257          }
258          long newLength = logFile.length();
259          if(fileLength < newLength) {
260            LOG.warn("Aggregated logs truncated by approximately "+
261                (newLength-fileLength) +" bytes.");
262          }
263          this.uploadedFiles.add(logFile);
264        } catch (IOException e) {
265          String message = logErrorMessage(logFile, e);
266          out.write(message.getBytes());
267        } finally {
268          IOUtils.cleanup(LOG, in);
269        }
270      }
271    }
272
273    @VisibleForTesting
274    public FileInputStream secureOpenFile(File logFile) throws IOException {
275      return SecureIOUtils.openForRead(logFile, getUser(), null);
276    }
277
278    private static String logErrorMessage(File logFile, Exception e) {
279      String message = "Error aggregating log file. Log file : "
280          + logFile.getAbsolutePath() + ". " + e.getMessage();
281      LOG.error(message, e);
282      return message;
283    }
284
285    // Added for testing purpose.
286    public String getUser() {
287      return user;
288    }
289
290    private Set<File> getPendingLogFilesToUpload(File containerLogDir) {
291      Set<File> candidates =
292          new HashSet<File>(Arrays.asList(containerLogDir.listFiles()));
293      for (File logFile : candidates) {
294        this.allExistingFileMeta.add(getLogFileMetaData(logFile));
295      }
296
297      if (this.logAggregationContext != null && candidates.size() > 0) {
298        if (this.logAggregationContext.getIncludePattern() != null
299            && !this.logAggregationContext.getIncludePattern().isEmpty()) {
300          filterFiles(this.logAggregationContext.getIncludePattern(),
301              candidates, false);
302        }
303
304        if (this.logAggregationContext.getExcludePattern() != null
305            && !this.logAggregationContext.getExcludePattern().isEmpty()) {
306          filterFiles(this.logAggregationContext.getExcludePattern(),
307              candidates, true);
308        }
309
310        Iterable<File> mask =
311            Iterables.filter(candidates, new Predicate<File>() {
312              @Override
313              public boolean apply(File next) {
314                return !alreadyUploadedLogFiles
315                  .contains(getLogFileMetaData(next));
316              }
317            });
318        candidates = Sets.newHashSet(mask);
319      }
320      return candidates;
321    }
322
323    private void filterFiles(String pattern, Set<File> candidates,
324        boolean exclusion) {
325      Pattern filterPattern =
326          Pattern.compile(pattern);
327      for (Iterator<File> candidatesItr = candidates.iterator(); candidatesItr
328          .hasNext();) {
329        File candidate = candidatesItr.next();
330        boolean match = filterPattern.matcher(candidate.getName()).find();
331        if ((!match && !exclusion) || (match && exclusion)) {
332          candidatesItr.remove();
333        }
334      }
335    }
336
337    public Set<Path> getCurrentUpLoadedFilesPath() {
338      Set<Path> path = new HashSet<Path>();
339      for (File file : this.uploadedFiles) {
340        path.add(new Path(file.getAbsolutePath()));
341      }
342      return path;
343    }
344
345    public Set<String> getCurrentUpLoadedFileMeta() {
346      Set<String> info = new HashSet<String>();
347      for (File file : this.uploadedFiles) {
348        info.add(getLogFileMetaData(file));
349      }
350      return info;
351    }
352
353    public Set<String> getAllExistingFilesMeta() {
354      return this.allExistingFileMeta;
355    }
356
357    private String getLogFileMetaData(File file) {
358      return containerId.toString() + "_" + file.getName() + "_"
359          + file.lastModified();
360    }
361  }
362
363  /**
364   * The writer that writes out the aggregated logs.
365   */
366  @Private
367  public static class LogWriter {
368
369    private final FSDataOutputStream fsDataOStream;
370    private final TFile.Writer writer;
371    private FileContext fc;
372
373    public LogWriter(final Configuration conf, final Path remoteAppLogFile,
374        UserGroupInformation userUgi) throws IOException {
375      try {
376        this.fsDataOStream =
377            userUgi.doAs(new PrivilegedExceptionAction<FSDataOutputStream>() {
378              @Override
379              public FSDataOutputStream run() throws Exception {
380                fc = FileContext.getFileContext(conf);
381                fc.setUMask(APP_LOG_FILE_UMASK);
382                return fc.create(
383                    remoteAppLogFile,
384                    EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
385                    new Options.CreateOpts[] {});
386              }
387            });
388      } catch (InterruptedException e) {
389        throw new IOException(e);
390      }
391
392      // Keys are not sorted: null arg
393      // 256KB minBlockSize : Expected log size for each container too
394      this.writer =
395          new TFile.Writer(this.fsDataOStream, 256 * 1024, conf.get(
396              YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE,
397              YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE), null, conf);
398      //Write the version string
399      writeVersion();
400    }
401
402    @VisibleForTesting
403    public TFile.Writer getWriter() {
404      return this.writer;
405    }
406
407    private void writeVersion() throws IOException {
408      DataOutputStream out = this.writer.prepareAppendKey(-1);
409      VERSION_KEY.write(out);
410      out.close();
411      out = this.writer.prepareAppendValue(-1);
412      out.writeInt(VERSION);
413      out.close();
414    }
415
416    public void writeApplicationOwner(String user) throws IOException {
417      DataOutputStream out = this.writer.prepareAppendKey(-1);
418      APPLICATION_OWNER_KEY.write(out);
419      out.close();
420      out = this.writer.prepareAppendValue(-1);
421      out.writeUTF(user);
422      out.close();
423    }
424
425    public void writeApplicationACLs(Map<ApplicationAccessType, String> appAcls)
426        throws IOException {
427      DataOutputStream out = this.writer.prepareAppendKey(-1);
428      APPLICATION_ACL_KEY.write(out);
429      out.close();
430      out = this.writer.prepareAppendValue(-1);
431      for (Entry<ApplicationAccessType, String> entry : appAcls.entrySet()) {
432        out.writeUTF(entry.getKey().toString());
433        out.writeUTF(entry.getValue());
434      }
435      out.close();
436    }
437
438    public void append(LogKey logKey, LogValue logValue) throws IOException {
439      Set<File> pendingUploadFiles =
440          logValue.getPendingLogFilesToUploadForThisContainer();
441      if (pendingUploadFiles.size() == 0) {
442        return;
443      }
444      DataOutputStream out = this.writer.prepareAppendKey(-1);
445      logKey.write(out);
446      out.close();
447      out = this.writer.prepareAppendValue(-1);
448      logValue.write(out, pendingUploadFiles);
449      out.close();
450    }
451
452    public void close() {
453      try {
454        this.writer.close();
455      } catch (IOException e) {
456        LOG.warn("Exception closing writer", e);
457      }
458      IOUtils.closeStream(fsDataOStream);
459    }
460  }
461
462  @Public
463  @Evolving
464  public static class LogReader {
465
466    private final FSDataInputStream fsDataIStream;
467    private final TFile.Reader.Scanner scanner;
468    private final TFile.Reader reader;
469
470    public LogReader(Configuration conf, Path remoteAppLogFile)
471        throws IOException {
472      FileContext fileContext = FileContext.getFileContext(conf);
473      this.fsDataIStream = fileContext.open(remoteAppLogFile);
474      reader =
475          new TFile.Reader(this.fsDataIStream, fileContext.getFileStatus(
476              remoteAppLogFile).getLen(), conf);
477      this.scanner = reader.createScanner();
478    }
479
480    private boolean atBeginning = true;
481
482    /**
483     * Returns the owner of the application.
484     * 
485     * @return the application owner.
486     * @throws IOException
487     */
488    public String getApplicationOwner() throws IOException {
489      TFile.Reader.Scanner ownerScanner = reader.createScanner();
490      LogKey key = new LogKey();
491      while (!ownerScanner.atEnd()) {
492        TFile.Reader.Scanner.Entry entry = ownerScanner.entry();
493        key.readFields(entry.getKeyStream());
494        if (key.toString().equals(APPLICATION_OWNER_KEY.toString())) {
495          DataInputStream valueStream = entry.getValueStream();
496          return valueStream.readUTF();
497        }
498        ownerScanner.advance();
499      }
500      return null;
501    }
502
503    /**
504     * Returns ACLs for the application. An empty map is returned if no ACLs are
505     * found.
506     * 
507     * @return a map of the Application ACLs.
508     * @throws IOException
509     */
510    public Map<ApplicationAccessType, String> getApplicationAcls()
511        throws IOException {
512      // TODO Seek directly to the key once a comparator is specified.
513      TFile.Reader.Scanner aclScanner = reader.createScanner();
514      LogKey key = new LogKey();
515      Map<ApplicationAccessType, String> acls =
516          new HashMap<ApplicationAccessType, String>();
517      while (!aclScanner.atEnd()) {
518        TFile.Reader.Scanner.Entry entry = aclScanner.entry();
519        key.readFields(entry.getKeyStream());
520        if (key.toString().equals(APPLICATION_ACL_KEY.toString())) {
521          DataInputStream valueStream = entry.getValueStream();
522          while (true) {
523            String appAccessOp = null;
524            String aclString = null;
525            try {
526              appAccessOp = valueStream.readUTF();
527            } catch (EOFException e) {
528              // Valid end of stream.
529              break;
530            }
531            try {
532              aclString = valueStream.readUTF();
533            } catch (EOFException e) {
534              throw new YarnRuntimeException("Error reading ACLs", e);
535            }
536            acls.put(ApplicationAccessType.valueOf(appAccessOp), aclString);
537          }
538
539        }
540        aclScanner.advance();
541      }
542      return acls;
543    }
544    
545    /**
546     * Read the next key and return the value-stream.
547     * 
548     * @param key
549     * @return the valueStream if there are more keys or null otherwise.
550     * @throws IOException
551     */
552    public DataInputStream next(LogKey key) throws IOException {
553      if (!this.atBeginning) {
554        this.scanner.advance();
555      } else {
556        this.atBeginning = false;
557      }
558      if (this.scanner.atEnd()) {
559        return null;
560      }
561      TFile.Reader.Scanner.Entry entry = this.scanner.entry();
562      key.readFields(entry.getKeyStream());
563      // Skip META keys
564      if (RESERVED_KEYS.containsKey(key.toString())) {
565        return next(key);
566      }
567      DataInputStream valueStream = entry.getValueStream();
568      return valueStream;
569    }
570
571    /**
572     * Get a ContainerLogsReader to read the logs for
573     * the specified container.
574     *
575     * @param containerId
576     * @return object to read the container's logs or null if the
577     *         logs could not be found
578     * @throws IOException
579     */
580    @Private
581    public ContainerLogsReader getContainerLogsReader(
582        ContainerId containerId) throws IOException {
583      ContainerLogsReader logReader = null;
584
585      final LogKey containerKey = new LogKey(containerId);
586      LogKey key = new LogKey();
587      DataInputStream valueStream = next(key);
588      while (valueStream != null && !key.equals(containerKey)) {
589        valueStream = next(key);
590      }
591
592      if (valueStream != null) {
593        logReader = new ContainerLogsReader(valueStream);
594      }
595
596      return logReader;
597    }
598
599    //TODO  Change Log format and interfaces to be containerId specific.
600    // Avoid returning completeValueStreams.
601//    public List<String> getTypesForContainer(DataInputStream valueStream){}
602//    
603//    /**
604//     * @param valueStream
605//     *          The Log stream for the container.
606//     * @param fileType
607//     *          the log type required.
608//     * @return An InputStreamReader for the required log type or null if the
609//     *         type is not found.
610//     * @throws IOException
611//     */
612//    public InputStreamReader getLogStreamForType(DataInputStream valueStream,
613//        String fileType) throws IOException {
614//      valueStream.reset();
615//      try {
616//        while (true) {
617//          String ft = valueStream.readUTF();
618//          String fileLengthStr = valueStream.readUTF();
619//          long fileLength = Long.parseLong(fileLengthStr);
620//          if (ft.equals(fileType)) {
621//            BoundedInputStream bis =
622//                new BoundedInputStream(valueStream, fileLength);
623//            return new InputStreamReader(bis);
624//          } else {
625//            long totalSkipped = 0;
626//            long currSkipped = 0;
627//            while (currSkipped != -1 && totalSkipped < fileLength) {
628//              currSkipped = valueStream.skip(fileLength - totalSkipped);
629//              totalSkipped += currSkipped;
630//            }
631//            // TODO Verify skip behaviour.
632//            if (currSkipped == -1) {
633//              return null;
634//            }
635//          }
636//        }
637//      } catch (EOFException e) {
638//        return null;
639//      }
640//    }
641
642    /**
643     * Writes all logs for a single container to the provided writer.
644     * @param valueStream
645     * @param writer
646     * @param logUploadedTime
647     * @throws IOException
648     */
649    public static void readAcontainerLogs(DataInputStream valueStream,
650        Writer writer, long logUploadedTime) throws IOException {
651      OutputStream os = null;
652      PrintStream ps = null;
653      try {
654        os = new WriterOutputStream(writer);
655        ps = new PrintStream(os);
656        while (true) {
657          try {
658            readContainerLogs(valueStream, ps, logUploadedTime);
659          } catch (EOFException e) {
660            // EndOfFile
661            return;
662          }
663        }
664      } finally {
665        IOUtils.cleanup(LOG, ps);
666        IOUtils.cleanup(LOG, os);
667      }
668    }
669
670    /**
671     * Writes all logs for a single container to the provided writer.
672     * @param valueStream
673     * @param writer
674     * @throws IOException
675     */
676    public static void readAcontainerLogs(DataInputStream valueStream,
677        Writer writer) throws IOException {
678      readAcontainerLogs(valueStream, writer, -1);
679    }
680
681    private static void readContainerLogs(DataInputStream valueStream,
682        PrintStream out, long logUploadedTime) throws IOException {
683      byte[] buf = new byte[65535];
684
685      String fileType = valueStream.readUTF();
686      String fileLengthStr = valueStream.readUTF();
687      long fileLength = Long.parseLong(fileLengthStr);
688      out.print("LogType:");
689      out.println(fileType);
690      if (logUploadedTime != -1) {
691        out.print("Log Upload Time:");
692        out.println(Times.format(logUploadedTime));
693      }
694      out.print("LogLength:");
695      out.println(fileLengthStr);
696      out.println("Log Contents:");
697
698      long curRead = 0;
699      long pendingRead = fileLength - curRead;
700      int toRead =
701                pendingRead > buf.length ? buf.length : (int) pendingRead;
702      int len = valueStream.read(buf, 0, toRead);
703      while (len != -1 && curRead < fileLength) {
704        out.write(buf, 0, len);
705        curRead += len;
706
707        pendingRead = fileLength - curRead;
708        toRead =
709                  pendingRead > buf.length ? buf.length : (int) pendingRead;
710        len = valueStream.read(buf, 0, toRead);
711      }
712      out.println("");
713    }
714
715    /**
716     * Keep calling this till you get a {@link EOFException} for getting logs of
717     * all types for a single container.
718     * 
719     * @param valueStream
720     * @param out
721     * @param logUploadedTime
722     * @throws IOException
723     */
724    public static void readAContainerLogsForALogType(
725        DataInputStream valueStream, PrintStream out, long logUploadedTime)
726          throws IOException {
727      readContainerLogs(valueStream, out, logUploadedTime);
728    }
729
730    /**
731     * Keep calling this till you get a {@link EOFException} for getting logs of
732     * all types for a single container.
733     * 
734     * @param valueStream
735     * @param out
736     * @throws IOException
737     */
738    public static void readAContainerLogsForALogType(
739        DataInputStream valueStream, PrintStream out)
740          throws IOException {
741      readAContainerLogsForALogType(valueStream, out, -1);
742    }
743
744    public void close() {
745      IOUtils.cleanup(LOG, scanner, reader, fsDataIStream);
746    }
747  }
748
749  @Private
750  public static class ContainerLogsReader {
751    private DataInputStream valueStream;
752    private String currentLogType = null;
753    private long currentLogLength = 0;
754    private BoundedInputStream currentLogData = null;
755    private InputStreamReader currentLogISR;
756
757    public ContainerLogsReader(DataInputStream stream) {
758      valueStream = stream;
759    }
760
761    public String nextLog() throws IOException {
762      if (currentLogData != null && currentLogLength > 0) {
763        // seek to the end of the current log, relying on BoundedInputStream
764        // to prevent seeking past the end of the current log
765        do {
766          if (currentLogData.skip(currentLogLength) < 0) {
767            break;
768          }
769        } while (currentLogData.read() != -1);
770      }
771
772      currentLogType = null;
773      currentLogLength = 0;
774      currentLogData = null;
775      currentLogISR = null;
776
777      try {
778        String logType = valueStream.readUTF();
779        String logLengthStr = valueStream.readUTF();
780        currentLogLength = Long.parseLong(logLengthStr);
781        currentLogData =
782            new BoundedInputStream(valueStream, currentLogLength);
783        currentLogData.setPropagateClose(false);
784        currentLogISR = new InputStreamReader(currentLogData);
785        currentLogType = logType;
786      } catch (EOFException e) {
787      }
788
789      return currentLogType;
790    }
791
792    public String getCurrentLogType() {
793      return currentLogType;
794    }
795
796    public long getCurrentLogLength() {
797      return currentLogLength;
798    }
799
800    public long skip(long n) throws IOException {
801      return currentLogData.skip(n);
802    }
803
804    public int read() throws IOException {
805      return currentLogData.read();
806    }
807
808    public int read(byte[] buf, int off, int len) throws IOException {
809      return currentLogData.read(buf, off, len);
810    }
811
812    public int read(char[] buf, int off, int len) throws IOException {
813      return currentLogISR.read(buf, off, len);
814    }
815  }
816}