001    /**
002    * Licensed to the Apache Software Foundation (ASF) under one
003    * or more contributor license agreements.  See the NOTICE file
004    * distributed with this work for additional information
005    * regarding copyright ownership.  The ASF licenses this file
006    * to you under the Apache License, Version 2.0 (the
007    * "License"); you may not use this file except in compliance
008    * with the License.  You may obtain a copy of the License at
009    *
010    *     http://www.apache.org/licenses/LICENSE-2.0
011    *
012    * Unless required by applicable law or agreed to in writing, software
013    * distributed under the License is distributed on an "AS IS" BASIS,
014    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015    * See the License for the specific language governing permissions and
016    * limitations under the License.
017    */
018    
019    package org.apache.hadoop.yarn.logaggregation;
020    
021    import java.io.DataInput;
022    import java.io.DataInputStream;
023    import java.io.DataOutput;
024    import java.io.DataOutputStream;
025    import java.io.EOFException;
026    import java.io.File;
027    import java.io.FileInputStream;
028    import java.io.IOException;
029    import java.io.InputStreamReader;
030    import java.io.PrintStream;
031    import java.io.Writer;
032    import java.security.PrivilegedExceptionAction;
033    import java.util.ArrayList;
034    import java.util.Arrays;
035    import java.util.Collections;
036    import java.util.EnumSet;
037    import java.util.HashMap;
038    import java.util.List;
039    import java.util.Map;
040    import java.util.Map.Entry;
041    
042    import org.apache.commons.io.input.BoundedInputStream;
043    import org.apache.commons.logging.Log;
044    import org.apache.commons.logging.LogFactory;
045    import org.apache.hadoop.classification.InterfaceAudience.Private;
046    import org.apache.hadoop.classification.InterfaceAudience.Public;
047    import org.apache.hadoop.classification.InterfaceStability.Evolving;
048    import org.apache.hadoop.conf.Configuration;
049    import org.apache.hadoop.fs.CreateFlag;
050    import org.apache.hadoop.fs.FSDataInputStream;
051    import org.apache.hadoop.fs.FSDataOutputStream;
052    import org.apache.hadoop.fs.FileContext;
053    import org.apache.hadoop.fs.Options;
054    import org.apache.hadoop.fs.Path;
055    import org.apache.hadoop.fs.permission.FsPermission;
056    import org.apache.hadoop.io.IOUtils;
057    import org.apache.hadoop.io.SecureIOUtils;
058    import org.apache.hadoop.io.Writable;
059    import org.apache.hadoop.io.file.tfile.TFile;
060    import org.apache.hadoop.security.UserGroupInformation;
061    import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
062    import org.apache.hadoop.yarn.api.records.ContainerId;
063    import org.apache.hadoop.yarn.conf.YarnConfiguration;
064    import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
065    import org.apache.hadoop.yarn.util.ConverterUtils;
066    
067    @Public
068    @Evolving
069    public class AggregatedLogFormat {
070    
071      private static final Log LOG = LogFactory.getLog(AggregatedLogFormat.class);
072      private static final LogKey APPLICATION_ACL_KEY = new LogKey("APPLICATION_ACL");
073      private static final LogKey APPLICATION_OWNER_KEY = new LogKey("APPLICATION_OWNER");
074      private static final LogKey VERSION_KEY = new LogKey("VERSION");
075      private static final Map<String, LogKey> RESERVED_KEYS;
076      //Maybe write out the retention policy.
077      //Maybe write out a list of containerLogs skipped by the retention policy.
078      private static final int VERSION = 1;
079    
080      /**
081       * Umask for the log file.
082       */
083      private static final FsPermission APP_LOG_FILE_UMASK = FsPermission
084          .createImmutable((short) (0640 ^ 0777));
085    
086    
087      static {
088        RESERVED_KEYS = new HashMap<String, AggregatedLogFormat.LogKey>();
089        RESERVED_KEYS.put(APPLICATION_ACL_KEY.toString(), APPLICATION_ACL_KEY);
090        RESERVED_KEYS.put(APPLICATION_OWNER_KEY.toString(), APPLICATION_OWNER_KEY);
091        RESERVED_KEYS.put(VERSION_KEY.toString(), VERSION_KEY);
092      }
093    
094      @Public
095      public static class LogKey implements Writable {
096    
097        private String keyString;
098    
099        public LogKey() {
100    
101        }
102    
103        public LogKey(ContainerId containerId) {
104          this.keyString = containerId.toString();
105        }
106    
107        public LogKey(String keyString) {
108          this.keyString = keyString;
109        }
110        
111        @Override
112        public int hashCode() {
113          return keyString == null ? 0 : keyString.hashCode();
114        }
115    
116        @Override
117        public boolean equals(Object obj) {
118          if (obj instanceof LogKey) {
119            LogKey other = (LogKey) obj;
120            if (this.keyString == null) {
121              return other.keyString == null;
122            }
123            return this.keyString.equals(other.keyString);
124          }
125          return false;
126        }
127    
128        @Private
129        @Override
130        public void write(DataOutput out) throws IOException {
131          out.writeUTF(this.keyString);
132        }
133    
134        @Private
135        @Override
136        public void readFields(DataInput in) throws IOException {
137          this.keyString = in.readUTF();
138        }
139    
140        @Override
141        public String toString() {
142          return this.keyString;
143        }
144      }
145    
146      @Private
147      public static class LogValue {
148    
149        private final List<String> rootLogDirs;
150        private final ContainerId containerId;
151        private final String user;
152        // TODO Maybe add a version string here. Instead of changing the version of
153        // the entire k-v format
154    
155        public LogValue(List<String> rootLogDirs, ContainerId containerId,
156            String user) {
157          this.rootLogDirs = new ArrayList<String>(rootLogDirs);
158          this.containerId = containerId;
159          this.user = user;
160    
161          // Ensure logs are processed in lexical order
162          Collections.sort(this.rootLogDirs);
163        }
164    
165        public void write(DataOutputStream out) throws IOException {
166          for (String rootLogDir : this.rootLogDirs) {
167            File appLogDir =
168                new File(rootLogDir, 
169                    ConverterUtils.toString(
170                        this.containerId.getApplicationAttemptId().
171                            getApplicationId())
172                    );
173            File containerLogDir =
174                new File(appLogDir, ConverterUtils.toString(this.containerId));
175    
176            if (!containerLogDir.isDirectory()) {
177              continue; // ContainerDir may have been deleted by the user.
178            }
179    
180            // Write out log files in lexical order
181            File[] logFiles = containerLogDir.listFiles();
182            Arrays.sort(logFiles);
183            for (File logFile : logFiles) {
184    
185              final long fileLength = logFile.length();
186    
187              // Write the logFile Type
188              out.writeUTF(logFile.getName());
189    
190              // Write the log length as UTF so that it is printable
191              out.writeUTF(String.valueOf(fileLength));
192    
193              // Write the log itself
194              FileInputStream in = null;
195              try {
196                in = SecureIOUtils.openForRead(logFile, getUser(), null);
197                byte[] buf = new byte[65535];
198                int len = 0;
199                long bytesLeft = fileLength;
200                while ((len = in.read(buf)) != -1) {
201                  //If buffer contents within fileLength, write
202                  if (len < bytesLeft) {
203                    out.write(buf, 0, len);
204                    bytesLeft-=len;
205                  }
206                  //else only write contents within fileLength, then exit early
207                  else {
208                    out.write(buf, 0, (int)bytesLeft);
209                    break;
210                  }
211                }
212                long newLength = logFile.length();
213                if(fileLength < newLength) {
214                  LOG.warn("Aggregated logs truncated by approximately "+
215                      (newLength-fileLength) +" bytes.");
216                }
217              } catch (IOException e) {
218                String message = "Error aggregating log file. Log file : "
219                    + logFile.getAbsolutePath() + e.getMessage(); 
220                LOG.error(message, e);
221                out.write(message.getBytes());
222              } finally {
223                if (in != null) {
224                  in.close();
225                }
226              }
227            }
228          }
229        }
230        
231        // Added for testing purpose.
232        public String getUser() {
233          return user;
234        }
235      }
236    
237      /**
238       * The writer that writes out the aggregated logs.
239       */
240      @Private
241      public static class LogWriter {
242    
243        private final FSDataOutputStream fsDataOStream;
244        private final TFile.Writer writer;
245    
246        public LogWriter(final Configuration conf, final Path remoteAppLogFile,
247            UserGroupInformation userUgi) throws IOException {
248          try {
249            this.fsDataOStream =
250                userUgi.doAs(new PrivilegedExceptionAction<FSDataOutputStream>() {
251                  @Override
252                  public FSDataOutputStream run() throws Exception {
253                    FileContext fc = FileContext.getFileContext(conf);
254                    fc.setUMask(APP_LOG_FILE_UMASK);
255                    return fc.create(
256                        remoteAppLogFile,
257                        EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
258                        new Options.CreateOpts[] {});
259                  }
260                });
261          } catch (InterruptedException e) {
262            throw new IOException(e);
263          }
264    
265          // Keys are not sorted: null arg
266          // 256KB minBlockSize : Expected log size for each container too
267          this.writer =
268              new TFile.Writer(this.fsDataOStream, 256 * 1024, conf.get(
269                  YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE,
270                  YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE), null, conf);
271          //Write the version string
272          writeVersion();
273        }
274    
275        private void writeVersion() throws IOException {
276          DataOutputStream out = this.writer.prepareAppendKey(-1);
277          VERSION_KEY.write(out);
278          out.close();
279          out = this.writer.prepareAppendValue(-1);
280          out.writeInt(VERSION);
281          out.close();
282        }
283    
284        public void writeApplicationOwner(String user) throws IOException {
285          DataOutputStream out = this.writer.prepareAppendKey(-1);
286          APPLICATION_OWNER_KEY.write(out);
287          out.close();
288          out = this.writer.prepareAppendValue(-1);
289          out.writeUTF(user);
290          out.close();
291        }
292    
293        public void writeApplicationACLs(Map<ApplicationAccessType, String> appAcls)
294            throws IOException {
295          DataOutputStream out = this.writer.prepareAppendKey(-1);
296          APPLICATION_ACL_KEY.write(out);
297          out.close();
298          out = this.writer.prepareAppendValue(-1);
299          for (Entry<ApplicationAccessType, String> entry : appAcls.entrySet()) {
300            out.writeUTF(entry.getKey().toString());
301            out.writeUTF(entry.getValue());
302          }
303          out.close();
304        }
305    
306        public void append(LogKey logKey, LogValue logValue) throws IOException {
307          DataOutputStream out = this.writer.prepareAppendKey(-1);
308          logKey.write(out);
309          out.close();
310          out = this.writer.prepareAppendValue(-1);
311          logValue.write(out);
312          out.close();
313        }
314    
315        public void close() {
316          try {
317            this.writer.close();
318          } catch (IOException e) {
319            LOG.warn("Exception closing writer", e);
320          }
321          try {
322            this.fsDataOStream.close();
323          } catch (IOException e) {
324            LOG.warn("Exception closing output-stream", e);
325          }
326        }
327      }
328    
329      @Public
330      @Evolving
331      public static class LogReader {
332    
333        private final FSDataInputStream fsDataIStream;
334        private final TFile.Reader.Scanner scanner;
335        private final TFile.Reader reader;
336    
337        public LogReader(Configuration conf, Path remoteAppLogFile)
338            throws IOException {
339          FileContext fileContext = FileContext.getFileContext(conf);
340          this.fsDataIStream = fileContext.open(remoteAppLogFile);
341          reader =
342              new TFile.Reader(this.fsDataIStream, fileContext.getFileStatus(
343                  remoteAppLogFile).getLen(), conf);
344          this.scanner = reader.createScanner();
345        }
346    
347        private boolean atBeginning = true;
348    
349        /**
350         * Returns the owner of the application.
351         * 
352         * @return the application owner.
353         * @throws IOException
354         */
355        public String getApplicationOwner() throws IOException {
356          TFile.Reader.Scanner ownerScanner = reader.createScanner();
357          LogKey key = new LogKey();
358          while (!ownerScanner.atEnd()) {
359            TFile.Reader.Scanner.Entry entry = ownerScanner.entry();
360            key.readFields(entry.getKeyStream());
361            if (key.toString().equals(APPLICATION_OWNER_KEY.toString())) {
362              DataInputStream valueStream = entry.getValueStream();
363              return valueStream.readUTF();
364            }
365            ownerScanner.advance();
366          }
367          return null;
368        }
369    
370        /**
371         * Returns ACLs for the application. An empty map is returned if no ACLs are
372         * found.
373         * 
374         * @return a map of the Application ACLs.
375         * @throws IOException
376         */
377        public Map<ApplicationAccessType, String> getApplicationAcls()
378            throws IOException {
379          // TODO Seek directly to the key once a comparator is specified.
380          TFile.Reader.Scanner aclScanner = reader.createScanner();
381          LogKey key = new LogKey();
382          Map<ApplicationAccessType, String> acls =
383              new HashMap<ApplicationAccessType, String>();
384          while (!aclScanner.atEnd()) {
385            TFile.Reader.Scanner.Entry entry = aclScanner.entry();
386            key.readFields(entry.getKeyStream());
387            if (key.toString().equals(APPLICATION_ACL_KEY.toString())) {
388              DataInputStream valueStream = entry.getValueStream();
389              while (true) {
390                String appAccessOp = null;
391                String aclString = null;
392                try {
393                  appAccessOp = valueStream.readUTF();
394                } catch (EOFException e) {
395                  // Valid end of stream.
396                  break;
397                }
398                try {
399                  aclString = valueStream.readUTF();
400                } catch (EOFException e) {
401                  throw new YarnRuntimeException("Error reading ACLs", e);
402                }
403                acls.put(ApplicationAccessType.valueOf(appAccessOp), aclString);
404              }
405    
406            }
407            aclScanner.advance();
408          }
409          return acls;
410        }
411        
412        /**
413         * Read the next key and return the value-stream.
414         * 
415         * @param key
416         * @return the valueStream if there are more keys or null otherwise.
417         * @throws IOException
418         */
419        public DataInputStream next(LogKey key) throws IOException {
420          if (!this.atBeginning) {
421            this.scanner.advance();
422          } else {
423            this.atBeginning = false;
424          }
425          if (this.scanner.atEnd()) {
426            return null;
427          }
428          TFile.Reader.Scanner.Entry entry = this.scanner.entry();
429          key.readFields(entry.getKeyStream());
430          // Skip META keys
431          if (RESERVED_KEYS.containsKey(key.toString())) {
432            return next(key);
433          }
434          DataInputStream valueStream = entry.getValueStream();
435          return valueStream;
436        }
437    
438        /**
439         * Get a ContainerLogsReader to read the logs for
440         * the specified container.
441         *
442         * @param containerId
443         * @return object to read the container's logs or null if the
444         *         logs could not be found
445         * @throws IOException
446         */
447        @Private
448        public ContainerLogsReader getContainerLogsReader(
449            ContainerId containerId) throws IOException {
450          ContainerLogsReader logReader = null;
451    
452          final LogKey containerKey = new LogKey(containerId);
453          LogKey key = new LogKey();
454          DataInputStream valueStream = next(key);
455          while (valueStream != null && !key.equals(containerKey)) {
456            valueStream = next(key);
457          }
458    
459          if (valueStream != null) {
460            logReader = new ContainerLogsReader(valueStream);
461          }
462    
463          return logReader;
464        }
465    
466        //TODO  Change Log format and interfaces to be containerId specific.
467        // Avoid returning completeValueStreams.
468    //    public List<String> getTypesForContainer(DataInputStream valueStream){}
469    //    
470    //    /**
471    //     * @param valueStream
472    //     *          The Log stream for the container.
473    //     * @param fileType
474    //     *          the log type required.
475    //     * @return An InputStreamReader for the required log type or null if the
476    //     *         type is not found.
477    //     * @throws IOException
478    //     */
479    //    public InputStreamReader getLogStreamForType(DataInputStream valueStream,
480    //        String fileType) throws IOException {
481    //      valueStream.reset();
482    //      try {
483    //        while (true) {
484    //          String ft = valueStream.readUTF();
485    //          String fileLengthStr = valueStream.readUTF();
486    //          long fileLength = Long.parseLong(fileLengthStr);
487    //          if (ft.equals(fileType)) {
488    //            BoundedInputStream bis =
489    //                new BoundedInputStream(valueStream, fileLength);
490    //            return new InputStreamReader(bis);
491    //          } else {
492    //            long totalSkipped = 0;
493    //            long currSkipped = 0;
494    //            while (currSkipped != -1 && totalSkipped < fileLength) {
495    //              currSkipped = valueStream.skip(fileLength - totalSkipped);
496    //              totalSkipped += currSkipped;
497    //            }
498    //            // TODO Verify skip behaviour.
499    //            if (currSkipped == -1) {
500    //              return null;
501    //            }
502    //          }
503    //        }
504    //      } catch (EOFException e) {
505    //        return null;
506    //      }
507    //    }
508    
509        /**
510         * Writes all logs for a single container to the provided writer.
511         * @param valueStream
512         * @param writer
513         * @throws IOException
514         */
515        public static void readAcontainerLogs(DataInputStream valueStream,
516            Writer writer) throws IOException {
517          int bufferSize = 65536;
518          char[] cbuf = new char[bufferSize];
519          String fileType;
520          String fileLengthStr;
521          long fileLength;
522    
523          while (true) {
524            try {
525              fileType = valueStream.readUTF();
526            } catch (EOFException e) {
527              // EndOfFile
528              return;
529            }
530            fileLengthStr = valueStream.readUTF();
531            fileLength = Long.parseLong(fileLengthStr);
532            writer.write("\n\nLogType:");
533            writer.write(fileType);
534            writer.write("\nLogLength:");
535            writer.write(fileLengthStr);
536            writer.write("\nLog Contents:\n");
537            // ByteLevel
538            BoundedInputStream bis =
539                new BoundedInputStream(valueStream, fileLength);
540            InputStreamReader reader = new InputStreamReader(bis);
541            int currentRead = 0;
542            int totalRead = 0;
543            while ((currentRead = reader.read(cbuf, 0, bufferSize)) != -1) {
544              writer.write(cbuf, 0, currentRead);
545              totalRead += currentRead;
546            }
547          }
548        }
549    
550        /**
551         * Keep calling this till you get a {@link EOFException} for getting logs of
552         * all types for a single container.
553         * 
554         * @param valueStream
555         * @param out
556         * @throws IOException
557         */
558        public static void readAContainerLogsForALogType(
559            DataInputStream valueStream, PrintStream out)
560              throws IOException {
561    
562          byte[] buf = new byte[65535];
563    
564          String fileType = valueStream.readUTF();
565          String fileLengthStr = valueStream.readUTF();
566          long fileLength = Long.parseLong(fileLengthStr);
567          out.print("LogType: ");
568          out.println(fileType);
569          out.print("LogLength: ");
570          out.println(fileLengthStr);
571          out.println("Log Contents:");
572    
573          long curRead = 0;
574          long pendingRead = fileLength - curRead;
575          int toRead =
576                    pendingRead > buf.length ? buf.length : (int) pendingRead;
577          int len = valueStream.read(buf, 0, toRead);
578          while (len != -1 && curRead < fileLength) {
579            out.write(buf, 0, len);
580            curRead += len;
581    
582            pendingRead = fileLength - curRead;
583            toRead =
584                      pendingRead > buf.length ? buf.length : (int) pendingRead;
585            len = valueStream.read(buf, 0, toRead);
586          }
587          out.println("");
588        }
589    
590        public void close() {
591          IOUtils.cleanup(LOG, scanner, reader, fsDataIStream);
592        }
593      }
594    
595      @Private
596      public static class ContainerLogsReader {
597        private DataInputStream valueStream;
598        private String currentLogType = null;
599        private long currentLogLength = 0;
600        private BoundedInputStream currentLogData = null;
601        private InputStreamReader currentLogISR;
602    
603        public ContainerLogsReader(DataInputStream stream) {
604          valueStream = stream;
605        }
606    
607        public String nextLog() throws IOException {
608          if (currentLogData != null && currentLogLength > 0) {
609            // seek to the end of the current log, relying on BoundedInputStream
610            // to prevent seeking past the end of the current log
611            do {
612              if (currentLogData.skip(currentLogLength) < 0) {
613                break;
614              }
615            } while (currentLogData.read() != -1);
616          }
617    
618          currentLogType = null;
619          currentLogLength = 0;
620          currentLogData = null;
621          currentLogISR = null;
622    
623          try {
624            String logType = valueStream.readUTF();
625            String logLengthStr = valueStream.readUTF();
626            currentLogLength = Long.parseLong(logLengthStr);
627            currentLogData =
628                new BoundedInputStream(valueStream, currentLogLength);
629            currentLogData.setPropagateClose(false);
630            currentLogISR = new InputStreamReader(currentLogData);
631            currentLogType = logType;
632          } catch (EOFException e) {
633          }
634    
635          return currentLogType;
636        }
637    
638        public String getCurrentLogType() {
639          return currentLogType;
640        }
641    
642        public long getCurrentLogLength() {
643          return currentLogLength;
644        }
645    
646        public long skip(long n) throws IOException {
647          return currentLogData.skip(n);
648        }
649    
650        public int read(byte[] buf, int off, int len) throws IOException {
651          return currentLogData.read(buf, off, len);
652        }
653    
654        public int read(char[] buf, int off, int len) throws IOException {
655          return currentLogISR.read(buf, off, len);
656        }
657      }
658    }