001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.lib.join;
020    
021    import java.io.CharArrayReader;
022    import java.io.IOException;
023    import java.io.StreamTokenizer;
024    import java.lang.reflect.Constructor;
025    import java.lang.reflect.InvocationTargetException;
026    import java.util.ArrayList;
027    import java.util.HashMap;
028    import java.util.Iterator;
029    import java.util.LinkedList;
030    import java.util.List;
031    import java.util.ListIterator;
032    import java.util.Map;
033    import java.util.Stack;
034    
035    import org.apache.hadoop.classification.InterfaceAudience;
036    import org.apache.hadoop.classification.InterfaceStability;
037    import org.apache.hadoop.conf.Configuration;
038    import org.apache.hadoop.io.WritableComparator;
039    import org.apache.hadoop.mapreduce.Counter;
040    import org.apache.hadoop.mapreduce.InputFormat;
041    import org.apache.hadoop.mapreduce.InputSplit;
042    import org.apache.hadoop.mapreduce.Job;
043    import org.apache.hadoop.mapreduce.JobContext;
044    import org.apache.hadoop.mapreduce.MRJobConfig;
045    import org.apache.hadoop.mapreduce.RecordReader;
046    import org.apache.hadoop.mapreduce.StatusReporter;
047    import org.apache.hadoop.mapreduce.TaskAttemptContext;
048    import org.apache.hadoop.mapreduce.TaskAttemptID;
049    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
050    import org.apache.hadoop.mapreduce.task.JobContextImpl;
051    import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
052    import org.apache.hadoop.util.ReflectionUtils;
053    
054    /**
055     * Very simple shift-reduce parser for join expressions.
056     *
057     * This should be sufficient for the user extension permitted now, but ought to
058     * be replaced with a parser generator if more complex grammars are supported.
059     * In particular, this "shift-reduce" parser has no states. Each set
060     * of formals requires a different internal node type, which is responsible for
061     * interpreting the list of tokens it receives. This is sufficient for the
062     * current grammar, but it has several annoying properties that might inhibit
063     * extension. In particular, parenthesis are always function calls; an
064     * algebraic or filter grammar would not only require a node type, but must
065     * also work around the internals of this parser.
066     *
067     * For most other cases, adding classes to the hierarchy- particularly by
068     * extending JoinRecordReader and MultiFilterRecordReader- is fairly
069     * straightforward. One need only override the relevant method(s) (usually only
070     * {@link CompositeRecordReader#combine}) and include a property to map its
071     * value to an identifier in the parser.
072     */
073    @InterfaceAudience.Public
074    @InterfaceStability.Evolving
075    public class Parser {
076      @InterfaceAudience.Public
077      @InterfaceStability.Evolving
078      public enum TType { CIF, IDENT, COMMA, LPAREN, RPAREN, QUOT, NUM, }
079    
080      /**
081       * Tagged-union type for tokens from the join expression.
082       * @see Parser.TType
083       */
084      @InterfaceAudience.Public
085      @InterfaceStability.Evolving
086      public static class Token {
087    
088        private TType type;
089    
090        Token(TType type) {
091          this.type = type;
092        }
093    
094        public TType getType() { return type; }
095        
096        public Node getNode() throws IOException {
097          throw new IOException("Expected nodetype");
098        }
099        
100        public double getNum() throws IOException {
101          throw new IOException("Expected numtype");
102        }
103        
104        public String getStr() throws IOException {
105          throw new IOException("Expected strtype");
106        }
107      }
108    
109      @InterfaceAudience.Public
110      @InterfaceStability.Evolving
111      public static class NumToken extends Token {
112        private double num;
113        public NumToken(double num) {
114          super(TType.NUM);
115          this.num = num;
116        }
117        public double getNum() { return num; }
118      }
119    
120      @InterfaceAudience.Public
121      @InterfaceStability.Evolving
122      public static class NodeToken extends Token {
123        private Node node;
124        NodeToken(Node node) {
125          super(TType.CIF);
126          this.node = node;
127        }
128        public Node getNode() {
129          return node;
130        }
131      }
132    
133      @InterfaceAudience.Public
134      @InterfaceStability.Evolving
135      public static class StrToken extends Token {
136        private String str;
137        public StrToken(TType type, String str) {
138          super(type);
139          this.str = str;
140        }
141        public String getStr() {
142          return str;
143        }
144      }
145    
146      /**
147       * Simple lexer wrapping a StreamTokenizer.
148       * This encapsulates the creation of tagged-union Tokens and initializes the
149       * SteamTokenizer.
150       */
151      private static class Lexer {
152    
153        private StreamTokenizer tok;
154    
155        Lexer(String s) {
156          tok = new StreamTokenizer(new CharArrayReader(s.toCharArray()));
157          tok.quoteChar('"');
158          tok.parseNumbers();
159          tok.ordinaryChar(',');
160          tok.ordinaryChar('(');
161          tok.ordinaryChar(')');
162          tok.wordChars('$','$');
163          tok.wordChars('_','_');
164        }
165    
166        Token next() throws IOException {
167          int type = tok.nextToken();
168          switch (type) {
169            case StreamTokenizer.TT_EOF:
170            case StreamTokenizer.TT_EOL:
171              return null;
172            case StreamTokenizer.TT_NUMBER:
173              return new NumToken(tok.nval);
174            case StreamTokenizer.TT_WORD:
175              return new StrToken(TType.IDENT, tok.sval);
176            case '"':
177              return new StrToken(TType.QUOT, tok.sval);
178            default:
179              switch (type) {
180                case ',':
181                  return new Token(TType.COMMA);
182                case '(':
183                  return new Token(TType.LPAREN);
184                case ')':
185                  return new Token(TType.RPAREN);
186                default:
187                  throw new IOException("Unexpected: " + type);
188              }
189          }
190        }
191      }
192    
193    @SuppressWarnings("unchecked")
194    @InterfaceAudience.Public
195    @InterfaceStability.Evolving
196    public abstract static class Node extends ComposableInputFormat {
197        /**
198         * Return the node type registered for the particular identifier.
199         * By default, this is a CNode for any composite node and a WNode
200         * for "wrapped" nodes. User nodes will likely be composite
201         * nodes.
202         * @see #addIdentifier(java.lang.String, java.lang.Class[], java.lang.Class, java.lang.Class)
203         * @see CompositeInputFormat#setFormat(org.apache.hadoop.mapred.JobConf)
204         */
205        static Node forIdent(String ident) throws IOException {
206          try {
207            if (!nodeCstrMap.containsKey(ident)) {
208              throw new IOException("No nodetype for " + ident);
209            }
210            return nodeCstrMap.get(ident).newInstance(ident);
211          } catch (IllegalAccessException e) {
212            throw new IOException(e);
213          } catch (InstantiationException e) {
214            throw new IOException(e);
215          } catch (InvocationTargetException e) {
216            throw new IOException(e);
217          }
218        }
219    
220        private static final Class<?>[] ncstrSig = { String.class };
221        private static final
222            Map<String,Constructor<? extends Node>> nodeCstrMap =
223            new HashMap<String,Constructor<? extends Node>>();
224        protected static final Map<String,Constructor<? extends 
225            ComposableRecordReader>> rrCstrMap =
226            new HashMap<String,Constructor<? extends ComposableRecordReader>>();
227    
228        /**
229         * For a given identifier, add a mapping to the nodetype for the parse
230         * tree and to the ComposableRecordReader to be created, including the
231         * formals required to invoke the constructor.
232         * The nodetype and constructor signature should be filled in from the
233         * child node.
234         */
235        protected static void addIdentifier(String ident, Class<?>[] mcstrSig,
236                                  Class<? extends Node> nodetype,
237                                  Class<? extends ComposableRecordReader> cl)
238            throws NoSuchMethodException {
239          Constructor<? extends Node> ncstr =
240            nodetype.getDeclaredConstructor(ncstrSig);
241          ncstr.setAccessible(true);
242          nodeCstrMap.put(ident, ncstr);
243          Constructor<? extends ComposableRecordReader> mcstr =
244            cl.getDeclaredConstructor(mcstrSig);
245          mcstr.setAccessible(true);
246          rrCstrMap.put(ident, mcstr);
247        }
248    
249        // inst
250        protected int id = -1;
251        protected String ident;
252        protected Class<? extends WritableComparator> cmpcl;
253    
254        protected Node(String ident) {
255          this.ident = ident;
256        }
257    
258        protected void setID(int id) {
259          this.id = id;
260        }
261    
262        protected void setKeyComparator(
263            Class<? extends WritableComparator> cmpcl) {
264          this.cmpcl = cmpcl;
265        }
266        abstract void parse(List<Token> args, Configuration conf) 
267            throws IOException;
268      }
269    
270      /**
271       * Nodetype in the parse tree for &quot;wrapped&quot; InputFormats.
272       */
273      static class WNode extends Node {
274        private static final Class<?>[] cstrSig =
275          { Integer.TYPE, RecordReader.class, Class.class };
276    
277        @SuppressWarnings("unchecked")
278            static void addIdentifier(String ident,
279                                  Class<? extends ComposableRecordReader> cl)
280            throws NoSuchMethodException {
281          Node.addIdentifier(ident, cstrSig, WNode.class, cl);
282        }
283    
284        private String indir;
285        private InputFormat<?, ?> inf;
286    
287        public WNode(String ident) {
288          super(ident);
289        }
290    
291        /**
292         * Let the first actual define the InputFormat and the second define
293         * the <tt>mapred.input.dir</tt> property.
294         */
295        @Override
296        public void parse(List<Token> ll, Configuration conf) throws IOException {
297          StringBuilder sb = new StringBuilder();
298          Iterator<Token> i = ll.iterator();
299          while (i.hasNext()) {
300            Token t = i.next();
301            if (TType.COMMA.equals(t.getType())) {
302              try {
303                    inf = (InputFormat<?, ?>)ReflectionUtils.newInstance(
304                                    conf.getClassByName(sb.toString()), conf);
305              } catch (ClassNotFoundException e) {
306                throw new IOException(e);
307              } catch (IllegalArgumentException e) {
308                throw new IOException(e);
309              }
310              break;
311            }
312            sb.append(t.getStr());
313          }
314          if (!i.hasNext()) {
315            throw new IOException("Parse error");
316          }
317          Token t = i.next();
318          if (!TType.QUOT.equals(t.getType())) {
319            throw new IOException("Expected quoted string");
320          }
321          indir = t.getStr();
322          // no check for ll.isEmpty() to permit extension
323        }
324    
325        private Configuration getConf(Configuration jconf) throws IOException {
326          Job job = new Job(jconf);
327          FileInputFormat.setInputPaths(job, indir);
328          return job.getConfiguration();
329        }
330        
331        public List<InputSplit> getSplits(JobContext context)
332            throws IOException, InterruptedException {
333          return inf.getSplits(
334                     new JobContextImpl(getConf(context.getConfiguration()), 
335                                        context.getJobID()));
336        }
337    
338        public ComposableRecordReader<?, ?> createRecordReader(InputSplit split, 
339            TaskAttemptContext taskContext) 
340            throws IOException, InterruptedException {
341          try {
342            if (!rrCstrMap.containsKey(ident)) {
343              throw new IOException("No RecordReader for " + ident);
344            }
345            Configuration conf = getConf(taskContext.getConfiguration());
346            TaskAttemptContext context = 
347              new TaskAttemptContextImpl(conf, 
348                  TaskAttemptID.forName(conf.get(MRJobConfig.TASK_ATTEMPT_ID)), 
349                  new WrappedStatusReporter(taskContext));
350            return rrCstrMap.get(ident).newInstance(id,
351                inf.createRecordReader(split, context), cmpcl);
352          } catch (IllegalAccessException e) {
353            throw new IOException(e);
354          } catch (InstantiationException e) {
355            throw new IOException(e);
356          } catch (InvocationTargetException e) {
357            throw new IOException(e);
358          }
359        }
360    
361        public String toString() {
362          return ident + "(" + inf.getClass().getName() + ",\"" + indir + "\")";
363        }
364      }
365    
366      private static class WrappedStatusReporter extends StatusReporter {
367    
368        TaskAttemptContext context;
369        
370        public WrappedStatusReporter(TaskAttemptContext context) {
371          this.context = context; 
372        }
373        @Override
374        public Counter getCounter(Enum<?> name) {
375          return context.getCounter(name);
376        }
377    
378        @Override
379        public Counter getCounter(String group, String name) {
380          return context.getCounter(group, name);
381        }
382    
383        @Override
384        public void progress() {
385          context.progress();
386        }
387    
388        @Override
389        public float getProgress() {
390          return context.getProgress();
391        }
392        
393        @Override
394        public void setStatus(String status) {
395          context.setStatus(status);
396        }
397      }
398    
399      /**
400       * Internal nodetype for &quot;composite&quot; InputFormats.
401       */
402      static class CNode extends Node {
403    
404        private static final Class<?>[] cstrSig =
405          { Integer.TYPE, Configuration.class, Integer.TYPE, Class.class };
406    
407        @SuppressWarnings("unchecked")
408            static void addIdentifier(String ident,
409                                  Class<? extends ComposableRecordReader> cl)
410            throws NoSuchMethodException {
411          Node.addIdentifier(ident, cstrSig, CNode.class, cl);
412        }
413    
414        // inst
415        private ArrayList<Node> kids = new ArrayList<Node>();
416    
417        public CNode(String ident) {
418          super(ident);
419        }
420    
421        @Override
422        public void setKeyComparator(Class<? extends WritableComparator> cmpcl) {
423          super.setKeyComparator(cmpcl);
424          for (Node n : kids) {
425            n.setKeyComparator(cmpcl);
426          }
427        }
428    
429        /**
430         * Combine InputSplits from child InputFormats into a
431         * {@link CompositeInputSplit}.
432         */
433        @SuppressWarnings("unchecked")
434            public List<InputSplit> getSplits(JobContext job)
435            throws IOException, InterruptedException {
436          List<List<InputSplit>> splits = 
437            new ArrayList<List<InputSplit>>(kids.size());
438          for (int i = 0; i < kids.size(); ++i) {
439            List<InputSplit> tmp = kids.get(i).getSplits(job);
440            if (null == tmp) {
441              throw new IOException("Error gathering splits from child RReader");
442            }
443            if (i > 0 && splits.get(i-1).size() != tmp.size()) {
444              throw new IOException("Inconsistent split cardinality from child " +
445                  i + " (" + splits.get(i-1).size() + "/" + tmp.size() + ")");
446            }
447            splits.add(i, tmp);
448          }
449          final int size = splits.get(0).size();
450          List<InputSplit> ret = new ArrayList<InputSplit>();
451          for (int i = 0; i < size; ++i) {
452            CompositeInputSplit split = new CompositeInputSplit(splits.size());
453            for (int j = 0; j < splits.size(); ++j) {
454              split.add(splits.get(j).get(i));
455            }
456            ret.add(split);
457          }
458          return ret;
459        }
460    
461        @SuppressWarnings("unchecked") // child types unknowable
462        public ComposableRecordReader 
463            createRecordReader(InputSplit split, TaskAttemptContext taskContext) 
464            throws IOException, InterruptedException {
465          if (!(split instanceof CompositeInputSplit)) {
466            throw new IOException("Invalid split type:" +
467                                  split.getClass().getName());
468          }
469          final CompositeInputSplit spl = (CompositeInputSplit)split;
470          final int capacity = kids.size();
471          CompositeRecordReader ret = null;
472          try {
473            if (!rrCstrMap.containsKey(ident)) {
474              throw new IOException("No RecordReader for " + ident);
475            }
476            ret = (CompositeRecordReader)rrCstrMap.get(ident).
477              newInstance(id, taskContext.getConfiguration(), capacity, cmpcl);
478          } catch (IllegalAccessException e) {
479            throw new IOException(e);
480          } catch (InstantiationException e) {
481            throw new IOException(e);
482          } catch (InvocationTargetException e) {
483            throw new IOException(e);
484          }
485          for (int i = 0; i < capacity; ++i) {
486            ret.add(kids.get(i).createRecordReader(spl.get(i), taskContext));
487          }
488          return (ComposableRecordReader)ret;
489        }
490    
491        /**
492         * Parse a list of comma-separated nodes.
493         */
494        public void parse(List<Token> args, Configuration conf) 
495            throws IOException {
496          ListIterator<Token> i = args.listIterator();
497          while (i.hasNext()) {
498            Token t = i.next();
499            t.getNode().setID(i.previousIndex() >> 1);
500            kids.add(t.getNode());
501            if (i.hasNext() && !TType.COMMA.equals(i.next().getType())) {
502              throw new IOException("Expected ','");
503            }
504          }
505        }
506    
507        public String toString() {
508          StringBuilder sb = new StringBuilder();
509          sb.append(ident + "(");
510          for (Node n : kids) {
511            sb.append(n.toString() + ",");
512          }
513          sb.setCharAt(sb.length() - 1, ')');
514          return sb.toString();
515        }
516      }
517    
518      private static Token reduce(Stack<Token> st, Configuration conf) 
519          throws IOException {
520        LinkedList<Token> args = new LinkedList<Token>();
521        while (!st.isEmpty() && !TType.LPAREN.equals(st.peek().getType())) {
522          args.addFirst(st.pop());
523        }
524        if (st.isEmpty()) {
525          throw new IOException("Unmatched ')'");
526        }
527        st.pop();
528        if (st.isEmpty() || !TType.IDENT.equals(st.peek().getType())) {
529          throw new IOException("Identifier expected");
530        }
531        Node n = Node.forIdent(st.pop().getStr());
532        n.parse(args, conf);
533        return new NodeToken(n);
534      }
535    
536      /**
537       * Given an expression and an optional comparator, build a tree of
538       * InputFormats using the comparator to sort keys.
539       */
540      static Node parse(String expr, Configuration conf) throws IOException {
541        if (null == expr) {
542          throw new IOException("Expression is null");
543        }
544        Class<? extends WritableComparator> cmpcl = conf.getClass(
545          CompositeInputFormat.JOIN_COMPARATOR, null, WritableComparator.class);
546        Lexer lex = new Lexer(expr);
547        Stack<Token> st = new Stack<Token>();
548        Token tok;
549        while ((tok = lex.next()) != null) {
550          if (TType.RPAREN.equals(tok.getType())) {
551            st.push(reduce(st, conf));
552          } else {
553            st.push(tok);
554          }
555        }
556        if (st.size() == 1 && TType.CIF.equals(st.peek().getType())) {
557          Node ret = st.pop().getNode();
558          if (cmpcl != null) {
559            ret.setKeyComparator(cmpcl);
560          }
561          return ret;
562        }
563        throw new IOException("Missing ')'");
564      }
565    
566    }