001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.yarn.logaggregation;
020
021 import java.io.DataInput;
022 import java.io.DataInputStream;
023 import java.io.DataOutput;
024 import java.io.DataOutputStream;
025 import java.io.EOFException;
026 import java.io.File;
027 import java.io.FileInputStream;
028 import java.io.IOException;
029 import java.io.InputStreamReader;
030 import java.io.PrintStream;
031 import java.io.Writer;
032 import java.security.PrivilegedExceptionAction;
033 import java.util.ArrayList;
034 import java.util.Arrays;
035 import java.util.Collections;
036 import java.util.EnumSet;
037 import java.util.HashMap;
038 import java.util.List;
039 import java.util.Map;
040 import java.util.Map.Entry;
041
042 import org.apache.commons.io.input.BoundedInputStream;
043 import org.apache.commons.logging.Log;
044 import org.apache.commons.logging.LogFactory;
045 import org.apache.hadoop.classification.InterfaceAudience.Private;
046 import org.apache.hadoop.classification.InterfaceAudience.Public;
047 import org.apache.hadoop.classification.InterfaceStability.Evolving;
048 import org.apache.hadoop.conf.Configuration;
049 import org.apache.hadoop.fs.CreateFlag;
050 import org.apache.hadoop.fs.FSDataInputStream;
051 import org.apache.hadoop.fs.FSDataOutputStream;
052 import org.apache.hadoop.fs.FileContext;
053 import org.apache.hadoop.fs.Options;
054 import org.apache.hadoop.fs.Path;
055 import org.apache.hadoop.fs.permission.FsPermission;
056 import org.apache.hadoop.io.IOUtils;
057 import org.apache.hadoop.io.SecureIOUtils;
058 import org.apache.hadoop.io.Writable;
059 import org.apache.hadoop.io.file.tfile.TFile;
060 import org.apache.hadoop.security.UserGroupInformation;
061 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
062 import org.apache.hadoop.yarn.api.records.ContainerId;
063 import org.apache.hadoop.yarn.conf.YarnConfiguration;
064 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
065 import org.apache.hadoop.yarn.util.ConverterUtils;
066
067 @Public
068 @Evolving
069 public class AggregatedLogFormat {
070
071 private static final Log LOG = LogFactory.getLog(AggregatedLogFormat.class);
072 private static final LogKey APPLICATION_ACL_KEY = new LogKey("APPLICATION_ACL");
073 private static final LogKey APPLICATION_OWNER_KEY = new LogKey("APPLICATION_OWNER");
074 private static final LogKey VERSION_KEY = new LogKey("VERSION");
075 private static final Map<String, LogKey> RESERVED_KEYS;
076 //Maybe write out the retention policy.
077 //Maybe write out a list of containerLogs skipped by the retention policy.
078 private static final int VERSION = 1;
079
080 /**
081 * Umask for the log file.
082 */
083 private static final FsPermission APP_LOG_FILE_UMASK = FsPermission
084 .createImmutable((short) (0640 ^ 0777));
085
086
087 static {
088 RESERVED_KEYS = new HashMap<String, AggregatedLogFormat.LogKey>();
089 RESERVED_KEYS.put(APPLICATION_ACL_KEY.toString(), APPLICATION_ACL_KEY);
090 RESERVED_KEYS.put(APPLICATION_OWNER_KEY.toString(), APPLICATION_OWNER_KEY);
091 RESERVED_KEYS.put(VERSION_KEY.toString(), VERSION_KEY);
092 }
093
094 @Public
095 public static class LogKey implements Writable {
096
097 private String keyString;
098
099 public LogKey() {
100
101 }
102
103 public LogKey(ContainerId containerId) {
104 this.keyString = containerId.toString();
105 }
106
107 public LogKey(String keyString) {
108 this.keyString = keyString;
109 }
110
111 @Override
112 public int hashCode() {
113 return keyString == null ? 0 : keyString.hashCode();
114 }
115
116 @Override
117 public boolean equals(Object obj) {
118 if (obj instanceof LogKey) {
119 LogKey other = (LogKey) obj;
120 if (this.keyString == null) {
121 return other.keyString == null;
122 }
123 return this.keyString.equals(other.keyString);
124 }
125 return false;
126 }
127
128 @Private
129 @Override
130 public void write(DataOutput out) throws IOException {
131 out.writeUTF(this.keyString);
132 }
133
134 @Private
135 @Override
136 public void readFields(DataInput in) throws IOException {
137 this.keyString = in.readUTF();
138 }
139
140 @Override
141 public String toString() {
142 return this.keyString;
143 }
144 }
145
146 @Private
147 public static class LogValue {
148
149 private final List<String> rootLogDirs;
150 private final ContainerId containerId;
151 private final String user;
152 // TODO Maybe add a version string here. Instead of changing the version of
153 // the entire k-v format
154
155 public LogValue(List<String> rootLogDirs, ContainerId containerId,
156 String user) {
157 this.rootLogDirs = new ArrayList<String>(rootLogDirs);
158 this.containerId = containerId;
159 this.user = user;
160
161 // Ensure logs are processed in lexical order
162 Collections.sort(this.rootLogDirs);
163 }
164
165 public void write(DataOutputStream out) throws IOException {
166 for (String rootLogDir : this.rootLogDirs) {
167 File appLogDir =
168 new File(rootLogDir,
169 ConverterUtils.toString(
170 this.containerId.getApplicationAttemptId().
171 getApplicationId())
172 );
173 File containerLogDir =
174 new File(appLogDir, ConverterUtils.toString(this.containerId));
175
176 if (!containerLogDir.isDirectory()) {
177 continue; // ContainerDir may have been deleted by the user.
178 }
179
180 // Write out log files in lexical order
181 File[] logFiles = containerLogDir.listFiles();
182 Arrays.sort(logFiles);
183 for (File logFile : logFiles) {
184
185 final long fileLength = logFile.length();
186
187 // Write the logFile Type
188 out.writeUTF(logFile.getName());
189
190 // Write the log length as UTF so that it is printable
191 out.writeUTF(String.valueOf(fileLength));
192
193 // Write the log itself
194 FileInputStream in = null;
195 try {
196 in = SecureIOUtils.openForRead(logFile, getUser(), null);
197 byte[] buf = new byte[65535];
198 int len = 0;
199 long bytesLeft = fileLength;
200 while ((len = in.read(buf)) != -1) {
201 //If buffer contents within fileLength, write
202 if (len < bytesLeft) {
203 out.write(buf, 0, len);
204 bytesLeft-=len;
205 }
206 //else only write contents within fileLength, then exit early
207 else {
208 out.write(buf, 0, (int)bytesLeft);
209 break;
210 }
211 }
212 long newLength = logFile.length();
213 if(fileLength < newLength) {
214 LOG.warn("Aggregated logs truncated by approximately "+
215 (newLength-fileLength) +" bytes.");
216 }
217 } catch (IOException e) {
218 String message = "Error aggregating log file. Log file : "
219 + logFile.getAbsolutePath() + e.getMessage();
220 LOG.error(message, e);
221 out.write(message.getBytes());
222 } finally {
223 if (in != null) {
224 in.close();
225 }
226 }
227 }
228 }
229 }
230
231 // Added for testing purpose.
232 public String getUser() {
233 return user;
234 }
235 }
236
237 /**
238 * The writer that writes out the aggregated logs.
239 */
240 @Private
241 public static class LogWriter {
242
243 private final FSDataOutputStream fsDataOStream;
244 private final TFile.Writer writer;
245
246 public LogWriter(final Configuration conf, final Path remoteAppLogFile,
247 UserGroupInformation userUgi) throws IOException {
248 try {
249 this.fsDataOStream =
250 userUgi.doAs(new PrivilegedExceptionAction<FSDataOutputStream>() {
251 @Override
252 public FSDataOutputStream run() throws Exception {
253 FileContext fc = FileContext.getFileContext(conf);
254 fc.setUMask(APP_LOG_FILE_UMASK);
255 return fc.create(
256 remoteAppLogFile,
257 EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
258 new Options.CreateOpts[] {});
259 }
260 });
261 } catch (InterruptedException e) {
262 throw new IOException(e);
263 }
264
265 // Keys are not sorted: null arg
266 // 256KB minBlockSize : Expected log size for each container too
267 this.writer =
268 new TFile.Writer(this.fsDataOStream, 256 * 1024, conf.get(
269 YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE,
270 YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE), null, conf);
271 //Write the version string
272 writeVersion();
273 }
274
275 private void writeVersion() throws IOException {
276 DataOutputStream out = this.writer.prepareAppendKey(-1);
277 VERSION_KEY.write(out);
278 out.close();
279 out = this.writer.prepareAppendValue(-1);
280 out.writeInt(VERSION);
281 out.close();
282 }
283
284 public void writeApplicationOwner(String user) throws IOException {
285 DataOutputStream out = this.writer.prepareAppendKey(-1);
286 APPLICATION_OWNER_KEY.write(out);
287 out.close();
288 out = this.writer.prepareAppendValue(-1);
289 out.writeUTF(user);
290 out.close();
291 }
292
293 public void writeApplicationACLs(Map<ApplicationAccessType, String> appAcls)
294 throws IOException {
295 DataOutputStream out = this.writer.prepareAppendKey(-1);
296 APPLICATION_ACL_KEY.write(out);
297 out.close();
298 out = this.writer.prepareAppendValue(-1);
299 for (Entry<ApplicationAccessType, String> entry : appAcls.entrySet()) {
300 out.writeUTF(entry.getKey().toString());
301 out.writeUTF(entry.getValue());
302 }
303 out.close();
304 }
305
306 public void append(LogKey logKey, LogValue logValue) throws IOException {
307 DataOutputStream out = this.writer.prepareAppendKey(-1);
308 logKey.write(out);
309 out.close();
310 out = this.writer.prepareAppendValue(-1);
311 logValue.write(out);
312 out.close();
313 }
314
315 public void close() {
316 try {
317 this.writer.close();
318 } catch (IOException e) {
319 LOG.warn("Exception closing writer", e);
320 }
321 try {
322 this.fsDataOStream.close();
323 } catch (IOException e) {
324 LOG.warn("Exception closing output-stream", e);
325 }
326 }
327 }
328
329 @Public
330 @Evolving
331 public static class LogReader {
332
333 private final FSDataInputStream fsDataIStream;
334 private final TFile.Reader.Scanner scanner;
335 private final TFile.Reader reader;
336
337 public LogReader(Configuration conf, Path remoteAppLogFile)
338 throws IOException {
339 FileContext fileContext = FileContext.getFileContext(conf);
340 this.fsDataIStream = fileContext.open(remoteAppLogFile);
341 reader =
342 new TFile.Reader(this.fsDataIStream, fileContext.getFileStatus(
343 remoteAppLogFile).getLen(), conf);
344 this.scanner = reader.createScanner();
345 }
346
347 private boolean atBeginning = true;
348
349 /**
350 * Returns the owner of the application.
351 *
352 * @return the application owner.
353 * @throws IOException
354 */
355 public String getApplicationOwner() throws IOException {
356 TFile.Reader.Scanner ownerScanner = reader.createScanner();
357 LogKey key = new LogKey();
358 while (!ownerScanner.atEnd()) {
359 TFile.Reader.Scanner.Entry entry = ownerScanner.entry();
360 key.readFields(entry.getKeyStream());
361 if (key.toString().equals(APPLICATION_OWNER_KEY.toString())) {
362 DataInputStream valueStream = entry.getValueStream();
363 return valueStream.readUTF();
364 }
365 ownerScanner.advance();
366 }
367 return null;
368 }
369
370 /**
371 * Returns ACLs for the application. An empty map is returned if no ACLs are
372 * found.
373 *
374 * @return a map of the Application ACLs.
375 * @throws IOException
376 */
377 public Map<ApplicationAccessType, String> getApplicationAcls()
378 throws IOException {
379 // TODO Seek directly to the key once a comparator is specified.
380 TFile.Reader.Scanner aclScanner = reader.createScanner();
381 LogKey key = new LogKey();
382 Map<ApplicationAccessType, String> acls =
383 new HashMap<ApplicationAccessType, String>();
384 while (!aclScanner.atEnd()) {
385 TFile.Reader.Scanner.Entry entry = aclScanner.entry();
386 key.readFields(entry.getKeyStream());
387 if (key.toString().equals(APPLICATION_ACL_KEY.toString())) {
388 DataInputStream valueStream = entry.getValueStream();
389 while (true) {
390 String appAccessOp = null;
391 String aclString = null;
392 try {
393 appAccessOp = valueStream.readUTF();
394 } catch (EOFException e) {
395 // Valid end of stream.
396 break;
397 }
398 try {
399 aclString = valueStream.readUTF();
400 } catch (EOFException e) {
401 throw new YarnRuntimeException("Error reading ACLs", e);
402 }
403 acls.put(ApplicationAccessType.valueOf(appAccessOp), aclString);
404 }
405
406 }
407 aclScanner.advance();
408 }
409 return acls;
410 }
411
412 /**
413 * Read the next key and return the value-stream.
414 *
415 * @param key
416 * @return the valueStream if there are more keys or null otherwise.
417 * @throws IOException
418 */
419 public DataInputStream next(LogKey key) throws IOException {
420 if (!this.atBeginning) {
421 this.scanner.advance();
422 } else {
423 this.atBeginning = false;
424 }
425 if (this.scanner.atEnd()) {
426 return null;
427 }
428 TFile.Reader.Scanner.Entry entry = this.scanner.entry();
429 key.readFields(entry.getKeyStream());
430 // Skip META keys
431 if (RESERVED_KEYS.containsKey(key.toString())) {
432 return next(key);
433 }
434 DataInputStream valueStream = entry.getValueStream();
435 return valueStream;
436 }
437
438 /**
439 * Get a ContainerLogsReader to read the logs for
440 * the specified container.
441 *
442 * @param containerId
443 * @return object to read the container's logs or null if the
444 * logs could not be found
445 * @throws IOException
446 */
447 @Private
448 public ContainerLogsReader getContainerLogsReader(
449 ContainerId containerId) throws IOException {
450 ContainerLogsReader logReader = null;
451
452 final LogKey containerKey = new LogKey(containerId);
453 LogKey key = new LogKey();
454 DataInputStream valueStream = next(key);
455 while (valueStream != null && !key.equals(containerKey)) {
456 valueStream = next(key);
457 }
458
459 if (valueStream != null) {
460 logReader = new ContainerLogsReader(valueStream);
461 }
462
463 return logReader;
464 }
465
466 //TODO Change Log format and interfaces to be containerId specific.
467 // Avoid returning completeValueStreams.
468 // public List<String> getTypesForContainer(DataInputStream valueStream){}
469 //
470 // /**
471 // * @param valueStream
472 // * The Log stream for the container.
473 // * @param fileType
474 // * the log type required.
475 // * @return An InputStreamReader for the required log type or null if the
476 // * type is not found.
477 // * @throws IOException
478 // */
479 // public InputStreamReader getLogStreamForType(DataInputStream valueStream,
480 // String fileType) throws IOException {
481 // valueStream.reset();
482 // try {
483 // while (true) {
484 // String ft = valueStream.readUTF();
485 // String fileLengthStr = valueStream.readUTF();
486 // long fileLength = Long.parseLong(fileLengthStr);
487 // if (ft.equals(fileType)) {
488 // BoundedInputStream bis =
489 // new BoundedInputStream(valueStream, fileLength);
490 // return new InputStreamReader(bis);
491 // } else {
492 // long totalSkipped = 0;
493 // long currSkipped = 0;
494 // while (currSkipped != -1 && totalSkipped < fileLength) {
495 // currSkipped = valueStream.skip(fileLength - totalSkipped);
496 // totalSkipped += currSkipped;
497 // }
498 // // TODO Verify skip behaviour.
499 // if (currSkipped == -1) {
500 // return null;
501 // }
502 // }
503 // }
504 // } catch (EOFException e) {
505 // return null;
506 // }
507 // }
508
509 /**
510 * Writes all logs for a single container to the provided writer.
511 * @param valueStream
512 * @param writer
513 * @throws IOException
514 */
515 public static void readAcontainerLogs(DataInputStream valueStream,
516 Writer writer) throws IOException {
517 int bufferSize = 65536;
518 char[] cbuf = new char[bufferSize];
519 String fileType;
520 String fileLengthStr;
521 long fileLength;
522
523 while (true) {
524 try {
525 fileType = valueStream.readUTF();
526 } catch (EOFException e) {
527 // EndOfFile
528 return;
529 }
530 fileLengthStr = valueStream.readUTF();
531 fileLength = Long.parseLong(fileLengthStr);
532 writer.write("\n\nLogType:");
533 writer.write(fileType);
534 writer.write("\nLogLength:");
535 writer.write(fileLengthStr);
536 writer.write("\nLog Contents:\n");
537 // ByteLevel
538 BoundedInputStream bis =
539 new BoundedInputStream(valueStream, fileLength);
540 InputStreamReader reader = new InputStreamReader(bis);
541 int currentRead = 0;
542 int totalRead = 0;
543 while ((currentRead = reader.read(cbuf, 0, bufferSize)) != -1) {
544 writer.write(cbuf, 0, currentRead);
545 totalRead += currentRead;
546 }
547 }
548 }
549
550 /**
551 * Keep calling this till you get a {@link EOFException} for getting logs of
552 * all types for a single container.
553 *
554 * @param valueStream
555 * @param out
556 * @throws IOException
557 */
558 public static void readAContainerLogsForALogType(
559 DataInputStream valueStream, PrintStream out)
560 throws IOException {
561
562 byte[] buf = new byte[65535];
563
564 String fileType = valueStream.readUTF();
565 String fileLengthStr = valueStream.readUTF();
566 long fileLength = Long.parseLong(fileLengthStr);
567 out.print("LogType: ");
568 out.println(fileType);
569 out.print("LogLength: ");
570 out.println(fileLengthStr);
571 out.println("Log Contents:");
572
573 long curRead = 0;
574 long pendingRead = fileLength - curRead;
575 int toRead =
576 pendingRead > buf.length ? buf.length : (int) pendingRead;
577 int len = valueStream.read(buf, 0, toRead);
578 while (len != -1 && curRead < fileLength) {
579 out.write(buf, 0, len);
580 curRead += len;
581
582 pendingRead = fileLength - curRead;
583 toRead =
584 pendingRead > buf.length ? buf.length : (int) pendingRead;
585 len = valueStream.read(buf, 0, toRead);
586 }
587 out.println("");
588 }
589
590 public void close() {
591 IOUtils.cleanup(LOG, scanner, reader, fsDataIStream);
592 }
593 }
594
595 @Private
596 public static class ContainerLogsReader {
597 private DataInputStream valueStream;
598 private String currentLogType = null;
599 private long currentLogLength = 0;
600 private BoundedInputStream currentLogData = null;
601 private InputStreamReader currentLogISR;
602
603 public ContainerLogsReader(DataInputStream stream) {
604 valueStream = stream;
605 }
606
607 public String nextLog() throws IOException {
608 if (currentLogData != null && currentLogLength > 0) {
609 // seek to the end of the current log, relying on BoundedInputStream
610 // to prevent seeking past the end of the current log
611 do {
612 if (currentLogData.skip(currentLogLength) < 0) {
613 break;
614 }
615 } while (currentLogData.read() != -1);
616 }
617
618 currentLogType = null;
619 currentLogLength = 0;
620 currentLogData = null;
621 currentLogISR = null;
622
623 try {
624 String logType = valueStream.readUTF();
625 String logLengthStr = valueStream.readUTF();
626 currentLogLength = Long.parseLong(logLengthStr);
627 currentLogData =
628 new BoundedInputStream(valueStream, currentLogLength);
629 currentLogData.setPropagateClose(false);
630 currentLogISR = new InputStreamReader(currentLogData);
631 currentLogType = logType;
632 } catch (EOFException e) {
633 }
634
635 return currentLogType;
636 }
637
638 public String getCurrentLogType() {
639 return currentLogType;
640 }
641
642 public long getCurrentLogLength() {
643 return currentLogLength;
644 }
645
646 public long skip(long n) throws IOException {
647 return currentLogData.skip(n);
648 }
649
650 public int read(byte[] buf, int off, int len) throws IOException {
651 return currentLogData.read(buf, off, len);
652 }
653
654 public int read(char[] buf, int off, int len) throws IOException {
655 return currentLogISR.read(buf, off, len);
656 }
657 }
658 }