001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.lib.db;
020
021import java.io.DataInput;
022import java.io.DataOutput;
023import java.io.IOException;
024import java.sql.Connection;
025import java.sql.DatabaseMetaData;
026import java.sql.PreparedStatement;
027import java.sql.ResultSet;
028import java.sql.SQLException;
029import java.sql.Statement;
030import java.sql.Types;
031import java.util.ArrayList;
032import java.util.List;
033
034import org.apache.commons.logging.Log;
035import org.apache.commons.logging.LogFactory;
036
037import org.apache.hadoop.io.LongWritable;
038import org.apache.hadoop.io.Text;
039import org.apache.hadoop.io.Writable;
040import org.apache.hadoop.mapreduce.InputFormat;
041import org.apache.hadoop.mapreduce.InputSplit;
042import org.apache.hadoop.mapreduce.Job;
043import org.apache.hadoop.mapreduce.JobContext;
044import org.apache.hadoop.mapreduce.MRJobConfig;
045import org.apache.hadoop.mapreduce.RecordReader;
046import org.apache.hadoop.mapreduce.TaskAttemptContext;
047import org.apache.hadoop.util.ReflectionUtils;
048import org.apache.hadoop.classification.InterfaceAudience;
049import org.apache.hadoop.classification.InterfaceStability;
050import org.apache.hadoop.conf.Configurable;
051import org.apache.hadoop.conf.Configuration;
052
053/**
054 * A InputFormat that reads input data from an SQL table.
055 * Operates like DBInputFormat, but instead of using LIMIT and OFFSET to demarcate
056 * splits, it tries to generate WHERE clauses which separate the data into roughly
057 * equivalent shards.
058 */
059@InterfaceAudience.Public
060@InterfaceStability.Evolving
061public class DataDrivenDBInputFormat<T extends DBWritable>
062    extends DBInputFormat<T> implements Configurable {
063
064  private static final Log LOG = LogFactory.getLog(DataDrivenDBInputFormat.class);
065
066  /** If users are providing their own query, the following string is expected to
067      appear in the WHERE clause, which will be substituted with a pair of conditions
068      on the input to allow input splits to parallelise the import. */
069  public static final String SUBSTITUTE_TOKEN = "$CONDITIONS";
070
071  /**
072   * A InputSplit that spans a set of rows
073   */
074  @InterfaceStability.Evolving
075  public static class DataDrivenDBInputSplit extends DBInputFormat.DBInputSplit {
076
077    private String lowerBoundClause;
078    private String upperBoundClause;
079
080    /**
081     * Default Constructor
082     */
083    public DataDrivenDBInputSplit() {
084    }
085
086    /**
087     * Convenience Constructor
088     * @param lower the string to be put in the WHERE clause to guard on the 'lower' end
089     * @param upper the string to be put in the WHERE clause to guard on the 'upper' end
090     */
091    public DataDrivenDBInputSplit(final String lower, final String upper) {
092      this.lowerBoundClause = lower;
093      this.upperBoundClause = upper;
094    }
095
096
097    /**
098     * @return The total row count in this split
099     */
100    public long getLength() throws IOException {
101      return 0; // unfortunately, we don't know this.
102    }
103
104    /** {@inheritDoc} */
105    public void readFields(DataInput input) throws IOException {
106      this.lowerBoundClause = Text.readString(input);
107      this.upperBoundClause = Text.readString(input);
108    }
109
110    /** {@inheritDoc} */
111    public void write(DataOutput output) throws IOException {
112      Text.writeString(output, this.lowerBoundClause);
113      Text.writeString(output, this.upperBoundClause);
114    }
115
116    public String getLowerClause() {
117      return lowerBoundClause;
118    }
119
120    public String getUpperClause() {
121      return upperBoundClause;
122    }
123  }
124
125  /**
126   * @return the DBSplitter implementation to use to divide the table/query into InputSplits.
127   */
128  protected DBSplitter getSplitter(int sqlDataType) {
129    switch (sqlDataType) {
130    case Types.NUMERIC:
131    case Types.DECIMAL:
132      return new BigDecimalSplitter();
133
134    case Types.BIT:
135    case Types.BOOLEAN:
136      return new BooleanSplitter();
137
138    case Types.INTEGER:
139    case Types.TINYINT:
140    case Types.SMALLINT:
141    case Types.BIGINT:
142      return new IntegerSplitter();
143
144    case Types.REAL:
145    case Types.FLOAT:
146    case Types.DOUBLE:
147      return new FloatSplitter();
148
149    case Types.CHAR:
150    case Types.VARCHAR:
151    case Types.LONGVARCHAR:
152      return new TextSplitter();
153
154    case Types.DATE:
155    case Types.TIME:
156    case Types.TIMESTAMP:
157      return new DateSplitter();
158
159    default:
160      // TODO: Support BINARY, VARBINARY, LONGVARBINARY, DISTINCT, CLOB, BLOB, ARRAY
161      // STRUCT, REF, DATALINK, and JAVA_OBJECT.
162      return null;
163    }
164  }
165
166  /** {@inheritDoc} */
167  public List<InputSplit> getSplits(JobContext job) throws IOException {
168
169    int targetNumTasks = job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1);
170    if (1 == targetNumTasks) {
171      // There's no need to run a bounding vals query; just return a split
172      // that separates nothing. This can be considerably more optimal for a
173      // large table with no index.
174      List<InputSplit> singletonSplit = new ArrayList<InputSplit>();
175      singletonSplit.add(new DataDrivenDBInputSplit("1=1", "1=1"));
176      return singletonSplit;
177    }
178
179    ResultSet results = null;
180    Statement statement = null;
181    try {
182      statement = connection.createStatement();
183
184      results = statement.executeQuery(getBoundingValsQuery());
185      results.next();
186
187      // Based on the type of the results, use a different mechanism
188      // for interpolating split points (i.e., numeric splits, text splits,
189      // dates, etc.)
190      int sqlDataType = results.getMetaData().getColumnType(1);
191      DBSplitter splitter = getSplitter(sqlDataType);
192      if (null == splitter) {
193        throw new IOException("Unknown SQL data type: " + sqlDataType);
194      }
195
196      return splitter.split(job.getConfiguration(), results, getDBConf().getInputOrderBy());
197    } catch (SQLException e) {
198      throw new IOException(e.getMessage());
199    } finally {
200      // More-or-less ignore SQL exceptions here, but log in case we need it.
201      try {
202        if (null != results) {
203          results.close();
204        }
205      } catch (SQLException se) {
206        LOG.debug("SQLException closing resultset: " + se.toString());
207      }
208
209      try {
210        if (null != statement) {
211          statement.close();
212        }
213      } catch (SQLException se) {
214        LOG.debug("SQLException closing statement: " + se.toString());
215      }
216
217      try {
218        connection.commit();
219        closeConnection();
220      } catch (SQLException se) {
221        LOG.debug("SQLException committing split transaction: " + se.toString());
222      }
223    }
224  }
225
226  /**
227   * @return a query which returns the minimum and maximum values for
228   * the order-by column.
229   *
230   * The min value should be in the first column, and the
231   * max value should be in the second column of the results.
232   */
233  protected String getBoundingValsQuery() {
234    // If the user has provided a query, use that instead.
235    String userQuery = getDBConf().getInputBoundingQuery();
236    if (null != userQuery) {
237      return userQuery;
238    }
239
240    // Auto-generate one based on the table name we've been provided with.
241    StringBuilder query = new StringBuilder();
242
243    String splitCol = getDBConf().getInputOrderBy();
244    query.append("SELECT MIN(").append(splitCol).append("), ");
245    query.append("MAX(").append(splitCol).append(") FROM ");
246    query.append(getDBConf().getInputTableName());
247    String conditions = getDBConf().getInputConditions();
248    if (null != conditions) {
249      query.append(" WHERE ( " + conditions + " )");
250    }
251
252    return query.toString();
253  }
254
255  /** Set the user-defined bounding query to use with a user-defined query.
256      This *must* include the substring "$CONDITIONS"
257      (DataDrivenDBInputFormat.SUBSTITUTE_TOKEN) inside the WHERE clause,
258      so that DataDrivenDBInputFormat knows where to insert split clauses.
259      e.g., "SELECT foo FROM mytable WHERE $CONDITIONS"
260      This will be expanded to something like:
261        SELECT foo FROM mytable WHERE (id &gt; 100) AND (id &lt; 250)
262      inside each split.
263    */
264  public static void setBoundingQuery(Configuration conf, String query) {
265    if (null != query) {
266      // If the user's settng a query, warn if they don't allow conditions.
267      if (query.indexOf(SUBSTITUTE_TOKEN) == -1) {
268        LOG.warn("Could not find " + SUBSTITUTE_TOKEN + " token in query: " + query
269            + "; splits may not partition data.");
270      }
271    }
272
273    conf.set(DBConfiguration.INPUT_BOUNDING_QUERY, query);
274  }
275
276  protected RecordReader<LongWritable, T> createDBRecordReader(DBInputSplit split,
277      Configuration conf) throws IOException {
278
279    DBConfiguration dbConf = getDBConf();
280    @SuppressWarnings("unchecked")
281    Class<T> inputClass = (Class<T>) (dbConf.getInputClass());
282    String dbProductName = getDBProductName();
283
284    LOG.debug("Creating db record reader for db product: " + dbProductName);
285
286    try {
287      // use database product name to determine appropriate record reader.
288      if (dbProductName.startsWith("MYSQL")) {
289        // use MySQL-specific db reader.
290        return new MySQLDataDrivenDBRecordReader<T>(split, inputClass,
291            conf, createConnection(), dbConf, dbConf.getInputConditions(),
292            dbConf.getInputFieldNames(), dbConf.getInputTableName());
293      } else {
294        // Generic reader.
295        return new DataDrivenDBRecordReader<T>(split, inputClass,
296            conf, createConnection(), dbConf, dbConf.getInputConditions(),
297            dbConf.getInputFieldNames(), dbConf.getInputTableName(),
298            dbProductName);
299      }
300    } catch (SQLException ex) {
301      throw new IOException(ex.getMessage());
302    }
303  }
304
305  // Configuration methods override superclass to ensure that the proper
306  // DataDrivenDBInputFormat gets used.
307
308  /** Note that the "orderBy" column is called the "splitBy" in this version.
309    * We reuse the same field, but it's not strictly ordering it -- just partitioning
310    * the results.
311    */
312  public static void setInput(Job job, 
313      Class<? extends DBWritable> inputClass,
314      String tableName,String conditions, 
315      String splitBy, String... fieldNames) {
316    DBInputFormat.setInput(job, inputClass, tableName, conditions, splitBy, fieldNames);
317    job.setInputFormatClass(DataDrivenDBInputFormat.class);
318  }
319
320  /** setInput() takes a custom query and a separate "bounding query" to use
321      instead of the custom "count query" used by DBInputFormat.
322    */
323  public static void setInput(Job job,
324      Class<? extends DBWritable> inputClass,
325      String inputQuery, String inputBoundingQuery) {
326    DBInputFormat.setInput(job, inputClass, inputQuery, "");
327    job.getConfiguration().set(DBConfiguration.INPUT_BOUNDING_QUERY, inputBoundingQuery);
328    job.setInputFormatClass(DataDrivenDBInputFormat.class);
329  }
330}