001    /**
002    * Licensed to the Apache Software Foundation (ASF) under one
003    * or more contributor license agreements.  See the NOTICE file
004    * distributed with this work for additional information
005    * regarding copyright ownership.  The ASF licenses this file
006    * to you under the Apache License, Version 2.0 (the
007    * "License"); you may not use this file except in compliance
008    * with the License.  You may obtain a copy of the License at
009    *
010    *     http://www.apache.org/licenses/LICENSE-2.0
011    *
012    * Unless required by applicable law or agreed to in writing, software
013    * distributed under the License is distributed on an "AS IS" BASIS,
014    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015    * See the License for the specific language governing permissions and
016    * limitations under the License.
017    */
018    
019    package org.apache.hadoop.yarn.logaggregation;
020    
021    import java.io.DataInput;
022    import java.io.DataInputStream;
023    import java.io.DataOutput;
024    import java.io.DataOutputStream;
025    import java.io.EOFException;
026    import java.io.File;
027    import java.io.FileInputStream;
028    import java.io.IOException;
029    import java.io.InputStreamReader;
030    import java.io.OutputStream;
031    import java.io.PrintStream;
032    import java.io.Writer;
033    import java.security.PrivilegedExceptionAction;
034    import java.util.ArrayList;
035    import java.util.Arrays;
036    import java.util.Collections;
037    import java.util.EnumSet;
038    import java.util.HashMap;
039    import java.util.HashSet;
040    import java.util.Iterator;
041    import java.util.List;
042    import java.util.Map;
043    import java.util.Map.Entry;
044    import java.util.Set;
045    import java.util.regex.Pattern;
046    
047    import org.apache.commons.io.input.BoundedInputStream;
048    import org.apache.commons.io.output.WriterOutputStream;
049    import org.apache.commons.logging.Log;
050    import org.apache.commons.logging.LogFactory;
051    import org.apache.hadoop.classification.InterfaceAudience.Private;
052    import org.apache.hadoop.classification.InterfaceAudience.Public;
053    import org.apache.hadoop.classification.InterfaceStability.Evolving;
054    import org.apache.hadoop.conf.Configuration;
055    import org.apache.hadoop.fs.CreateFlag;
056    import org.apache.hadoop.fs.FSDataInputStream;
057    import org.apache.hadoop.fs.FSDataOutputStream;
058    import org.apache.hadoop.fs.FileContext;
059    import org.apache.hadoop.fs.Options;
060    import org.apache.hadoop.fs.Path;
061    import org.apache.hadoop.fs.permission.FsPermission;
062    import org.apache.hadoop.io.IOUtils;
063    import org.apache.hadoop.io.SecureIOUtils;
064    import org.apache.hadoop.io.Writable;
065    import org.apache.hadoop.io.file.tfile.TFile;
066    import org.apache.hadoop.security.UserGroupInformation;
067    import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
068    import org.apache.hadoop.yarn.api.records.ContainerId;
069    import org.apache.hadoop.yarn.api.records.LogAggregationContext;
070    import org.apache.hadoop.yarn.conf.YarnConfiguration;
071    import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
072    import org.apache.hadoop.yarn.util.ConverterUtils;
073    import org.apache.hadoop.yarn.util.Times;
074    
075    import com.google.common.annotations.VisibleForTesting;
076    import com.google.common.base.Predicate;
077    import com.google.common.collect.Iterables;
078    import com.google.common.collect.Sets;
079    
080    @Public
081    @Evolving
082    public class AggregatedLogFormat {
083    
084      private static final Log LOG = LogFactory.getLog(AggregatedLogFormat.class);
085      private static final LogKey APPLICATION_ACL_KEY = new LogKey("APPLICATION_ACL");
086      private static final LogKey APPLICATION_OWNER_KEY = new LogKey("APPLICATION_OWNER");
087      private static final LogKey VERSION_KEY = new LogKey("VERSION");
088      private static final Map<String, LogKey> RESERVED_KEYS;
089      //Maybe write out the retention policy.
090      //Maybe write out a list of containerLogs skipped by the retention policy.
091      private static final int VERSION = 1;
092    
093      /**
094       * Umask for the log file.
095       */
096      private static final FsPermission APP_LOG_FILE_UMASK = FsPermission
097          .createImmutable((short) (0640 ^ 0777));
098    
099    
100      static {
101        RESERVED_KEYS = new HashMap<String, AggregatedLogFormat.LogKey>();
102        RESERVED_KEYS.put(APPLICATION_ACL_KEY.toString(), APPLICATION_ACL_KEY);
103        RESERVED_KEYS.put(APPLICATION_OWNER_KEY.toString(), APPLICATION_OWNER_KEY);
104        RESERVED_KEYS.put(VERSION_KEY.toString(), VERSION_KEY);
105      }
106    
107      @Public
108      public static class LogKey implements Writable {
109    
110        private String keyString;
111    
112        public LogKey() {
113    
114        }
115    
116        public LogKey(ContainerId containerId) {
117          this.keyString = containerId.toString();
118        }
119    
120        public LogKey(String keyString) {
121          this.keyString = keyString;
122        }
123        
124        @Override
125        public int hashCode() {
126          return keyString == null ? 0 : keyString.hashCode();
127        }
128    
129        @Override
130        public boolean equals(Object obj) {
131          if (obj instanceof LogKey) {
132            LogKey other = (LogKey) obj;
133            if (this.keyString == null) {
134              return other.keyString == null;
135            }
136            return this.keyString.equals(other.keyString);
137          }
138          return false;
139        }
140    
141        @Private
142        @Override
143        public void write(DataOutput out) throws IOException {
144          out.writeUTF(this.keyString);
145        }
146    
147        @Private
148        @Override
149        public void readFields(DataInput in) throws IOException {
150          this.keyString = in.readUTF();
151        }
152    
153        @Override
154        public String toString() {
155          return this.keyString;
156        }
157      }
158    
159      @Private
160      public static class LogValue {
161    
162        private final List<String> rootLogDirs;
163        private final ContainerId containerId;
164        private final String user;
165        private final LogAggregationContext logAggregationContext;
166        private Set<File> uploadedFiles = new HashSet<File>();
167        private final Set<String> alreadyUploadedLogFiles;
168        private Set<String> allExistingFileMeta = new HashSet<String>();
169        // TODO Maybe add a version string here. Instead of changing the version of
170        // the entire k-v format
171    
172        public LogValue(List<String> rootLogDirs, ContainerId containerId,
173            String user) {
174          this(rootLogDirs, containerId, user, null, new HashSet<String>());
175        }
176    
177        public LogValue(List<String> rootLogDirs, ContainerId containerId,
178            String user, LogAggregationContext logAggregationContext,
179            Set<String> alreadyUploadedLogFiles) {
180          this.rootLogDirs = new ArrayList<String>(rootLogDirs);
181          this.containerId = containerId;
182          this.user = user;
183    
184          // Ensure logs are processed in lexical order
185          Collections.sort(this.rootLogDirs);
186          this.logAggregationContext = logAggregationContext;
187          this.alreadyUploadedLogFiles = alreadyUploadedLogFiles;
188        }
189    
190        private Set<File> getPendingLogFilesToUploadForThisContainer() {
191          Set<File> pendingUploadFiles = new HashSet<File>();
192          for (String rootLogDir : this.rootLogDirs) {
193            File appLogDir =
194                new File(rootLogDir, 
195                    ConverterUtils.toString(
196                        this.containerId.getApplicationAttemptId().
197                            getApplicationId())
198                    );
199            File containerLogDir =
200                new File(appLogDir, ConverterUtils.toString(this.containerId));
201    
202            if (!containerLogDir.isDirectory()) {
203              continue; // ContainerDir may have been deleted by the user.
204            }
205    
206            pendingUploadFiles
207              .addAll(getPendingLogFilesToUpload(containerLogDir));
208          }
209          return pendingUploadFiles;
210        }
211    
212        public void write(DataOutputStream out, Set<File> pendingUploadFiles)
213            throws IOException {
214          List<File> fileList = new ArrayList<File>(pendingUploadFiles);
215          Collections.sort(fileList);
216    
217          for (File logFile : fileList) {
218            // We only aggregate top level files.
219            // Ignore anything inside sub-folders.
220            if (logFile.isDirectory()) {
221              LOG.warn(logFile.getAbsolutePath() + " is a directory. Ignore it.");
222              continue;
223            }
224    
225            FileInputStream in = null;
226            try {
227              in = secureOpenFile(logFile);
228            } catch (IOException e) {
229              logErrorMessage(logFile, e);
230              IOUtils.cleanup(LOG, in);
231              continue;
232            }
233    
234            final long fileLength = logFile.length();
235            // Write the logFile Type
236            out.writeUTF(logFile.getName());
237    
238            // Write the log length as UTF so that it is printable
239            out.writeUTF(String.valueOf(fileLength));
240    
241            // Write the log itself
242            try {
243              byte[] buf = new byte[65535];
244              int len = 0;
245              long bytesLeft = fileLength;
246              while ((len = in.read(buf)) != -1) {
247                //If buffer contents within fileLength, write
248                if (len < bytesLeft) {
249                  out.write(buf, 0, len);
250                  bytesLeft-=len;
251                }
252                //else only write contents within fileLength, then exit early
253                else {
254                  out.write(buf, 0, (int)bytesLeft);
255                  break;
256                }
257              }
258              long newLength = logFile.length();
259              if(fileLength < newLength) {
260                LOG.warn("Aggregated logs truncated by approximately "+
261                    (newLength-fileLength) +" bytes.");
262              }
263              this.uploadedFiles.add(logFile);
264            } catch (IOException e) {
265              String message = logErrorMessage(logFile, e);
266              out.write(message.getBytes());
267            } finally {
268              IOUtils.cleanup(LOG, in);
269            }
270          }
271        }
272    
273        @VisibleForTesting
274        public FileInputStream secureOpenFile(File logFile) throws IOException {
275          return SecureIOUtils.openForRead(logFile, getUser(), null);
276        }
277    
278        private static String logErrorMessage(File logFile, Exception e) {
279          String message = "Error aggregating log file. Log file : "
280              + logFile.getAbsolutePath() + ". " + e.getMessage();
281          LOG.error(message, e);
282          return message;
283        }
284    
285        // Added for testing purpose.
286        public String getUser() {
287          return user;
288        }
289    
290        private Set<File> getPendingLogFilesToUpload(File containerLogDir) {
291          Set<File> candidates =
292              new HashSet<File>(Arrays.asList(containerLogDir.listFiles()));
293          for (File logFile : candidates) {
294            this.allExistingFileMeta.add(getLogFileMetaData(logFile));
295          }
296    
297          if (this.logAggregationContext != null && candidates.size() > 0) {
298            if (this.logAggregationContext.getIncludePattern() != null
299                && !this.logAggregationContext.getIncludePattern().isEmpty()) {
300              filterFiles(this.logAggregationContext.getIncludePattern(),
301                  candidates, false);
302            }
303    
304            if (this.logAggregationContext.getExcludePattern() != null
305                && !this.logAggregationContext.getExcludePattern().isEmpty()) {
306              filterFiles(this.logAggregationContext.getExcludePattern(),
307                  candidates, true);
308            }
309    
310            Iterable<File> mask =
311                Iterables.filter(candidates, new Predicate<File>() {
312                  @Override
313                  public boolean apply(File next) {
314                    return !alreadyUploadedLogFiles
315                      .contains(getLogFileMetaData(next));
316                  }
317                });
318            candidates = Sets.newHashSet(mask);
319          }
320          return candidates;
321        }
322    
323        private void filterFiles(String pattern, Set<File> candidates,
324            boolean exclusion) {
325          Pattern filterPattern =
326              Pattern.compile(pattern);
327          for (Iterator<File> candidatesItr = candidates.iterator(); candidatesItr
328              .hasNext();) {
329            File candidate = candidatesItr.next();
330            boolean match = filterPattern.matcher(candidate.getName()).find();
331            if ((!match && !exclusion) || (match && exclusion)) {
332              candidatesItr.remove();
333            }
334          }
335        }
336    
337        public Set<Path> getCurrentUpLoadedFilesPath() {
338          Set<Path> path = new HashSet<Path>();
339          for (File file : this.uploadedFiles) {
340            path.add(new Path(file.getAbsolutePath()));
341          }
342          return path;
343        }
344    
345        public Set<String> getCurrentUpLoadedFileMeta() {
346          Set<String> info = new HashSet<String>();
347          for (File file : this.uploadedFiles) {
348            info.add(getLogFileMetaData(file));
349          }
350          return info;
351        }
352    
353        public Set<String> getAllExistingFilesMeta() {
354          return this.allExistingFileMeta;
355        }
356    
357        private String getLogFileMetaData(File file) {
358          return containerId.toString() + "_" + file.getName() + "_"
359              + file.lastModified();
360        }
361      }
362    
363      /**
364       * The writer that writes out the aggregated logs.
365       */
366      @Private
367      public static class LogWriter {
368    
369        private final FSDataOutputStream fsDataOStream;
370        private final TFile.Writer writer;
371        private FileContext fc;
372    
373        public LogWriter(final Configuration conf, final Path remoteAppLogFile,
374            UserGroupInformation userUgi) throws IOException {
375          try {
376            this.fsDataOStream =
377                userUgi.doAs(new PrivilegedExceptionAction<FSDataOutputStream>() {
378                  @Override
379                  public FSDataOutputStream run() throws Exception {
380                    fc = FileContext.getFileContext(conf);
381                    fc.setUMask(APP_LOG_FILE_UMASK);
382                    return fc.create(
383                        remoteAppLogFile,
384                        EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
385                        new Options.CreateOpts[] {});
386                  }
387                });
388          } catch (InterruptedException e) {
389            throw new IOException(e);
390          }
391    
392          // Keys are not sorted: null arg
393          // 256KB minBlockSize : Expected log size for each container too
394          this.writer =
395              new TFile.Writer(this.fsDataOStream, 256 * 1024, conf.get(
396                  YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE,
397                  YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE), null, conf);
398          //Write the version string
399          writeVersion();
400        }
401    
402        @VisibleForTesting
403        public TFile.Writer getWriter() {
404          return this.writer;
405        }
406    
407        private void writeVersion() throws IOException {
408          DataOutputStream out = this.writer.prepareAppendKey(-1);
409          VERSION_KEY.write(out);
410          out.close();
411          out = this.writer.prepareAppendValue(-1);
412          out.writeInt(VERSION);
413          out.close();
414        }
415    
416        public void writeApplicationOwner(String user) throws IOException {
417          DataOutputStream out = this.writer.prepareAppendKey(-1);
418          APPLICATION_OWNER_KEY.write(out);
419          out.close();
420          out = this.writer.prepareAppendValue(-1);
421          out.writeUTF(user);
422          out.close();
423        }
424    
425        public void writeApplicationACLs(Map<ApplicationAccessType, String> appAcls)
426            throws IOException {
427          DataOutputStream out = this.writer.prepareAppendKey(-1);
428          APPLICATION_ACL_KEY.write(out);
429          out.close();
430          out = this.writer.prepareAppendValue(-1);
431          for (Entry<ApplicationAccessType, String> entry : appAcls.entrySet()) {
432            out.writeUTF(entry.getKey().toString());
433            out.writeUTF(entry.getValue());
434          }
435          out.close();
436        }
437    
438        public void append(LogKey logKey, LogValue logValue) throws IOException {
439          Set<File> pendingUploadFiles =
440              logValue.getPendingLogFilesToUploadForThisContainer();
441          if (pendingUploadFiles.size() == 0) {
442            return;
443          }
444          DataOutputStream out = this.writer.prepareAppendKey(-1);
445          logKey.write(out);
446          out.close();
447          out = this.writer.prepareAppendValue(-1);
448          logValue.write(out, pendingUploadFiles);
449          out.close();
450        }
451    
452        public void close() {
453          try {
454            this.writer.close();
455          } catch (IOException e) {
456            LOG.warn("Exception closing writer", e);
457          }
458          IOUtils.closeStream(fsDataOStream);
459        }
460      }
461    
462      @Public
463      @Evolving
464      public static class LogReader {
465    
466        private final FSDataInputStream fsDataIStream;
467        private final TFile.Reader.Scanner scanner;
468        private final TFile.Reader reader;
469    
470        public LogReader(Configuration conf, Path remoteAppLogFile)
471            throws IOException {
472          FileContext fileContext = FileContext.getFileContext(conf);
473          this.fsDataIStream = fileContext.open(remoteAppLogFile);
474          reader =
475              new TFile.Reader(this.fsDataIStream, fileContext.getFileStatus(
476                  remoteAppLogFile).getLen(), conf);
477          this.scanner = reader.createScanner();
478        }
479    
480        private boolean atBeginning = true;
481    
482        /**
483         * Returns the owner of the application.
484         * 
485         * @return the application owner.
486         * @throws IOException
487         */
488        public String getApplicationOwner() throws IOException {
489          TFile.Reader.Scanner ownerScanner = reader.createScanner();
490          LogKey key = new LogKey();
491          while (!ownerScanner.atEnd()) {
492            TFile.Reader.Scanner.Entry entry = ownerScanner.entry();
493            key.readFields(entry.getKeyStream());
494            if (key.toString().equals(APPLICATION_OWNER_KEY.toString())) {
495              DataInputStream valueStream = entry.getValueStream();
496              return valueStream.readUTF();
497            }
498            ownerScanner.advance();
499          }
500          return null;
501        }
502    
503        /**
504         * Returns ACLs for the application. An empty map is returned if no ACLs are
505         * found.
506         * 
507         * @return a map of the Application ACLs.
508         * @throws IOException
509         */
510        public Map<ApplicationAccessType, String> getApplicationAcls()
511            throws IOException {
512          // TODO Seek directly to the key once a comparator is specified.
513          TFile.Reader.Scanner aclScanner = reader.createScanner();
514          LogKey key = new LogKey();
515          Map<ApplicationAccessType, String> acls =
516              new HashMap<ApplicationAccessType, String>();
517          while (!aclScanner.atEnd()) {
518            TFile.Reader.Scanner.Entry entry = aclScanner.entry();
519            key.readFields(entry.getKeyStream());
520            if (key.toString().equals(APPLICATION_ACL_KEY.toString())) {
521              DataInputStream valueStream = entry.getValueStream();
522              while (true) {
523                String appAccessOp = null;
524                String aclString = null;
525                try {
526                  appAccessOp = valueStream.readUTF();
527                } catch (EOFException e) {
528                  // Valid end of stream.
529                  break;
530                }
531                try {
532                  aclString = valueStream.readUTF();
533                } catch (EOFException e) {
534                  throw new YarnRuntimeException("Error reading ACLs", e);
535                }
536                acls.put(ApplicationAccessType.valueOf(appAccessOp), aclString);
537              }
538    
539            }
540            aclScanner.advance();
541          }
542          return acls;
543        }
544        
545        /**
546         * Read the next key and return the value-stream.
547         * 
548         * @param key
549         * @return the valueStream if there are more keys or null otherwise.
550         * @throws IOException
551         */
552        public DataInputStream next(LogKey key) throws IOException {
553          if (!this.atBeginning) {
554            this.scanner.advance();
555          } else {
556            this.atBeginning = false;
557          }
558          if (this.scanner.atEnd()) {
559            return null;
560          }
561          TFile.Reader.Scanner.Entry entry = this.scanner.entry();
562          key.readFields(entry.getKeyStream());
563          // Skip META keys
564          if (RESERVED_KEYS.containsKey(key.toString())) {
565            return next(key);
566          }
567          DataInputStream valueStream = entry.getValueStream();
568          return valueStream;
569        }
570    
571        /**
572         * Get a ContainerLogsReader to read the logs for
573         * the specified container.
574         *
575         * @param containerId
576         * @return object to read the container's logs or null if the
577         *         logs could not be found
578         * @throws IOException
579         */
580        @Private
581        public ContainerLogsReader getContainerLogsReader(
582            ContainerId containerId) throws IOException {
583          ContainerLogsReader logReader = null;
584    
585          final LogKey containerKey = new LogKey(containerId);
586          LogKey key = new LogKey();
587          DataInputStream valueStream = next(key);
588          while (valueStream != null && !key.equals(containerKey)) {
589            valueStream = next(key);
590          }
591    
592          if (valueStream != null) {
593            logReader = new ContainerLogsReader(valueStream);
594          }
595    
596          return logReader;
597        }
598    
599        //TODO  Change Log format and interfaces to be containerId specific.
600        // Avoid returning completeValueStreams.
601    //    public List<String> getTypesForContainer(DataInputStream valueStream){}
602    //    
603    //    /**
604    //     * @param valueStream
605    //     *          The Log stream for the container.
606    //     * @param fileType
607    //     *          the log type required.
608    //     * @return An InputStreamReader for the required log type or null if the
609    //     *         type is not found.
610    //     * @throws IOException
611    //     */
612    //    public InputStreamReader getLogStreamForType(DataInputStream valueStream,
613    //        String fileType) throws IOException {
614    //      valueStream.reset();
615    //      try {
616    //        while (true) {
617    //          String ft = valueStream.readUTF();
618    //          String fileLengthStr = valueStream.readUTF();
619    //          long fileLength = Long.parseLong(fileLengthStr);
620    //          if (ft.equals(fileType)) {
621    //            BoundedInputStream bis =
622    //                new BoundedInputStream(valueStream, fileLength);
623    //            return new InputStreamReader(bis);
624    //          } else {
625    //            long totalSkipped = 0;
626    //            long currSkipped = 0;
627    //            while (currSkipped != -1 && totalSkipped < fileLength) {
628    //              currSkipped = valueStream.skip(fileLength - totalSkipped);
629    //              totalSkipped += currSkipped;
630    //            }
631    //            // TODO Verify skip behaviour.
632    //            if (currSkipped == -1) {
633    //              return null;
634    //            }
635    //          }
636    //        }
637    //      } catch (EOFException e) {
638    //        return null;
639    //      }
640    //    }
641    
642        /**
643         * Writes all logs for a single container to the provided writer.
644         * @param valueStream
645         * @param writer
646         * @param logUploadedTime
647         * @throws IOException
648         */
649        public static void readAcontainerLogs(DataInputStream valueStream,
650            Writer writer, long logUploadedTime) throws IOException {
651          OutputStream os = null;
652          PrintStream ps = null;
653          try {
654            os = new WriterOutputStream(writer);
655            ps = new PrintStream(os);
656            while (true) {
657              try {
658                readContainerLogs(valueStream, ps, logUploadedTime);
659              } catch (EOFException e) {
660                // EndOfFile
661                return;
662              }
663            }
664          } finally {
665            IOUtils.cleanup(LOG, ps);
666            IOUtils.cleanup(LOG, os);
667          }
668        }
669    
670        /**
671         * Writes all logs for a single container to the provided writer.
672         * @param valueStream
673         * @param writer
674         * @throws IOException
675         */
676        public static void readAcontainerLogs(DataInputStream valueStream,
677            Writer writer) throws IOException {
678          readAcontainerLogs(valueStream, writer, -1);
679        }
680    
681        private static void readContainerLogs(DataInputStream valueStream,
682            PrintStream out, long logUploadedTime) throws IOException {
683          byte[] buf = new byte[65535];
684    
685          String fileType = valueStream.readUTF();
686          String fileLengthStr = valueStream.readUTF();
687          long fileLength = Long.parseLong(fileLengthStr);
688          out.print("LogType:");
689          out.println(fileType);
690          if (logUploadedTime != -1) {
691            out.print("Log Upload Time:");
692            out.println(Times.format(logUploadedTime));
693          }
694          out.print("LogLength:");
695          out.println(fileLengthStr);
696          out.println("Log Contents:");
697    
698          long curRead = 0;
699          long pendingRead = fileLength - curRead;
700          int toRead =
701                    pendingRead > buf.length ? buf.length : (int) pendingRead;
702          int len = valueStream.read(buf, 0, toRead);
703          while (len != -1 && curRead < fileLength) {
704            out.write(buf, 0, len);
705            curRead += len;
706    
707            pendingRead = fileLength - curRead;
708            toRead =
709                      pendingRead > buf.length ? buf.length : (int) pendingRead;
710            len = valueStream.read(buf, 0, toRead);
711          }
712          out.println("");
713        }
714    
715        /**
716         * Keep calling this till you get a {@link EOFException} for getting logs of
717         * all types for a single container.
718         * 
719         * @param valueStream
720         * @param out
721         * @param logUploadedTime
722         * @throws IOException
723         */
724        public static void readAContainerLogsForALogType(
725            DataInputStream valueStream, PrintStream out, long logUploadedTime)
726              throws IOException {
727          readContainerLogs(valueStream, out, logUploadedTime);
728        }
729    
730        /**
731         * Keep calling this till you get a {@link EOFException} for getting logs of
732         * all types for a single container.
733         * 
734         * @param valueStream
735         * @param out
736         * @throws IOException
737         */
738        public static void readAContainerLogsForALogType(
739            DataInputStream valueStream, PrintStream out)
740              throws IOException {
741          readAContainerLogsForALogType(valueStream, out, -1);
742        }
743    
744        public void close() {
745          IOUtils.cleanup(LOG, scanner, reader, fsDataIStream);
746        }
747      }
748    
749      @Private
750      public static class ContainerLogsReader {
751        private DataInputStream valueStream;
752        private String currentLogType = null;
753        private long currentLogLength = 0;
754        private BoundedInputStream currentLogData = null;
755        private InputStreamReader currentLogISR;
756    
757        public ContainerLogsReader(DataInputStream stream) {
758          valueStream = stream;
759        }
760    
761        public String nextLog() throws IOException {
762          if (currentLogData != null && currentLogLength > 0) {
763            // seek to the end of the current log, relying on BoundedInputStream
764            // to prevent seeking past the end of the current log
765            do {
766              if (currentLogData.skip(currentLogLength) < 0) {
767                break;
768              }
769            } while (currentLogData.read() != -1);
770          }
771    
772          currentLogType = null;
773          currentLogLength = 0;
774          currentLogData = null;
775          currentLogISR = null;
776    
777          try {
778            String logType = valueStream.readUTF();
779            String logLengthStr = valueStream.readUTF();
780            currentLogLength = Long.parseLong(logLengthStr);
781            currentLogData =
782                new BoundedInputStream(valueStream, currentLogLength);
783            currentLogData.setPropagateClose(false);
784            currentLogISR = new InputStreamReader(currentLogData);
785            currentLogType = logType;
786          } catch (EOFException e) {
787          }
788    
789          return currentLogType;
790        }
791    
792        public String getCurrentLogType() {
793          return currentLogType;
794        }
795    
796        public long getCurrentLogLength() {
797          return currentLogLength;
798        }
799    
800        public long skip(long n) throws IOException {
801          return currentLogData.skip(n);
802        }
803    
804        public int read(byte[] buf, int off, int len) throws IOException {
805          return currentLogData.read(buf, off, len);
806        }
807    
808        public int read(char[] buf, int off, int len) throws IOException {
809          return currentLogISR.read(buf, off, len);
810        }
811      }
812    }