001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapred.join;
020
021import java.io.CharArrayReader;
022import java.io.IOException;
023import java.io.StreamTokenizer;
024import java.lang.reflect.Constructor;
025import java.lang.reflect.InvocationTargetException;
026import java.util.ArrayList;
027import java.util.HashMap;
028import java.util.Iterator;
029import java.util.LinkedList;
030import java.util.List;
031import java.util.ListIterator;
032import java.util.Map;
033import java.util.Stack;
034
035import org.apache.hadoop.classification.InterfaceAudience;
036import org.apache.hadoop.classification.InterfaceStability;
037import org.apache.hadoop.io.WritableComparator;
038import org.apache.hadoop.mapred.FileInputFormat;
039import org.apache.hadoop.mapred.InputFormat;
040import org.apache.hadoop.mapred.InputSplit;
041import org.apache.hadoop.mapred.JobConf;
042import org.apache.hadoop.mapred.RecordReader;
043import org.apache.hadoop.mapred.Reporter;
044import org.apache.hadoop.util.ReflectionUtils;
045
046/**
047 * Very simple shift-reduce parser for join expressions.
048 *
049 * This should be sufficient for the user extension permitted now, but ought to
050 * be replaced with a parser generator if more complex grammars are supported.
051 * In particular, this "shift-reduce" parser has no states. Each set
052 * of formals requires a different internal node type, which is responsible for
053 * interpreting the list of tokens it receives. This is sufficient for the
054 * current grammar, but it has several annoying properties that might inhibit
055 * extension. In particular, parenthesis are always function calls; an
056 * algebraic or filter grammar would not only require a node type, but must
057 * also work around the internals of this parser.
058 *
059 * For most other cases, adding classes to the hierarchy- particularly by
060 * extending JoinRecordReader and MultiFilterRecordReader- is fairly
061 * straightforward. One need only override the relevant method(s) (usually only
062 * {@link CompositeRecordReader#combine}) and include a property to map its
063 * value to an identifier in the parser.
064 */
065@InterfaceAudience.Public
066@InterfaceStability.Evolving
067public class Parser {
068  @InterfaceAudience.Public
069  @InterfaceStability.Evolving
070  public enum TType { CIF, IDENT, COMMA, LPAREN, RPAREN, QUOT, NUM, }
071
072  /**
073   * Tagged-union type for tokens from the join expression.
074   * @see Parser.TType
075   */
076  @InterfaceAudience.Public
077  @InterfaceStability.Evolving
078  public static class Token {
079
080    private TType type;
081
082    Token(TType type) {
083      this.type = type;
084    }
085
086    public TType getType() { return type; }
087    public Node getNode() throws IOException {
088      throw new IOException("Expected nodetype");
089    }
090    public double getNum() throws IOException {
091      throw new IOException("Expected numtype");
092    }
093    public String getStr() throws IOException {
094      throw new IOException("Expected strtype");
095    }
096  }
097
098  @InterfaceAudience.Public
099  @InterfaceStability.Evolving
100  public static class NumToken extends Token {
101    private double num;
102    public NumToken(double num) {
103      super(TType.NUM);
104      this.num = num;
105    }
106    public double getNum() { return num; }
107  }
108
109  @InterfaceAudience.Public
110  @InterfaceStability.Evolving
111  public static class NodeToken extends Token {
112    private Node node;
113    NodeToken(Node node) {
114      super(TType.CIF);
115      this.node = node;
116    }
117    public Node getNode() {
118      return node;
119    }
120  }
121
122  @InterfaceAudience.Public
123  @InterfaceStability.Evolving
124  public static class StrToken extends Token {
125    private String str;
126    public StrToken(TType type, String str) {
127      super(type);
128      this.str = str;
129    }
130    public String getStr() {
131      return str;
132    }
133  }
134
135  /**
136   * Simple lexer wrapping a StreamTokenizer.
137   * This encapsulates the creation of tagged-union Tokens and initializes the
138   * SteamTokenizer.
139   */
140  private static class Lexer {
141
142    private StreamTokenizer tok;
143
144    Lexer(String s) {
145      tok = new StreamTokenizer(new CharArrayReader(s.toCharArray()));
146      tok.quoteChar('"');
147      tok.parseNumbers();
148      tok.ordinaryChar(',');
149      tok.ordinaryChar('(');
150      tok.ordinaryChar(')');
151      tok.wordChars('$','$');
152      tok.wordChars('_','_');
153    }
154
155    Token next() throws IOException {
156      int type = tok.nextToken();
157      switch (type) {
158        case StreamTokenizer.TT_EOF:
159        case StreamTokenizer.TT_EOL:
160          return null;
161        case StreamTokenizer.TT_NUMBER:
162          return new NumToken(tok.nval);
163        case StreamTokenizer.TT_WORD:
164          return new StrToken(TType.IDENT, tok.sval);
165        case '"':
166          return new StrToken(TType.QUOT, tok.sval);
167        default:
168          switch (type) {
169            case ',':
170              return new Token(TType.COMMA);
171            case '(':
172              return new Token(TType.LPAREN);
173            case ')':
174              return new Token(TType.RPAREN);
175            default:
176              throw new IOException("Unexpected: " + type);
177          }
178      }
179    }
180  }
181
182  @InterfaceAudience.Public
183  @InterfaceStability.Evolving
184  public abstract static class Node implements ComposableInputFormat {
185    /**
186     * Return the node type registered for the particular identifier.
187     * By default, this is a CNode for any composite node and a WNode
188     * for "wrapped" nodes. User nodes will likely be composite
189     * nodes.
190     * @see #addIdentifier(java.lang.String, java.lang.Class[], java.lang.Class, java.lang.Class)
191     * @see CompositeInputFormat#setFormat(org.apache.hadoop.mapred.JobConf)
192     */
193    static Node forIdent(String ident) throws IOException {
194      try {
195        if (!nodeCstrMap.containsKey(ident)) {
196          throw new IOException("No nodetype for " + ident);
197        }
198        return nodeCstrMap.get(ident).newInstance(ident);
199      } catch (IllegalAccessException e) {
200        throw (IOException)new IOException().initCause(e);
201      } catch (InstantiationException e) {
202        throw (IOException)new IOException().initCause(e);
203      } catch (InvocationTargetException e) {
204        throw (IOException)new IOException().initCause(e);
205      }
206    }
207
208    private static final Class<?>[] ncstrSig = { String.class };
209    private static final
210        Map<String,Constructor<? extends Node>> nodeCstrMap =
211        new HashMap<String,Constructor<? extends Node>>();
212    protected static final
213        Map<String,Constructor<? extends ComposableRecordReader>> rrCstrMap =
214        new HashMap<String,Constructor<? extends ComposableRecordReader>>();
215
216    /**
217     * For a given identifier, add a mapping to the nodetype for the parse
218     * tree and to the ComposableRecordReader to be created, including the
219     * formals required to invoke the constructor.
220     * The nodetype and constructor signature should be filled in from the
221     * child node.
222     */
223    protected static void addIdentifier(String ident, Class<?>[] mcstrSig,
224                              Class<? extends Node> nodetype,
225                              Class<? extends ComposableRecordReader> cl)
226        throws NoSuchMethodException {
227      Constructor<? extends Node> ncstr =
228        nodetype.getDeclaredConstructor(ncstrSig);
229      ncstr.setAccessible(true);
230      nodeCstrMap.put(ident, ncstr);
231      Constructor<? extends ComposableRecordReader> mcstr =
232        cl.getDeclaredConstructor(mcstrSig);
233      mcstr.setAccessible(true);
234      rrCstrMap.put(ident, mcstr);
235    }
236
237    // inst
238    protected int id = -1;
239    protected String ident;
240    protected Class<? extends WritableComparator> cmpcl;
241
242    protected Node(String ident) {
243      this.ident = ident;
244    }
245
246    protected void setID(int id) {
247      this.id = id;
248    }
249
250    protected void setKeyComparator(Class<? extends WritableComparator> cmpcl) {
251      this.cmpcl = cmpcl;
252    }
253    abstract void parse(List<Token> args, JobConf job) throws IOException;
254  }
255
256  /**
257   * Nodetype in the parse tree for &quot;wrapped&quot; InputFormats.
258   */
259  static class WNode extends Node {
260    private static final Class<?>[] cstrSig =
261      { Integer.TYPE, RecordReader.class, Class.class };
262
263    static void addIdentifier(String ident,
264                              Class<? extends ComposableRecordReader> cl)
265        throws NoSuchMethodException {
266      Node.addIdentifier(ident, cstrSig, WNode.class, cl);
267    }
268
269    private String indir;
270    private InputFormat inf;
271
272    public WNode(String ident) {
273      super(ident);
274    }
275
276    /**
277     * Let the first actual define the InputFormat and the second define
278     * the <tt>mapred.input.dir</tt> property.
279     */
280    public void parse(List<Token> ll, JobConf job) throws IOException {
281      StringBuilder sb = new StringBuilder();
282      Iterator<Token> i = ll.iterator();
283      while (i.hasNext()) {
284        Token t = i.next();
285        if (TType.COMMA.equals(t.getType())) {
286          try {
287                inf = (InputFormat)ReflectionUtils.newInstance(
288                                job.getClassByName(sb.toString()),
289                job);
290          } catch (ClassNotFoundException e) {
291            throw (IOException)new IOException().initCause(e);
292          } catch (IllegalArgumentException e) {
293            throw (IOException)new IOException().initCause(e);
294          }
295          break;
296        }
297        sb.append(t.getStr());
298      }
299      if (!i.hasNext()) {
300        throw new IOException("Parse error");
301      }
302      Token t = i.next();
303      if (!TType.QUOT.equals(t.getType())) {
304        throw new IOException("Expected quoted string");
305      }
306      indir = t.getStr();
307      // no check for ll.isEmpty() to permit extension
308    }
309
310    private JobConf getConf(JobConf job) {
311      JobConf conf = new JobConf(job);
312      FileInputFormat.setInputPaths(conf, indir);
313      conf.setClassLoader(job.getClassLoader());
314      return conf;
315    }
316
317    public InputSplit[] getSplits(JobConf job, int numSplits)
318        throws IOException {
319      return inf.getSplits(getConf(job), numSplits);
320    }
321
322    public ComposableRecordReader getRecordReader(
323        InputSplit split, JobConf job, Reporter reporter) throws IOException {
324      try {
325        if (!rrCstrMap.containsKey(ident)) {
326          throw new IOException("No RecordReader for " + ident);
327        }
328        return rrCstrMap.get(ident).newInstance(id,
329            inf.getRecordReader(split, getConf(job), reporter), cmpcl);
330      } catch (IllegalAccessException e) {
331        throw (IOException)new IOException().initCause(e);
332      } catch (InstantiationException e) {
333        throw (IOException)new IOException().initCause(e);
334      } catch (InvocationTargetException e) {
335        throw (IOException)new IOException().initCause(e);
336      }
337    }
338
339    public String toString() {
340      return ident + "(" + inf.getClass().getName() + ",\"" + indir + "\")";
341    }
342  }
343
344  /**
345   * Internal nodetype for &quot;composite&quot; InputFormats.
346   */
347  static class CNode extends Node {
348
349    private static final Class<?>[] cstrSig =
350      { Integer.TYPE, JobConf.class, Integer.TYPE, Class.class };
351
352    static void addIdentifier(String ident,
353                              Class<? extends ComposableRecordReader> cl)
354        throws NoSuchMethodException {
355      Node.addIdentifier(ident, cstrSig, CNode.class, cl);
356    }
357
358    // inst
359    private ArrayList<Node> kids = new ArrayList<Node>();
360
361    public CNode(String ident) {
362      super(ident);
363    }
364
365    public void setKeyComparator(Class<? extends WritableComparator> cmpcl) {
366      super.setKeyComparator(cmpcl);
367      for (Node n : kids) {
368        n.setKeyComparator(cmpcl);
369      }
370    }
371
372    /**
373     * Combine InputSplits from child InputFormats into a
374     * {@link CompositeInputSplit}.
375     */
376    public InputSplit[] getSplits(JobConf job, int numSplits)
377        throws IOException {
378      InputSplit[][] splits = new InputSplit[kids.size()][];
379      for (int i = 0; i < kids.size(); ++i) {
380        final InputSplit[] tmp = kids.get(i).getSplits(job, numSplits);
381        if (null == tmp) {
382          throw new IOException("Error gathering splits from child RReader");
383        }
384        if (i > 0 && splits[i-1].length != tmp.length) {
385          throw new IOException("Inconsistent split cardinality from child " +
386              i + " (" + splits[i-1].length + "/" + tmp.length + ")");
387        }
388        splits[i] = tmp;
389      }
390      final int size = splits[0].length;
391      CompositeInputSplit[] ret = new CompositeInputSplit[size];
392      for (int i = 0; i < size; ++i) {
393        ret[i] = new CompositeInputSplit(splits.length);
394        for (int j = 0; j < splits.length; ++j) {
395          ret[i].add(splits[j][i]);
396        }
397      }
398      return ret;
399    }
400
401    @SuppressWarnings("unchecked") // child types unknowable
402    public ComposableRecordReader getRecordReader(
403        InputSplit split, JobConf job, Reporter reporter) throws IOException {
404      if (!(split instanceof CompositeInputSplit)) {
405        throw new IOException("Invalid split type:" +
406                              split.getClass().getName());
407      }
408      final CompositeInputSplit spl = (CompositeInputSplit)split;
409      final int capacity = kids.size();
410      CompositeRecordReader ret = null;
411      try {
412        if (!rrCstrMap.containsKey(ident)) {
413          throw new IOException("No RecordReader for " + ident);
414        }
415        ret = (CompositeRecordReader)
416          rrCstrMap.get(ident).newInstance(id, job, capacity, cmpcl);
417      } catch (IllegalAccessException e) {
418        throw (IOException)new IOException().initCause(e);
419      } catch (InstantiationException e) {
420        throw (IOException)new IOException().initCause(e);
421      } catch (InvocationTargetException e) {
422        throw (IOException)new IOException().initCause(e);
423      }
424      for (int i = 0; i < capacity; ++i) {
425        ret.add(kids.get(i).getRecordReader(spl.get(i), job, reporter));
426      }
427      return (ComposableRecordReader)ret;
428    }
429
430    /**
431     * Parse a list of comma-separated nodes.
432     */
433    public void parse(List<Token> args, JobConf job) throws IOException {
434      ListIterator<Token> i = args.listIterator();
435      while (i.hasNext()) {
436        Token t = i.next();
437        t.getNode().setID(i.previousIndex() >> 1);
438        kids.add(t.getNode());
439        if (i.hasNext() && !TType.COMMA.equals(i.next().getType())) {
440          throw new IOException("Expected ','");
441        }
442      }
443    }
444
445    public String toString() {
446      StringBuilder sb = new StringBuilder();
447      sb.append(ident + "(");
448      for (Node n : kids) {
449        sb.append(n.toString() + ",");
450      }
451      sb.setCharAt(sb.length() - 1, ')');
452      return sb.toString();
453    }
454  }
455
456  private static Token reduce(Stack<Token> st, JobConf job) throws IOException {
457    LinkedList<Token> args = new LinkedList<Token>();
458    while (!st.isEmpty() && !TType.LPAREN.equals(st.peek().getType())) {
459      args.addFirst(st.pop());
460    }
461    if (st.isEmpty()) {
462      throw new IOException("Unmatched ')'");
463    }
464    st.pop();
465    if (st.isEmpty() || !TType.IDENT.equals(st.peek().getType())) {
466      throw new IOException("Identifier expected");
467    }
468    Node n = Node.forIdent(st.pop().getStr());
469    n.parse(args, job);
470    return new NodeToken(n);
471  }
472
473  /**
474   * Given an expression and an optional comparator, build a tree of
475   * InputFormats using the comparator to sort keys.
476   */
477  static Node parse(String expr, JobConf job) throws IOException {
478    if (null == expr) {
479      throw new IOException("Expression is null");
480    }
481    Class<? extends WritableComparator> cmpcl =
482      job.getClass("mapred.join.keycomparator", null, WritableComparator.class);
483    Lexer lex = new Lexer(expr);
484    Stack<Token> st = new Stack<Token>();
485    Token tok;
486    while ((tok = lex.next()) != null) {
487      if (TType.RPAREN.equals(tok.getType())) {
488        st.push(reduce(st, job));
489      } else {
490        st.push(tok);
491      }
492    }
493    if (st.size() == 1 && TType.CIF.equals(st.peek().getType())) {
494      Node ret = st.pop().getNode();
495      if (cmpcl != null) {
496        ret.setKeyComparator(cmpcl);
497      }
498      return ret;
499    }
500    throw new IOException("Missing ')'");
501  }
502
503}