001    /**
002    * Licensed to the Apache Software Foundation (ASF) under one
003    * or more contributor license agreements.  See the NOTICE file
004    * distributed with this work for additional information
005    * regarding copyright ownership.  The ASF licenses this file
006    * to you under the Apache License, Version 2.0 (the
007    * "License"); you may not use this file except in compliance
008    * with the License.  You may obtain a copy of the License at
009    *
010    *     http://www.apache.org/licenses/LICENSE-2.0
011    *
012    * Unless required by applicable law or agreed to in writing, software
013    * distributed under the License is distributed on an "AS IS" BASIS,
014    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015    * See the License for the specific language governing permissions and
016    * limitations under the License.
017    */
018    
019    package org.apache.hadoop.yarn.logaggregation;
020    
021    import java.io.DataInput;
022    import java.io.DataInputStream;
023    import java.io.DataOutput;
024    import java.io.DataOutputStream;
025    import java.io.EOFException;
026    import java.io.File;
027    import java.io.FileInputStream;
028    import java.io.IOException;
029    import java.io.InputStreamReader;
030    import java.io.PrintStream;
031    import java.io.Writer;
032    import java.security.PrivilegedExceptionAction;
033    import java.util.ArrayList;
034    import java.util.Arrays;
035    import java.util.Collections;
036    import java.util.EnumSet;
037    import java.util.HashMap;
038    import java.util.List;
039    import java.util.Map;
040    import java.util.Map.Entry;
041    
042    import org.apache.commons.io.input.BoundedInputStream;
043    import org.apache.commons.logging.Log;
044    import org.apache.commons.logging.LogFactory;
045    import org.apache.hadoop.classification.InterfaceAudience.Private;
046    import org.apache.hadoop.classification.InterfaceAudience.Public;
047    import org.apache.hadoop.classification.InterfaceStability.Evolving;
048    import org.apache.hadoop.conf.Configuration;
049    import org.apache.hadoop.fs.CreateFlag;
050    import org.apache.hadoop.fs.FSDataInputStream;
051    import org.apache.hadoop.fs.FSDataOutputStream;
052    import org.apache.hadoop.fs.FileContext;
053    import org.apache.hadoop.fs.Options;
054    import org.apache.hadoop.fs.Path;
055    import org.apache.hadoop.fs.permission.FsPermission;
056    import org.apache.hadoop.io.IOUtils;
057    import org.apache.hadoop.io.SecureIOUtils;
058    import org.apache.hadoop.io.Writable;
059    import org.apache.hadoop.io.file.tfile.TFile;
060    import org.apache.hadoop.security.UserGroupInformation;
061    import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
062    import org.apache.hadoop.yarn.api.records.ContainerId;
063    import org.apache.hadoop.yarn.conf.YarnConfiguration;
064    import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
065    import org.apache.hadoop.yarn.util.ConverterUtils;
066    
067    @Public
068    @Evolving
069    public class AggregatedLogFormat {
070    
071      private static final Log LOG = LogFactory.getLog(AggregatedLogFormat.class);
072      private static final LogKey APPLICATION_ACL_KEY = new LogKey("APPLICATION_ACL");
073      private static final LogKey APPLICATION_OWNER_KEY = new LogKey("APPLICATION_OWNER");
074      private static final LogKey VERSION_KEY = new LogKey("VERSION");
075      private static final Map<String, LogKey> RESERVED_KEYS;
076      //Maybe write out the retention policy.
077      //Maybe write out a list of containerLogs skipped by the retention policy.
078      private static final int VERSION = 1;
079    
080      /**
081       * Umask for the log file.
082       */
083      private static final FsPermission APP_LOG_FILE_UMASK = FsPermission
084          .createImmutable((short) (0640 ^ 0777));
085    
086    
087      static {
088        RESERVED_KEYS = new HashMap<String, AggregatedLogFormat.LogKey>();
089        RESERVED_KEYS.put(APPLICATION_ACL_KEY.toString(), APPLICATION_ACL_KEY);
090        RESERVED_KEYS.put(APPLICATION_OWNER_KEY.toString(), APPLICATION_OWNER_KEY);
091        RESERVED_KEYS.put(VERSION_KEY.toString(), VERSION_KEY);
092      }
093    
094      @Public
095      public static class LogKey implements Writable {
096    
097        private String keyString;
098    
099        public LogKey() {
100    
101        }
102    
103        public LogKey(ContainerId containerId) {
104          this.keyString = containerId.toString();
105        }
106    
107        public LogKey(String keyString) {
108          this.keyString = keyString;
109        }
110        
111        @Override
112        public int hashCode() {
113          return keyString == null ? 0 : keyString.hashCode();
114        }
115    
116        @Override
117        public boolean equals(Object obj) {
118          if (obj instanceof LogKey) {
119            LogKey other = (LogKey) obj;
120            if (this.keyString == null) {
121              return other.keyString == null;
122            }
123            return this.keyString.equals(other.keyString);
124          }
125          return false;
126        }
127    
128        @Private
129        @Override
130        public void write(DataOutput out) throws IOException {
131          out.writeUTF(this.keyString);
132        }
133    
134        @Private
135        @Override
136        public void readFields(DataInput in) throws IOException {
137          this.keyString = in.readUTF();
138        }
139    
140        @Override
141        public String toString() {
142          return this.keyString;
143        }
144      }
145    
146      @Private
147      public static class LogValue {
148    
149        private final List<String> rootLogDirs;
150        private final ContainerId containerId;
151        private final String user;
152        // TODO Maybe add a version string here. Instead of changing the version of
153        // the entire k-v format
154    
155        public LogValue(List<String> rootLogDirs, ContainerId containerId,
156            String user) {
157          this.rootLogDirs = new ArrayList<String>(rootLogDirs);
158          this.containerId = containerId;
159          this.user = user;
160    
161          // Ensure logs are processed in lexical order
162          Collections.sort(this.rootLogDirs);
163        }
164    
165        public void write(DataOutputStream out) throws IOException {
166          for (String rootLogDir : this.rootLogDirs) {
167            File appLogDir =
168                new File(rootLogDir, 
169                    ConverterUtils.toString(
170                        this.containerId.getApplicationAttemptId().
171                            getApplicationId())
172                    );
173            File containerLogDir =
174                new File(appLogDir, ConverterUtils.toString(this.containerId));
175    
176            if (!containerLogDir.isDirectory()) {
177              continue; // ContainerDir may have been deleted by the user.
178            }
179    
180            // Write out log files in lexical order
181            File[] logFiles = containerLogDir.listFiles();
182            Arrays.sort(logFiles);
183            for (File logFile : logFiles) {
184    
185              // Write the logFile Type
186              out.writeUTF(logFile.getName());
187    
188              // Write the log length as UTF so that it is printable
189              out.writeUTF(String.valueOf(logFile.length()));
190    
191              // Write the log itself
192              FileInputStream in = null;
193              try {
194                in = SecureIOUtils.openForRead(logFile, getUser(), null);
195                byte[] buf = new byte[65535];
196                int len = 0;
197                while ((len = in.read(buf)) != -1) {
198                  out.write(buf, 0, len);
199                }
200              } catch (IOException e) {
201                String message = "Error aggregating log file. Log file : "
202                    + logFile.getAbsolutePath() + e.getMessage(); 
203                LOG.error(message, e);
204                out.write(message.getBytes());
205              } finally {
206                if (in != null) {
207                  in.close();
208                }
209              }
210            }
211          }
212        }
213        
214        // Added for testing purpose.
215        public String getUser() {
216          return user;
217        }
218      }
219    
220      /**
221       * The writer that writes out the aggregated logs.
222       */
223      @Private
224      public static class LogWriter {
225    
226        private final FSDataOutputStream fsDataOStream;
227        private final TFile.Writer writer;
228    
229        public LogWriter(final Configuration conf, final Path remoteAppLogFile,
230            UserGroupInformation userUgi) throws IOException {
231          try {
232            this.fsDataOStream =
233                userUgi.doAs(new PrivilegedExceptionAction<FSDataOutputStream>() {
234                  @Override
235                  public FSDataOutputStream run() throws Exception {
236                    FileContext fc = FileContext.getFileContext(conf);
237                    fc.setUMask(APP_LOG_FILE_UMASK);
238                    return fc.create(
239                        remoteAppLogFile,
240                        EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
241                        new Options.CreateOpts[] {});
242                  }
243                });
244          } catch (InterruptedException e) {
245            throw new IOException(e);
246          }
247    
248          // Keys are not sorted: null arg
249          // 256KB minBlockSize : Expected log size for each container too
250          this.writer =
251              new TFile.Writer(this.fsDataOStream, 256 * 1024, conf.get(
252                  YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE,
253                  YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE), null, conf);
254          //Write the version string
255          writeVersion();
256        }
257    
258        private void writeVersion() throws IOException {
259          DataOutputStream out = this.writer.prepareAppendKey(-1);
260          VERSION_KEY.write(out);
261          out.close();
262          out = this.writer.prepareAppendValue(-1);
263          out.writeInt(VERSION);
264          out.close();
265        }
266    
267        public void writeApplicationOwner(String user) throws IOException {
268          DataOutputStream out = this.writer.prepareAppendKey(-1);
269          APPLICATION_OWNER_KEY.write(out);
270          out.close();
271          out = this.writer.prepareAppendValue(-1);
272          out.writeUTF(user);
273          out.close();
274        }
275    
276        public void writeApplicationACLs(Map<ApplicationAccessType, String> appAcls)
277            throws IOException {
278          DataOutputStream out = this.writer.prepareAppendKey(-1);
279          APPLICATION_ACL_KEY.write(out);
280          out.close();
281          out = this.writer.prepareAppendValue(-1);
282          for (Entry<ApplicationAccessType, String> entry : appAcls.entrySet()) {
283            out.writeUTF(entry.getKey().toString());
284            out.writeUTF(entry.getValue());
285          }
286          out.close();
287        }
288    
289        public void append(LogKey logKey, LogValue logValue) throws IOException {
290          DataOutputStream out = this.writer.prepareAppendKey(-1);
291          logKey.write(out);
292          out.close();
293          out = this.writer.prepareAppendValue(-1);
294          logValue.write(out);
295          out.close();
296        }
297    
298        public void close() {
299          try {
300            this.writer.close();
301          } catch (IOException e) {
302            LOG.warn("Exception closing writer", e);
303          }
304          try {
305            this.fsDataOStream.close();
306          } catch (IOException e) {
307            LOG.warn("Exception closing output-stream", e);
308          }
309        }
310      }
311    
312      @Public
313      @Evolving
314      public static class LogReader {
315    
316        private final FSDataInputStream fsDataIStream;
317        private final TFile.Reader.Scanner scanner;
318        private final TFile.Reader reader;
319    
320        public LogReader(Configuration conf, Path remoteAppLogFile)
321            throws IOException {
322          FileContext fileContext = FileContext.getFileContext(conf);
323          this.fsDataIStream = fileContext.open(remoteAppLogFile);
324          reader =
325              new TFile.Reader(this.fsDataIStream, fileContext.getFileStatus(
326                  remoteAppLogFile).getLen(), conf);
327          this.scanner = reader.createScanner();
328        }
329    
330        private boolean atBeginning = true;
331    
332        /**
333         * Returns the owner of the application.
334         * 
335         * @return the application owner.
336         * @throws IOException
337         */
338        public String getApplicationOwner() throws IOException {
339          TFile.Reader.Scanner ownerScanner = reader.createScanner();
340          LogKey key = new LogKey();
341          while (!ownerScanner.atEnd()) {
342            TFile.Reader.Scanner.Entry entry = ownerScanner.entry();
343            key.readFields(entry.getKeyStream());
344            if (key.toString().equals(APPLICATION_OWNER_KEY.toString())) {
345              DataInputStream valueStream = entry.getValueStream();
346              return valueStream.readUTF();
347            }
348            ownerScanner.advance();
349          }
350          return null;
351        }
352    
353        /**
354         * Returns ACLs for the application. An empty map is returned if no ACLs are
355         * found.
356         * 
357         * @return a map of the Application ACLs.
358         * @throws IOException
359         */
360        public Map<ApplicationAccessType, String> getApplicationAcls()
361            throws IOException {
362          // TODO Seek directly to the key once a comparator is specified.
363          TFile.Reader.Scanner aclScanner = reader.createScanner();
364          LogKey key = new LogKey();
365          Map<ApplicationAccessType, String> acls =
366              new HashMap<ApplicationAccessType, String>();
367          while (!aclScanner.atEnd()) {
368            TFile.Reader.Scanner.Entry entry = aclScanner.entry();
369            key.readFields(entry.getKeyStream());
370            if (key.toString().equals(APPLICATION_ACL_KEY.toString())) {
371              DataInputStream valueStream = entry.getValueStream();
372              while (true) {
373                String appAccessOp = null;
374                String aclString = null;
375                try {
376                  appAccessOp = valueStream.readUTF();
377                } catch (EOFException e) {
378                  // Valid end of stream.
379                  break;
380                }
381                try {
382                  aclString = valueStream.readUTF();
383                } catch (EOFException e) {
384                  throw new YarnRuntimeException("Error reading ACLs", e);
385                }
386                acls.put(ApplicationAccessType.valueOf(appAccessOp), aclString);
387              }
388    
389            }
390            aclScanner.advance();
391          }
392          return acls;
393        }
394        
395        /**
396         * Read the next key and return the value-stream.
397         * 
398         * @param key
399         * @return the valueStream if there are more keys or null otherwise.
400         * @throws IOException
401         */
402        public DataInputStream next(LogKey key) throws IOException {
403          if (!this.atBeginning) {
404            this.scanner.advance();
405          } else {
406            this.atBeginning = false;
407          }
408          if (this.scanner.atEnd()) {
409            return null;
410          }
411          TFile.Reader.Scanner.Entry entry = this.scanner.entry();
412          key.readFields(entry.getKeyStream());
413          // Skip META keys
414          if (RESERVED_KEYS.containsKey(key.toString())) {
415            return next(key);
416          }
417          DataInputStream valueStream = entry.getValueStream();
418          return valueStream;
419        }
420    
421        /**
422         * Get a ContainerLogsReader to read the logs for
423         * the specified container.
424         *
425         * @param containerId
426         * @return object to read the container's logs or null if the
427         *         logs could not be found
428         * @throws IOException
429         */
430        @Private
431        public ContainerLogsReader getContainerLogsReader(
432            ContainerId containerId) throws IOException {
433          ContainerLogsReader logReader = null;
434    
435          final LogKey containerKey = new LogKey(containerId);
436          LogKey key = new LogKey();
437          DataInputStream valueStream = next(key);
438          while (valueStream != null && !key.equals(containerKey)) {
439            valueStream = next(key);
440          }
441    
442          if (valueStream != null) {
443            logReader = new ContainerLogsReader(valueStream);
444          }
445    
446          return logReader;
447        }
448    
449        //TODO  Change Log format and interfaces to be containerId specific.
450        // Avoid returning completeValueStreams.
451    //    public List<String> getTypesForContainer(DataInputStream valueStream){}
452    //    
453    //    /**
454    //     * @param valueStream
455    //     *          The Log stream for the container.
456    //     * @param fileType
457    //     *          the log type required.
458    //     * @return An InputStreamReader for the required log type or null if the
459    //     *         type is not found.
460    //     * @throws IOException
461    //     */
462    //    public InputStreamReader getLogStreamForType(DataInputStream valueStream,
463    //        String fileType) throws IOException {
464    //      valueStream.reset();
465    //      try {
466    //        while (true) {
467    //          String ft = valueStream.readUTF();
468    //          String fileLengthStr = valueStream.readUTF();
469    //          long fileLength = Long.parseLong(fileLengthStr);
470    //          if (ft.equals(fileType)) {
471    //            BoundedInputStream bis =
472    //                new BoundedInputStream(valueStream, fileLength);
473    //            return new InputStreamReader(bis);
474    //          } else {
475    //            long totalSkipped = 0;
476    //            long currSkipped = 0;
477    //            while (currSkipped != -1 && totalSkipped < fileLength) {
478    //              currSkipped = valueStream.skip(fileLength - totalSkipped);
479    //              totalSkipped += currSkipped;
480    //            }
481    //            // TODO Verify skip behaviour.
482    //            if (currSkipped == -1) {
483    //              return null;
484    //            }
485    //          }
486    //        }
487    //      } catch (EOFException e) {
488    //        return null;
489    //      }
490    //    }
491    
492        /**
493         * Writes all logs for a single container to the provided writer.
494         * @param valueStream
495         * @param writer
496         * @throws IOException
497         */
498        public static void readAcontainerLogs(DataInputStream valueStream,
499            Writer writer) throws IOException {
500          int bufferSize = 65536;
501          char[] cbuf = new char[bufferSize];
502          String fileType;
503          String fileLengthStr;
504          long fileLength;
505    
506          while (true) {
507            try {
508              fileType = valueStream.readUTF();
509            } catch (EOFException e) {
510              // EndOfFile
511              return;
512            }
513            fileLengthStr = valueStream.readUTF();
514            fileLength = Long.parseLong(fileLengthStr);
515            writer.write("\n\nLogType:");
516            writer.write(fileType);
517            writer.write("\nLogLength:");
518            writer.write(fileLengthStr);
519            writer.write("\nLog Contents:\n");
520            // ByteLevel
521            BoundedInputStream bis =
522                new BoundedInputStream(valueStream, fileLength);
523            InputStreamReader reader = new InputStreamReader(bis);
524            int currentRead = 0;
525            int totalRead = 0;
526            while ((currentRead = reader.read(cbuf, 0, bufferSize)) != -1) {
527              writer.write(cbuf, 0, currentRead);
528              totalRead += currentRead;
529            }
530          }
531        }
532    
533        /**
534         * Keep calling this till you get a {@link EOFException} for getting logs of
535         * all types for a single container.
536         * 
537         * @param valueStream
538         * @param out
539         * @throws IOException
540         */
541        public static void readAContainerLogsForALogType(
542            DataInputStream valueStream, PrintStream out)
543              throws IOException {
544    
545          byte[] buf = new byte[65535];
546    
547          String fileType = valueStream.readUTF();
548          String fileLengthStr = valueStream.readUTF();
549          long fileLength = Long.parseLong(fileLengthStr);
550          out.print("LogType: ");
551          out.println(fileType);
552          out.print("LogLength: ");
553          out.println(fileLengthStr);
554          out.println("Log Contents:");
555    
556          int curRead = 0;
557          long pendingRead = fileLength - curRead;
558          int toRead =
559                    pendingRead > buf.length ? buf.length : (int) pendingRead;
560          int len = valueStream.read(buf, 0, toRead);
561          while (len != -1 && curRead < fileLength) {
562            out.write(buf, 0, len);
563            curRead += len;
564    
565            pendingRead = fileLength - curRead;
566            toRead =
567                      pendingRead > buf.length ? buf.length : (int) pendingRead;
568            len = valueStream.read(buf, 0, toRead);
569          }
570          out.println("");
571        }
572    
573        public void close() {
574          IOUtils.cleanup(LOG, scanner, reader, fsDataIStream);
575        }
576      }
577    
578      @Private
579      public static class ContainerLogsReader {
580        private DataInputStream valueStream;
581        private String currentLogType = null;
582        private long currentLogLength = 0;
583        private BoundedInputStream currentLogData = null;
584        private InputStreamReader currentLogISR;
585    
586        public ContainerLogsReader(DataInputStream stream) {
587          valueStream = stream;
588        }
589    
590        public String nextLog() throws IOException {
591          if (currentLogData != null && currentLogLength > 0) {
592            // seek to the end of the current log, relying on BoundedInputStream
593            // to prevent seeking past the end of the current log
594            do {
595              if (currentLogData.skip(currentLogLength) < 0) {
596                break;
597              }
598            } while (currentLogData.read() != -1);
599          }
600    
601          currentLogType = null;
602          currentLogLength = 0;
603          currentLogData = null;
604          currentLogISR = null;
605    
606          try {
607            String logType = valueStream.readUTF();
608            String logLengthStr = valueStream.readUTF();
609            currentLogLength = Long.parseLong(logLengthStr);
610            currentLogData =
611                new BoundedInputStream(valueStream, currentLogLength);
612            currentLogData.setPropagateClose(false);
613            currentLogISR = new InputStreamReader(currentLogData);
614            currentLogType = logType;
615          } catch (EOFException e) {
616          }
617    
618          return currentLogType;
619        }
620    
621        public String getCurrentLogType() {
622          return currentLogType;
623        }
624    
625        public long getCurrentLogLength() {
626          return currentLogLength;
627        }
628    
629        public long skip(long n) throws IOException {
630          return currentLogData.skip(n);
631        }
632    
633        public int read(byte[] buf, int off, int len) throws IOException {
634          return currentLogData.read(buf, off, len);
635        }
636    
637        public int read(char[] buf, int off, int len) throws IOException {
638          return currentLogISR.read(buf, off, len);
639        }
640      }
641    }