001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapreduce.lib.db;
020
021 import java.io.DataInput;
022 import java.io.DataOutput;
023 import java.io.IOException;
024 import java.sql.Connection;
025 import java.sql.DatabaseMetaData;
026 import java.sql.PreparedStatement;
027 import java.sql.ResultSet;
028 import java.sql.SQLException;
029 import java.sql.Statement;
030 import java.sql.Types;
031 import java.util.ArrayList;
032 import java.util.List;
033
034 import org.apache.commons.logging.Log;
035 import org.apache.commons.logging.LogFactory;
036
037 import org.apache.hadoop.io.LongWritable;
038 import org.apache.hadoop.io.Text;
039 import org.apache.hadoop.io.Writable;
040 import org.apache.hadoop.mapreduce.InputFormat;
041 import org.apache.hadoop.mapreduce.InputSplit;
042 import org.apache.hadoop.mapreduce.Job;
043 import org.apache.hadoop.mapreduce.JobContext;
044 import org.apache.hadoop.mapreduce.MRJobConfig;
045 import org.apache.hadoop.mapreduce.RecordReader;
046 import org.apache.hadoop.mapreduce.TaskAttemptContext;
047 import org.apache.hadoop.util.ReflectionUtils;
048 import org.apache.hadoop.classification.InterfaceAudience;
049 import org.apache.hadoop.classification.InterfaceStability;
050 import org.apache.hadoop.conf.Configurable;
051 import org.apache.hadoop.conf.Configuration;
052
053 /**
054 * A InputFormat that reads input data from an SQL table.
055 * Operates like DBInputFormat, but instead of using LIMIT and OFFSET to demarcate
056 * splits, it tries to generate WHERE clauses which separate the data into roughly
057 * equivalent shards.
058 */
059 @InterfaceAudience.Public
060 @InterfaceStability.Evolving
061 public class DataDrivenDBInputFormat<T extends DBWritable>
062 extends DBInputFormat<T> implements Configurable {
063
064 private static final Log LOG = LogFactory.getLog(DataDrivenDBInputFormat.class);
065
066 /** If users are providing their own query, the following string is expected to
067 appear in the WHERE clause, which will be substituted with a pair of conditions
068 on the input to allow input splits to parallelise the import. */
069 public static final String SUBSTITUTE_TOKEN = "$CONDITIONS";
070
071 /**
072 * A InputSplit that spans a set of rows
073 */
074 @InterfaceStability.Evolving
075 public static class DataDrivenDBInputSplit extends DBInputFormat.DBInputSplit {
076
077 private String lowerBoundClause;
078 private String upperBoundClause;
079
080 /**
081 * Default Constructor
082 */
083 public DataDrivenDBInputSplit() {
084 }
085
086 /**
087 * Convenience Constructor
088 * @param lower the string to be put in the WHERE clause to guard on the 'lower' end
089 * @param upper the string to be put in the WHERE clause to guard on the 'upper' end
090 */
091 public DataDrivenDBInputSplit(final String lower, final String upper) {
092 this.lowerBoundClause = lower;
093 this.upperBoundClause = upper;
094 }
095
096
097 /**
098 * @return The total row count in this split
099 */
100 public long getLength() throws IOException {
101 return 0; // unfortunately, we don't know this.
102 }
103
104 /** {@inheritDoc} */
105 public void readFields(DataInput input) throws IOException {
106 this.lowerBoundClause = Text.readString(input);
107 this.upperBoundClause = Text.readString(input);
108 }
109
110 /** {@inheritDoc} */
111 public void write(DataOutput output) throws IOException {
112 Text.writeString(output, this.lowerBoundClause);
113 Text.writeString(output, this.upperBoundClause);
114 }
115
116 public String getLowerClause() {
117 return lowerBoundClause;
118 }
119
120 public String getUpperClause() {
121 return upperBoundClause;
122 }
123 }
124
125 /**
126 * @return the DBSplitter implementation to use to divide the table/query into InputSplits.
127 */
128 protected DBSplitter getSplitter(int sqlDataType) {
129 switch (sqlDataType) {
130 case Types.NUMERIC:
131 case Types.DECIMAL:
132 return new BigDecimalSplitter();
133
134 case Types.BIT:
135 case Types.BOOLEAN:
136 return new BooleanSplitter();
137
138 case Types.INTEGER:
139 case Types.TINYINT:
140 case Types.SMALLINT:
141 case Types.BIGINT:
142 return new IntegerSplitter();
143
144 case Types.REAL:
145 case Types.FLOAT:
146 case Types.DOUBLE:
147 return new FloatSplitter();
148
149 case Types.CHAR:
150 case Types.VARCHAR:
151 case Types.LONGVARCHAR:
152 return new TextSplitter();
153
154 case Types.DATE:
155 case Types.TIME:
156 case Types.TIMESTAMP:
157 return new DateSplitter();
158
159 default:
160 // TODO: Support BINARY, VARBINARY, LONGVARBINARY, DISTINCT, CLOB, BLOB, ARRAY
161 // STRUCT, REF, DATALINK, and JAVA_OBJECT.
162 return null;
163 }
164 }
165
166 /** {@inheritDoc} */
167 public List<InputSplit> getSplits(JobContext job) throws IOException {
168
169 int targetNumTasks = job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1);
170 if (1 == targetNumTasks) {
171 // There's no need to run a bounding vals query; just return a split
172 // that separates nothing. This can be considerably more optimal for a
173 // large table with no index.
174 List<InputSplit> singletonSplit = new ArrayList<InputSplit>();
175 singletonSplit.add(new DataDrivenDBInputSplit("1=1", "1=1"));
176 return singletonSplit;
177 }
178
179 ResultSet results = null;
180 Statement statement = null;
181 Connection connection = getConnection();
182 try {
183 statement = connection.createStatement();
184
185 results = statement.executeQuery(getBoundingValsQuery());
186 results.next();
187
188 // Based on the type of the results, use a different mechanism
189 // for interpolating split points (i.e., numeric splits, text splits,
190 // dates, etc.)
191 int sqlDataType = results.getMetaData().getColumnType(1);
192 DBSplitter splitter = getSplitter(sqlDataType);
193 if (null == splitter) {
194 throw new IOException("Unknown SQL data type: " + sqlDataType);
195 }
196
197 return splitter.split(job.getConfiguration(), results, getDBConf().getInputOrderBy());
198 } catch (SQLException e) {
199 throw new IOException(e.getMessage());
200 } finally {
201 // More-or-less ignore SQL exceptions here, but log in case we need it.
202 try {
203 if (null != results) {
204 results.close();
205 }
206 } catch (SQLException se) {
207 LOG.debug("SQLException closing resultset: " + se.toString());
208 }
209
210 try {
211 if (null != statement) {
212 statement.close();
213 }
214 } catch (SQLException se) {
215 LOG.debug("SQLException closing statement: " + se.toString());
216 }
217
218 try {
219 connection.commit();
220 closeConnection();
221 } catch (SQLException se) {
222 LOG.debug("SQLException committing split transaction: " + se.toString());
223 }
224 }
225 }
226
227 /**
228 * @return a query which returns the minimum and maximum values for
229 * the order-by column.
230 *
231 * The min value should be in the first column, and the
232 * max value should be in the second column of the results.
233 */
234 protected String getBoundingValsQuery() {
235 // If the user has provided a query, use that instead.
236 String userQuery = getDBConf().getInputBoundingQuery();
237 if (null != userQuery) {
238 return userQuery;
239 }
240
241 // Auto-generate one based on the table name we've been provided with.
242 StringBuilder query = new StringBuilder();
243
244 String splitCol = getDBConf().getInputOrderBy();
245 query.append("SELECT MIN(").append(splitCol).append("), ");
246 query.append("MAX(").append(splitCol).append(") FROM ");
247 query.append(getDBConf().getInputTableName());
248 String conditions = getDBConf().getInputConditions();
249 if (null != conditions) {
250 query.append(" WHERE ( " + conditions + " )");
251 }
252
253 return query.toString();
254 }
255
256 /** Set the user-defined bounding query to use with a user-defined query.
257 This *must* include the substring "$CONDITIONS"
258 (DataDrivenDBInputFormat.SUBSTITUTE_TOKEN) inside the WHERE clause,
259 so that DataDrivenDBInputFormat knows where to insert split clauses.
260 e.g., "SELECT foo FROM mytable WHERE $CONDITIONS"
261 This will be expanded to something like:
262 SELECT foo FROM mytable WHERE (id > 100) AND (id < 250)
263 inside each split.
264 */
265 public static void setBoundingQuery(Configuration conf, String query) {
266 if (null != query) {
267 // If the user's settng a query, warn if they don't allow conditions.
268 if (query.indexOf(SUBSTITUTE_TOKEN) == -1) {
269 LOG.warn("Could not find " + SUBSTITUTE_TOKEN + " token in query: " + query
270 + "; splits may not partition data.");
271 }
272 }
273
274 conf.set(DBConfiguration.INPUT_BOUNDING_QUERY, query);
275 }
276
277 protected RecordReader<LongWritable, T> createDBRecordReader(DBInputSplit split,
278 Configuration conf) throws IOException {
279
280 DBConfiguration dbConf = getDBConf();
281 @SuppressWarnings("unchecked")
282 Class<T> inputClass = (Class<T>) (dbConf.getInputClass());
283 String dbProductName = getDBProductName();
284
285 LOG.debug("Creating db record reader for db product: " + dbProductName);
286
287 try {
288 // use database product name to determine appropriate record reader.
289 if (dbProductName.startsWith("MYSQL")) {
290 // use MySQL-specific db reader.
291 return new MySQLDataDrivenDBRecordReader<T>(split, inputClass,
292 conf, getConnection(), dbConf, dbConf.getInputConditions(),
293 dbConf.getInputFieldNames(), dbConf.getInputTableName());
294 } else {
295 // Generic reader.
296 return new DataDrivenDBRecordReader<T>(split, inputClass,
297 conf, getConnection(), dbConf, dbConf.getInputConditions(),
298 dbConf.getInputFieldNames(), dbConf.getInputTableName(),
299 dbProductName);
300 }
301 } catch (SQLException ex) {
302 throw new IOException(ex.getMessage());
303 }
304 }
305
306 // Configuration methods override superclass to ensure that the proper
307 // DataDrivenDBInputFormat gets used.
308
309 /** Note that the "orderBy" column is called the "splitBy" in this version.
310 * We reuse the same field, but it's not strictly ordering it -- just partitioning
311 * the results.
312 */
313 public static void setInput(Job job,
314 Class<? extends DBWritable> inputClass,
315 String tableName,String conditions,
316 String splitBy, String... fieldNames) {
317 DBInputFormat.setInput(job, inputClass, tableName, conditions, splitBy, fieldNames);
318 job.setInputFormatClass(DataDrivenDBInputFormat.class);
319 }
320
321 /** setInput() takes a custom query and a separate "bounding query" to use
322 instead of the custom "count query" used by DBInputFormat.
323 */
324 public static void setInput(Job job,
325 Class<? extends DBWritable> inputClass,
326 String inputQuery, String inputBoundingQuery) {
327 DBInputFormat.setInput(job, inputClass, inputQuery, "");
328 job.getConfiguration().set(DBConfiguration.INPUT_BOUNDING_QUERY, inputBoundingQuery);
329 job.setInputFormatClass(DataDrivenDBInputFormat.class);
330 }
331 }