001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapred.join;
020
021 import java.io.CharArrayReader;
022 import java.io.IOException;
023 import java.io.StreamTokenizer;
024 import java.lang.reflect.Constructor;
025 import java.lang.reflect.InvocationTargetException;
026 import java.util.ArrayList;
027 import java.util.HashMap;
028 import java.util.Iterator;
029 import java.util.LinkedList;
030 import java.util.List;
031 import java.util.ListIterator;
032 import java.util.Map;
033 import java.util.Stack;
034
035 import org.apache.hadoop.classification.InterfaceAudience;
036 import org.apache.hadoop.classification.InterfaceStability;
037 import org.apache.hadoop.io.WritableComparator;
038 import org.apache.hadoop.mapred.FileInputFormat;
039 import org.apache.hadoop.mapred.InputFormat;
040 import org.apache.hadoop.mapred.InputSplit;
041 import org.apache.hadoop.mapred.JobConf;
042 import org.apache.hadoop.mapred.RecordReader;
043 import org.apache.hadoop.mapred.Reporter;
044 import org.apache.hadoop.util.ReflectionUtils;
045
046 /**
047 * Very simple shift-reduce parser for join expressions.
048 *
049 * This should be sufficient for the user extension permitted now, but ought to
050 * be replaced with a parser generator if more complex grammars are supported.
051 * In particular, this "shift-reduce" parser has no states. Each set
052 * of formals requires a different internal node type, which is responsible for
053 * interpreting the list of tokens it receives. This is sufficient for the
054 * current grammar, but it has several annoying properties that might inhibit
055 * extension. In particular, parenthesis are always function calls; an
056 * algebraic or filter grammar would not only require a node type, but must
057 * also work around the internals of this parser.
058 *
059 * For most other cases, adding classes to the hierarchy- particularly by
060 * extending JoinRecordReader and MultiFilterRecordReader- is fairly
061 * straightforward. One need only override the relevant method(s) (usually only
062 * {@link CompositeRecordReader#combine}) and include a property to map its
063 * value to an identifier in the parser.
064 */
065 @InterfaceAudience.Public
066 @InterfaceStability.Evolving
067 public class Parser {
068 @InterfaceAudience.Public
069 @InterfaceStability.Evolving
070 public enum TType { CIF, IDENT, COMMA, LPAREN, RPAREN, QUOT, NUM, }
071
072 /**
073 * Tagged-union type for tokens from the join expression.
074 * @see Parser.TType
075 */
076 @InterfaceAudience.Public
077 @InterfaceStability.Evolving
078 public static class Token {
079
080 private TType type;
081
082 Token(TType type) {
083 this.type = type;
084 }
085
086 public TType getType() { return type; }
087 public Node getNode() throws IOException {
088 throw new IOException("Expected nodetype");
089 }
090 public double getNum() throws IOException {
091 throw new IOException("Expected numtype");
092 }
093 public String getStr() throws IOException {
094 throw new IOException("Expected strtype");
095 }
096 }
097
098 @InterfaceAudience.Public
099 @InterfaceStability.Evolving
100 public static class NumToken extends Token {
101 private double num;
102 public NumToken(double num) {
103 super(TType.NUM);
104 this.num = num;
105 }
106 public double getNum() { return num; }
107 }
108
109 @InterfaceAudience.Public
110 @InterfaceStability.Evolving
111 public static class NodeToken extends Token {
112 private Node node;
113 NodeToken(Node node) {
114 super(TType.CIF);
115 this.node = node;
116 }
117 public Node getNode() {
118 return node;
119 }
120 }
121
122 @InterfaceAudience.Public
123 @InterfaceStability.Evolving
124 public static class StrToken extends Token {
125 private String str;
126 public StrToken(TType type, String str) {
127 super(type);
128 this.str = str;
129 }
130 public String getStr() {
131 return str;
132 }
133 }
134
135 /**
136 * Simple lexer wrapping a StreamTokenizer.
137 * This encapsulates the creation of tagged-union Tokens and initializes the
138 * SteamTokenizer.
139 */
140 private static class Lexer {
141
142 private StreamTokenizer tok;
143
144 Lexer(String s) {
145 tok = new StreamTokenizer(new CharArrayReader(s.toCharArray()));
146 tok.quoteChar('"');
147 tok.parseNumbers();
148 tok.ordinaryChar(',');
149 tok.ordinaryChar('(');
150 tok.ordinaryChar(')');
151 tok.wordChars('$','$');
152 tok.wordChars('_','_');
153 }
154
155 Token next() throws IOException {
156 int type = tok.nextToken();
157 switch (type) {
158 case StreamTokenizer.TT_EOF:
159 case StreamTokenizer.TT_EOL:
160 return null;
161 case StreamTokenizer.TT_NUMBER:
162 return new NumToken(tok.nval);
163 case StreamTokenizer.TT_WORD:
164 return new StrToken(TType.IDENT, tok.sval);
165 case '"':
166 return new StrToken(TType.QUOT, tok.sval);
167 default:
168 switch (type) {
169 case ',':
170 return new Token(TType.COMMA);
171 case '(':
172 return new Token(TType.LPAREN);
173 case ')':
174 return new Token(TType.RPAREN);
175 default:
176 throw new IOException("Unexpected: " + type);
177 }
178 }
179 }
180 }
181
182 @InterfaceAudience.Public
183 @InterfaceStability.Evolving
184 public abstract static class Node implements ComposableInputFormat {
185 /**
186 * Return the node type registered for the particular identifier.
187 * By default, this is a CNode for any composite node and a WNode
188 * for "wrapped" nodes. User nodes will likely be composite
189 * nodes.
190 * @see #addIdentifier(java.lang.String, java.lang.Class[], java.lang.Class, java.lang.Class)
191 * @see CompositeInputFormat#setFormat(org.apache.hadoop.mapred.JobConf)
192 */
193 static Node forIdent(String ident) throws IOException {
194 try {
195 if (!nodeCstrMap.containsKey(ident)) {
196 throw new IOException("No nodetype for " + ident);
197 }
198 return nodeCstrMap.get(ident).newInstance(ident);
199 } catch (IllegalAccessException e) {
200 throw (IOException)new IOException().initCause(e);
201 } catch (InstantiationException e) {
202 throw (IOException)new IOException().initCause(e);
203 } catch (InvocationTargetException e) {
204 throw (IOException)new IOException().initCause(e);
205 }
206 }
207
208 private static final Class<?>[] ncstrSig = { String.class };
209 private static final
210 Map<String,Constructor<? extends Node>> nodeCstrMap =
211 new HashMap<String,Constructor<? extends Node>>();
212 protected static final
213 Map<String,Constructor<? extends ComposableRecordReader>> rrCstrMap =
214 new HashMap<String,Constructor<? extends ComposableRecordReader>>();
215
216 /**
217 * For a given identifier, add a mapping to the nodetype for the parse
218 * tree and to the ComposableRecordReader to be created, including the
219 * formals required to invoke the constructor.
220 * The nodetype and constructor signature should be filled in from the
221 * child node.
222 */
223 protected static void addIdentifier(String ident, Class<?>[] mcstrSig,
224 Class<? extends Node> nodetype,
225 Class<? extends ComposableRecordReader> cl)
226 throws NoSuchMethodException {
227 Constructor<? extends Node> ncstr =
228 nodetype.getDeclaredConstructor(ncstrSig);
229 ncstr.setAccessible(true);
230 nodeCstrMap.put(ident, ncstr);
231 Constructor<? extends ComposableRecordReader> mcstr =
232 cl.getDeclaredConstructor(mcstrSig);
233 mcstr.setAccessible(true);
234 rrCstrMap.put(ident, mcstr);
235 }
236
237 // inst
238 protected int id = -1;
239 protected String ident;
240 protected Class<? extends WritableComparator> cmpcl;
241
242 protected Node(String ident) {
243 this.ident = ident;
244 }
245
246 protected void setID(int id) {
247 this.id = id;
248 }
249
250 protected void setKeyComparator(Class<? extends WritableComparator> cmpcl) {
251 this.cmpcl = cmpcl;
252 }
253 abstract void parse(List<Token> args, JobConf job) throws IOException;
254 }
255
256 /**
257 * Nodetype in the parse tree for "wrapped" InputFormats.
258 */
259 static class WNode extends Node {
260 private static final Class<?>[] cstrSig =
261 { Integer.TYPE, RecordReader.class, Class.class };
262
263 static void addIdentifier(String ident,
264 Class<? extends ComposableRecordReader> cl)
265 throws NoSuchMethodException {
266 Node.addIdentifier(ident, cstrSig, WNode.class, cl);
267 }
268
269 private String indir;
270 private InputFormat inf;
271
272 public WNode(String ident) {
273 super(ident);
274 }
275
276 /**
277 * Let the first actual define the InputFormat and the second define
278 * the <tt>mapred.input.dir</tt> property.
279 */
280 public void parse(List<Token> ll, JobConf job) throws IOException {
281 StringBuilder sb = new StringBuilder();
282 Iterator<Token> i = ll.iterator();
283 while (i.hasNext()) {
284 Token t = i.next();
285 if (TType.COMMA.equals(t.getType())) {
286 try {
287 inf = (InputFormat)ReflectionUtils.newInstance(
288 job.getClassByName(sb.toString()),
289 job);
290 } catch (ClassNotFoundException e) {
291 throw (IOException)new IOException().initCause(e);
292 } catch (IllegalArgumentException e) {
293 throw (IOException)new IOException().initCause(e);
294 }
295 break;
296 }
297 sb.append(t.getStr());
298 }
299 if (!i.hasNext()) {
300 throw new IOException("Parse error");
301 }
302 Token t = i.next();
303 if (!TType.QUOT.equals(t.getType())) {
304 throw new IOException("Expected quoted string");
305 }
306 indir = t.getStr();
307 // no check for ll.isEmpty() to permit extension
308 }
309
310 private JobConf getConf(JobConf job) {
311 JobConf conf = new JobConf(job);
312 FileInputFormat.setInputPaths(conf, indir);
313 conf.setClassLoader(job.getClassLoader());
314 return conf;
315 }
316
317 public InputSplit[] getSplits(JobConf job, int numSplits)
318 throws IOException {
319 return inf.getSplits(getConf(job), numSplits);
320 }
321
322 public ComposableRecordReader getRecordReader(
323 InputSplit split, JobConf job, Reporter reporter) throws IOException {
324 try {
325 if (!rrCstrMap.containsKey(ident)) {
326 throw new IOException("No RecordReader for " + ident);
327 }
328 return rrCstrMap.get(ident).newInstance(id,
329 inf.getRecordReader(split, getConf(job), reporter), cmpcl);
330 } catch (IllegalAccessException e) {
331 throw (IOException)new IOException().initCause(e);
332 } catch (InstantiationException e) {
333 throw (IOException)new IOException().initCause(e);
334 } catch (InvocationTargetException e) {
335 throw (IOException)new IOException().initCause(e);
336 }
337 }
338
339 public String toString() {
340 return ident + "(" + inf.getClass().getName() + ",\"" + indir + "\")";
341 }
342 }
343
344 /**
345 * Internal nodetype for "composite" InputFormats.
346 */
347 static class CNode extends Node {
348
349 private static final Class<?>[] cstrSig =
350 { Integer.TYPE, JobConf.class, Integer.TYPE, Class.class };
351
352 static void addIdentifier(String ident,
353 Class<? extends ComposableRecordReader> cl)
354 throws NoSuchMethodException {
355 Node.addIdentifier(ident, cstrSig, CNode.class, cl);
356 }
357
358 // inst
359 private ArrayList<Node> kids = new ArrayList<Node>();
360
361 public CNode(String ident) {
362 super(ident);
363 }
364
365 public void setKeyComparator(Class<? extends WritableComparator> cmpcl) {
366 super.setKeyComparator(cmpcl);
367 for (Node n : kids) {
368 n.setKeyComparator(cmpcl);
369 }
370 }
371
372 /**
373 * Combine InputSplits from child InputFormats into a
374 * {@link CompositeInputSplit}.
375 */
376 public InputSplit[] getSplits(JobConf job, int numSplits)
377 throws IOException {
378 InputSplit[][] splits = new InputSplit[kids.size()][];
379 for (int i = 0; i < kids.size(); ++i) {
380 final InputSplit[] tmp = kids.get(i).getSplits(job, numSplits);
381 if (null == tmp) {
382 throw new IOException("Error gathering splits from child RReader");
383 }
384 if (i > 0 && splits[i-1].length != tmp.length) {
385 throw new IOException("Inconsistent split cardinality from child " +
386 i + " (" + splits[i-1].length + "/" + tmp.length + ")");
387 }
388 splits[i] = tmp;
389 }
390 final int size = splits[0].length;
391 CompositeInputSplit[] ret = new CompositeInputSplit[size];
392 for (int i = 0; i < size; ++i) {
393 ret[i] = new CompositeInputSplit(splits.length);
394 for (int j = 0; j < splits.length; ++j) {
395 ret[i].add(splits[j][i]);
396 }
397 }
398 return ret;
399 }
400
401 @SuppressWarnings("unchecked") // child types unknowable
402 public ComposableRecordReader getRecordReader(
403 InputSplit split, JobConf job, Reporter reporter) throws IOException {
404 if (!(split instanceof CompositeInputSplit)) {
405 throw new IOException("Invalid split type:" +
406 split.getClass().getName());
407 }
408 final CompositeInputSplit spl = (CompositeInputSplit)split;
409 final int capacity = kids.size();
410 CompositeRecordReader ret = null;
411 try {
412 if (!rrCstrMap.containsKey(ident)) {
413 throw new IOException("No RecordReader for " + ident);
414 }
415 ret = (CompositeRecordReader)
416 rrCstrMap.get(ident).newInstance(id, job, capacity, cmpcl);
417 } catch (IllegalAccessException e) {
418 throw (IOException)new IOException().initCause(e);
419 } catch (InstantiationException e) {
420 throw (IOException)new IOException().initCause(e);
421 } catch (InvocationTargetException e) {
422 throw (IOException)new IOException().initCause(e);
423 }
424 for (int i = 0; i < capacity; ++i) {
425 ret.add(kids.get(i).getRecordReader(spl.get(i), job, reporter));
426 }
427 return (ComposableRecordReader)ret;
428 }
429
430 /**
431 * Parse a list of comma-separated nodes.
432 */
433 public void parse(List<Token> args, JobConf job) throws IOException {
434 ListIterator<Token> i = args.listIterator();
435 while (i.hasNext()) {
436 Token t = i.next();
437 t.getNode().setID(i.previousIndex() >> 1);
438 kids.add(t.getNode());
439 if (i.hasNext() && !TType.COMMA.equals(i.next().getType())) {
440 throw new IOException("Expected ','");
441 }
442 }
443 }
444
445 public String toString() {
446 StringBuilder sb = new StringBuilder();
447 sb.append(ident + "(");
448 for (Node n : kids) {
449 sb.append(n.toString() + ",");
450 }
451 sb.setCharAt(sb.length() - 1, ')');
452 return sb.toString();
453 }
454 }
455
456 private static Token reduce(Stack<Token> st, JobConf job) throws IOException {
457 LinkedList<Token> args = new LinkedList<Token>();
458 while (!st.isEmpty() && !TType.LPAREN.equals(st.peek().getType())) {
459 args.addFirst(st.pop());
460 }
461 if (st.isEmpty()) {
462 throw new IOException("Unmatched ')'");
463 }
464 st.pop();
465 if (st.isEmpty() || !TType.IDENT.equals(st.peek().getType())) {
466 throw new IOException("Identifier expected");
467 }
468 Node n = Node.forIdent(st.pop().getStr());
469 n.parse(args, job);
470 return new NodeToken(n);
471 }
472
473 /**
474 * Given an expression and an optional comparator, build a tree of
475 * InputFormats using the comparator to sort keys.
476 */
477 static Node parse(String expr, JobConf job) throws IOException {
478 if (null == expr) {
479 throw new IOException("Expression is null");
480 }
481 Class<? extends WritableComparator> cmpcl =
482 job.getClass("mapred.join.keycomparator", null, WritableComparator.class);
483 Lexer lex = new Lexer(expr);
484 Stack<Token> st = new Stack<Token>();
485 Token tok;
486 while ((tok = lex.next()) != null) {
487 if (TType.RPAREN.equals(tok.getType())) {
488 st.push(reduce(st, job));
489 } else {
490 st.push(tok);
491 }
492 }
493 if (st.size() == 1 && TType.CIF.equals(st.peek().getType())) {
494 Node ret = st.pop().getNode();
495 if (cmpcl != null) {
496 ret.setKeyComparator(cmpcl);
497 }
498 return ret;
499 }
500 throw new IOException("Missing ')'");
501 }
502
503 }