001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapred.join;
020    
021    import java.io.CharArrayReader;
022    import java.io.IOException;
023    import java.io.StreamTokenizer;
024    import java.lang.reflect.Constructor;
025    import java.lang.reflect.InvocationTargetException;
026    import java.util.ArrayList;
027    import java.util.HashMap;
028    import java.util.Iterator;
029    import java.util.LinkedList;
030    import java.util.List;
031    import java.util.ListIterator;
032    import java.util.Map;
033    import java.util.Stack;
034    
035    import org.apache.hadoop.classification.InterfaceAudience;
036    import org.apache.hadoop.classification.InterfaceStability;
037    import org.apache.hadoop.io.WritableComparator;
038    import org.apache.hadoop.mapred.FileInputFormat;
039    import org.apache.hadoop.mapred.InputFormat;
040    import org.apache.hadoop.mapred.InputSplit;
041    import org.apache.hadoop.mapred.JobConf;
042    import org.apache.hadoop.mapred.RecordReader;
043    import org.apache.hadoop.mapred.Reporter;
044    import org.apache.hadoop.util.ReflectionUtils;
045    
046    /**
047     * Very simple shift-reduce parser for join expressions.
048     *
049     * This should be sufficient for the user extension permitted now, but ought to
050     * be replaced with a parser generator if more complex grammars are supported.
051     * In particular, this "shift-reduce" parser has no states. Each set
052     * of formals requires a different internal node type, which is responsible for
053     * interpreting the list of tokens it receives. This is sufficient for the
054     * current grammar, but it has several annoying properties that might inhibit
055     * extension. In particular, parenthesis are always function calls; an
056     * algebraic or filter grammar would not only require a node type, but must
057     * also work around the internals of this parser.
058     *
059     * For most other cases, adding classes to the hierarchy- particularly by
060     * extending JoinRecordReader and MultiFilterRecordReader- is fairly
061     * straightforward. One need only override the relevant method(s) (usually only
062     * {@link CompositeRecordReader#combine}) and include a property to map its
063     * value to an identifier in the parser.
064     */
065    @InterfaceAudience.Public
066    @InterfaceStability.Evolving
067    public class Parser {
068      @InterfaceAudience.Public
069      @InterfaceStability.Evolving
070      public enum TType { CIF, IDENT, COMMA, LPAREN, RPAREN, QUOT, NUM, }
071    
072      /**
073       * Tagged-union type for tokens from the join expression.
074       * @see Parser.TType
075       */
076      @InterfaceAudience.Public
077      @InterfaceStability.Evolving
078      public static class Token {
079    
080        private TType type;
081    
082        Token(TType type) {
083          this.type = type;
084        }
085    
086        public TType getType() { return type; }
087        public Node getNode() throws IOException {
088          throw new IOException("Expected nodetype");
089        }
090        public double getNum() throws IOException {
091          throw new IOException("Expected numtype");
092        }
093        public String getStr() throws IOException {
094          throw new IOException("Expected strtype");
095        }
096      }
097    
098      @InterfaceAudience.Public
099      @InterfaceStability.Evolving
100      public static class NumToken extends Token {
101        private double num;
102        public NumToken(double num) {
103          super(TType.NUM);
104          this.num = num;
105        }
106        public double getNum() { return num; }
107      }
108    
109      @InterfaceAudience.Public
110      @InterfaceStability.Evolving
111      public static class NodeToken extends Token {
112        private Node node;
113        NodeToken(Node node) {
114          super(TType.CIF);
115          this.node = node;
116        }
117        public Node getNode() {
118          return node;
119        }
120      }
121    
122      @InterfaceAudience.Public
123      @InterfaceStability.Evolving
124      public static class StrToken extends Token {
125        private String str;
126        public StrToken(TType type, String str) {
127          super(type);
128          this.str = str;
129        }
130        public String getStr() {
131          return str;
132        }
133      }
134    
135      /**
136       * Simple lexer wrapping a StreamTokenizer.
137       * This encapsulates the creation of tagged-union Tokens and initializes the
138       * SteamTokenizer.
139       */
140      private static class Lexer {
141    
142        private StreamTokenizer tok;
143    
144        Lexer(String s) {
145          tok = new StreamTokenizer(new CharArrayReader(s.toCharArray()));
146          tok.quoteChar('"');
147          tok.parseNumbers();
148          tok.ordinaryChar(',');
149          tok.ordinaryChar('(');
150          tok.ordinaryChar(')');
151          tok.wordChars('$','$');
152          tok.wordChars('_','_');
153        }
154    
155        Token next() throws IOException {
156          int type = tok.nextToken();
157          switch (type) {
158            case StreamTokenizer.TT_EOF:
159            case StreamTokenizer.TT_EOL:
160              return null;
161            case StreamTokenizer.TT_NUMBER:
162              return new NumToken(tok.nval);
163            case StreamTokenizer.TT_WORD:
164              return new StrToken(TType.IDENT, tok.sval);
165            case '"':
166              return new StrToken(TType.QUOT, tok.sval);
167            default:
168              switch (type) {
169                case ',':
170                  return new Token(TType.COMMA);
171                case '(':
172                  return new Token(TType.LPAREN);
173                case ')':
174                  return new Token(TType.RPAREN);
175                default:
176                  throw new IOException("Unexpected: " + type);
177              }
178          }
179        }
180      }
181    
182      @InterfaceAudience.Public
183      @InterfaceStability.Evolving
184      public abstract static class Node implements ComposableInputFormat {
185        /**
186         * Return the node type registered for the particular identifier.
187         * By default, this is a CNode for any composite node and a WNode
188         * for "wrapped" nodes. User nodes will likely be composite
189         * nodes.
190         * @see #addIdentifier(java.lang.String, java.lang.Class[], java.lang.Class, java.lang.Class)
191         * @see CompositeInputFormat#setFormat(org.apache.hadoop.mapred.JobConf)
192         */
193        static Node forIdent(String ident) throws IOException {
194          try {
195            if (!nodeCstrMap.containsKey(ident)) {
196              throw new IOException("No nodetype for " + ident);
197            }
198            return nodeCstrMap.get(ident).newInstance(ident);
199          } catch (IllegalAccessException e) {
200            throw (IOException)new IOException().initCause(e);
201          } catch (InstantiationException e) {
202            throw (IOException)new IOException().initCause(e);
203          } catch (InvocationTargetException e) {
204            throw (IOException)new IOException().initCause(e);
205          }
206        }
207    
208        private static final Class<?>[] ncstrSig = { String.class };
209        private static final
210            Map<String,Constructor<? extends Node>> nodeCstrMap =
211            new HashMap<String,Constructor<? extends Node>>();
212        protected static final
213            Map<String,Constructor<? extends ComposableRecordReader>> rrCstrMap =
214            new HashMap<String,Constructor<? extends ComposableRecordReader>>();
215    
216        /**
217         * For a given identifier, add a mapping to the nodetype for the parse
218         * tree and to the ComposableRecordReader to be created, including the
219         * formals required to invoke the constructor.
220         * The nodetype and constructor signature should be filled in from the
221         * child node.
222         */
223        protected static void addIdentifier(String ident, Class<?>[] mcstrSig,
224                                  Class<? extends Node> nodetype,
225                                  Class<? extends ComposableRecordReader> cl)
226            throws NoSuchMethodException {
227          Constructor<? extends Node> ncstr =
228            nodetype.getDeclaredConstructor(ncstrSig);
229          ncstr.setAccessible(true);
230          nodeCstrMap.put(ident, ncstr);
231          Constructor<? extends ComposableRecordReader> mcstr =
232            cl.getDeclaredConstructor(mcstrSig);
233          mcstr.setAccessible(true);
234          rrCstrMap.put(ident, mcstr);
235        }
236    
237        // inst
238        protected int id = -1;
239        protected String ident;
240        protected Class<? extends WritableComparator> cmpcl;
241    
242        protected Node(String ident) {
243          this.ident = ident;
244        }
245    
246        protected void setID(int id) {
247          this.id = id;
248        }
249    
250        protected void setKeyComparator(Class<? extends WritableComparator> cmpcl) {
251          this.cmpcl = cmpcl;
252        }
253        abstract void parse(List<Token> args, JobConf job) throws IOException;
254      }
255    
256      /**
257       * Nodetype in the parse tree for &quot;wrapped&quot; InputFormats.
258       */
259      static class WNode extends Node {
260        private static final Class<?>[] cstrSig =
261          { Integer.TYPE, RecordReader.class, Class.class };
262    
263        static void addIdentifier(String ident,
264                                  Class<? extends ComposableRecordReader> cl)
265            throws NoSuchMethodException {
266          Node.addIdentifier(ident, cstrSig, WNode.class, cl);
267        }
268    
269        private String indir;
270        private InputFormat inf;
271    
272        public WNode(String ident) {
273          super(ident);
274        }
275    
276        /**
277         * Let the first actual define the InputFormat and the second define
278         * the <tt>mapred.input.dir</tt> property.
279         */
280        public void parse(List<Token> ll, JobConf job) throws IOException {
281          StringBuilder sb = new StringBuilder();
282          Iterator<Token> i = ll.iterator();
283          while (i.hasNext()) {
284            Token t = i.next();
285            if (TType.COMMA.equals(t.getType())) {
286              try {
287                    inf = (InputFormat)ReflectionUtils.newInstance(
288                                    job.getClassByName(sb.toString()),
289                    job);
290              } catch (ClassNotFoundException e) {
291                throw (IOException)new IOException().initCause(e);
292              } catch (IllegalArgumentException e) {
293                throw (IOException)new IOException().initCause(e);
294              }
295              break;
296            }
297            sb.append(t.getStr());
298          }
299          if (!i.hasNext()) {
300            throw new IOException("Parse error");
301          }
302          Token t = i.next();
303          if (!TType.QUOT.equals(t.getType())) {
304            throw new IOException("Expected quoted string");
305          }
306          indir = t.getStr();
307          // no check for ll.isEmpty() to permit extension
308        }
309    
310        private JobConf getConf(JobConf job) {
311          JobConf conf = new JobConf(job);
312          FileInputFormat.setInputPaths(conf, indir);
313          conf.setClassLoader(job.getClassLoader());
314          return conf;
315        }
316    
317        public InputSplit[] getSplits(JobConf job, int numSplits)
318            throws IOException {
319          return inf.getSplits(getConf(job), numSplits);
320        }
321    
322        public ComposableRecordReader getRecordReader(
323            InputSplit split, JobConf job, Reporter reporter) throws IOException {
324          try {
325            if (!rrCstrMap.containsKey(ident)) {
326              throw new IOException("No RecordReader for " + ident);
327            }
328            return rrCstrMap.get(ident).newInstance(id,
329                inf.getRecordReader(split, getConf(job), reporter), cmpcl);
330          } catch (IllegalAccessException e) {
331            throw (IOException)new IOException().initCause(e);
332          } catch (InstantiationException e) {
333            throw (IOException)new IOException().initCause(e);
334          } catch (InvocationTargetException e) {
335            throw (IOException)new IOException().initCause(e);
336          }
337        }
338    
339        public String toString() {
340          return ident + "(" + inf.getClass().getName() + ",\"" + indir + "\")";
341        }
342      }
343    
344      /**
345       * Internal nodetype for &quot;composite&quot; InputFormats.
346       */
347      static class CNode extends Node {
348    
349        private static final Class<?>[] cstrSig =
350          { Integer.TYPE, JobConf.class, Integer.TYPE, Class.class };
351    
352        static void addIdentifier(String ident,
353                                  Class<? extends ComposableRecordReader> cl)
354            throws NoSuchMethodException {
355          Node.addIdentifier(ident, cstrSig, CNode.class, cl);
356        }
357    
358        // inst
359        private ArrayList<Node> kids = new ArrayList<Node>();
360    
361        public CNode(String ident) {
362          super(ident);
363        }
364    
365        public void setKeyComparator(Class<? extends WritableComparator> cmpcl) {
366          super.setKeyComparator(cmpcl);
367          for (Node n : kids) {
368            n.setKeyComparator(cmpcl);
369          }
370        }
371    
372        /**
373         * Combine InputSplits from child InputFormats into a
374         * {@link CompositeInputSplit}.
375         */
376        public InputSplit[] getSplits(JobConf job, int numSplits)
377            throws IOException {
378          InputSplit[][] splits = new InputSplit[kids.size()][];
379          for (int i = 0; i < kids.size(); ++i) {
380            final InputSplit[] tmp = kids.get(i).getSplits(job, numSplits);
381            if (null == tmp) {
382              throw new IOException("Error gathering splits from child RReader");
383            }
384            if (i > 0 && splits[i-1].length != tmp.length) {
385              throw new IOException("Inconsistent split cardinality from child " +
386                  i + " (" + splits[i-1].length + "/" + tmp.length + ")");
387            }
388            splits[i] = tmp;
389          }
390          final int size = splits[0].length;
391          CompositeInputSplit[] ret = new CompositeInputSplit[size];
392          for (int i = 0; i < size; ++i) {
393            ret[i] = new CompositeInputSplit(splits.length);
394            for (int j = 0; j < splits.length; ++j) {
395              ret[i].add(splits[j][i]);
396            }
397          }
398          return ret;
399        }
400    
401        @SuppressWarnings("unchecked") // child types unknowable
402        public ComposableRecordReader getRecordReader(
403            InputSplit split, JobConf job, Reporter reporter) throws IOException {
404          if (!(split instanceof CompositeInputSplit)) {
405            throw new IOException("Invalid split type:" +
406                                  split.getClass().getName());
407          }
408          final CompositeInputSplit spl = (CompositeInputSplit)split;
409          final int capacity = kids.size();
410          CompositeRecordReader ret = null;
411          try {
412            if (!rrCstrMap.containsKey(ident)) {
413              throw new IOException("No RecordReader for " + ident);
414            }
415            ret = (CompositeRecordReader)
416              rrCstrMap.get(ident).newInstance(id, job, capacity, cmpcl);
417          } catch (IllegalAccessException e) {
418            throw (IOException)new IOException().initCause(e);
419          } catch (InstantiationException e) {
420            throw (IOException)new IOException().initCause(e);
421          } catch (InvocationTargetException e) {
422            throw (IOException)new IOException().initCause(e);
423          }
424          for (int i = 0; i < capacity; ++i) {
425            ret.add(kids.get(i).getRecordReader(spl.get(i), job, reporter));
426          }
427          return (ComposableRecordReader)ret;
428        }
429    
430        /**
431         * Parse a list of comma-separated nodes.
432         */
433        public void parse(List<Token> args, JobConf job) throws IOException {
434          ListIterator<Token> i = args.listIterator();
435          while (i.hasNext()) {
436            Token t = i.next();
437            t.getNode().setID(i.previousIndex() >> 1);
438            kids.add(t.getNode());
439            if (i.hasNext() && !TType.COMMA.equals(i.next().getType())) {
440              throw new IOException("Expected ','");
441            }
442          }
443        }
444    
445        public String toString() {
446          StringBuilder sb = new StringBuilder();
447          sb.append(ident + "(");
448          for (Node n : kids) {
449            sb.append(n.toString() + ",");
450          }
451          sb.setCharAt(sb.length() - 1, ')');
452          return sb.toString();
453        }
454      }
455    
456      private static Token reduce(Stack<Token> st, JobConf job) throws IOException {
457        LinkedList<Token> args = new LinkedList<Token>();
458        while (!st.isEmpty() && !TType.LPAREN.equals(st.peek().getType())) {
459          args.addFirst(st.pop());
460        }
461        if (st.isEmpty()) {
462          throw new IOException("Unmatched ')'");
463        }
464        st.pop();
465        if (st.isEmpty() || !TType.IDENT.equals(st.peek().getType())) {
466          throw new IOException("Identifier expected");
467        }
468        Node n = Node.forIdent(st.pop().getStr());
469        n.parse(args, job);
470        return new NodeToken(n);
471      }
472    
473      /**
474       * Given an expression and an optional comparator, build a tree of
475       * InputFormats using the comparator to sort keys.
476       */
477      static Node parse(String expr, JobConf job) throws IOException {
478        if (null == expr) {
479          throw new IOException("Expression is null");
480        }
481        Class<? extends WritableComparator> cmpcl =
482          job.getClass("mapred.join.keycomparator", null, WritableComparator.class);
483        Lexer lex = new Lexer(expr);
484        Stack<Token> st = new Stack<Token>();
485        Token tok;
486        while ((tok = lex.next()) != null) {
487          if (TType.RPAREN.equals(tok.getType())) {
488            st.push(reduce(st, job));
489          } else {
490            st.push(tok);
491          }
492        }
493        if (st.size() == 1 && TType.CIF.equals(st.peek().getType())) {
494          Node ret = st.pop().getNode();
495          if (cmpcl != null) {
496            ret.setKeyComparator(cmpcl);
497          }
498          return ret;
499        }
500        throw new IOException("Missing ')'");
501      }
502    
503    }