001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapred.join; 020 021 import java.io.CharArrayReader; 022 import java.io.IOException; 023 import java.io.StreamTokenizer; 024 import java.lang.reflect.Constructor; 025 import java.lang.reflect.InvocationTargetException; 026 import java.util.ArrayList; 027 import java.util.HashMap; 028 import java.util.Iterator; 029 import java.util.LinkedList; 030 import java.util.List; 031 import java.util.ListIterator; 032 import java.util.Map; 033 import java.util.Stack; 034 035 import org.apache.hadoop.classification.InterfaceAudience; 036 import org.apache.hadoop.classification.InterfaceStability; 037 import org.apache.hadoop.io.WritableComparator; 038 import org.apache.hadoop.mapred.FileInputFormat; 039 import org.apache.hadoop.mapred.InputFormat; 040 import org.apache.hadoop.mapred.InputSplit; 041 import org.apache.hadoop.mapred.JobConf; 042 import org.apache.hadoop.mapred.RecordReader; 043 import org.apache.hadoop.mapred.Reporter; 044 import org.apache.hadoop.util.ReflectionUtils; 045 046 /** 047 * Very simple shift-reduce parser for join expressions. 048 * 049 * This should be sufficient for the user extension permitted now, but ought to 050 * be replaced with a parser generator if more complex grammars are supported. 051 * In particular, this "shift-reduce" parser has no states. Each set 052 * of formals requires a different internal node type, which is responsible for 053 * interpreting the list of tokens it receives. This is sufficient for the 054 * current grammar, but it has several annoying properties that might inhibit 055 * extension. In particular, parenthesis are always function calls; an 056 * algebraic or filter grammar would not only require a node type, but must 057 * also work around the internals of this parser. 058 * 059 * For most other cases, adding classes to the hierarchy- particularly by 060 * extending JoinRecordReader and MultiFilterRecordReader- is fairly 061 * straightforward. One need only override the relevant method(s) (usually only 062 * {@link CompositeRecordReader#combine}) and include a property to map its 063 * value to an identifier in the parser. 064 */ 065 @InterfaceAudience.Public 066 @InterfaceStability.Evolving 067 public class Parser { 068 @InterfaceAudience.Public 069 @InterfaceStability.Evolving 070 public enum TType { CIF, IDENT, COMMA, LPAREN, RPAREN, QUOT, NUM, } 071 072 /** 073 * Tagged-union type for tokens from the join expression. 074 * @see Parser.TType 075 */ 076 @InterfaceAudience.Public 077 @InterfaceStability.Evolving 078 public static class Token { 079 080 private TType type; 081 082 Token(TType type) { 083 this.type = type; 084 } 085 086 public TType getType() { return type; } 087 public Node getNode() throws IOException { 088 throw new IOException("Expected nodetype"); 089 } 090 public double getNum() throws IOException { 091 throw new IOException("Expected numtype"); 092 } 093 public String getStr() throws IOException { 094 throw new IOException("Expected strtype"); 095 } 096 } 097 098 @InterfaceAudience.Public 099 @InterfaceStability.Evolving 100 public static class NumToken extends Token { 101 private double num; 102 public NumToken(double num) { 103 super(TType.NUM); 104 this.num = num; 105 } 106 public double getNum() { return num; } 107 } 108 109 @InterfaceAudience.Public 110 @InterfaceStability.Evolving 111 public static class NodeToken extends Token { 112 private Node node; 113 NodeToken(Node node) { 114 super(TType.CIF); 115 this.node = node; 116 } 117 public Node getNode() { 118 return node; 119 } 120 } 121 122 @InterfaceAudience.Public 123 @InterfaceStability.Evolving 124 public static class StrToken extends Token { 125 private String str; 126 public StrToken(TType type, String str) { 127 super(type); 128 this.str = str; 129 } 130 public String getStr() { 131 return str; 132 } 133 } 134 135 /** 136 * Simple lexer wrapping a StreamTokenizer. 137 * This encapsulates the creation of tagged-union Tokens and initializes the 138 * SteamTokenizer. 139 */ 140 private static class Lexer { 141 142 private StreamTokenizer tok; 143 144 Lexer(String s) { 145 tok = new StreamTokenizer(new CharArrayReader(s.toCharArray())); 146 tok.quoteChar('"'); 147 tok.parseNumbers(); 148 tok.ordinaryChar(','); 149 tok.ordinaryChar('('); 150 tok.ordinaryChar(')'); 151 tok.wordChars('$','$'); 152 tok.wordChars('_','_'); 153 } 154 155 Token next() throws IOException { 156 int type = tok.nextToken(); 157 switch (type) { 158 case StreamTokenizer.TT_EOF: 159 case StreamTokenizer.TT_EOL: 160 return null; 161 case StreamTokenizer.TT_NUMBER: 162 return new NumToken(tok.nval); 163 case StreamTokenizer.TT_WORD: 164 return new StrToken(TType.IDENT, tok.sval); 165 case '"': 166 return new StrToken(TType.QUOT, tok.sval); 167 default: 168 switch (type) { 169 case ',': 170 return new Token(TType.COMMA); 171 case '(': 172 return new Token(TType.LPAREN); 173 case ')': 174 return new Token(TType.RPAREN); 175 default: 176 throw new IOException("Unexpected: " + type); 177 } 178 } 179 } 180 } 181 182 @InterfaceAudience.Public 183 @InterfaceStability.Evolving 184 public abstract static class Node implements ComposableInputFormat { 185 /** 186 * Return the node type registered for the particular identifier. 187 * By default, this is a CNode for any composite node and a WNode 188 * for "wrapped" nodes. User nodes will likely be composite 189 * nodes. 190 * @see #addIdentifier(java.lang.String, java.lang.Class[], java.lang.Class, java.lang.Class) 191 * @see CompositeInputFormat#setFormat(org.apache.hadoop.mapred.JobConf) 192 */ 193 static Node forIdent(String ident) throws IOException { 194 try { 195 if (!nodeCstrMap.containsKey(ident)) { 196 throw new IOException("No nodetype for " + ident); 197 } 198 return nodeCstrMap.get(ident).newInstance(ident); 199 } catch (IllegalAccessException e) { 200 throw (IOException)new IOException().initCause(e); 201 } catch (InstantiationException e) { 202 throw (IOException)new IOException().initCause(e); 203 } catch (InvocationTargetException e) { 204 throw (IOException)new IOException().initCause(e); 205 } 206 } 207 208 private static final Class<?>[] ncstrSig = { String.class }; 209 private static final 210 Map<String,Constructor<? extends Node>> nodeCstrMap = 211 new HashMap<String,Constructor<? extends Node>>(); 212 protected static final 213 Map<String,Constructor<? extends ComposableRecordReader>> rrCstrMap = 214 new HashMap<String,Constructor<? extends ComposableRecordReader>>(); 215 216 /** 217 * For a given identifier, add a mapping to the nodetype for the parse 218 * tree and to the ComposableRecordReader to be created, including the 219 * formals required to invoke the constructor. 220 * The nodetype and constructor signature should be filled in from the 221 * child node. 222 */ 223 protected static void addIdentifier(String ident, Class<?>[] mcstrSig, 224 Class<? extends Node> nodetype, 225 Class<? extends ComposableRecordReader> cl) 226 throws NoSuchMethodException { 227 Constructor<? extends Node> ncstr = 228 nodetype.getDeclaredConstructor(ncstrSig); 229 ncstr.setAccessible(true); 230 nodeCstrMap.put(ident, ncstr); 231 Constructor<? extends ComposableRecordReader> mcstr = 232 cl.getDeclaredConstructor(mcstrSig); 233 mcstr.setAccessible(true); 234 rrCstrMap.put(ident, mcstr); 235 } 236 237 // inst 238 protected int id = -1; 239 protected String ident; 240 protected Class<? extends WritableComparator> cmpcl; 241 242 protected Node(String ident) { 243 this.ident = ident; 244 } 245 246 protected void setID(int id) { 247 this.id = id; 248 } 249 250 protected void setKeyComparator(Class<? extends WritableComparator> cmpcl) { 251 this.cmpcl = cmpcl; 252 } 253 abstract void parse(List<Token> args, JobConf job) throws IOException; 254 } 255 256 /** 257 * Nodetype in the parse tree for "wrapped" InputFormats. 258 */ 259 static class WNode extends Node { 260 private static final Class<?>[] cstrSig = 261 { Integer.TYPE, RecordReader.class, Class.class }; 262 263 static void addIdentifier(String ident, 264 Class<? extends ComposableRecordReader> cl) 265 throws NoSuchMethodException { 266 Node.addIdentifier(ident, cstrSig, WNode.class, cl); 267 } 268 269 private String indir; 270 private InputFormat inf; 271 272 public WNode(String ident) { 273 super(ident); 274 } 275 276 /** 277 * Let the first actual define the InputFormat and the second define 278 * the <tt>mapred.input.dir</tt> property. 279 */ 280 public void parse(List<Token> ll, JobConf job) throws IOException { 281 StringBuilder sb = new StringBuilder(); 282 Iterator<Token> i = ll.iterator(); 283 while (i.hasNext()) { 284 Token t = i.next(); 285 if (TType.COMMA.equals(t.getType())) { 286 try { 287 inf = (InputFormat)ReflectionUtils.newInstance( 288 job.getClassByName(sb.toString()), 289 job); 290 } catch (ClassNotFoundException e) { 291 throw (IOException)new IOException().initCause(e); 292 } catch (IllegalArgumentException e) { 293 throw (IOException)new IOException().initCause(e); 294 } 295 break; 296 } 297 sb.append(t.getStr()); 298 } 299 if (!i.hasNext()) { 300 throw new IOException("Parse error"); 301 } 302 Token t = i.next(); 303 if (!TType.QUOT.equals(t.getType())) { 304 throw new IOException("Expected quoted string"); 305 } 306 indir = t.getStr(); 307 // no check for ll.isEmpty() to permit extension 308 } 309 310 private JobConf getConf(JobConf job) { 311 JobConf conf = new JobConf(job); 312 FileInputFormat.setInputPaths(conf, indir); 313 conf.setClassLoader(job.getClassLoader()); 314 return conf; 315 } 316 317 public InputSplit[] getSplits(JobConf job, int numSplits) 318 throws IOException { 319 return inf.getSplits(getConf(job), numSplits); 320 } 321 322 public ComposableRecordReader getRecordReader( 323 InputSplit split, JobConf job, Reporter reporter) throws IOException { 324 try { 325 if (!rrCstrMap.containsKey(ident)) { 326 throw new IOException("No RecordReader for " + ident); 327 } 328 return rrCstrMap.get(ident).newInstance(id, 329 inf.getRecordReader(split, getConf(job), reporter), cmpcl); 330 } catch (IllegalAccessException e) { 331 throw (IOException)new IOException().initCause(e); 332 } catch (InstantiationException e) { 333 throw (IOException)new IOException().initCause(e); 334 } catch (InvocationTargetException e) { 335 throw (IOException)new IOException().initCause(e); 336 } 337 } 338 339 public String toString() { 340 return ident + "(" + inf.getClass().getName() + ",\"" + indir + "\")"; 341 } 342 } 343 344 /** 345 * Internal nodetype for "composite" InputFormats. 346 */ 347 static class CNode extends Node { 348 349 private static final Class<?>[] cstrSig = 350 { Integer.TYPE, JobConf.class, Integer.TYPE, Class.class }; 351 352 static void addIdentifier(String ident, 353 Class<? extends ComposableRecordReader> cl) 354 throws NoSuchMethodException { 355 Node.addIdentifier(ident, cstrSig, CNode.class, cl); 356 } 357 358 // inst 359 private ArrayList<Node> kids = new ArrayList<Node>(); 360 361 public CNode(String ident) { 362 super(ident); 363 } 364 365 public void setKeyComparator(Class<? extends WritableComparator> cmpcl) { 366 super.setKeyComparator(cmpcl); 367 for (Node n : kids) { 368 n.setKeyComparator(cmpcl); 369 } 370 } 371 372 /** 373 * Combine InputSplits from child InputFormats into a 374 * {@link CompositeInputSplit}. 375 */ 376 public InputSplit[] getSplits(JobConf job, int numSplits) 377 throws IOException { 378 InputSplit[][] splits = new InputSplit[kids.size()][]; 379 for (int i = 0; i < kids.size(); ++i) { 380 final InputSplit[] tmp = kids.get(i).getSplits(job, numSplits); 381 if (null == tmp) { 382 throw new IOException("Error gathering splits from child RReader"); 383 } 384 if (i > 0 && splits[i-1].length != tmp.length) { 385 throw new IOException("Inconsistent split cardinality from child " + 386 i + " (" + splits[i-1].length + "/" + tmp.length + ")"); 387 } 388 splits[i] = tmp; 389 } 390 final int size = splits[0].length; 391 CompositeInputSplit[] ret = new CompositeInputSplit[size]; 392 for (int i = 0; i < size; ++i) { 393 ret[i] = new CompositeInputSplit(splits.length); 394 for (int j = 0; j < splits.length; ++j) { 395 ret[i].add(splits[j][i]); 396 } 397 } 398 return ret; 399 } 400 401 @SuppressWarnings("unchecked") // child types unknowable 402 public ComposableRecordReader getRecordReader( 403 InputSplit split, JobConf job, Reporter reporter) throws IOException { 404 if (!(split instanceof CompositeInputSplit)) { 405 throw new IOException("Invalid split type:" + 406 split.getClass().getName()); 407 } 408 final CompositeInputSplit spl = (CompositeInputSplit)split; 409 final int capacity = kids.size(); 410 CompositeRecordReader ret = null; 411 try { 412 if (!rrCstrMap.containsKey(ident)) { 413 throw new IOException("No RecordReader for " + ident); 414 } 415 ret = (CompositeRecordReader) 416 rrCstrMap.get(ident).newInstance(id, job, capacity, cmpcl); 417 } catch (IllegalAccessException e) { 418 throw (IOException)new IOException().initCause(e); 419 } catch (InstantiationException e) { 420 throw (IOException)new IOException().initCause(e); 421 } catch (InvocationTargetException e) { 422 throw (IOException)new IOException().initCause(e); 423 } 424 for (int i = 0; i < capacity; ++i) { 425 ret.add(kids.get(i).getRecordReader(spl.get(i), job, reporter)); 426 } 427 return (ComposableRecordReader)ret; 428 } 429 430 /** 431 * Parse a list of comma-separated nodes. 432 */ 433 public void parse(List<Token> args, JobConf job) throws IOException { 434 ListIterator<Token> i = args.listIterator(); 435 while (i.hasNext()) { 436 Token t = i.next(); 437 t.getNode().setID(i.previousIndex() >> 1); 438 kids.add(t.getNode()); 439 if (i.hasNext() && !TType.COMMA.equals(i.next().getType())) { 440 throw new IOException("Expected ','"); 441 } 442 } 443 } 444 445 public String toString() { 446 StringBuilder sb = new StringBuilder(); 447 sb.append(ident + "("); 448 for (Node n : kids) { 449 sb.append(n.toString() + ","); 450 } 451 sb.setCharAt(sb.length() - 1, ')'); 452 return sb.toString(); 453 } 454 } 455 456 private static Token reduce(Stack<Token> st, JobConf job) throws IOException { 457 LinkedList<Token> args = new LinkedList<Token>(); 458 while (!st.isEmpty() && !TType.LPAREN.equals(st.peek().getType())) { 459 args.addFirst(st.pop()); 460 } 461 if (st.isEmpty()) { 462 throw new IOException("Unmatched ')'"); 463 } 464 st.pop(); 465 if (st.isEmpty() || !TType.IDENT.equals(st.peek().getType())) { 466 throw new IOException("Identifier expected"); 467 } 468 Node n = Node.forIdent(st.pop().getStr()); 469 n.parse(args, job); 470 return new NodeToken(n); 471 } 472 473 /** 474 * Given an expression and an optional comparator, build a tree of 475 * InputFormats using the comparator to sort keys. 476 */ 477 static Node parse(String expr, JobConf job) throws IOException { 478 if (null == expr) { 479 throw new IOException("Expression is null"); 480 } 481 Class<? extends WritableComparator> cmpcl = 482 job.getClass("mapred.join.keycomparator", null, WritableComparator.class); 483 Lexer lex = new Lexer(expr); 484 Stack<Token> st = new Stack<Token>(); 485 Token tok; 486 while ((tok = lex.next()) != null) { 487 if (TType.RPAREN.equals(tok.getType())) { 488 st.push(reduce(st, job)); 489 } else { 490 st.push(tok); 491 } 492 } 493 if (st.size() == 1 && TType.CIF.equals(st.peek().getType())) { 494 Node ret = st.pop().getNode(); 495 if (cmpcl != null) { 496 ret.setKeyComparator(cmpcl); 497 } 498 return ret; 499 } 500 throw new IOException("Missing ')'"); 501 } 502 503 }