001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.lib.join;
020
021import java.io.CharArrayReader;
022import java.io.IOException;
023import java.io.StreamTokenizer;
024import java.lang.reflect.Constructor;
025import java.lang.reflect.InvocationTargetException;
026import java.util.ArrayList;
027import java.util.HashMap;
028import java.util.Iterator;
029import java.util.LinkedList;
030import java.util.List;
031import java.util.ListIterator;
032import java.util.Map;
033import java.util.Stack;
034
035import org.apache.hadoop.classification.InterfaceAudience;
036import org.apache.hadoop.classification.InterfaceStability;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.io.WritableComparator;
039import org.apache.hadoop.mapreduce.Counter;
040import org.apache.hadoop.mapreduce.InputFormat;
041import org.apache.hadoop.mapreduce.InputSplit;
042import org.apache.hadoop.mapreduce.Job;
043import org.apache.hadoop.mapreduce.JobContext;
044import org.apache.hadoop.mapreduce.MRJobConfig;
045import org.apache.hadoop.mapreduce.RecordReader;
046import org.apache.hadoop.mapreduce.StatusReporter;
047import org.apache.hadoop.mapreduce.TaskAttemptContext;
048import org.apache.hadoop.mapreduce.TaskAttemptID;
049import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
050import org.apache.hadoop.mapreduce.task.JobContextImpl;
051import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
052import org.apache.hadoop.util.ReflectionUtils;
053
054/**
055 * Very simple shift-reduce parser for join expressions.
056 *
057 * This should be sufficient for the user extension permitted now, but ought to
058 * be replaced with a parser generator if more complex grammars are supported.
059 * In particular, this "shift-reduce" parser has no states. Each set
060 * of formals requires a different internal node type, which is responsible for
061 * interpreting the list of tokens it receives. This is sufficient for the
062 * current grammar, but it has several annoying properties that might inhibit
063 * extension. In particular, parenthesis are always function calls; an
064 * algebraic or filter grammar would not only require a node type, but must
065 * also work around the internals of this parser.
066 *
067 * For most other cases, adding classes to the hierarchy- particularly by
068 * extending JoinRecordReader and MultiFilterRecordReader- is fairly
069 * straightforward. One need only override the relevant method(s) (usually only
070 * {@link CompositeRecordReader#combine}) and include a property to map its
071 * value to an identifier in the parser.
072 */
073@InterfaceAudience.Public
074@InterfaceStability.Evolving
075public class Parser {
076  @InterfaceAudience.Public
077  @InterfaceStability.Evolving
078  public enum TType { CIF, IDENT, COMMA, LPAREN, RPAREN, QUOT, NUM, }
079
080  /**
081   * Tagged-union type for tokens from the join expression.
082   * @see Parser.TType
083   */
084  @InterfaceAudience.Public
085  @InterfaceStability.Evolving
086  public static class Token {
087
088    private TType type;
089
090    Token(TType type) {
091      this.type = type;
092    }
093
094    public TType getType() { return type; }
095    
096    public Node getNode() throws IOException {
097      throw new IOException("Expected nodetype");
098    }
099    
100    public double getNum() throws IOException {
101      throw new IOException("Expected numtype");
102    }
103    
104    public String getStr() throws IOException {
105      throw new IOException("Expected strtype");
106    }
107  }
108
109  @InterfaceAudience.Public
110  @InterfaceStability.Evolving
111  public static class NumToken extends Token {
112    private double num;
113    public NumToken(double num) {
114      super(TType.NUM);
115      this.num = num;
116    }
117    public double getNum() { return num; }
118  }
119
120  @InterfaceAudience.Public
121  @InterfaceStability.Evolving
122  public static class NodeToken extends Token {
123    private Node node;
124    NodeToken(Node node) {
125      super(TType.CIF);
126      this.node = node;
127    }
128    public Node getNode() {
129      return node;
130    }
131  }
132
133  @InterfaceAudience.Public
134  @InterfaceStability.Evolving
135  public static class StrToken extends Token {
136    private String str;
137    public StrToken(TType type, String str) {
138      super(type);
139      this.str = str;
140    }
141    public String getStr() {
142      return str;
143    }
144  }
145
146  /**
147   * Simple lexer wrapping a StreamTokenizer.
148   * This encapsulates the creation of tagged-union Tokens and initializes the
149   * SteamTokenizer.
150   */
151  private static class Lexer {
152
153    private StreamTokenizer tok;
154
155    Lexer(String s) {
156      tok = new StreamTokenizer(new CharArrayReader(s.toCharArray()));
157      tok.quoteChar('"');
158      tok.parseNumbers();
159      tok.ordinaryChar(',');
160      tok.ordinaryChar('(');
161      tok.ordinaryChar(')');
162      tok.wordChars('$','$');
163      tok.wordChars('_','_');
164    }
165
166    Token next() throws IOException {
167      int type = tok.nextToken();
168      switch (type) {
169        case StreamTokenizer.TT_EOF:
170        case StreamTokenizer.TT_EOL:
171          return null;
172        case StreamTokenizer.TT_NUMBER:
173          return new NumToken(tok.nval);
174        case StreamTokenizer.TT_WORD:
175          return new StrToken(TType.IDENT, tok.sval);
176        case '"':
177          return new StrToken(TType.QUOT, tok.sval);
178        default:
179          switch (type) {
180            case ',':
181              return new Token(TType.COMMA);
182            case '(':
183              return new Token(TType.LPAREN);
184            case ')':
185              return new Token(TType.RPAREN);
186            default:
187              throw new IOException("Unexpected: " + type);
188          }
189      }
190    }
191  }
192
193@SuppressWarnings("unchecked")
194@InterfaceAudience.Public
195@InterfaceStability.Evolving
196public abstract static class Node extends ComposableInputFormat {
197    /**
198     * Return the node type registered for the particular identifier.
199     * By default, this is a CNode for any composite node and a WNode
200     * for "wrapped" nodes. User nodes will likely be composite
201     * nodes.
202     * @see #addIdentifier(java.lang.String, java.lang.Class[], java.lang.Class, java.lang.Class)
203     * @see CompositeInputFormat#setFormat(org.apache.hadoop.mapred.JobConf)
204     */
205    static Node forIdent(String ident) throws IOException {
206      try {
207        if (!nodeCstrMap.containsKey(ident)) {
208          throw new IOException("No nodetype for " + ident);
209        }
210        return nodeCstrMap.get(ident).newInstance(ident);
211      } catch (IllegalAccessException e) {
212        throw new IOException(e);
213      } catch (InstantiationException e) {
214        throw new IOException(e);
215      } catch (InvocationTargetException e) {
216        throw new IOException(e);
217      }
218    }
219
220    private static final Class<?>[] ncstrSig = { String.class };
221    private static final
222        Map<String,Constructor<? extends Node>> nodeCstrMap =
223        new HashMap<String,Constructor<? extends Node>>();
224    protected static final Map<String,Constructor<? extends 
225        ComposableRecordReader>> rrCstrMap =
226        new HashMap<String,Constructor<? extends ComposableRecordReader>>();
227
228    /**
229     * For a given identifier, add a mapping to the nodetype for the parse
230     * tree and to the ComposableRecordReader to be created, including the
231     * formals required to invoke the constructor.
232     * The nodetype and constructor signature should be filled in from the
233     * child node.
234     */
235    protected static void addIdentifier(String ident, Class<?>[] mcstrSig,
236                              Class<? extends Node> nodetype,
237                              Class<? extends ComposableRecordReader> cl)
238        throws NoSuchMethodException {
239      Constructor<? extends Node> ncstr =
240        nodetype.getDeclaredConstructor(ncstrSig);
241      ncstr.setAccessible(true);
242      nodeCstrMap.put(ident, ncstr);
243      Constructor<? extends ComposableRecordReader> mcstr =
244        cl.getDeclaredConstructor(mcstrSig);
245      mcstr.setAccessible(true);
246      rrCstrMap.put(ident, mcstr);
247    }
248
249    // inst
250    protected int id = -1;
251    protected String ident;
252    protected Class<? extends WritableComparator> cmpcl;
253
254    protected Node(String ident) {
255      this.ident = ident;
256    }
257
258    protected void setID(int id) {
259      this.id = id;
260    }
261
262    protected void setKeyComparator(
263        Class<? extends WritableComparator> cmpcl) {
264      this.cmpcl = cmpcl;
265    }
266    abstract void parse(List<Token> args, Configuration conf) 
267        throws IOException;
268  }
269
270  /**
271   * Nodetype in the parse tree for &quot;wrapped&quot; InputFormats.
272   */
273  static class WNode extends Node {
274    private static final Class<?>[] cstrSig =
275      { Integer.TYPE, RecordReader.class, Class.class };
276
277    @SuppressWarnings("unchecked")
278        static void addIdentifier(String ident,
279                              Class<? extends ComposableRecordReader> cl)
280        throws NoSuchMethodException {
281      Node.addIdentifier(ident, cstrSig, WNode.class, cl);
282    }
283
284    private String indir;
285    private InputFormat<?, ?> inf;
286
287    public WNode(String ident) {
288      super(ident);
289    }
290
291    /**
292     * Let the first actual define the InputFormat and the second define
293     * the <tt>mapred.input.dir</tt> property.
294     */
295    @Override
296    public void parse(List<Token> ll, Configuration conf) throws IOException {
297      StringBuilder sb = new StringBuilder();
298      Iterator<Token> i = ll.iterator();
299      while (i.hasNext()) {
300        Token t = i.next();
301        if (TType.COMMA.equals(t.getType())) {
302          try {
303                inf = (InputFormat<?, ?>)ReflectionUtils.newInstance(
304                                conf.getClassByName(sb.toString()), conf);
305          } catch (ClassNotFoundException e) {
306            throw new IOException(e);
307          } catch (IllegalArgumentException e) {
308            throw new IOException(e);
309          }
310          break;
311        }
312        sb.append(t.getStr());
313      }
314      if (!i.hasNext()) {
315        throw new IOException("Parse error");
316      }
317      Token t = i.next();
318      if (!TType.QUOT.equals(t.getType())) {
319        throw new IOException("Expected quoted string");
320      }
321      indir = t.getStr();
322      // no check for ll.isEmpty() to permit extension
323    }
324
325    private Configuration getConf(Configuration jconf) throws IOException {
326      Job job = Job.getInstance(jconf);
327      FileInputFormat.setInputPaths(job, indir);
328      return job.getConfiguration();
329    }
330    
331    public List<InputSplit> getSplits(JobContext context)
332        throws IOException, InterruptedException {
333      return inf.getSplits(
334                 new JobContextImpl(getConf(context.getConfiguration()), 
335                                    context.getJobID()));
336    }
337
338    public ComposableRecordReader<?, ?> createRecordReader(InputSplit split, 
339        TaskAttemptContext taskContext) 
340        throws IOException, InterruptedException {
341      try {
342        if (!rrCstrMap.containsKey(ident)) {
343          throw new IOException("No RecordReader for " + ident);
344        }
345        Configuration conf = getConf(taskContext.getConfiguration());
346        TaskAttemptContext context = 
347          new TaskAttemptContextImpl(conf, 
348              TaskAttemptID.forName(conf.get(MRJobConfig.TASK_ATTEMPT_ID)), 
349              new WrappedStatusReporter(taskContext));
350        return rrCstrMap.get(ident).newInstance(id,
351            inf.createRecordReader(split, context), cmpcl);
352      } catch (IllegalAccessException e) {
353        throw new IOException(e);
354      } catch (InstantiationException e) {
355        throw new IOException(e);
356      } catch (InvocationTargetException e) {
357        throw new IOException(e);
358      }
359    }
360
361    public String toString() {
362      return ident + "(" + inf.getClass().getName() + ",\"" + indir + "\")";
363    }
364  }
365
366  private static class WrappedStatusReporter extends StatusReporter {
367
368    TaskAttemptContext context;
369    
370    public WrappedStatusReporter(TaskAttemptContext context) {
371      this.context = context; 
372    }
373    @Override
374    public Counter getCounter(Enum<?> name) {
375      return context.getCounter(name);
376    }
377
378    @Override
379    public Counter getCounter(String group, String name) {
380      return context.getCounter(group, name);
381    }
382
383    @Override
384    public void progress() {
385      context.progress();
386    }
387
388    @Override
389    public float getProgress() {
390      return context.getProgress();
391    }
392    
393    @Override
394    public void setStatus(String status) {
395      context.setStatus(status);
396    }
397  }
398
399  /**
400   * Internal nodetype for &quot;composite&quot; InputFormats.
401   */
402  static class CNode extends Node {
403
404    private static final Class<?>[] cstrSig =
405      { Integer.TYPE, Configuration.class, Integer.TYPE, Class.class };
406
407    @SuppressWarnings("unchecked")
408        static void addIdentifier(String ident,
409                              Class<? extends ComposableRecordReader> cl)
410        throws NoSuchMethodException {
411      Node.addIdentifier(ident, cstrSig, CNode.class, cl);
412    }
413
414    // inst
415    private ArrayList<Node> kids = new ArrayList<Node>();
416
417    public CNode(String ident) {
418      super(ident);
419    }
420
421    @Override
422    public void setKeyComparator(Class<? extends WritableComparator> cmpcl) {
423      super.setKeyComparator(cmpcl);
424      for (Node n : kids) {
425        n.setKeyComparator(cmpcl);
426      }
427    }
428
429    /**
430     * Combine InputSplits from child InputFormats into a
431     * {@link CompositeInputSplit}.
432     */
433    @SuppressWarnings("unchecked")
434        public List<InputSplit> getSplits(JobContext job)
435        throws IOException, InterruptedException {
436      List<List<InputSplit>> splits = 
437        new ArrayList<List<InputSplit>>(kids.size());
438      for (int i = 0; i < kids.size(); ++i) {
439        List<InputSplit> tmp = kids.get(i).getSplits(job);
440        if (null == tmp) {
441          throw new IOException("Error gathering splits from child RReader");
442        }
443        if (i > 0 && splits.get(i-1).size() != tmp.size()) {
444          throw new IOException("Inconsistent split cardinality from child " +
445              i + " (" + splits.get(i-1).size() + "/" + tmp.size() + ")");
446        }
447        splits.add(i, tmp);
448      }
449      final int size = splits.get(0).size();
450      List<InputSplit> ret = new ArrayList<InputSplit>();
451      for (int i = 0; i < size; ++i) {
452        CompositeInputSplit split = new CompositeInputSplit(splits.size());
453        for (int j = 0; j < splits.size(); ++j) {
454          split.add(splits.get(j).get(i));
455        }
456        ret.add(split);
457      }
458      return ret;
459    }
460
461    @SuppressWarnings("unchecked") // child types unknowable
462    public ComposableRecordReader 
463        createRecordReader(InputSplit split, TaskAttemptContext taskContext) 
464        throws IOException, InterruptedException {
465      if (!(split instanceof CompositeInputSplit)) {
466        throw new IOException("Invalid split type:" +
467                              split.getClass().getName());
468      }
469      final CompositeInputSplit spl = (CompositeInputSplit)split;
470      final int capacity = kids.size();
471      CompositeRecordReader ret = null;
472      try {
473        if (!rrCstrMap.containsKey(ident)) {
474          throw new IOException("No RecordReader for " + ident);
475        }
476        ret = (CompositeRecordReader)rrCstrMap.get(ident).
477          newInstance(id, taskContext.getConfiguration(), capacity, cmpcl);
478      } catch (IllegalAccessException e) {
479        throw new IOException(e);
480      } catch (InstantiationException e) {
481        throw new IOException(e);
482      } catch (InvocationTargetException e) {
483        throw new IOException(e);
484      }
485      for (int i = 0; i < capacity; ++i) {
486        ret.add(kids.get(i).createRecordReader(spl.get(i), taskContext));
487      }
488      return (ComposableRecordReader)ret;
489    }
490
491    /**
492     * Parse a list of comma-separated nodes.
493     */
494    public void parse(List<Token> args, Configuration conf) 
495        throws IOException {
496      ListIterator<Token> i = args.listIterator();
497      while (i.hasNext()) {
498        Token t = i.next();
499        t.getNode().setID(i.previousIndex() >> 1);
500        kids.add(t.getNode());
501        if (i.hasNext() && !TType.COMMA.equals(i.next().getType())) {
502          throw new IOException("Expected ','");
503        }
504      }
505    }
506
507    public String toString() {
508      StringBuilder sb = new StringBuilder();
509      sb.append(ident + "(");
510      for (Node n : kids) {
511        sb.append(n.toString() + ",");
512      }
513      sb.setCharAt(sb.length() - 1, ')');
514      return sb.toString();
515    }
516  }
517
518  private static Token reduce(Stack<Token> st, Configuration conf) 
519      throws IOException {
520    LinkedList<Token> args = new LinkedList<Token>();
521    while (!st.isEmpty() && !TType.LPAREN.equals(st.peek().getType())) {
522      args.addFirst(st.pop());
523    }
524    if (st.isEmpty()) {
525      throw new IOException("Unmatched ')'");
526    }
527    st.pop();
528    if (st.isEmpty() || !TType.IDENT.equals(st.peek().getType())) {
529      throw new IOException("Identifier expected");
530    }
531    Node n = Node.forIdent(st.pop().getStr());
532    n.parse(args, conf);
533    return new NodeToken(n);
534  }
535
536  /**
537   * Given an expression and an optional comparator, build a tree of
538   * InputFormats using the comparator to sort keys.
539   */
540  static Node parse(String expr, Configuration conf) throws IOException {
541    if (null == expr) {
542      throw new IOException("Expression is null");
543    }
544    Class<? extends WritableComparator> cmpcl = conf.getClass(
545      CompositeInputFormat.JOIN_COMPARATOR, null, WritableComparator.class);
546    Lexer lex = new Lexer(expr);
547    Stack<Token> st = new Stack<Token>();
548    Token tok;
549    while ((tok = lex.next()) != null) {
550      if (TType.RPAREN.equals(tok.getType())) {
551        st.push(reduce(st, conf));
552      } else {
553        st.push(tok);
554      }
555    }
556    if (st.size() == 1 && TType.CIF.equals(st.peek().getType())) {
557      Node ret = st.pop().getNode();
558      if (cmpcl != null) {
559        ret.setKeyComparator(cmpcl);
560      }
561      return ret;
562    }
563    throw new IOException("Missing ')'");
564  }
565
566}