001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.lib.db;
020
021import java.io.DataInput;
022import java.io.DataOutput;
023import java.io.IOException;
024import java.sql.Connection;
025import java.sql.DatabaseMetaData;
026import java.sql.PreparedStatement;
027import java.sql.ResultSet;
028import java.sql.SQLException;
029import java.sql.Statement;
030import java.sql.Types;
031import java.util.ArrayList;
032import java.util.List;
033
034import org.apache.commons.logging.Log;
035import org.apache.commons.logging.LogFactory;
036
037import org.apache.hadoop.io.LongWritable;
038import org.apache.hadoop.io.Text;
039import org.apache.hadoop.io.Writable;
040import org.apache.hadoop.mapreduce.InputFormat;
041import org.apache.hadoop.mapreduce.InputSplit;
042import org.apache.hadoop.mapreduce.Job;
043import org.apache.hadoop.mapreduce.JobContext;
044import org.apache.hadoop.mapreduce.MRJobConfig;
045import org.apache.hadoop.mapreduce.RecordReader;
046import org.apache.hadoop.mapreduce.TaskAttemptContext;
047import org.apache.hadoop.util.ReflectionUtils;
048import org.apache.hadoop.classification.InterfaceAudience;
049import org.apache.hadoop.classification.InterfaceStability;
050import org.apache.hadoop.conf.Configurable;
051import org.apache.hadoop.conf.Configuration;
052
053/**
054 * A InputFormat that reads input data from an SQL table.
055 * Operates like DBInputFormat, but instead of using LIMIT and OFFSET to demarcate
056 * splits, it tries to generate WHERE clauses which separate the data into roughly
057 * equivalent shards.
058 */
059@InterfaceAudience.Public
060@InterfaceStability.Evolving
061public class DataDrivenDBInputFormat<T extends DBWritable>
062    extends DBInputFormat<T> implements Configurable {
063
064  private static final Log LOG = LogFactory.getLog(DataDrivenDBInputFormat.class);
065
066  /** If users are providing their own query, the following string is expected to
067      appear in the WHERE clause, which will be substituted with a pair of conditions
068      on the input to allow input splits to parallelise the import. */
069  public static final String SUBSTITUTE_TOKEN = "$CONDITIONS";
070
071  /**
072   * A InputSplit that spans a set of rows
073   */
074  @InterfaceStability.Evolving
075  public static class DataDrivenDBInputSplit extends DBInputFormat.DBInputSplit {
076
077    private String lowerBoundClause;
078    private String upperBoundClause;
079
080    /**
081     * Default Constructor
082     */
083    public DataDrivenDBInputSplit() {
084    }
085
086    /**
087     * Convenience Constructor
088     * @param lower the string to be put in the WHERE clause to guard on the 'lower' end
089     * @param upper the string to be put in the WHERE clause to guard on the 'upper' end
090     */
091    public DataDrivenDBInputSplit(final String lower, final String upper) {
092      this.lowerBoundClause = lower;
093      this.upperBoundClause = upper;
094    }
095
096
097    /**
098     * @return The total row count in this split
099     */
100    public long getLength() throws IOException {
101      return 0; // unfortunately, we don't know this.
102    }
103
104    /** {@inheritDoc} */
105    public void readFields(DataInput input) throws IOException {
106      this.lowerBoundClause = Text.readString(input);
107      this.upperBoundClause = Text.readString(input);
108    }
109
110    /** {@inheritDoc} */
111    public void write(DataOutput output) throws IOException {
112      Text.writeString(output, this.lowerBoundClause);
113      Text.writeString(output, this.upperBoundClause);
114    }
115
116    public String getLowerClause() {
117      return lowerBoundClause;
118    }
119
120    public String getUpperClause() {
121      return upperBoundClause;
122    }
123  }
124
125  /**
126   * @return the DBSplitter implementation to use to divide the table/query into InputSplits.
127   */
128  protected DBSplitter getSplitter(int sqlDataType) {
129    switch (sqlDataType) {
130    case Types.NUMERIC:
131    case Types.DECIMAL:
132      return new BigDecimalSplitter();
133
134    case Types.BIT:
135    case Types.BOOLEAN:
136      return new BooleanSplitter();
137
138    case Types.INTEGER:
139    case Types.TINYINT:
140    case Types.SMALLINT:
141    case Types.BIGINT:
142      return new IntegerSplitter();
143
144    case Types.REAL:
145    case Types.FLOAT:
146    case Types.DOUBLE:
147      return new FloatSplitter();
148
149    case Types.CHAR:
150    case Types.VARCHAR:
151    case Types.LONGVARCHAR:
152      return new TextSplitter();
153
154    case Types.DATE:
155    case Types.TIME:
156    case Types.TIMESTAMP:
157      return new DateSplitter();
158
159    default:
160      // TODO: Support BINARY, VARBINARY, LONGVARBINARY, DISTINCT, CLOB, BLOB, ARRAY
161      // STRUCT, REF, DATALINK, and JAVA_OBJECT.
162      return null;
163    }
164  }
165
166  /** {@inheritDoc} */
167  public List<InputSplit> getSplits(JobContext job) throws IOException {
168
169    int targetNumTasks = job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1);
170    if (1 == targetNumTasks) {
171      // There's no need to run a bounding vals query; just return a split
172      // that separates nothing. This can be considerably more optimal for a
173      // large table with no index.
174      List<InputSplit> singletonSplit = new ArrayList<InputSplit>();
175      singletonSplit.add(new DataDrivenDBInputSplit("1=1", "1=1"));
176      return singletonSplit;
177    }
178
179    ResultSet results = null;
180    Statement statement = null;
181    Connection connection = getConnection();
182    try {
183      statement = connection.createStatement();
184
185      results = statement.executeQuery(getBoundingValsQuery());
186      results.next();
187
188      // Based on the type of the results, use a different mechanism
189      // for interpolating split points (i.e., numeric splits, text splits,
190      // dates, etc.)
191      int sqlDataType = results.getMetaData().getColumnType(1);
192      DBSplitter splitter = getSplitter(sqlDataType);
193      if (null == splitter) {
194        throw new IOException("Unknown SQL data type: " + sqlDataType);
195      }
196
197      return splitter.split(job.getConfiguration(), results, getDBConf().getInputOrderBy());
198    } catch (SQLException e) {
199      throw new IOException(e.getMessage());
200    } finally {
201      // More-or-less ignore SQL exceptions here, but log in case we need it.
202      try {
203        if (null != results) {
204          results.close();
205        }
206      } catch (SQLException se) {
207        LOG.debug("SQLException closing resultset: " + se.toString());
208      }
209
210      try {
211        if (null != statement) {
212          statement.close();
213        }
214      } catch (SQLException se) {
215        LOG.debug("SQLException closing statement: " + se.toString());
216      }
217
218      try {
219        connection.commit();
220        closeConnection();
221      } catch (SQLException se) {
222        LOG.debug("SQLException committing split transaction: " + se.toString());
223      }
224    }
225  }
226
227  /**
228   * @return a query which returns the minimum and maximum values for
229   * the order-by column.
230   *
231   * The min value should be in the first column, and the
232   * max value should be in the second column of the results.
233   */
234  protected String getBoundingValsQuery() {
235    // If the user has provided a query, use that instead.
236    String userQuery = getDBConf().getInputBoundingQuery();
237    if (null != userQuery) {
238      return userQuery;
239    }
240
241    // Auto-generate one based on the table name we've been provided with.
242    StringBuilder query = new StringBuilder();
243
244    String splitCol = getDBConf().getInputOrderBy();
245    query.append("SELECT MIN(").append(splitCol).append("), ");
246    query.append("MAX(").append(splitCol).append(") FROM ");
247    query.append(getDBConf().getInputTableName());
248    String conditions = getDBConf().getInputConditions();
249    if (null != conditions) {
250      query.append(" WHERE ( " + conditions + " )");
251    }
252
253    return query.toString();
254  }
255
256  /** Set the user-defined bounding query to use with a user-defined query.
257      This *must* include the substring "$CONDITIONS"
258      (DataDrivenDBInputFormat.SUBSTITUTE_TOKEN) inside the WHERE clause,
259      so that DataDrivenDBInputFormat knows where to insert split clauses.
260      e.g., "SELECT foo FROM mytable WHERE $CONDITIONS"
261      This will be expanded to something like:
262        SELECT foo FROM mytable WHERE (id &gt; 100) AND (id &lt; 250)
263      inside each split.
264    */
265  public static void setBoundingQuery(Configuration conf, String query) {
266    if (null != query) {
267      // If the user's settng a query, warn if they don't allow conditions.
268      if (query.indexOf(SUBSTITUTE_TOKEN) == -1) {
269        LOG.warn("Could not find " + SUBSTITUTE_TOKEN + " token in query: " + query
270            + "; splits may not partition data.");
271      }
272    }
273
274    conf.set(DBConfiguration.INPUT_BOUNDING_QUERY, query);
275  }
276
277  protected RecordReader<LongWritable, T> createDBRecordReader(DBInputSplit split,
278      Configuration conf) throws IOException {
279
280    DBConfiguration dbConf = getDBConf();
281    @SuppressWarnings("unchecked")
282    Class<T> inputClass = (Class<T>) (dbConf.getInputClass());
283    String dbProductName = getDBProductName();
284
285    LOG.debug("Creating db record reader for db product: " + dbProductName);
286
287    try {
288      // use database product name to determine appropriate record reader.
289      if (dbProductName.startsWith("MYSQL")) {
290        // use MySQL-specific db reader.
291        return new MySQLDataDrivenDBRecordReader<T>(split, inputClass,
292            conf, getConnection(), dbConf, dbConf.getInputConditions(),
293            dbConf.getInputFieldNames(), dbConf.getInputTableName());
294      } else {
295        // Generic reader.
296        return new DataDrivenDBRecordReader<T>(split, inputClass,
297            conf, getConnection(), dbConf, dbConf.getInputConditions(),
298            dbConf.getInputFieldNames(), dbConf.getInputTableName(),
299            dbProductName);
300      }
301    } catch (SQLException ex) {
302      throw new IOException(ex.getMessage());
303    }
304  }
305
306  // Configuration methods override superclass to ensure that the proper
307  // DataDrivenDBInputFormat gets used.
308
309  /** Note that the "orderBy" column is called the "splitBy" in this version.
310    * We reuse the same field, but it's not strictly ordering it -- just partitioning
311    * the results.
312    */
313  public static void setInput(Job job, 
314      Class<? extends DBWritable> inputClass,
315      String tableName,String conditions, 
316      String splitBy, String... fieldNames) {
317    DBInputFormat.setInput(job, inputClass, tableName, conditions, splitBy, fieldNames);
318    job.setInputFormatClass(DataDrivenDBInputFormat.class);
319  }
320
321  /** setInput() takes a custom query and a separate "bounding query" to use
322      instead of the custom "count query" used by DBInputFormat.
323    */
324  public static void setInput(Job job,
325      Class<? extends DBWritable> inputClass,
326      String inputQuery, String inputBoundingQuery) {
327    DBInputFormat.setInput(job, inputClass, inputQuery, "");
328    job.getConfiguration().set(DBConfiguration.INPUT_BOUNDING_QUERY, inputBoundingQuery);
329    job.setInputFormatClass(DataDrivenDBInputFormat.class);
330  }
331}