001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.lib.db;
020    
021    import java.io.DataInput;
022    import java.io.DataOutput;
023    import java.io.IOException;
024    import java.sql.Connection;
025    import java.sql.DatabaseMetaData;
026    import java.sql.PreparedStatement;
027    import java.sql.ResultSet;
028    import java.sql.SQLException;
029    import java.sql.Statement;
030    import java.sql.Types;
031    import java.util.ArrayList;
032    import java.util.List;
033    
034    import org.apache.commons.logging.Log;
035    import org.apache.commons.logging.LogFactory;
036    
037    import org.apache.hadoop.io.LongWritable;
038    import org.apache.hadoop.io.Text;
039    import org.apache.hadoop.io.Writable;
040    import org.apache.hadoop.mapreduce.InputFormat;
041    import org.apache.hadoop.mapreduce.InputSplit;
042    import org.apache.hadoop.mapreduce.Job;
043    import org.apache.hadoop.mapreduce.JobContext;
044    import org.apache.hadoop.mapreduce.MRJobConfig;
045    import org.apache.hadoop.mapreduce.RecordReader;
046    import org.apache.hadoop.mapreduce.TaskAttemptContext;
047    import org.apache.hadoop.util.ReflectionUtils;
048    import org.apache.hadoop.classification.InterfaceAudience;
049    import org.apache.hadoop.classification.InterfaceStability;
050    import org.apache.hadoop.conf.Configurable;
051    import org.apache.hadoop.conf.Configuration;
052    
053    /**
054     * A InputFormat that reads input data from an SQL table.
055     * Operates like DBInputFormat, but instead of using LIMIT and OFFSET to demarcate
056     * splits, it tries to generate WHERE clauses which separate the data into roughly
057     * equivalent shards.
058     */
059    @InterfaceAudience.Public
060    @InterfaceStability.Evolving
061    public class DataDrivenDBInputFormat<T extends DBWritable>
062        extends DBInputFormat<T> implements Configurable {
063    
064      private static final Log LOG = LogFactory.getLog(DataDrivenDBInputFormat.class);
065    
066      /** If users are providing their own query, the following string is expected to
067          appear in the WHERE clause, which will be substituted with a pair of conditions
068          on the input to allow input splits to parallelise the import. */
069      public static final String SUBSTITUTE_TOKEN = "$CONDITIONS";
070    
071      /**
072       * A InputSplit that spans a set of rows
073       */
074      @InterfaceStability.Evolving
075      public static class DataDrivenDBInputSplit extends DBInputFormat.DBInputSplit {
076    
077        private String lowerBoundClause;
078        private String upperBoundClause;
079    
080        /**
081         * Default Constructor
082         */
083        public DataDrivenDBInputSplit() {
084        }
085    
086        /**
087         * Convenience Constructor
088         * @param lower the string to be put in the WHERE clause to guard on the 'lower' end
089         * @param upper the string to be put in the WHERE clause to guard on the 'upper' end
090         */
091        public DataDrivenDBInputSplit(final String lower, final String upper) {
092          this.lowerBoundClause = lower;
093          this.upperBoundClause = upper;
094        }
095    
096    
097        /**
098         * @return The total row count in this split
099         */
100        public long getLength() throws IOException {
101          return 0; // unfortunately, we don't know this.
102        }
103    
104        /** {@inheritDoc} */
105        public void readFields(DataInput input) throws IOException {
106          this.lowerBoundClause = Text.readString(input);
107          this.upperBoundClause = Text.readString(input);
108        }
109    
110        /** {@inheritDoc} */
111        public void write(DataOutput output) throws IOException {
112          Text.writeString(output, this.lowerBoundClause);
113          Text.writeString(output, this.upperBoundClause);
114        }
115    
116        public String getLowerClause() {
117          return lowerBoundClause;
118        }
119    
120        public String getUpperClause() {
121          return upperBoundClause;
122        }
123      }
124    
125      /**
126       * @return the DBSplitter implementation to use to divide the table/query into InputSplits.
127       */
128      protected DBSplitter getSplitter(int sqlDataType) {
129        switch (sqlDataType) {
130        case Types.NUMERIC:
131        case Types.DECIMAL:
132          return new BigDecimalSplitter();
133    
134        case Types.BIT:
135        case Types.BOOLEAN:
136          return new BooleanSplitter();
137    
138        case Types.INTEGER:
139        case Types.TINYINT:
140        case Types.SMALLINT:
141        case Types.BIGINT:
142          return new IntegerSplitter();
143    
144        case Types.REAL:
145        case Types.FLOAT:
146        case Types.DOUBLE:
147          return new FloatSplitter();
148    
149        case Types.CHAR:
150        case Types.VARCHAR:
151        case Types.LONGVARCHAR:
152          return new TextSplitter();
153    
154        case Types.DATE:
155        case Types.TIME:
156        case Types.TIMESTAMP:
157          return new DateSplitter();
158    
159        default:
160          // TODO: Support BINARY, VARBINARY, LONGVARBINARY, DISTINCT, CLOB, BLOB, ARRAY
161          // STRUCT, REF, DATALINK, and JAVA_OBJECT.
162          return null;
163        }
164      }
165    
166      /** {@inheritDoc} */
167      public List<InputSplit> getSplits(JobContext job) throws IOException {
168    
169        int targetNumTasks = job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1);
170        if (1 == targetNumTasks) {
171          // There's no need to run a bounding vals query; just return a split
172          // that separates nothing. This can be considerably more optimal for a
173          // large table with no index.
174          List<InputSplit> singletonSplit = new ArrayList<InputSplit>();
175          singletonSplit.add(new DataDrivenDBInputSplit("1=1", "1=1"));
176          return singletonSplit;
177        }
178    
179        ResultSet results = null;
180        Statement statement = null;
181        Connection connection = getConnection();
182        try {
183          statement = connection.createStatement();
184    
185          results = statement.executeQuery(getBoundingValsQuery());
186          results.next();
187    
188          // Based on the type of the results, use a different mechanism
189          // for interpolating split points (i.e., numeric splits, text splits,
190          // dates, etc.)
191          int sqlDataType = results.getMetaData().getColumnType(1);
192          DBSplitter splitter = getSplitter(sqlDataType);
193          if (null == splitter) {
194            throw new IOException("Unknown SQL data type: " + sqlDataType);
195          }
196    
197          return splitter.split(job.getConfiguration(), results, getDBConf().getInputOrderBy());
198        } catch (SQLException e) {
199          throw new IOException(e.getMessage());
200        } finally {
201          // More-or-less ignore SQL exceptions here, but log in case we need it.
202          try {
203            if (null != results) {
204              results.close();
205            }
206          } catch (SQLException se) {
207            LOG.debug("SQLException closing resultset: " + se.toString());
208          }
209    
210          try {
211            if (null != statement) {
212              statement.close();
213            }
214          } catch (SQLException se) {
215            LOG.debug("SQLException closing statement: " + se.toString());
216          }
217    
218          try {
219            connection.commit();
220            closeConnection();
221          } catch (SQLException se) {
222            LOG.debug("SQLException committing split transaction: " + se.toString());
223          }
224        }
225      }
226    
227      /**
228       * @return a query which returns the minimum and maximum values for
229       * the order-by column.
230       *
231       * The min value should be in the first column, and the
232       * max value should be in the second column of the results.
233       */
234      protected String getBoundingValsQuery() {
235        // If the user has provided a query, use that instead.
236        String userQuery = getDBConf().getInputBoundingQuery();
237        if (null != userQuery) {
238          return userQuery;
239        }
240    
241        // Auto-generate one based on the table name we've been provided with.
242        StringBuilder query = new StringBuilder();
243    
244        String splitCol = getDBConf().getInputOrderBy();
245        query.append("SELECT MIN(").append(splitCol).append("), ");
246        query.append("MAX(").append(splitCol).append(") FROM ");
247        query.append(getDBConf().getInputTableName());
248        String conditions = getDBConf().getInputConditions();
249        if (null != conditions) {
250          query.append(" WHERE ( " + conditions + " )");
251        }
252    
253        return query.toString();
254      }
255    
256      /** Set the user-defined bounding query to use with a user-defined query.
257          This *must* include the substring "$CONDITIONS"
258          (DataDrivenDBInputFormat.SUBSTITUTE_TOKEN) inside the WHERE clause,
259          so that DataDrivenDBInputFormat knows where to insert split clauses.
260          e.g., "SELECT foo FROM mytable WHERE $CONDITIONS"
261          This will be expanded to something like:
262            SELECT foo FROM mytable WHERE (id &gt; 100) AND (id &lt; 250)
263          inside each split.
264        */
265      public static void setBoundingQuery(Configuration conf, String query) {
266        if (null != query) {
267          // If the user's settng a query, warn if they don't allow conditions.
268          if (query.indexOf(SUBSTITUTE_TOKEN) == -1) {
269            LOG.warn("Could not find " + SUBSTITUTE_TOKEN + " token in query: " + query
270                + "; splits may not partition data.");
271          }
272        }
273    
274        conf.set(DBConfiguration.INPUT_BOUNDING_QUERY, query);
275      }
276    
277      protected RecordReader<LongWritable, T> createDBRecordReader(DBInputSplit split,
278          Configuration conf) throws IOException {
279    
280        DBConfiguration dbConf = getDBConf();
281        @SuppressWarnings("unchecked")
282        Class<T> inputClass = (Class<T>) (dbConf.getInputClass());
283        String dbProductName = getDBProductName();
284    
285        LOG.debug("Creating db record reader for db product: " + dbProductName);
286    
287        try {
288          // use database product name to determine appropriate record reader.
289          if (dbProductName.startsWith("MYSQL")) {
290            // use MySQL-specific db reader.
291            return new MySQLDataDrivenDBRecordReader<T>(split, inputClass,
292                conf, getConnection(), dbConf, dbConf.getInputConditions(),
293                dbConf.getInputFieldNames(), dbConf.getInputTableName());
294          } else {
295            // Generic reader.
296            return new DataDrivenDBRecordReader<T>(split, inputClass,
297                conf, getConnection(), dbConf, dbConf.getInputConditions(),
298                dbConf.getInputFieldNames(), dbConf.getInputTableName(),
299                dbProductName);
300          }
301        } catch (SQLException ex) {
302          throw new IOException(ex.getMessage());
303        }
304      }
305    
306      // Configuration methods override superclass to ensure that the proper
307      // DataDrivenDBInputFormat gets used.
308    
309      /** Note that the "orderBy" column is called the "splitBy" in this version.
310        * We reuse the same field, but it's not strictly ordering it -- just partitioning
311        * the results.
312        */
313      public static void setInput(Job job, 
314          Class<? extends DBWritable> inputClass,
315          String tableName,String conditions, 
316          String splitBy, String... fieldNames) {
317        DBInputFormat.setInput(job, inputClass, tableName, conditions, splitBy, fieldNames);
318        job.setInputFormatClass(DataDrivenDBInputFormat.class);
319      }
320    
321      /** setInput() takes a custom query and a separate "bounding query" to use
322          instead of the custom "count query" used by DBInputFormat.
323        */
324      public static void setInput(Job job,
325          Class<? extends DBWritable> inputClass,
326          String inputQuery, String inputBoundingQuery) {
327        DBInputFormat.setInput(job, inputClass, inputQuery, "");
328        job.getConfiguration().set(DBConfiguration.INPUT_BOUNDING_QUERY, inputBoundingQuery);
329        job.setInputFormatClass(DataDrivenDBInputFormat.class);
330      }
331    }