001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.yarn.logaggregation; 020 021 import java.io.DataInput; 022 import java.io.DataInputStream; 023 import java.io.DataOutput; 024 import java.io.DataOutputStream; 025 import java.io.EOFException; 026 import java.io.File; 027 import java.io.FileInputStream; 028 import java.io.IOException; 029 import java.io.InputStreamReader; 030 import java.io.PrintStream; 031 import java.io.Writer; 032 import java.security.PrivilegedExceptionAction; 033 import java.util.ArrayList; 034 import java.util.Arrays; 035 import java.util.Collections; 036 import java.util.EnumSet; 037 import java.util.HashMap; 038 import java.util.List; 039 import java.util.Map; 040 import java.util.Map.Entry; 041 042 import org.apache.commons.io.input.BoundedInputStream; 043 import org.apache.commons.logging.Log; 044 import org.apache.commons.logging.LogFactory; 045 import org.apache.hadoop.classification.InterfaceAudience.Private; 046 import org.apache.hadoop.classification.InterfaceAudience.Public; 047 import org.apache.hadoop.classification.InterfaceStability.Evolving; 048 import org.apache.hadoop.conf.Configuration; 049 import org.apache.hadoop.fs.CreateFlag; 050 import org.apache.hadoop.fs.FSDataInputStream; 051 import org.apache.hadoop.fs.FSDataOutputStream; 052 import org.apache.hadoop.fs.FileContext; 053 import org.apache.hadoop.fs.Options; 054 import org.apache.hadoop.fs.Path; 055 import org.apache.hadoop.fs.permission.FsPermission; 056 import org.apache.hadoop.io.IOUtils; 057 import org.apache.hadoop.io.SecureIOUtils; 058 import org.apache.hadoop.io.Writable; 059 import org.apache.hadoop.io.file.tfile.TFile; 060 import org.apache.hadoop.security.UserGroupInformation; 061 import org.apache.hadoop.yarn.api.records.ApplicationAccessType; 062 import org.apache.hadoop.yarn.api.records.ContainerId; 063 import org.apache.hadoop.yarn.conf.YarnConfiguration; 064 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; 065 import org.apache.hadoop.yarn.util.ConverterUtils; 066 067 @Public 068 @Evolving 069 public class AggregatedLogFormat { 070 071 private static final Log LOG = LogFactory.getLog(AggregatedLogFormat.class); 072 private static final LogKey APPLICATION_ACL_KEY = new LogKey("APPLICATION_ACL"); 073 private static final LogKey APPLICATION_OWNER_KEY = new LogKey("APPLICATION_OWNER"); 074 private static final LogKey VERSION_KEY = new LogKey("VERSION"); 075 private static final Map<String, LogKey> RESERVED_KEYS; 076 //Maybe write out the retention policy. 077 //Maybe write out a list of containerLogs skipped by the retention policy. 078 private static final int VERSION = 1; 079 080 /** 081 * Umask for the log file. 082 */ 083 private static final FsPermission APP_LOG_FILE_UMASK = FsPermission 084 .createImmutable((short) (0640 ^ 0777)); 085 086 087 static { 088 RESERVED_KEYS = new HashMap<String, AggregatedLogFormat.LogKey>(); 089 RESERVED_KEYS.put(APPLICATION_ACL_KEY.toString(), APPLICATION_ACL_KEY); 090 RESERVED_KEYS.put(APPLICATION_OWNER_KEY.toString(), APPLICATION_OWNER_KEY); 091 RESERVED_KEYS.put(VERSION_KEY.toString(), VERSION_KEY); 092 } 093 094 @Public 095 public static class LogKey implements Writable { 096 097 private String keyString; 098 099 public LogKey() { 100 101 } 102 103 public LogKey(ContainerId containerId) { 104 this.keyString = containerId.toString(); 105 } 106 107 public LogKey(String keyString) { 108 this.keyString = keyString; 109 } 110 111 @Override 112 public int hashCode() { 113 return keyString == null ? 0 : keyString.hashCode(); 114 } 115 116 @Override 117 public boolean equals(Object obj) { 118 if (obj instanceof LogKey) { 119 LogKey other = (LogKey) obj; 120 if (this.keyString == null) { 121 return other.keyString == null; 122 } 123 return this.keyString.equals(other.keyString); 124 } 125 return false; 126 } 127 128 @Private 129 @Override 130 public void write(DataOutput out) throws IOException { 131 out.writeUTF(this.keyString); 132 } 133 134 @Private 135 @Override 136 public void readFields(DataInput in) throws IOException { 137 this.keyString = in.readUTF(); 138 } 139 140 @Override 141 public String toString() { 142 return this.keyString; 143 } 144 } 145 146 @Private 147 public static class LogValue { 148 149 private final List<String> rootLogDirs; 150 private final ContainerId containerId; 151 private final String user; 152 // TODO Maybe add a version string here. Instead of changing the version of 153 // the entire k-v format 154 155 public LogValue(List<String> rootLogDirs, ContainerId containerId, 156 String user) { 157 this.rootLogDirs = new ArrayList<String>(rootLogDirs); 158 this.containerId = containerId; 159 this.user = user; 160 161 // Ensure logs are processed in lexical order 162 Collections.sort(this.rootLogDirs); 163 } 164 165 public void write(DataOutputStream out) throws IOException { 166 for (String rootLogDir : this.rootLogDirs) { 167 File appLogDir = 168 new File(rootLogDir, 169 ConverterUtils.toString( 170 this.containerId.getApplicationAttemptId(). 171 getApplicationId()) 172 ); 173 File containerLogDir = 174 new File(appLogDir, ConverterUtils.toString(this.containerId)); 175 176 if (!containerLogDir.isDirectory()) { 177 continue; // ContainerDir may have been deleted by the user. 178 } 179 180 // Write out log files in lexical order 181 File[] logFiles = containerLogDir.listFiles(); 182 Arrays.sort(logFiles); 183 for (File logFile : logFiles) { 184 185 final long fileLength = logFile.length(); 186 187 // Write the logFile Type 188 out.writeUTF(logFile.getName()); 189 190 // Write the log length as UTF so that it is printable 191 out.writeUTF(String.valueOf(fileLength)); 192 193 // Write the log itself 194 FileInputStream in = null; 195 try { 196 in = SecureIOUtils.openForRead(logFile, getUser(), null); 197 byte[] buf = new byte[65535]; 198 int len = 0; 199 long bytesLeft = fileLength; 200 while ((len = in.read(buf)) != -1) { 201 //If buffer contents within fileLength, write 202 if (len < bytesLeft) { 203 out.write(buf, 0, len); 204 bytesLeft-=len; 205 } 206 //else only write contents within fileLength, then exit early 207 else { 208 out.write(buf, 0, (int)bytesLeft); 209 break; 210 } 211 } 212 long newLength = logFile.length(); 213 if(fileLength < newLength) { 214 LOG.warn("Aggregated logs truncated by approximately "+ 215 (newLength-fileLength) +" bytes."); 216 } 217 } catch (IOException e) { 218 String message = "Error aggregating log file. Log file : " 219 + logFile.getAbsolutePath() + e.getMessage(); 220 LOG.error(message, e); 221 out.write(message.getBytes()); 222 } finally { 223 if (in != null) { 224 in.close(); 225 } 226 } 227 } 228 } 229 } 230 231 // Added for testing purpose. 232 public String getUser() { 233 return user; 234 } 235 } 236 237 /** 238 * The writer that writes out the aggregated logs. 239 */ 240 @Private 241 public static class LogWriter { 242 243 private final FSDataOutputStream fsDataOStream; 244 private final TFile.Writer writer; 245 246 public LogWriter(final Configuration conf, final Path remoteAppLogFile, 247 UserGroupInformation userUgi) throws IOException { 248 try { 249 this.fsDataOStream = 250 userUgi.doAs(new PrivilegedExceptionAction<FSDataOutputStream>() { 251 @Override 252 public FSDataOutputStream run() throws Exception { 253 FileContext fc = FileContext.getFileContext(conf); 254 fc.setUMask(APP_LOG_FILE_UMASK); 255 return fc.create( 256 remoteAppLogFile, 257 EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), 258 new Options.CreateOpts[] {}); 259 } 260 }); 261 } catch (InterruptedException e) { 262 throw new IOException(e); 263 } 264 265 // Keys are not sorted: null arg 266 // 256KB minBlockSize : Expected log size for each container too 267 this.writer = 268 new TFile.Writer(this.fsDataOStream, 256 * 1024, conf.get( 269 YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE, 270 YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE), null, conf); 271 //Write the version string 272 writeVersion(); 273 } 274 275 private void writeVersion() throws IOException { 276 DataOutputStream out = this.writer.prepareAppendKey(-1); 277 VERSION_KEY.write(out); 278 out.close(); 279 out = this.writer.prepareAppendValue(-1); 280 out.writeInt(VERSION); 281 out.close(); 282 } 283 284 public void writeApplicationOwner(String user) throws IOException { 285 DataOutputStream out = this.writer.prepareAppendKey(-1); 286 APPLICATION_OWNER_KEY.write(out); 287 out.close(); 288 out = this.writer.prepareAppendValue(-1); 289 out.writeUTF(user); 290 out.close(); 291 } 292 293 public void writeApplicationACLs(Map<ApplicationAccessType, String> appAcls) 294 throws IOException { 295 DataOutputStream out = this.writer.prepareAppendKey(-1); 296 APPLICATION_ACL_KEY.write(out); 297 out.close(); 298 out = this.writer.prepareAppendValue(-1); 299 for (Entry<ApplicationAccessType, String> entry : appAcls.entrySet()) { 300 out.writeUTF(entry.getKey().toString()); 301 out.writeUTF(entry.getValue()); 302 } 303 out.close(); 304 } 305 306 public void append(LogKey logKey, LogValue logValue) throws IOException { 307 DataOutputStream out = this.writer.prepareAppendKey(-1); 308 logKey.write(out); 309 out.close(); 310 out = this.writer.prepareAppendValue(-1); 311 logValue.write(out); 312 out.close(); 313 } 314 315 public void close() { 316 try { 317 this.writer.close(); 318 } catch (IOException e) { 319 LOG.warn("Exception closing writer", e); 320 } 321 try { 322 this.fsDataOStream.close(); 323 } catch (IOException e) { 324 LOG.warn("Exception closing output-stream", e); 325 } 326 } 327 } 328 329 @Public 330 @Evolving 331 public static class LogReader { 332 333 private final FSDataInputStream fsDataIStream; 334 private final TFile.Reader.Scanner scanner; 335 private final TFile.Reader reader; 336 337 public LogReader(Configuration conf, Path remoteAppLogFile) 338 throws IOException { 339 FileContext fileContext = FileContext.getFileContext(conf); 340 this.fsDataIStream = fileContext.open(remoteAppLogFile); 341 reader = 342 new TFile.Reader(this.fsDataIStream, fileContext.getFileStatus( 343 remoteAppLogFile).getLen(), conf); 344 this.scanner = reader.createScanner(); 345 } 346 347 private boolean atBeginning = true; 348 349 /** 350 * Returns the owner of the application. 351 * 352 * @return the application owner. 353 * @throws IOException 354 */ 355 public String getApplicationOwner() throws IOException { 356 TFile.Reader.Scanner ownerScanner = reader.createScanner(); 357 LogKey key = new LogKey(); 358 while (!ownerScanner.atEnd()) { 359 TFile.Reader.Scanner.Entry entry = ownerScanner.entry(); 360 key.readFields(entry.getKeyStream()); 361 if (key.toString().equals(APPLICATION_OWNER_KEY.toString())) { 362 DataInputStream valueStream = entry.getValueStream(); 363 return valueStream.readUTF(); 364 } 365 ownerScanner.advance(); 366 } 367 return null; 368 } 369 370 /** 371 * Returns ACLs for the application. An empty map is returned if no ACLs are 372 * found. 373 * 374 * @return a map of the Application ACLs. 375 * @throws IOException 376 */ 377 public Map<ApplicationAccessType, String> getApplicationAcls() 378 throws IOException { 379 // TODO Seek directly to the key once a comparator is specified. 380 TFile.Reader.Scanner aclScanner = reader.createScanner(); 381 LogKey key = new LogKey(); 382 Map<ApplicationAccessType, String> acls = 383 new HashMap<ApplicationAccessType, String>(); 384 while (!aclScanner.atEnd()) { 385 TFile.Reader.Scanner.Entry entry = aclScanner.entry(); 386 key.readFields(entry.getKeyStream()); 387 if (key.toString().equals(APPLICATION_ACL_KEY.toString())) { 388 DataInputStream valueStream = entry.getValueStream(); 389 while (true) { 390 String appAccessOp = null; 391 String aclString = null; 392 try { 393 appAccessOp = valueStream.readUTF(); 394 } catch (EOFException e) { 395 // Valid end of stream. 396 break; 397 } 398 try { 399 aclString = valueStream.readUTF(); 400 } catch (EOFException e) { 401 throw new YarnRuntimeException("Error reading ACLs", e); 402 } 403 acls.put(ApplicationAccessType.valueOf(appAccessOp), aclString); 404 } 405 406 } 407 aclScanner.advance(); 408 } 409 return acls; 410 } 411 412 /** 413 * Read the next key and return the value-stream. 414 * 415 * @param key 416 * @return the valueStream if there are more keys or null otherwise. 417 * @throws IOException 418 */ 419 public DataInputStream next(LogKey key) throws IOException { 420 if (!this.atBeginning) { 421 this.scanner.advance(); 422 } else { 423 this.atBeginning = false; 424 } 425 if (this.scanner.atEnd()) { 426 return null; 427 } 428 TFile.Reader.Scanner.Entry entry = this.scanner.entry(); 429 key.readFields(entry.getKeyStream()); 430 // Skip META keys 431 if (RESERVED_KEYS.containsKey(key.toString())) { 432 return next(key); 433 } 434 DataInputStream valueStream = entry.getValueStream(); 435 return valueStream; 436 } 437 438 /** 439 * Get a ContainerLogsReader to read the logs for 440 * the specified container. 441 * 442 * @param containerId 443 * @return object to read the container's logs or null if the 444 * logs could not be found 445 * @throws IOException 446 */ 447 @Private 448 public ContainerLogsReader getContainerLogsReader( 449 ContainerId containerId) throws IOException { 450 ContainerLogsReader logReader = null; 451 452 final LogKey containerKey = new LogKey(containerId); 453 LogKey key = new LogKey(); 454 DataInputStream valueStream = next(key); 455 while (valueStream != null && !key.equals(containerKey)) { 456 valueStream = next(key); 457 } 458 459 if (valueStream != null) { 460 logReader = new ContainerLogsReader(valueStream); 461 } 462 463 return logReader; 464 } 465 466 //TODO Change Log format and interfaces to be containerId specific. 467 // Avoid returning completeValueStreams. 468 // public List<String> getTypesForContainer(DataInputStream valueStream){} 469 // 470 // /** 471 // * @param valueStream 472 // * The Log stream for the container. 473 // * @param fileType 474 // * the log type required. 475 // * @return An InputStreamReader for the required log type or null if the 476 // * type is not found. 477 // * @throws IOException 478 // */ 479 // public InputStreamReader getLogStreamForType(DataInputStream valueStream, 480 // String fileType) throws IOException { 481 // valueStream.reset(); 482 // try { 483 // while (true) { 484 // String ft = valueStream.readUTF(); 485 // String fileLengthStr = valueStream.readUTF(); 486 // long fileLength = Long.parseLong(fileLengthStr); 487 // if (ft.equals(fileType)) { 488 // BoundedInputStream bis = 489 // new BoundedInputStream(valueStream, fileLength); 490 // return new InputStreamReader(bis); 491 // } else { 492 // long totalSkipped = 0; 493 // long currSkipped = 0; 494 // while (currSkipped != -1 && totalSkipped < fileLength) { 495 // currSkipped = valueStream.skip(fileLength - totalSkipped); 496 // totalSkipped += currSkipped; 497 // } 498 // // TODO Verify skip behaviour. 499 // if (currSkipped == -1) { 500 // return null; 501 // } 502 // } 503 // } 504 // } catch (EOFException e) { 505 // return null; 506 // } 507 // } 508 509 /** 510 * Writes all logs for a single container to the provided writer. 511 * @param valueStream 512 * @param writer 513 * @throws IOException 514 */ 515 public static void readAcontainerLogs(DataInputStream valueStream, 516 Writer writer) throws IOException { 517 int bufferSize = 65536; 518 char[] cbuf = new char[bufferSize]; 519 String fileType; 520 String fileLengthStr; 521 long fileLength; 522 523 while (true) { 524 try { 525 fileType = valueStream.readUTF(); 526 } catch (EOFException e) { 527 // EndOfFile 528 return; 529 } 530 fileLengthStr = valueStream.readUTF(); 531 fileLength = Long.parseLong(fileLengthStr); 532 writer.write("\n\nLogType:"); 533 writer.write(fileType); 534 writer.write("\nLogLength:"); 535 writer.write(fileLengthStr); 536 writer.write("\nLog Contents:\n"); 537 // ByteLevel 538 BoundedInputStream bis = 539 new BoundedInputStream(valueStream, fileLength); 540 InputStreamReader reader = new InputStreamReader(bis); 541 int currentRead = 0; 542 int totalRead = 0; 543 while ((currentRead = reader.read(cbuf, 0, bufferSize)) != -1) { 544 writer.write(cbuf, 0, currentRead); 545 totalRead += currentRead; 546 } 547 } 548 } 549 550 /** 551 * Keep calling this till you get a {@link EOFException} for getting logs of 552 * all types for a single container. 553 * 554 * @param valueStream 555 * @param out 556 * @throws IOException 557 */ 558 public static void readAContainerLogsForALogType( 559 DataInputStream valueStream, PrintStream out) 560 throws IOException { 561 562 byte[] buf = new byte[65535]; 563 564 String fileType = valueStream.readUTF(); 565 String fileLengthStr = valueStream.readUTF(); 566 long fileLength = Long.parseLong(fileLengthStr); 567 out.print("LogType: "); 568 out.println(fileType); 569 out.print("LogLength: "); 570 out.println(fileLengthStr); 571 out.println("Log Contents:"); 572 573 long curRead = 0; 574 long pendingRead = fileLength - curRead; 575 int toRead = 576 pendingRead > buf.length ? buf.length : (int) pendingRead; 577 int len = valueStream.read(buf, 0, toRead); 578 while (len != -1 && curRead < fileLength) { 579 out.write(buf, 0, len); 580 curRead += len; 581 582 pendingRead = fileLength - curRead; 583 toRead = 584 pendingRead > buf.length ? buf.length : (int) pendingRead; 585 len = valueStream.read(buf, 0, toRead); 586 } 587 out.println(""); 588 } 589 590 public void close() { 591 IOUtils.cleanup(LOG, scanner, reader, fsDataIStream); 592 } 593 } 594 595 @Private 596 public static class ContainerLogsReader { 597 private DataInputStream valueStream; 598 private String currentLogType = null; 599 private long currentLogLength = 0; 600 private BoundedInputStream currentLogData = null; 601 private InputStreamReader currentLogISR; 602 603 public ContainerLogsReader(DataInputStream stream) { 604 valueStream = stream; 605 } 606 607 public String nextLog() throws IOException { 608 if (currentLogData != null && currentLogLength > 0) { 609 // seek to the end of the current log, relying on BoundedInputStream 610 // to prevent seeking past the end of the current log 611 do { 612 if (currentLogData.skip(currentLogLength) < 0) { 613 break; 614 } 615 } while (currentLogData.read() != -1); 616 } 617 618 currentLogType = null; 619 currentLogLength = 0; 620 currentLogData = null; 621 currentLogISR = null; 622 623 try { 624 String logType = valueStream.readUTF(); 625 String logLengthStr = valueStream.readUTF(); 626 currentLogLength = Long.parseLong(logLengthStr); 627 currentLogData = 628 new BoundedInputStream(valueStream, currentLogLength); 629 currentLogData.setPropagateClose(false); 630 currentLogISR = new InputStreamReader(currentLogData); 631 currentLogType = logType; 632 } catch (EOFException e) { 633 } 634 635 return currentLogType; 636 } 637 638 public String getCurrentLogType() { 639 return currentLogType; 640 } 641 642 public long getCurrentLogLength() { 643 return currentLogLength; 644 } 645 646 public long skip(long n) throws IOException { 647 return currentLogData.skip(n); 648 } 649 650 public int read(byte[] buf, int off, int len) throws IOException { 651 return currentLogData.read(buf, off, len); 652 } 653 654 public int read(char[] buf, int off, int len) throws IOException { 655 return currentLogISR.read(buf, off, len); 656 } 657 } 658 }