001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapreduce.lib.join;
020
021 import java.io.CharArrayReader;
022 import java.io.IOException;
023 import java.io.StreamTokenizer;
024 import java.lang.reflect.Constructor;
025 import java.lang.reflect.InvocationTargetException;
026 import java.util.ArrayList;
027 import java.util.HashMap;
028 import java.util.Iterator;
029 import java.util.LinkedList;
030 import java.util.List;
031 import java.util.ListIterator;
032 import java.util.Map;
033 import java.util.Stack;
034
035 import org.apache.hadoop.classification.InterfaceAudience;
036 import org.apache.hadoop.classification.InterfaceStability;
037 import org.apache.hadoop.conf.Configuration;
038 import org.apache.hadoop.io.WritableComparator;
039 import org.apache.hadoop.mapreduce.Counter;
040 import org.apache.hadoop.mapreduce.InputFormat;
041 import org.apache.hadoop.mapreduce.InputSplit;
042 import org.apache.hadoop.mapreduce.Job;
043 import org.apache.hadoop.mapreduce.JobContext;
044 import org.apache.hadoop.mapreduce.MRJobConfig;
045 import org.apache.hadoop.mapreduce.RecordReader;
046 import org.apache.hadoop.mapreduce.StatusReporter;
047 import org.apache.hadoop.mapreduce.TaskAttemptContext;
048 import org.apache.hadoop.mapreduce.TaskAttemptID;
049 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
050 import org.apache.hadoop.mapreduce.task.JobContextImpl;
051 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
052 import org.apache.hadoop.util.ReflectionUtils;
053
054 /**
055 * Very simple shift-reduce parser for join expressions.
056 *
057 * This should be sufficient for the user extension permitted now, but ought to
058 * be replaced with a parser generator if more complex grammars are supported.
059 * In particular, this "shift-reduce" parser has no states. Each set
060 * of formals requires a different internal node type, which is responsible for
061 * interpreting the list of tokens it receives. This is sufficient for the
062 * current grammar, but it has several annoying properties that might inhibit
063 * extension. In particular, parenthesis are always function calls; an
064 * algebraic or filter grammar would not only require a node type, but must
065 * also work around the internals of this parser.
066 *
067 * For most other cases, adding classes to the hierarchy- particularly by
068 * extending JoinRecordReader and MultiFilterRecordReader- is fairly
069 * straightforward. One need only override the relevant method(s) (usually only
070 * {@link CompositeRecordReader#combine}) and include a property to map its
071 * value to an identifier in the parser.
072 */
073 @InterfaceAudience.Public
074 @InterfaceStability.Evolving
075 public class Parser {
076 @InterfaceAudience.Public
077 @InterfaceStability.Evolving
078 public enum TType { CIF, IDENT, COMMA, LPAREN, RPAREN, QUOT, NUM, }
079
080 /**
081 * Tagged-union type for tokens from the join expression.
082 * @see Parser.TType
083 */
084 @InterfaceAudience.Public
085 @InterfaceStability.Evolving
086 public static class Token {
087
088 private TType type;
089
090 Token(TType type) {
091 this.type = type;
092 }
093
094 public TType getType() { return type; }
095
096 public Node getNode() throws IOException {
097 throw new IOException("Expected nodetype");
098 }
099
100 public double getNum() throws IOException {
101 throw new IOException("Expected numtype");
102 }
103
104 public String getStr() throws IOException {
105 throw new IOException("Expected strtype");
106 }
107 }
108
109 @InterfaceAudience.Public
110 @InterfaceStability.Evolving
111 public static class NumToken extends Token {
112 private double num;
113 public NumToken(double num) {
114 super(TType.NUM);
115 this.num = num;
116 }
117 public double getNum() { return num; }
118 }
119
120 @InterfaceAudience.Public
121 @InterfaceStability.Evolving
122 public static class NodeToken extends Token {
123 private Node node;
124 NodeToken(Node node) {
125 super(TType.CIF);
126 this.node = node;
127 }
128 public Node getNode() {
129 return node;
130 }
131 }
132
133 @InterfaceAudience.Public
134 @InterfaceStability.Evolving
135 public static class StrToken extends Token {
136 private String str;
137 public StrToken(TType type, String str) {
138 super(type);
139 this.str = str;
140 }
141 public String getStr() {
142 return str;
143 }
144 }
145
146 /**
147 * Simple lexer wrapping a StreamTokenizer.
148 * This encapsulates the creation of tagged-union Tokens and initializes the
149 * SteamTokenizer.
150 */
151 private static class Lexer {
152
153 private StreamTokenizer tok;
154
155 Lexer(String s) {
156 tok = new StreamTokenizer(new CharArrayReader(s.toCharArray()));
157 tok.quoteChar('"');
158 tok.parseNumbers();
159 tok.ordinaryChar(',');
160 tok.ordinaryChar('(');
161 tok.ordinaryChar(')');
162 tok.wordChars('$','$');
163 tok.wordChars('_','_');
164 }
165
166 Token next() throws IOException {
167 int type = tok.nextToken();
168 switch (type) {
169 case StreamTokenizer.TT_EOF:
170 case StreamTokenizer.TT_EOL:
171 return null;
172 case StreamTokenizer.TT_NUMBER:
173 return new NumToken(tok.nval);
174 case StreamTokenizer.TT_WORD:
175 return new StrToken(TType.IDENT, tok.sval);
176 case '"':
177 return new StrToken(TType.QUOT, tok.sval);
178 default:
179 switch (type) {
180 case ',':
181 return new Token(TType.COMMA);
182 case '(':
183 return new Token(TType.LPAREN);
184 case ')':
185 return new Token(TType.RPAREN);
186 default:
187 throw new IOException("Unexpected: " + type);
188 }
189 }
190 }
191 }
192
193 @SuppressWarnings("unchecked")
194 @InterfaceAudience.Public
195 @InterfaceStability.Evolving
196 public abstract static class Node extends ComposableInputFormat {
197 /**
198 * Return the node type registered for the particular identifier.
199 * By default, this is a CNode for any composite node and a WNode
200 * for "wrapped" nodes. User nodes will likely be composite
201 * nodes.
202 * @see #addIdentifier(java.lang.String, java.lang.Class[], java.lang.Class, java.lang.Class)
203 * @see CompositeInputFormat#setFormat(org.apache.hadoop.mapred.JobConf)
204 */
205 static Node forIdent(String ident) throws IOException {
206 try {
207 if (!nodeCstrMap.containsKey(ident)) {
208 throw new IOException("No nodetype for " + ident);
209 }
210 return nodeCstrMap.get(ident).newInstance(ident);
211 } catch (IllegalAccessException e) {
212 throw new IOException(e);
213 } catch (InstantiationException e) {
214 throw new IOException(e);
215 } catch (InvocationTargetException e) {
216 throw new IOException(e);
217 }
218 }
219
220 private static final Class<?>[] ncstrSig = { String.class };
221 private static final
222 Map<String,Constructor<? extends Node>> nodeCstrMap =
223 new HashMap<String,Constructor<? extends Node>>();
224 protected static final Map<String,Constructor<? extends
225 ComposableRecordReader>> rrCstrMap =
226 new HashMap<String,Constructor<? extends ComposableRecordReader>>();
227
228 /**
229 * For a given identifier, add a mapping to the nodetype for the parse
230 * tree and to the ComposableRecordReader to be created, including the
231 * formals required to invoke the constructor.
232 * The nodetype and constructor signature should be filled in from the
233 * child node.
234 */
235 protected static void addIdentifier(String ident, Class<?>[] mcstrSig,
236 Class<? extends Node> nodetype,
237 Class<? extends ComposableRecordReader> cl)
238 throws NoSuchMethodException {
239 Constructor<? extends Node> ncstr =
240 nodetype.getDeclaredConstructor(ncstrSig);
241 ncstr.setAccessible(true);
242 nodeCstrMap.put(ident, ncstr);
243 Constructor<? extends ComposableRecordReader> mcstr =
244 cl.getDeclaredConstructor(mcstrSig);
245 mcstr.setAccessible(true);
246 rrCstrMap.put(ident, mcstr);
247 }
248
249 // inst
250 protected int id = -1;
251 protected String ident;
252 protected Class<? extends WritableComparator> cmpcl;
253
254 protected Node(String ident) {
255 this.ident = ident;
256 }
257
258 protected void setID(int id) {
259 this.id = id;
260 }
261
262 protected void setKeyComparator(
263 Class<? extends WritableComparator> cmpcl) {
264 this.cmpcl = cmpcl;
265 }
266 abstract void parse(List<Token> args, Configuration conf)
267 throws IOException;
268 }
269
270 /**
271 * Nodetype in the parse tree for "wrapped" InputFormats.
272 */
273 static class WNode extends Node {
274 private static final Class<?>[] cstrSig =
275 { Integer.TYPE, RecordReader.class, Class.class };
276
277 @SuppressWarnings("unchecked")
278 static void addIdentifier(String ident,
279 Class<? extends ComposableRecordReader> cl)
280 throws NoSuchMethodException {
281 Node.addIdentifier(ident, cstrSig, WNode.class, cl);
282 }
283
284 private String indir;
285 private InputFormat<?, ?> inf;
286
287 public WNode(String ident) {
288 super(ident);
289 }
290
291 /**
292 * Let the first actual define the InputFormat and the second define
293 * the <tt>mapred.input.dir</tt> property.
294 */
295 @Override
296 public void parse(List<Token> ll, Configuration conf) throws IOException {
297 StringBuilder sb = new StringBuilder();
298 Iterator<Token> i = ll.iterator();
299 while (i.hasNext()) {
300 Token t = i.next();
301 if (TType.COMMA.equals(t.getType())) {
302 try {
303 inf = (InputFormat<?, ?>)ReflectionUtils.newInstance(
304 conf.getClassByName(sb.toString()), conf);
305 } catch (ClassNotFoundException e) {
306 throw new IOException(e);
307 } catch (IllegalArgumentException e) {
308 throw new IOException(e);
309 }
310 break;
311 }
312 sb.append(t.getStr());
313 }
314 if (!i.hasNext()) {
315 throw new IOException("Parse error");
316 }
317 Token t = i.next();
318 if (!TType.QUOT.equals(t.getType())) {
319 throw new IOException("Expected quoted string");
320 }
321 indir = t.getStr();
322 // no check for ll.isEmpty() to permit extension
323 }
324
325 private Configuration getConf(Configuration jconf) throws IOException {
326 Job job = new Job(jconf);
327 FileInputFormat.setInputPaths(job, indir);
328 return job.getConfiguration();
329 }
330
331 public List<InputSplit> getSplits(JobContext context)
332 throws IOException, InterruptedException {
333 return inf.getSplits(
334 new JobContextImpl(getConf(context.getConfiguration()),
335 context.getJobID()));
336 }
337
338 public ComposableRecordReader<?, ?> createRecordReader(InputSplit split,
339 TaskAttemptContext taskContext)
340 throws IOException, InterruptedException {
341 try {
342 if (!rrCstrMap.containsKey(ident)) {
343 throw new IOException("No RecordReader for " + ident);
344 }
345 Configuration conf = getConf(taskContext.getConfiguration());
346 TaskAttemptContext context =
347 new TaskAttemptContextImpl(conf,
348 TaskAttemptID.forName(conf.get(MRJobConfig.TASK_ATTEMPT_ID)),
349 new WrappedStatusReporter(taskContext));
350 return rrCstrMap.get(ident).newInstance(id,
351 inf.createRecordReader(split, context), cmpcl);
352 } catch (IllegalAccessException e) {
353 throw new IOException(e);
354 } catch (InstantiationException e) {
355 throw new IOException(e);
356 } catch (InvocationTargetException e) {
357 throw new IOException(e);
358 }
359 }
360
361 public String toString() {
362 return ident + "(" + inf.getClass().getName() + ",\"" + indir + "\")";
363 }
364 }
365
366 private static class WrappedStatusReporter extends StatusReporter {
367
368 TaskAttemptContext context;
369
370 public WrappedStatusReporter(TaskAttemptContext context) {
371 this.context = context;
372 }
373 @Override
374 public Counter getCounter(Enum<?> name) {
375 return context.getCounter(name);
376 }
377
378 @Override
379 public Counter getCounter(String group, String name) {
380 return context.getCounter(group, name);
381 }
382
383 @Override
384 public void progress() {
385 context.progress();
386 }
387
388 @Override
389 public float getProgress() {
390 return context.getProgress();
391 }
392
393 @Override
394 public void setStatus(String status) {
395 context.setStatus(status);
396 }
397 }
398
399 /**
400 * Internal nodetype for "composite" InputFormats.
401 */
402 static class CNode extends Node {
403
404 private static final Class<?>[] cstrSig =
405 { Integer.TYPE, Configuration.class, Integer.TYPE, Class.class };
406
407 @SuppressWarnings("unchecked")
408 static void addIdentifier(String ident,
409 Class<? extends ComposableRecordReader> cl)
410 throws NoSuchMethodException {
411 Node.addIdentifier(ident, cstrSig, CNode.class, cl);
412 }
413
414 // inst
415 private ArrayList<Node> kids = new ArrayList<Node>();
416
417 public CNode(String ident) {
418 super(ident);
419 }
420
421 @Override
422 public void setKeyComparator(Class<? extends WritableComparator> cmpcl) {
423 super.setKeyComparator(cmpcl);
424 for (Node n : kids) {
425 n.setKeyComparator(cmpcl);
426 }
427 }
428
429 /**
430 * Combine InputSplits from child InputFormats into a
431 * {@link CompositeInputSplit}.
432 */
433 @SuppressWarnings("unchecked")
434 public List<InputSplit> getSplits(JobContext job)
435 throws IOException, InterruptedException {
436 List<List<InputSplit>> splits =
437 new ArrayList<List<InputSplit>>(kids.size());
438 for (int i = 0; i < kids.size(); ++i) {
439 List<InputSplit> tmp = kids.get(i).getSplits(job);
440 if (null == tmp) {
441 throw new IOException("Error gathering splits from child RReader");
442 }
443 if (i > 0 && splits.get(i-1).size() != tmp.size()) {
444 throw new IOException("Inconsistent split cardinality from child " +
445 i + " (" + splits.get(i-1).size() + "/" + tmp.size() + ")");
446 }
447 splits.add(i, tmp);
448 }
449 final int size = splits.get(0).size();
450 List<InputSplit> ret = new ArrayList<InputSplit>();
451 for (int i = 0; i < size; ++i) {
452 CompositeInputSplit split = new CompositeInputSplit(splits.size());
453 for (int j = 0; j < splits.size(); ++j) {
454 split.add(splits.get(j).get(i));
455 }
456 ret.add(split);
457 }
458 return ret;
459 }
460
461 @SuppressWarnings("unchecked") // child types unknowable
462 public ComposableRecordReader
463 createRecordReader(InputSplit split, TaskAttemptContext taskContext)
464 throws IOException, InterruptedException {
465 if (!(split instanceof CompositeInputSplit)) {
466 throw new IOException("Invalid split type:" +
467 split.getClass().getName());
468 }
469 final CompositeInputSplit spl = (CompositeInputSplit)split;
470 final int capacity = kids.size();
471 CompositeRecordReader ret = null;
472 try {
473 if (!rrCstrMap.containsKey(ident)) {
474 throw new IOException("No RecordReader for " + ident);
475 }
476 ret = (CompositeRecordReader)rrCstrMap.get(ident).
477 newInstance(id, taskContext.getConfiguration(), capacity, cmpcl);
478 } catch (IllegalAccessException e) {
479 throw new IOException(e);
480 } catch (InstantiationException e) {
481 throw new IOException(e);
482 } catch (InvocationTargetException e) {
483 throw new IOException(e);
484 }
485 for (int i = 0; i < capacity; ++i) {
486 ret.add(kids.get(i).createRecordReader(spl.get(i), taskContext));
487 }
488 return (ComposableRecordReader)ret;
489 }
490
491 /**
492 * Parse a list of comma-separated nodes.
493 */
494 public void parse(List<Token> args, Configuration conf)
495 throws IOException {
496 ListIterator<Token> i = args.listIterator();
497 while (i.hasNext()) {
498 Token t = i.next();
499 t.getNode().setID(i.previousIndex() >> 1);
500 kids.add(t.getNode());
501 if (i.hasNext() && !TType.COMMA.equals(i.next().getType())) {
502 throw new IOException("Expected ','");
503 }
504 }
505 }
506
507 public String toString() {
508 StringBuilder sb = new StringBuilder();
509 sb.append(ident + "(");
510 for (Node n : kids) {
511 sb.append(n.toString() + ",");
512 }
513 sb.setCharAt(sb.length() - 1, ')');
514 return sb.toString();
515 }
516 }
517
518 private static Token reduce(Stack<Token> st, Configuration conf)
519 throws IOException {
520 LinkedList<Token> args = new LinkedList<Token>();
521 while (!st.isEmpty() && !TType.LPAREN.equals(st.peek().getType())) {
522 args.addFirst(st.pop());
523 }
524 if (st.isEmpty()) {
525 throw new IOException("Unmatched ')'");
526 }
527 st.pop();
528 if (st.isEmpty() || !TType.IDENT.equals(st.peek().getType())) {
529 throw new IOException("Identifier expected");
530 }
531 Node n = Node.forIdent(st.pop().getStr());
532 n.parse(args, conf);
533 return new NodeToken(n);
534 }
535
536 /**
537 * Given an expression and an optional comparator, build a tree of
538 * InputFormats using the comparator to sort keys.
539 */
540 static Node parse(String expr, Configuration conf) throws IOException {
541 if (null == expr) {
542 throw new IOException("Expression is null");
543 }
544 Class<? extends WritableComparator> cmpcl = conf.getClass(
545 CompositeInputFormat.JOIN_COMPARATOR, null, WritableComparator.class);
546 Lexer lex = new Lexer(expr);
547 Stack<Token> st = new Stack<Token>();
548 Token tok;
549 while ((tok = lex.next()) != null) {
550 if (TType.RPAREN.equals(tok.getType())) {
551 st.push(reduce(st, conf));
552 } else {
553 st.push(tok);
554 }
555 }
556 if (st.size() == 1 && TType.CIF.equals(st.peek().getType())) {
557 Node ret = st.pop().getNode();
558 if (cmpcl != null) {
559 ret.setKeyComparator(cmpcl);
560 }
561 return ret;
562 }
563 throw new IOException("Missing ')'");
564 }
565
566 }