001/**
002* Licensed to the Apache Software Foundation (ASF) under one
003* or more contributor license agreements.  See the NOTICE file
004* distributed with this work for additional information
005* regarding copyright ownership.  The ASF licenses this file
006* to you under the Apache License, Version 2.0 (the
007* "License"); you may not use this file except in compliance
008* with the License.  You may obtain a copy of the License at
009*
010*     http://www.apache.org/licenses/LICENSE-2.0
011*
012* Unless required by applicable law or agreed to in writing, software
013* distributed under the License is distributed on an "AS IS" BASIS,
014* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015* See the License for the specific language governing permissions and
016* limitations under the License.
017*/
018
019package org.apache.hadoop.yarn.logaggregation;
020
021import java.io.DataInput;
022import java.io.DataInputStream;
023import java.io.DataOutput;
024import java.io.DataOutputStream;
025import java.io.EOFException;
026import java.io.File;
027import java.io.FileInputStream;
028import java.io.IOException;
029import java.io.InputStreamReader;
030import java.io.OutputStream;
031import java.io.PrintStream;
032import java.io.Writer;
033import java.nio.charset.Charset;
034import java.security.PrivilegedExceptionAction;
035import java.util.ArrayList;
036import java.util.Arrays;
037import java.util.Collections;
038import java.util.EnumSet;
039import java.util.HashMap;
040import java.util.HashSet;
041import java.util.Iterator;
042import java.util.List;
043import java.util.Map;
044import java.util.Map.Entry;
045import java.util.Set;
046import java.util.regex.Pattern;
047
048import org.apache.commons.io.input.BoundedInputStream;
049import org.apache.commons.io.output.WriterOutputStream;
050import org.apache.commons.logging.Log;
051import org.apache.commons.logging.LogFactory;
052import org.apache.hadoop.classification.InterfaceAudience.Private;
053import org.apache.hadoop.classification.InterfaceAudience.Public;
054import org.apache.hadoop.classification.InterfaceStability.Evolving;
055import org.apache.hadoop.conf.Configuration;
056import org.apache.hadoop.fs.CreateFlag;
057import org.apache.hadoop.fs.FSDataInputStream;
058import org.apache.hadoop.fs.FSDataOutputStream;
059import org.apache.hadoop.fs.FileContext;
060import org.apache.hadoop.fs.Options;
061import org.apache.hadoop.fs.Path;
062import org.apache.hadoop.fs.permission.FsPermission;
063import org.apache.hadoop.io.IOUtils;
064import org.apache.hadoop.io.SecureIOUtils;
065import org.apache.hadoop.io.Writable;
066import org.apache.hadoop.io.file.tfile.TFile;
067import org.apache.hadoop.security.UserGroupInformation;
068import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
069import org.apache.hadoop.yarn.api.records.ContainerId;
070import org.apache.hadoop.yarn.api.records.LogAggregationContext;
071import org.apache.hadoop.yarn.conf.YarnConfiguration;
072import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
073import org.apache.hadoop.yarn.util.ConverterUtils;
074import org.apache.hadoop.yarn.util.Times;
075
076import com.google.common.annotations.VisibleForTesting;
077import com.google.common.base.Predicate;
078import com.google.common.collect.Iterables;
079import com.google.common.collect.Sets;
080
081@Public
082@Evolving
083public class AggregatedLogFormat {
084
085  private static final Log LOG = LogFactory.getLog(AggregatedLogFormat.class);
086  private static final LogKey APPLICATION_ACL_KEY = new LogKey("APPLICATION_ACL");
087  private static final LogKey APPLICATION_OWNER_KEY = new LogKey("APPLICATION_OWNER");
088  private static final LogKey VERSION_KEY = new LogKey("VERSION");
089  private static final Map<String, LogKey> RESERVED_KEYS;
090  //Maybe write out the retention policy.
091  //Maybe write out a list of containerLogs skipped by the retention policy.
092  private static final int VERSION = 1;
093
094  /**
095   * Umask for the log file.
096   */
097  private static final FsPermission APP_LOG_FILE_UMASK = FsPermission
098      .createImmutable((short) (0640 ^ 0777));
099
100
101  static {
102    RESERVED_KEYS = new HashMap<String, AggregatedLogFormat.LogKey>();
103    RESERVED_KEYS.put(APPLICATION_ACL_KEY.toString(), APPLICATION_ACL_KEY);
104    RESERVED_KEYS.put(APPLICATION_OWNER_KEY.toString(), APPLICATION_OWNER_KEY);
105    RESERVED_KEYS.put(VERSION_KEY.toString(), VERSION_KEY);
106  }
107
108  @Public
109  public static class LogKey implements Writable {
110
111    private String keyString;
112
113    public LogKey() {
114
115    }
116
117    public LogKey(ContainerId containerId) {
118      this.keyString = containerId.toString();
119    }
120
121    public LogKey(String keyString) {
122      this.keyString = keyString;
123    }
124    
125    @Override
126    public int hashCode() {
127      return keyString == null ? 0 : keyString.hashCode();
128    }
129
130    @Override
131    public boolean equals(Object obj) {
132      if (obj instanceof LogKey) {
133        LogKey other = (LogKey) obj;
134        if (this.keyString == null) {
135          return other.keyString == null;
136        }
137        return this.keyString.equals(other.keyString);
138      }
139      return false;
140    }
141
142    @Private
143    @Override
144    public void write(DataOutput out) throws IOException {
145      out.writeUTF(this.keyString);
146    }
147
148    @Private
149    @Override
150    public void readFields(DataInput in) throws IOException {
151      this.keyString = in.readUTF();
152    }
153
154    @Override
155    public String toString() {
156      return this.keyString;
157    }
158  }
159
160  @Private
161  public static class LogValue {
162
163    private final List<String> rootLogDirs;
164    private final ContainerId containerId;
165    private final String user;
166    private final LogAggregationContext logAggregationContext;
167    private Set<File> uploadedFiles = new HashSet<File>();
168    private final Set<String> alreadyUploadedLogFiles;
169    private Set<String> allExistingFileMeta = new HashSet<String>();
170    private final boolean appFinished;
171    private final boolean containerFinished;
172    // TODO Maybe add a version string here. Instead of changing the version of
173    // the entire k-v format
174
175    public LogValue(List<String> rootLogDirs, ContainerId containerId,
176        String user) {
177      this(rootLogDirs, containerId, user, null, new HashSet<String>(), true,
178          true);
179    }
180
181    public LogValue(List<String> rootLogDirs, ContainerId containerId,
182        String user, LogAggregationContext logAggregationContext,
183        Set<String> alreadyUploadedLogFiles, boolean appFinished,
184        boolean containerFinished) {
185      this.rootLogDirs = new ArrayList<String>(rootLogDirs);
186      this.containerId = containerId;
187      this.user = user;
188
189      // Ensure logs are processed in lexical order
190      Collections.sort(this.rootLogDirs);
191      this.logAggregationContext = logAggregationContext;
192      this.alreadyUploadedLogFiles = alreadyUploadedLogFiles;
193      this.appFinished = appFinished;
194      this.containerFinished = containerFinished;
195    }
196
197    private Set<File> getPendingLogFilesToUploadForThisContainer() {
198      Set<File> pendingUploadFiles = new HashSet<File>();
199      for (String rootLogDir : this.rootLogDirs) {
200        File appLogDir = new File(rootLogDir,
201            this.containerId.getApplicationAttemptId().
202                getApplicationId().toString());
203        File containerLogDir =
204            new File(appLogDir, this.containerId.toString());
205
206        if (!containerLogDir.isDirectory()) {
207          continue; // ContainerDir may have been deleted by the user.
208        }
209
210        pendingUploadFiles
211          .addAll(getPendingLogFilesToUpload(containerLogDir));
212      }
213      return pendingUploadFiles;
214    }
215
216    public void write(DataOutputStream out, Set<File> pendingUploadFiles)
217        throws IOException {
218      List<File> fileList = new ArrayList<File>(pendingUploadFiles);
219      Collections.sort(fileList);
220
221      for (File logFile : fileList) {
222        // We only aggregate top level files.
223        // Ignore anything inside sub-folders.
224        if (logFile.isDirectory()) {
225          LOG.warn(logFile.getAbsolutePath() + " is a directory. Ignore it.");
226          continue;
227        }
228
229        FileInputStream in = null;
230        try {
231          in = secureOpenFile(logFile);
232        } catch (IOException e) {
233          logErrorMessage(logFile, e);
234          IOUtils.cleanup(LOG, in);
235          continue;
236        }
237
238        final long fileLength = logFile.length();
239        // Write the logFile Type
240        out.writeUTF(logFile.getName());
241
242        // Write the log length as UTF so that it is printable
243        out.writeUTF(String.valueOf(fileLength));
244
245        // Write the log itself
246        try {
247          byte[] buf = new byte[65535];
248          int len = 0;
249          long bytesLeft = fileLength;
250          while ((len = in.read(buf)) != -1) {
251            //If buffer contents within fileLength, write
252            if (len < bytesLeft) {
253              out.write(buf, 0, len);
254              bytesLeft-=len;
255            }
256            //else only write contents within fileLength, then exit early
257            else {
258              out.write(buf, 0, (int)bytesLeft);
259              break;
260            }
261          }
262          long newLength = logFile.length();
263          if(fileLength < newLength) {
264            LOG.warn("Aggregated logs truncated by approximately "+
265                (newLength-fileLength) +" bytes.");
266          }
267          this.uploadedFiles.add(logFile);
268        } catch (IOException e) {
269          String message = logErrorMessage(logFile, e);
270          out.write(message.getBytes(Charset.forName("UTF-8")));
271        } finally {
272          IOUtils.cleanup(LOG, in);
273        }
274      }
275    }
276
277    @VisibleForTesting
278    public FileInputStream secureOpenFile(File logFile) throws IOException {
279      return SecureIOUtils.openForRead(logFile, getUser(), null);
280    }
281
282    private static String logErrorMessage(File logFile, Exception e) {
283      String message = "Error aggregating log file. Log file : "
284          + logFile.getAbsolutePath() + ". " + e.getMessage();
285      LOG.error(message, e);
286      return message;
287    }
288
289    // Added for testing purpose.
290    public String getUser() {
291      return user;
292    }
293
294    private Set<File> getPendingLogFilesToUpload(File containerLogDir) {
295      Set<File> candidates =
296          new HashSet<File>(Arrays.asList(containerLogDir.listFiles()));
297      for (File logFile : candidates) {
298        this.allExistingFileMeta.add(getLogFileMetaData(logFile));
299      }
300
301      Set<File> fileCandidates = new HashSet<File>(candidates);
302      if (this.logAggregationContext != null && candidates.size() > 0) {
303        fileCandidates = getFileCandidates(fileCandidates, this.appFinished);
304        if (!this.appFinished && this.containerFinished) {
305          Set<File> addition = new HashSet<File>(candidates);
306          addition = getFileCandidates(addition, true);
307          fileCandidates.addAll(addition);
308        }
309      }
310      return fileCandidates;
311    }
312
313    private Set<File> getFileCandidates(Set<File> candidates,
314        boolean useRegularPattern) {
315      filterFiles(
316          useRegularPattern ? this.logAggregationContext.getIncludePattern()
317              : this.logAggregationContext.getRolledLogsIncludePattern(),
318          candidates, false);
319
320      filterFiles(
321          useRegularPattern ? this.logAggregationContext.getExcludePattern()
322              : this.logAggregationContext.getRolledLogsExcludePattern(),
323          candidates, true);
324
325      Iterable<File> mask =
326          Iterables.filter(candidates, new Predicate<File>() {
327            @Override
328            public boolean apply(File next) {
329              return !alreadyUploadedLogFiles
330                  .contains(getLogFileMetaData(next));
331            }
332          });
333      return Sets.newHashSet(mask);
334    }
335
336    private void filterFiles(String pattern, Set<File> candidates,
337        boolean exclusion) {
338      if (pattern != null && !pattern.isEmpty()) {
339        Pattern filterPattern = Pattern.compile(pattern);
340        for (Iterator<File> candidatesItr = candidates.iterator(); candidatesItr
341          .hasNext();) {
342          File candidate = candidatesItr.next();
343          boolean match = filterPattern.matcher(candidate.getName()).find();
344          if ((!match && !exclusion) || (match && exclusion)) {
345            candidatesItr.remove();
346          }
347        }
348      }
349    }
350
351    public Set<Path> getCurrentUpLoadedFilesPath() {
352      Set<Path> path = new HashSet<Path>();
353      for (File file : this.uploadedFiles) {
354        path.add(new Path(file.getAbsolutePath()));
355      }
356      return path;
357    }
358
359    public Set<String> getCurrentUpLoadedFileMeta() {
360      Set<String> info = new HashSet<String>();
361      for (File file : this.uploadedFiles) {
362        info.add(getLogFileMetaData(file));
363      }
364      return info;
365    }
366
367    public Set<String> getAllExistingFilesMeta() {
368      return this.allExistingFileMeta;
369    }
370
371    private String getLogFileMetaData(File file) {
372      return containerId.toString() + "_" + file.getName() + "_"
373          + file.lastModified();
374    }
375  }
376
377  /**
378   * The writer that writes out the aggregated logs.
379   */
380  @Private
381  public static class LogWriter {
382
383    private final FSDataOutputStream fsDataOStream;
384    private final TFile.Writer writer;
385    private FileContext fc;
386
387    public LogWriter(final Configuration conf, final Path remoteAppLogFile,
388        UserGroupInformation userUgi) throws IOException {
389      try {
390        this.fsDataOStream =
391            userUgi.doAs(new PrivilegedExceptionAction<FSDataOutputStream>() {
392              @Override
393              public FSDataOutputStream run() throws Exception {
394                fc = FileContext.getFileContext(remoteAppLogFile.toUri(), conf);
395                fc.setUMask(APP_LOG_FILE_UMASK);
396                return fc.create(
397                    remoteAppLogFile,
398                    EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
399                    new Options.CreateOpts[] {});
400              }
401            });
402      } catch (InterruptedException e) {
403        throw new IOException(e);
404      }
405
406      // Keys are not sorted: null arg
407      // 256KB minBlockSize : Expected log size for each container too
408      this.writer =
409          new TFile.Writer(this.fsDataOStream, 256 * 1024, conf.get(
410              YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE,
411              YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE), null, conf);
412      //Write the version string
413      writeVersion();
414    }
415
416    @VisibleForTesting
417    public TFile.Writer getWriter() {
418      return this.writer;
419    }
420
421    private void writeVersion() throws IOException {
422      DataOutputStream out = this.writer.prepareAppendKey(-1);
423      VERSION_KEY.write(out);
424      out.close();
425      out = this.writer.prepareAppendValue(-1);
426      out.writeInt(VERSION);
427      out.close();
428    }
429
430    public void writeApplicationOwner(String user) throws IOException {
431      DataOutputStream out = this.writer.prepareAppendKey(-1);
432      APPLICATION_OWNER_KEY.write(out);
433      out.close();
434      out = this.writer.prepareAppendValue(-1);
435      out.writeUTF(user);
436      out.close();
437    }
438
439    public void writeApplicationACLs(Map<ApplicationAccessType, String> appAcls)
440        throws IOException {
441      DataOutputStream out = this.writer.prepareAppendKey(-1);
442      APPLICATION_ACL_KEY.write(out);
443      out.close();
444      out = this.writer.prepareAppendValue(-1);
445      for (Entry<ApplicationAccessType, String> entry : appAcls.entrySet()) {
446        out.writeUTF(entry.getKey().toString());
447        out.writeUTF(entry.getValue());
448      }
449      out.close();
450    }
451
452    public void append(LogKey logKey, LogValue logValue) throws IOException {
453      Set<File> pendingUploadFiles =
454          logValue.getPendingLogFilesToUploadForThisContainer();
455      if (pendingUploadFiles.size() == 0) {
456        return;
457      }
458      DataOutputStream out = this.writer.prepareAppendKey(-1);
459      logKey.write(out);
460      out.close();
461      out = this.writer.prepareAppendValue(-1);
462      logValue.write(out, pendingUploadFiles);
463      out.close();
464    }
465
466    public void close() {
467      try {
468        this.writer.close();
469      } catch (IOException e) {
470        LOG.warn("Exception closing writer", e);
471      }
472      IOUtils.closeStream(fsDataOStream);
473    }
474  }
475
476  @Public
477  @Evolving
478  public static class LogReader {
479
480    private final FSDataInputStream fsDataIStream;
481    private final TFile.Reader.Scanner scanner;
482    private final TFile.Reader reader;
483
484    public LogReader(Configuration conf, Path remoteAppLogFile)
485        throws IOException {
486      FileContext fileContext =
487          FileContext.getFileContext(remoteAppLogFile.toUri(), conf);
488      this.fsDataIStream = fileContext.open(remoteAppLogFile);
489      reader =
490          new TFile.Reader(this.fsDataIStream, fileContext.getFileStatus(
491              remoteAppLogFile).getLen(), conf);
492      this.scanner = reader.createScanner();
493    }
494
495    private boolean atBeginning = true;
496
497    /**
498     * Returns the owner of the application.
499     * 
500     * @return the application owner.
501     * @throws IOException
502     */
503    public String getApplicationOwner() throws IOException {
504      TFile.Reader.Scanner ownerScanner = null;
505      try {
506        ownerScanner = reader.createScanner();
507        LogKey key = new LogKey();
508        while (!ownerScanner.atEnd()) {
509          TFile.Reader.Scanner.Entry entry = ownerScanner.entry();
510          key.readFields(entry.getKeyStream());
511          if (key.toString().equals(APPLICATION_OWNER_KEY.toString())) {
512            DataInputStream valueStream = entry.getValueStream();
513            return valueStream.readUTF();
514          }
515          ownerScanner.advance();
516        }
517        return null;
518      } finally {
519        IOUtils.cleanup(LOG, ownerScanner);
520      }
521    }
522
523    /**
524     * Returns ACLs for the application. An empty map is returned if no ACLs are
525     * found.
526     * 
527     * @return a map of the Application ACLs.
528     * @throws IOException
529     */
530    public Map<ApplicationAccessType, String> getApplicationAcls()
531        throws IOException {
532      // TODO Seek directly to the key once a comparator is specified.
533      TFile.Reader.Scanner aclScanner = null;
534      try {
535        aclScanner = reader.createScanner();
536        LogKey key = new LogKey();
537        Map<ApplicationAccessType, String> acls =
538            new HashMap<ApplicationAccessType, String>();
539        while (!aclScanner.atEnd()) {
540          TFile.Reader.Scanner.Entry entry = aclScanner.entry();
541          key.readFields(entry.getKeyStream());
542          if (key.toString().equals(APPLICATION_ACL_KEY.toString())) {
543            DataInputStream valueStream = entry.getValueStream();
544            while (true) {
545              String appAccessOp = null;
546              String aclString = null;
547              try {
548                appAccessOp = valueStream.readUTF();
549              } catch (EOFException e) {
550                // Valid end of stream.
551                break;
552              }
553              try {
554                aclString = valueStream.readUTF();
555              } catch (EOFException e) {
556                throw new YarnRuntimeException("Error reading ACLs", e);
557              }
558              acls.put(ApplicationAccessType.valueOf(appAccessOp), aclString);
559            }
560          }
561          aclScanner.advance();
562        }
563        return acls;
564      } finally {
565        IOUtils.cleanup(LOG, aclScanner);
566      }
567    }
568
569    /**
570     * Read the next key and return the value-stream.
571     * 
572     * @param key
573     * @return the valueStream if there are more keys or null otherwise.
574     * @throws IOException
575     */
576    public DataInputStream next(LogKey key) throws IOException {
577      if (!this.atBeginning) {
578        this.scanner.advance();
579      } else {
580        this.atBeginning = false;
581      }
582      if (this.scanner.atEnd()) {
583        return null;
584      }
585      TFile.Reader.Scanner.Entry entry = this.scanner.entry();
586      key.readFields(entry.getKeyStream());
587      // Skip META keys
588      if (RESERVED_KEYS.containsKey(key.toString())) {
589        return next(key);
590      }
591      DataInputStream valueStream = entry.getValueStream();
592      return valueStream;
593    }
594
595    /**
596     * Get a ContainerLogsReader to read the logs for
597     * the specified container.
598     *
599     * @param containerId
600     * @return object to read the container's logs or null if the
601     *         logs could not be found
602     * @throws IOException
603     */
604    @Private
605    public ContainerLogsReader getContainerLogsReader(
606        ContainerId containerId) throws IOException {
607      ContainerLogsReader logReader = null;
608
609      final LogKey containerKey = new LogKey(containerId);
610      LogKey key = new LogKey();
611      DataInputStream valueStream = next(key);
612      while (valueStream != null && !key.equals(containerKey)) {
613        valueStream = next(key);
614      }
615
616      if (valueStream != null) {
617        logReader = new ContainerLogsReader(valueStream);
618      }
619
620      return logReader;
621    }
622
623    //TODO  Change Log format and interfaces to be containerId specific.
624    // Avoid returning completeValueStreams.
625//    public List<String> getTypesForContainer(DataInputStream valueStream){}
626//    
627//    /**
628//     * @param valueStream
629//     *          The Log stream for the container.
630//     * @param fileType
631//     *          the log type required.
632//     * @return An InputStreamReader for the required log type or null if the
633//     *         type is not found.
634//     * @throws IOException
635//     */
636//    public InputStreamReader getLogStreamForType(DataInputStream valueStream,
637//        String fileType) throws IOException {
638//      valueStream.reset();
639//      try {
640//        while (true) {
641//          String ft = valueStream.readUTF();
642//          String fileLengthStr = valueStream.readUTF();
643//          long fileLength = Long.parseLong(fileLengthStr);
644//          if (ft.equals(fileType)) {
645//            BoundedInputStream bis =
646//                new BoundedInputStream(valueStream, fileLength);
647//            return new InputStreamReader(bis);
648//          } else {
649//            long totalSkipped = 0;
650//            long currSkipped = 0;
651//            while (currSkipped != -1 && totalSkipped < fileLength) {
652//              currSkipped = valueStream.skip(fileLength - totalSkipped);
653//              totalSkipped += currSkipped;
654//            }
655//            // TODO Verify skip behaviour.
656//            if (currSkipped == -1) {
657//              return null;
658//            }
659//          }
660//        }
661//      } catch (EOFException e) {
662//        return null;
663//      }
664//    }
665
666    /**
667     * Writes all logs for a single container to the provided writer.
668     * @param valueStream
669     * @param writer
670     * @param logUploadedTime
671     * @throws IOException
672     */
673    public static void readAcontainerLogs(DataInputStream valueStream,
674        Writer writer, long logUploadedTime) throws IOException {
675      OutputStream os = null;
676      PrintStream ps = null;
677      try {
678        os = new WriterOutputStream(writer, Charset.forName("UTF-8"));
679        ps = new PrintStream(os);
680        while (true) {
681          try {
682            readContainerLogs(valueStream, ps, logUploadedTime);
683          } catch (EOFException e) {
684            // EndOfFile
685            return;
686          }
687        }
688      } finally {
689        IOUtils.cleanup(LOG, ps);
690        IOUtils.cleanup(LOG, os);
691      }
692    }
693
694    /**
695     * Writes all logs for a single container to the provided writer.
696     * @param valueStream
697     * @param writer
698     * @throws IOException
699     */
700    public static void readAcontainerLogs(DataInputStream valueStream,
701        Writer writer) throws IOException {
702      readAcontainerLogs(valueStream, writer, -1);
703    }
704
705    private static void readContainerLogs(DataInputStream valueStream,
706        PrintStream out, long logUploadedTime) throws IOException {
707      byte[] buf = new byte[65535];
708
709      String fileType = valueStream.readUTF();
710      String fileLengthStr = valueStream.readUTF();
711      long fileLength = Long.parseLong(fileLengthStr);
712      out.print("LogType:");
713      out.println(fileType);
714      if (logUploadedTime != -1) {
715        out.print("Log Upload Time:");
716        out.println(Times.format(logUploadedTime));
717      }
718      out.print("LogLength:");
719      out.println(fileLengthStr);
720      out.println("Log Contents:");
721
722      long curRead = 0;
723      long pendingRead = fileLength - curRead;
724      int toRead =
725                pendingRead > buf.length ? buf.length : (int) pendingRead;
726      int len = valueStream.read(buf, 0, toRead);
727      while (len != -1 && curRead < fileLength) {
728        out.write(buf, 0, len);
729        curRead += len;
730
731        pendingRead = fileLength - curRead;
732        toRead =
733                  pendingRead > buf.length ? buf.length : (int) pendingRead;
734        len = valueStream.read(buf, 0, toRead);
735      }
736      out.println("End of LogType:" + fileType);
737      out.println("");
738    }
739
740    /**
741     * Keep calling this till you get a {@link EOFException} for getting logs of
742     * all types for a single container.
743     * 
744     * @param valueStream
745     * @param out
746     * @param logUploadedTime
747     * @throws IOException
748     */
749    public static void readAContainerLogsForALogType(
750        DataInputStream valueStream, PrintStream out, long logUploadedTime)
751          throws IOException {
752      readContainerLogs(valueStream, out, logUploadedTime);
753    }
754
755    /**
756     * Keep calling this till you get a {@link EOFException} for getting logs of
757     * all types for a single container.
758     * 
759     * @param valueStream
760     * @param out
761     * @throws IOException
762     */
763    public static void readAContainerLogsForALogType(
764        DataInputStream valueStream, PrintStream out)
765          throws IOException {
766      readAContainerLogsForALogType(valueStream, out, -1);
767    }
768
769    /**
770     * Keep calling this till you get a {@link EOFException} for getting logs of
771     * the specific types for a single container.
772     * @param valueStream
773     * @param out
774     * @param logUploadedTime
775     * @param logType
776     * @throws IOException
777     */
778    public static int readContainerLogsForALogType(
779        DataInputStream valueStream, PrintStream out, long logUploadedTime,
780        List<String> logType) throws IOException {
781      byte[] buf = new byte[65535];
782
783      String fileType = valueStream.readUTF();
784      String fileLengthStr = valueStream.readUTF();
785      long fileLength = Long.parseLong(fileLengthStr);
786      if (logType.contains(fileType)) {
787        out.print("LogType:");
788        out.println(fileType);
789        if (logUploadedTime != -1) {
790          out.print("Log Upload Time:");
791          out.println(Times.format(logUploadedTime));
792        }
793        out.print("LogLength:");
794        out.println(fileLengthStr);
795        out.println("Log Contents:");
796
797        long curRead = 0;
798        long pendingRead = fileLength - curRead;
799        int toRead = pendingRead > buf.length ? buf.length : (int) pendingRead;
800        int len = valueStream.read(buf, 0, toRead);
801        while (len != -1 && curRead < fileLength) {
802          out.write(buf, 0, len);
803          curRead += len;
804
805          pendingRead = fileLength - curRead;
806          toRead = pendingRead > buf.length ? buf.length : (int) pendingRead;
807          len = valueStream.read(buf, 0, toRead);
808        }
809        out.println("End of LogType:" + fileType);
810        out.println("");
811        return 0;
812      } else {
813        long totalSkipped = 0;
814        long currSkipped = 0;
815        while (currSkipped != -1 && totalSkipped < fileLength) {
816          currSkipped = valueStream.skip(fileLength - totalSkipped);
817          totalSkipped += currSkipped;
818        }
819        return -1;
820      }
821    }
822
823    public void close() {
824      IOUtils.cleanup(LOG, scanner, reader, fsDataIStream);
825    }
826  }
827
828  @Private
829  public static class ContainerLogsReader {
830    private DataInputStream valueStream;
831    private String currentLogType = null;
832    private long currentLogLength = 0;
833    private BoundedInputStream currentLogData = null;
834    private InputStreamReader currentLogISR;
835
836    public ContainerLogsReader(DataInputStream stream) {
837      valueStream = stream;
838    }
839
840    public String nextLog() throws IOException {
841      if (currentLogData != null && currentLogLength > 0) {
842        // seek to the end of the current log, relying on BoundedInputStream
843        // to prevent seeking past the end of the current log
844        do {
845          if (currentLogData.skip(currentLogLength) < 0) {
846            break;
847          }
848        } while (currentLogData.read() != -1);
849      }
850
851      currentLogType = null;
852      currentLogLength = 0;
853      currentLogData = null;
854      currentLogISR = null;
855
856      try {
857        String logType = valueStream.readUTF();
858        String logLengthStr = valueStream.readUTF();
859        currentLogLength = Long.parseLong(logLengthStr);
860        currentLogData =
861            new BoundedInputStream(valueStream, currentLogLength);
862        currentLogData.setPropagateClose(false);
863        currentLogISR = new InputStreamReader(currentLogData,
864            Charset.forName("UTF-8"));
865        currentLogType = logType;
866      } catch (EOFException e) {
867      }
868
869      return currentLogType;
870    }
871
872    public String getCurrentLogType() {
873      return currentLogType;
874    }
875
876    public long getCurrentLogLength() {
877      return currentLogLength;
878    }
879
880    public long skip(long n) throws IOException {
881      return currentLogData.skip(n);
882    }
883
884    public int read() throws IOException {
885      return currentLogData.read();
886    }
887
888    public int read(byte[] buf, int off, int len) throws IOException {
889      return currentLogData.read(buf, off, len);
890    }
891
892    public int read(char[] buf, int off, int len) throws IOException {
893      return currentLogISR.read(buf, off, len);
894    }
895  }
896}