001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapred.join; 020 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Map; 024import java.util.regex.Matcher; 025import java.util.regex.Pattern; 026 027import org.apache.hadoop.classification.InterfaceAudience; 028import org.apache.hadoop.classification.InterfaceStability; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.io.WritableComparable; 031import org.apache.hadoop.mapred.InputFormat; 032import org.apache.hadoop.mapred.InputSplit; 033import org.apache.hadoop.mapred.JobConf; 034import org.apache.hadoop.mapred.Reporter; 035 036/** 037 * An InputFormat capable of performing joins over a set of data sources sorted 038 * and partitioned the same way. 039 * 040 * A user may define new join types by setting the property 041 * <tt>mapred.join.define.<ident></tt> to a classname. In the expression 042 * <tt>mapred.join.expr</tt>, the identifier will be assumed to be a 043 * ComposableRecordReader. 044 * <tt>mapred.join.keycomparator</tt> can be a classname used to compare keys 045 * in the join. 046 * @see #setFormat 047 * @see JoinRecordReader 048 * @see MultiFilterRecordReader 049 */ 050@InterfaceAudience.Public 051@InterfaceStability.Stable 052public class CompositeInputFormat<K extends WritableComparable> 053 implements ComposableInputFormat<K,TupleWritable> { 054 055 // expression parse tree to which IF requests are proxied 056 private Parser.Node root; 057 058 public CompositeInputFormat() { } 059 060 061 /** 062 * Interpret a given string as a composite expression. 063 * {@code 064 * func ::= <ident>([<func>,]*<func>) 065 * func ::= tbl(<class>,"<path>") 066 * class ::= @see java.lang.Class#forName(java.lang.String) 067 * path ::= @see org.apache.hadoop.fs.Path#Path(java.lang.String) 068 * } 069 * Reads expression from the <tt>mapred.join.expr</tt> property and 070 * user-supplied join types from <tt>mapred.join.define.<ident></tt> 071 * types. Paths supplied to <tt>tbl</tt> are given as input paths to the 072 * InputFormat class listed. 073 * @see #compose(java.lang.String, java.lang.Class, java.lang.String...) 074 */ 075 public void setFormat(JobConf job) throws IOException { 076 addDefaults(); 077 addUserIdentifiers(job); 078 root = Parser.parse(job.get("mapred.join.expr", null), job); 079 } 080 081 /** 082 * Adds the default set of identifiers to the parser. 083 */ 084 protected void addDefaults() { 085 try { 086 Parser.CNode.addIdentifier("inner", InnerJoinRecordReader.class); 087 Parser.CNode.addIdentifier("outer", OuterJoinRecordReader.class); 088 Parser.CNode.addIdentifier("override", OverrideRecordReader.class); 089 Parser.WNode.addIdentifier("tbl", WrappedRecordReader.class); 090 } catch (NoSuchMethodException e) { 091 throw new RuntimeException("FATAL: Failed to init defaults", e); 092 } 093 } 094 095 /** 096 * Inform the parser of user-defined types. 097 */ 098 private void addUserIdentifiers(JobConf job) throws IOException { 099 Pattern x = Pattern.compile("^mapred\\.join\\.define\\.(\\w+)$"); 100 for (Map.Entry<String,String> kv : job) { 101 Matcher m = x.matcher(kv.getKey()); 102 if (m.matches()) { 103 try { 104 Parser.CNode.addIdentifier(m.group(1), 105 job.getClass(m.group(0), null, ComposableRecordReader.class)); 106 } catch (NoSuchMethodException e) { 107 throw (IOException)new IOException( 108 "Invalid define for " + m.group(1)).initCause(e); 109 } 110 } 111 } 112 } 113 114 /** 115 * Build a CompositeInputSplit from the child InputFormats by assigning the 116 * ith split from each child to the ith composite split. 117 */ 118 public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { 119 setFormat(job); 120 job.setLong("mapred.min.split.size", Long.MAX_VALUE); 121 return root.getSplits(job, numSplits); 122 } 123 124 /** 125 * Construct a CompositeRecordReader for the children of this InputFormat 126 * as defined in the init expression. 127 * The outermost join need only be composable, not necessarily a composite. 128 * Mandating TupleWritable isn't strictly correct. 129 */ 130 @SuppressWarnings("unchecked") // child types unknown 131 public ComposableRecordReader<K,TupleWritable> getRecordReader( 132 InputSplit split, JobConf job, Reporter reporter) throws IOException { 133 setFormat(job); 134 return root.getRecordReader(split, job, reporter); 135 } 136 137 /** 138 * Convenience method for constructing composite formats. 139 * Given InputFormat class (inf), path (p) return: 140 * {@code tbl(<inf>, <p>) } 141 */ 142 public static String compose(Class<? extends InputFormat> inf, String path) { 143 return compose(inf.getName().intern(), path, new StringBuffer()).toString(); 144 } 145 146 /** 147 * Convenience method for constructing composite formats. 148 * Given operation (op), Object class (inf), set of paths (p) return: 149 * {@code <op>(tbl(<inf>,<p1>),tbl(<inf>,<p2>),...,tbl(<inf>,<pn>)) } 150 */ 151 public static String compose(String op, Class<? extends InputFormat> inf, 152 String... path) { 153 final String infname = inf.getName(); 154 StringBuffer ret = new StringBuffer(op + '('); 155 for (String p : path) { 156 compose(infname, p, ret); 157 ret.append(','); 158 } 159 ret.setCharAt(ret.length() - 1, ')'); 160 return ret.toString(); 161 } 162 163 /** 164 * Convenience method for constructing composite formats. 165 * Given operation (op), Object class (inf), set of paths (p) return: 166 * {@code <op>(tbl(<inf>,<p1>),tbl(<inf>,<p2>),...,tbl(<inf>,<pn>)) } 167 */ 168 public static String compose(String op, Class<? extends InputFormat> inf, 169 Path... path) { 170 ArrayList<String> tmp = new ArrayList<String>(path.length); 171 for (Path p : path) { 172 tmp.add(p.toString()); 173 } 174 return compose(op, inf, tmp.toArray(new String[0])); 175 } 176 177 private static StringBuffer compose(String inf, String path, 178 StringBuffer sb) { 179 sb.append("tbl(" + inf + ",\""); 180 sb.append(path); 181 sb.append("\")"); 182 return sb; 183 } 184 185}