001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapred.join;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Map;
024import java.util.regex.Matcher;
025import java.util.regex.Pattern;
026
027import org.apache.hadoop.classification.InterfaceAudience;
028import org.apache.hadoop.classification.InterfaceStability;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.io.WritableComparable;
031import org.apache.hadoop.mapred.InputFormat;
032import org.apache.hadoop.mapred.InputSplit;
033import org.apache.hadoop.mapred.JobConf;
034import org.apache.hadoop.mapred.Reporter;
035
036/**
037 * An InputFormat capable of performing joins over a set of data sources sorted
038 * and partitioned the same way.
039 *
040 * A user may define new join types by setting the property
041 * <tt>mapred.join.define.&lt;ident&gt;</tt> to a classname. In the expression
042 * <tt>mapred.join.expr</tt>, the identifier will be assumed to be a
043 * ComposableRecordReader.
044 * <tt>mapred.join.keycomparator</tt> can be a classname used to compare keys
045 * in the join.
046 * @see #setFormat
047 * @see JoinRecordReader
048 * @see MultiFilterRecordReader
049 */
050@InterfaceAudience.Public
051@InterfaceStability.Stable
052public class CompositeInputFormat<K extends WritableComparable>
053      implements ComposableInputFormat<K,TupleWritable> {
054
055  // expression parse tree to which IF requests are proxied
056  private Parser.Node root;
057
058  public CompositeInputFormat() { }
059
060
061  /**
062   * Interpret a given string as a composite expression.
063   * {@code
064   *   func  ::= <ident>([<func>,]*<func>)
065   *   func  ::= tbl(<class>,"<path>")
066   *   class ::= @see java.lang.Class#forName(java.lang.String)
067   *   path  ::= @see org.apache.hadoop.fs.Path#Path(java.lang.String)
068   * }
069   * Reads expression from the <tt>mapred.join.expr</tt> property and
070   * user-supplied join types from <tt>mapred.join.define.&lt;ident&gt;</tt>
071   *  types. Paths supplied to <tt>tbl</tt> are given as input paths to the
072   * InputFormat class listed.
073   * @see #compose(java.lang.String, java.lang.Class, java.lang.String...)
074   */
075  public void setFormat(JobConf job) throws IOException {
076    addDefaults();
077    addUserIdentifiers(job);
078    root = Parser.parse(job.get("mapred.join.expr", null), job);
079  }
080
081  /**
082   * Adds the default set of identifiers to the parser.
083   */
084  protected void addDefaults() {
085    try {
086      Parser.CNode.addIdentifier("inner", InnerJoinRecordReader.class);
087      Parser.CNode.addIdentifier("outer", OuterJoinRecordReader.class);
088      Parser.CNode.addIdentifier("override", OverrideRecordReader.class);
089      Parser.WNode.addIdentifier("tbl", WrappedRecordReader.class);
090    } catch (NoSuchMethodException e) {
091      throw new RuntimeException("FATAL: Failed to init defaults", e);
092    }
093  }
094
095  /**
096   * Inform the parser of user-defined types.
097   */
098  private void addUserIdentifiers(JobConf job) throws IOException {
099    Pattern x = Pattern.compile("^mapred\\.join\\.define\\.(\\w+)$");
100    for (Map.Entry<String,String> kv : job) {
101      Matcher m = x.matcher(kv.getKey());
102      if (m.matches()) {
103        try {
104          Parser.CNode.addIdentifier(m.group(1),
105              job.getClass(m.group(0), null, ComposableRecordReader.class));
106        } catch (NoSuchMethodException e) {
107          throw (IOException)new IOException(
108              "Invalid define for " + m.group(1)).initCause(e);
109        }
110      }
111    }
112  }
113
114  /**
115   * Build a CompositeInputSplit from the child InputFormats by assigning the
116   * ith split from each child to the ith composite split.
117   */
118  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
119    setFormat(job);
120    job.setLong("mapred.min.split.size", Long.MAX_VALUE);
121    return root.getSplits(job, numSplits);
122  }
123
124  /**
125   * Construct a CompositeRecordReader for the children of this InputFormat
126   * as defined in the init expression.
127   * The outermost join need only be composable, not necessarily a composite.
128   * Mandating TupleWritable isn't strictly correct.
129   */
130  @SuppressWarnings("unchecked") // child types unknown
131  public ComposableRecordReader<K,TupleWritable> getRecordReader(
132      InputSplit split, JobConf job, Reporter reporter) throws IOException {
133    setFormat(job);
134    return root.getRecordReader(split, job, reporter);
135  }
136
137  /**
138   * Convenience method for constructing composite formats.
139   * Given InputFormat class (inf), path (p) return:
140   * {@code tbl(<inf>, <p>) }
141   */
142  public static String compose(Class<? extends InputFormat> inf, String path) {
143    return compose(inf.getName().intern(), path, new StringBuffer()).toString();
144  }
145
146  /**
147   * Convenience method for constructing composite formats.
148   * Given operation (op), Object class (inf), set of paths (p) return:
149   * {@code <op>(tbl(<inf>,<p1>),tbl(<inf>,<p2>),...,tbl(<inf>,<pn>)) }
150   */
151  public static String compose(String op, Class<? extends InputFormat> inf,
152      String... path) {
153    final String infname = inf.getName();
154    StringBuffer ret = new StringBuffer(op + '(');
155    for (String p : path) {
156      compose(infname, p, ret);
157      ret.append(',');
158    }
159    ret.setCharAt(ret.length() - 1, ')');
160    return ret.toString();
161  }
162
163  /**
164   * Convenience method for constructing composite formats.
165   * Given operation (op), Object class (inf), set of paths (p) return:
166   * {@code <op>(tbl(<inf>,<p1>),tbl(<inf>,<p2>),...,tbl(<inf>,<pn>)) }
167   */
168  public static String compose(String op, Class<? extends InputFormat> inf,
169      Path... path) {
170    ArrayList<String> tmp = new ArrayList<String>(path.length);
171    for (Path p : path) {
172      tmp.add(p.toString());
173    }
174    return compose(op, inf, tmp.toArray(new String[0]));
175  }
176
177  private static StringBuffer compose(String inf, String path,
178      StringBuffer sb) {
179    sb.append("tbl(" + inf + ",\"");
180    sb.append(path);
181    sb.append("\")");
182    return sb;
183  }
184
185}