001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce.lib.join; 020 021import java.io.CharArrayReader; 022import java.io.IOException; 023import java.io.StreamTokenizer; 024import java.lang.reflect.Constructor; 025import java.lang.reflect.InvocationTargetException; 026import java.util.ArrayList; 027import java.util.HashMap; 028import java.util.Iterator; 029import java.util.LinkedList; 030import java.util.List; 031import java.util.ListIterator; 032import java.util.Map; 033import java.util.Stack; 034 035import org.apache.hadoop.classification.InterfaceAudience; 036import org.apache.hadoop.classification.InterfaceStability; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.io.WritableComparator; 039import org.apache.hadoop.mapreduce.Counter; 040import org.apache.hadoop.mapreduce.InputFormat; 041import org.apache.hadoop.mapreduce.InputSplit; 042import org.apache.hadoop.mapreduce.Job; 043import org.apache.hadoop.mapreduce.JobContext; 044import org.apache.hadoop.mapreduce.MRJobConfig; 045import org.apache.hadoop.mapreduce.RecordReader; 046import org.apache.hadoop.mapreduce.StatusReporter; 047import org.apache.hadoop.mapreduce.TaskAttemptContext; 048import org.apache.hadoop.mapreduce.TaskAttemptID; 049import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 050import org.apache.hadoop.mapreduce.task.JobContextImpl; 051import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; 052import org.apache.hadoop.util.ReflectionUtils; 053 054/** 055 * Very simple shift-reduce parser for join expressions. 056 * 057 * This should be sufficient for the user extension permitted now, but ought to 058 * be replaced with a parser generator if more complex grammars are supported. 059 * In particular, this "shift-reduce" parser has no states. Each set 060 * of formals requires a different internal node type, which is responsible for 061 * interpreting the list of tokens it receives. This is sufficient for the 062 * current grammar, but it has several annoying properties that might inhibit 063 * extension. In particular, parenthesis are always function calls; an 064 * algebraic or filter grammar would not only require a node type, but must 065 * also work around the internals of this parser. 066 * 067 * For most other cases, adding classes to the hierarchy- particularly by 068 * extending JoinRecordReader and MultiFilterRecordReader- is fairly 069 * straightforward. One need only override the relevant method(s) (usually only 070 * {@link CompositeRecordReader#combine}) and include a property to map its 071 * value to an identifier in the parser. 072 */ 073@InterfaceAudience.Public 074@InterfaceStability.Evolving 075public class Parser { 076 @InterfaceAudience.Public 077 @InterfaceStability.Evolving 078 public enum TType { CIF, IDENT, COMMA, LPAREN, RPAREN, QUOT, NUM, } 079 080 /** 081 * Tagged-union type for tokens from the join expression. 082 * @see Parser.TType 083 */ 084 @InterfaceAudience.Public 085 @InterfaceStability.Evolving 086 public static class Token { 087 088 private TType type; 089 090 Token(TType type) { 091 this.type = type; 092 } 093 094 public TType getType() { return type; } 095 096 public Node getNode() throws IOException { 097 throw new IOException("Expected nodetype"); 098 } 099 100 public double getNum() throws IOException { 101 throw new IOException("Expected numtype"); 102 } 103 104 public String getStr() throws IOException { 105 throw new IOException("Expected strtype"); 106 } 107 } 108 109 @InterfaceAudience.Public 110 @InterfaceStability.Evolving 111 public static class NumToken extends Token { 112 private double num; 113 public NumToken(double num) { 114 super(TType.NUM); 115 this.num = num; 116 } 117 public double getNum() { return num; } 118 } 119 120 @InterfaceAudience.Public 121 @InterfaceStability.Evolving 122 public static class NodeToken extends Token { 123 private Node node; 124 NodeToken(Node node) { 125 super(TType.CIF); 126 this.node = node; 127 } 128 public Node getNode() { 129 return node; 130 } 131 } 132 133 @InterfaceAudience.Public 134 @InterfaceStability.Evolving 135 public static class StrToken extends Token { 136 private String str; 137 public StrToken(TType type, String str) { 138 super(type); 139 this.str = str; 140 } 141 public String getStr() { 142 return str; 143 } 144 } 145 146 /** 147 * Simple lexer wrapping a StreamTokenizer. 148 * This encapsulates the creation of tagged-union Tokens and initializes the 149 * SteamTokenizer. 150 */ 151 private static class Lexer { 152 153 private StreamTokenizer tok; 154 155 Lexer(String s) { 156 tok = new StreamTokenizer(new CharArrayReader(s.toCharArray())); 157 tok.quoteChar('"'); 158 tok.parseNumbers(); 159 tok.ordinaryChar(','); 160 tok.ordinaryChar('('); 161 tok.ordinaryChar(')'); 162 tok.wordChars('$','$'); 163 tok.wordChars('_','_'); 164 } 165 166 Token next() throws IOException { 167 int type = tok.nextToken(); 168 switch (type) { 169 case StreamTokenizer.TT_EOF: 170 case StreamTokenizer.TT_EOL: 171 return null; 172 case StreamTokenizer.TT_NUMBER: 173 return new NumToken(tok.nval); 174 case StreamTokenizer.TT_WORD: 175 return new StrToken(TType.IDENT, tok.sval); 176 case '"': 177 return new StrToken(TType.QUOT, tok.sval); 178 default: 179 switch (type) { 180 case ',': 181 return new Token(TType.COMMA); 182 case '(': 183 return new Token(TType.LPAREN); 184 case ')': 185 return new Token(TType.RPAREN); 186 default: 187 throw new IOException("Unexpected: " + type); 188 } 189 } 190 } 191 } 192 193@SuppressWarnings("unchecked") 194@InterfaceAudience.Public 195@InterfaceStability.Evolving 196public abstract static class Node extends ComposableInputFormat { 197 /** 198 * Return the node type registered for the particular identifier. 199 * By default, this is a CNode for any composite node and a WNode 200 * for "wrapped" nodes. User nodes will likely be composite 201 * nodes. 202 * @see #addIdentifier(java.lang.String, java.lang.Class[], java.lang.Class, java.lang.Class) 203 * @see CompositeInputFormat#setFormat(org.apache.hadoop.mapred.JobConf) 204 */ 205 static Node forIdent(String ident) throws IOException { 206 try { 207 if (!nodeCstrMap.containsKey(ident)) { 208 throw new IOException("No nodetype for " + ident); 209 } 210 return nodeCstrMap.get(ident).newInstance(ident); 211 } catch (IllegalAccessException e) { 212 throw new IOException(e); 213 } catch (InstantiationException e) { 214 throw new IOException(e); 215 } catch (InvocationTargetException e) { 216 throw new IOException(e); 217 } 218 } 219 220 private static final Class<?>[] ncstrSig = { String.class }; 221 private static final 222 Map<String,Constructor<? extends Node>> nodeCstrMap = 223 new HashMap<String,Constructor<? extends Node>>(); 224 protected static final Map<String,Constructor<? extends 225 ComposableRecordReader>> rrCstrMap = 226 new HashMap<String,Constructor<? extends ComposableRecordReader>>(); 227 228 /** 229 * For a given identifier, add a mapping to the nodetype for the parse 230 * tree and to the ComposableRecordReader to be created, including the 231 * formals required to invoke the constructor. 232 * The nodetype and constructor signature should be filled in from the 233 * child node. 234 */ 235 protected static void addIdentifier(String ident, Class<?>[] mcstrSig, 236 Class<? extends Node> nodetype, 237 Class<? extends ComposableRecordReader> cl) 238 throws NoSuchMethodException { 239 Constructor<? extends Node> ncstr = 240 nodetype.getDeclaredConstructor(ncstrSig); 241 ncstr.setAccessible(true); 242 nodeCstrMap.put(ident, ncstr); 243 Constructor<? extends ComposableRecordReader> mcstr = 244 cl.getDeclaredConstructor(mcstrSig); 245 mcstr.setAccessible(true); 246 rrCstrMap.put(ident, mcstr); 247 } 248 249 // inst 250 protected int id = -1; 251 protected String ident; 252 protected Class<? extends WritableComparator> cmpcl; 253 254 protected Node(String ident) { 255 this.ident = ident; 256 } 257 258 protected void setID(int id) { 259 this.id = id; 260 } 261 262 protected void setKeyComparator( 263 Class<? extends WritableComparator> cmpcl) { 264 this.cmpcl = cmpcl; 265 } 266 abstract void parse(List<Token> args, Configuration conf) 267 throws IOException; 268 } 269 270 /** 271 * Nodetype in the parse tree for "wrapped" InputFormats. 272 */ 273 static class WNode extends Node { 274 private static final Class<?>[] cstrSig = 275 { Integer.TYPE, RecordReader.class, Class.class }; 276 277 @SuppressWarnings("unchecked") 278 static void addIdentifier(String ident, 279 Class<? extends ComposableRecordReader> cl) 280 throws NoSuchMethodException { 281 Node.addIdentifier(ident, cstrSig, WNode.class, cl); 282 } 283 284 private String indir; 285 private InputFormat<?, ?> inf; 286 287 public WNode(String ident) { 288 super(ident); 289 } 290 291 /** 292 * Let the first actual define the InputFormat and the second define 293 * the <tt>mapred.input.dir</tt> property. 294 */ 295 @Override 296 public void parse(List<Token> ll, Configuration conf) throws IOException { 297 StringBuilder sb = new StringBuilder(); 298 Iterator<Token> i = ll.iterator(); 299 while (i.hasNext()) { 300 Token t = i.next(); 301 if (TType.COMMA.equals(t.getType())) { 302 try { 303 inf = (InputFormat<?, ?>)ReflectionUtils.newInstance( 304 conf.getClassByName(sb.toString()), conf); 305 } catch (ClassNotFoundException e) { 306 throw new IOException(e); 307 } catch (IllegalArgumentException e) { 308 throw new IOException(e); 309 } 310 break; 311 } 312 sb.append(t.getStr()); 313 } 314 if (!i.hasNext()) { 315 throw new IOException("Parse error"); 316 } 317 Token t = i.next(); 318 if (!TType.QUOT.equals(t.getType())) { 319 throw new IOException("Expected quoted string"); 320 } 321 indir = t.getStr(); 322 // no check for ll.isEmpty() to permit extension 323 } 324 325 private Configuration getConf(Configuration jconf) throws IOException { 326 Job job = Job.getInstance(jconf); 327 FileInputFormat.setInputPaths(job, indir); 328 return job.getConfiguration(); 329 } 330 331 public List<InputSplit> getSplits(JobContext context) 332 throws IOException, InterruptedException { 333 return inf.getSplits( 334 new JobContextImpl(getConf(context.getConfiguration()), 335 context.getJobID())); 336 } 337 338 public ComposableRecordReader<?, ?> createRecordReader(InputSplit split, 339 TaskAttemptContext taskContext) 340 throws IOException, InterruptedException { 341 try { 342 if (!rrCstrMap.containsKey(ident)) { 343 throw new IOException("No RecordReader for " + ident); 344 } 345 Configuration conf = getConf(taskContext.getConfiguration()); 346 TaskAttemptContext context = 347 new TaskAttemptContextImpl(conf, 348 TaskAttemptID.forName(conf.get(MRJobConfig.TASK_ATTEMPT_ID)), 349 new WrappedStatusReporter(taskContext)); 350 return rrCstrMap.get(ident).newInstance(id, 351 inf.createRecordReader(split, context), cmpcl); 352 } catch (IllegalAccessException e) { 353 throw new IOException(e); 354 } catch (InstantiationException e) { 355 throw new IOException(e); 356 } catch (InvocationTargetException e) { 357 throw new IOException(e); 358 } 359 } 360 361 public String toString() { 362 return ident + "(" + inf.getClass().getName() + ",\"" + indir + "\")"; 363 } 364 } 365 366 private static class WrappedStatusReporter extends StatusReporter { 367 368 TaskAttemptContext context; 369 370 public WrappedStatusReporter(TaskAttemptContext context) { 371 this.context = context; 372 } 373 @Override 374 public Counter getCounter(Enum<?> name) { 375 return context.getCounter(name); 376 } 377 378 @Override 379 public Counter getCounter(String group, String name) { 380 return context.getCounter(group, name); 381 } 382 383 @Override 384 public void progress() { 385 context.progress(); 386 } 387 388 @Override 389 public float getProgress() { 390 return context.getProgress(); 391 } 392 393 @Override 394 public void setStatus(String status) { 395 context.setStatus(status); 396 } 397 } 398 399 /** 400 * Internal nodetype for "composite" InputFormats. 401 */ 402 static class CNode extends Node { 403 404 private static final Class<?>[] cstrSig = 405 { Integer.TYPE, Configuration.class, Integer.TYPE, Class.class }; 406 407 @SuppressWarnings("unchecked") 408 static void addIdentifier(String ident, 409 Class<? extends ComposableRecordReader> cl) 410 throws NoSuchMethodException { 411 Node.addIdentifier(ident, cstrSig, CNode.class, cl); 412 } 413 414 // inst 415 private ArrayList<Node> kids = new ArrayList<Node>(); 416 417 public CNode(String ident) { 418 super(ident); 419 } 420 421 @Override 422 public void setKeyComparator(Class<? extends WritableComparator> cmpcl) { 423 super.setKeyComparator(cmpcl); 424 for (Node n : kids) { 425 n.setKeyComparator(cmpcl); 426 } 427 } 428 429 /** 430 * Combine InputSplits from child InputFormats into a 431 * {@link CompositeInputSplit}. 432 */ 433 @SuppressWarnings("unchecked") 434 public List<InputSplit> getSplits(JobContext job) 435 throws IOException, InterruptedException { 436 List<List<InputSplit>> splits = 437 new ArrayList<List<InputSplit>>(kids.size()); 438 for (int i = 0; i < kids.size(); ++i) { 439 List<InputSplit> tmp = kids.get(i).getSplits(job); 440 if (null == tmp) { 441 throw new IOException("Error gathering splits from child RReader"); 442 } 443 if (i > 0 && splits.get(i-1).size() != tmp.size()) { 444 throw new IOException("Inconsistent split cardinality from child " + 445 i + " (" + splits.get(i-1).size() + "/" + tmp.size() + ")"); 446 } 447 splits.add(i, tmp); 448 } 449 final int size = splits.get(0).size(); 450 List<InputSplit> ret = new ArrayList<InputSplit>(); 451 for (int i = 0; i < size; ++i) { 452 CompositeInputSplit split = new CompositeInputSplit(splits.size()); 453 for (int j = 0; j < splits.size(); ++j) { 454 split.add(splits.get(j).get(i)); 455 } 456 ret.add(split); 457 } 458 return ret; 459 } 460 461 @SuppressWarnings("unchecked") // child types unknowable 462 public ComposableRecordReader 463 createRecordReader(InputSplit split, TaskAttemptContext taskContext) 464 throws IOException, InterruptedException { 465 if (!(split instanceof CompositeInputSplit)) { 466 throw new IOException("Invalid split type:" + 467 split.getClass().getName()); 468 } 469 final CompositeInputSplit spl = (CompositeInputSplit)split; 470 final int capacity = kids.size(); 471 CompositeRecordReader ret = null; 472 try { 473 if (!rrCstrMap.containsKey(ident)) { 474 throw new IOException("No RecordReader for " + ident); 475 } 476 ret = (CompositeRecordReader)rrCstrMap.get(ident). 477 newInstance(id, taskContext.getConfiguration(), capacity, cmpcl); 478 } catch (IllegalAccessException e) { 479 throw new IOException(e); 480 } catch (InstantiationException e) { 481 throw new IOException(e); 482 } catch (InvocationTargetException e) { 483 throw new IOException(e); 484 } 485 for (int i = 0; i < capacity; ++i) { 486 ret.add(kids.get(i).createRecordReader(spl.get(i), taskContext)); 487 } 488 return (ComposableRecordReader)ret; 489 } 490 491 /** 492 * Parse a list of comma-separated nodes. 493 */ 494 public void parse(List<Token> args, Configuration conf) 495 throws IOException { 496 ListIterator<Token> i = args.listIterator(); 497 while (i.hasNext()) { 498 Token t = i.next(); 499 t.getNode().setID(i.previousIndex() >> 1); 500 kids.add(t.getNode()); 501 if (i.hasNext() && !TType.COMMA.equals(i.next().getType())) { 502 throw new IOException("Expected ','"); 503 } 504 } 505 } 506 507 public String toString() { 508 StringBuilder sb = new StringBuilder(); 509 sb.append(ident + "("); 510 for (Node n : kids) { 511 sb.append(n.toString() + ","); 512 } 513 sb.setCharAt(sb.length() - 1, ')'); 514 return sb.toString(); 515 } 516 } 517 518 private static Token reduce(Stack<Token> st, Configuration conf) 519 throws IOException { 520 LinkedList<Token> args = new LinkedList<Token>(); 521 while (!st.isEmpty() && !TType.LPAREN.equals(st.peek().getType())) { 522 args.addFirst(st.pop()); 523 } 524 if (st.isEmpty()) { 525 throw new IOException("Unmatched ')'"); 526 } 527 st.pop(); 528 if (st.isEmpty() || !TType.IDENT.equals(st.peek().getType())) { 529 throw new IOException("Identifier expected"); 530 } 531 Node n = Node.forIdent(st.pop().getStr()); 532 n.parse(args, conf); 533 return new NodeToken(n); 534 } 535 536 /** 537 * Given an expression and an optional comparator, build a tree of 538 * InputFormats using the comparator to sort keys. 539 */ 540 static Node parse(String expr, Configuration conf) throws IOException { 541 if (null == expr) { 542 throw new IOException("Expression is null"); 543 } 544 Class<? extends WritableComparator> cmpcl = conf.getClass( 545 CompositeInputFormat.JOIN_COMPARATOR, null, WritableComparator.class); 546 Lexer lex = new Lexer(expr); 547 Stack<Token> st = new Stack<Token>(); 548 Token tok; 549 while ((tok = lex.next()) != null) { 550 if (TType.RPAREN.equals(tok.getType())) { 551 st.push(reduce(st, conf)); 552 } else { 553 st.push(tok); 554 } 555 } 556 if (st.size() == 1 && TType.CIF.equals(st.peek().getType())) { 557 Node ret = st.pop().getNode(); 558 if (cmpcl != null) { 559 ret.setKeyComparator(cmpcl); 560 } 561 return ret; 562 } 563 throw new IOException("Missing ')'"); 564 } 565 566}