001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce.lib.db; 020 021import java.io.DataInput; 022import java.io.DataOutput; 023import java.io.IOException; 024import java.sql.Connection; 025import java.sql.DatabaseMetaData; 026import java.sql.PreparedStatement; 027import java.sql.ResultSet; 028import java.sql.SQLException; 029import java.sql.Statement; 030import java.sql.Types; 031import java.util.ArrayList; 032import java.util.List; 033 034import org.apache.commons.logging.Log; 035import org.apache.commons.logging.LogFactory; 036 037import org.apache.hadoop.io.LongWritable; 038import org.apache.hadoop.io.Text; 039import org.apache.hadoop.io.Writable; 040import org.apache.hadoop.mapreduce.InputFormat; 041import org.apache.hadoop.mapreduce.InputSplit; 042import org.apache.hadoop.mapreduce.Job; 043import org.apache.hadoop.mapreduce.JobContext; 044import org.apache.hadoop.mapreduce.MRJobConfig; 045import org.apache.hadoop.mapreduce.RecordReader; 046import org.apache.hadoop.mapreduce.TaskAttemptContext; 047import org.apache.hadoop.util.ReflectionUtils; 048import org.apache.hadoop.classification.InterfaceAudience; 049import org.apache.hadoop.classification.InterfaceStability; 050import org.apache.hadoop.conf.Configurable; 051import org.apache.hadoop.conf.Configuration; 052 053/** 054 * A InputFormat that reads input data from an SQL table. 055 * Operates like DBInputFormat, but instead of using LIMIT and OFFSET to demarcate 056 * splits, it tries to generate WHERE clauses which separate the data into roughly 057 * equivalent shards. 058 */ 059@InterfaceAudience.Public 060@InterfaceStability.Evolving 061public class DataDrivenDBInputFormat<T extends DBWritable> 062 extends DBInputFormat<T> implements Configurable { 063 064 private static final Log LOG = LogFactory.getLog(DataDrivenDBInputFormat.class); 065 066 /** If users are providing their own query, the following string is expected to 067 appear in the WHERE clause, which will be substituted with a pair of conditions 068 on the input to allow input splits to parallelise the import. */ 069 public static final String SUBSTITUTE_TOKEN = "$CONDITIONS"; 070 071 /** 072 * A InputSplit that spans a set of rows 073 */ 074 @InterfaceStability.Evolving 075 public static class DataDrivenDBInputSplit extends DBInputFormat.DBInputSplit { 076 077 private String lowerBoundClause; 078 private String upperBoundClause; 079 080 /** 081 * Default Constructor 082 */ 083 public DataDrivenDBInputSplit() { 084 } 085 086 /** 087 * Convenience Constructor 088 * @param lower the string to be put in the WHERE clause to guard on the 'lower' end 089 * @param upper the string to be put in the WHERE clause to guard on the 'upper' end 090 */ 091 public DataDrivenDBInputSplit(final String lower, final String upper) { 092 this.lowerBoundClause = lower; 093 this.upperBoundClause = upper; 094 } 095 096 097 /** 098 * @return The total row count in this split 099 */ 100 public long getLength() throws IOException { 101 return 0; // unfortunately, we don't know this. 102 } 103 104 /** {@inheritDoc} */ 105 public void readFields(DataInput input) throws IOException { 106 this.lowerBoundClause = Text.readString(input); 107 this.upperBoundClause = Text.readString(input); 108 } 109 110 /** {@inheritDoc} */ 111 public void write(DataOutput output) throws IOException { 112 Text.writeString(output, this.lowerBoundClause); 113 Text.writeString(output, this.upperBoundClause); 114 } 115 116 public String getLowerClause() { 117 return lowerBoundClause; 118 } 119 120 public String getUpperClause() { 121 return upperBoundClause; 122 } 123 } 124 125 /** 126 * @return the DBSplitter implementation to use to divide the table/query into InputSplits. 127 */ 128 protected DBSplitter getSplitter(int sqlDataType) { 129 switch (sqlDataType) { 130 case Types.NUMERIC: 131 case Types.DECIMAL: 132 return new BigDecimalSplitter(); 133 134 case Types.BIT: 135 case Types.BOOLEAN: 136 return new BooleanSplitter(); 137 138 case Types.INTEGER: 139 case Types.TINYINT: 140 case Types.SMALLINT: 141 case Types.BIGINT: 142 return new IntegerSplitter(); 143 144 case Types.REAL: 145 case Types.FLOAT: 146 case Types.DOUBLE: 147 return new FloatSplitter(); 148 149 case Types.CHAR: 150 case Types.VARCHAR: 151 case Types.LONGVARCHAR: 152 return new TextSplitter(); 153 154 case Types.DATE: 155 case Types.TIME: 156 case Types.TIMESTAMP: 157 return new DateSplitter(); 158 159 default: 160 // TODO: Support BINARY, VARBINARY, LONGVARBINARY, DISTINCT, CLOB, BLOB, ARRAY 161 // STRUCT, REF, DATALINK, and JAVA_OBJECT. 162 return null; 163 } 164 } 165 166 /** {@inheritDoc} */ 167 public List<InputSplit> getSplits(JobContext job) throws IOException { 168 169 int targetNumTasks = job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1); 170 if (1 == targetNumTasks) { 171 // There's no need to run a bounding vals query; just return a split 172 // that separates nothing. This can be considerably more optimal for a 173 // large table with no index. 174 List<InputSplit> singletonSplit = new ArrayList<InputSplit>(); 175 singletonSplit.add(new DataDrivenDBInputSplit("1=1", "1=1")); 176 return singletonSplit; 177 } 178 179 ResultSet results = null; 180 Statement statement = null; 181 try { 182 statement = connection.createStatement(); 183 184 results = statement.executeQuery(getBoundingValsQuery()); 185 results.next(); 186 187 // Based on the type of the results, use a different mechanism 188 // for interpolating split points (i.e., numeric splits, text splits, 189 // dates, etc.) 190 int sqlDataType = results.getMetaData().getColumnType(1); 191 DBSplitter splitter = getSplitter(sqlDataType); 192 if (null == splitter) { 193 throw new IOException("Unknown SQL data type: " + sqlDataType); 194 } 195 196 return splitter.split(job.getConfiguration(), results, getDBConf().getInputOrderBy()); 197 } catch (SQLException e) { 198 throw new IOException(e.getMessage()); 199 } finally { 200 // More-or-less ignore SQL exceptions here, but log in case we need it. 201 try { 202 if (null != results) { 203 results.close(); 204 } 205 } catch (SQLException se) { 206 LOG.debug("SQLException closing resultset: " + se.toString()); 207 } 208 209 try { 210 if (null != statement) { 211 statement.close(); 212 } 213 } catch (SQLException se) { 214 LOG.debug("SQLException closing statement: " + se.toString()); 215 } 216 217 try { 218 connection.commit(); 219 closeConnection(); 220 } catch (SQLException se) { 221 LOG.debug("SQLException committing split transaction: " + se.toString()); 222 } 223 } 224 } 225 226 /** 227 * @return a query which returns the minimum and maximum values for 228 * the order-by column. 229 * 230 * The min value should be in the first column, and the 231 * max value should be in the second column of the results. 232 */ 233 protected String getBoundingValsQuery() { 234 // If the user has provided a query, use that instead. 235 String userQuery = getDBConf().getInputBoundingQuery(); 236 if (null != userQuery) { 237 return userQuery; 238 } 239 240 // Auto-generate one based on the table name we've been provided with. 241 StringBuilder query = new StringBuilder(); 242 243 String splitCol = getDBConf().getInputOrderBy(); 244 query.append("SELECT MIN(").append(splitCol).append("), "); 245 query.append("MAX(").append(splitCol).append(") FROM "); 246 query.append(getDBConf().getInputTableName()); 247 String conditions = getDBConf().getInputConditions(); 248 if (null != conditions) { 249 query.append(" WHERE ( " + conditions + " )"); 250 } 251 252 return query.toString(); 253 } 254 255 /** Set the user-defined bounding query to use with a user-defined query. 256 This *must* include the substring "$CONDITIONS" 257 (DataDrivenDBInputFormat.SUBSTITUTE_TOKEN) inside the WHERE clause, 258 so that DataDrivenDBInputFormat knows where to insert split clauses. 259 e.g., "SELECT foo FROM mytable WHERE $CONDITIONS" 260 This will be expanded to something like: 261 SELECT foo FROM mytable WHERE (id > 100) AND (id < 250) 262 inside each split. 263 */ 264 public static void setBoundingQuery(Configuration conf, String query) { 265 if (null != query) { 266 // If the user's settng a query, warn if they don't allow conditions. 267 if (query.indexOf(SUBSTITUTE_TOKEN) == -1) { 268 LOG.warn("Could not find " + SUBSTITUTE_TOKEN + " token in query: " + query 269 + "; splits may not partition data."); 270 } 271 } 272 273 conf.set(DBConfiguration.INPUT_BOUNDING_QUERY, query); 274 } 275 276 protected RecordReader<LongWritable, T> createDBRecordReader(DBInputSplit split, 277 Configuration conf) throws IOException { 278 279 DBConfiguration dbConf = getDBConf(); 280 @SuppressWarnings("unchecked") 281 Class<T> inputClass = (Class<T>) (dbConf.getInputClass()); 282 String dbProductName = getDBProductName(); 283 284 LOG.debug("Creating db record reader for db product: " + dbProductName); 285 286 try { 287 // use database product name to determine appropriate record reader. 288 if (dbProductName.startsWith("MYSQL")) { 289 // use MySQL-specific db reader. 290 return new MySQLDataDrivenDBRecordReader<T>(split, inputClass, 291 conf, createConnection(), dbConf, dbConf.getInputConditions(), 292 dbConf.getInputFieldNames(), dbConf.getInputTableName()); 293 } else { 294 // Generic reader. 295 return new DataDrivenDBRecordReader<T>(split, inputClass, 296 conf, createConnection(), dbConf, dbConf.getInputConditions(), 297 dbConf.getInputFieldNames(), dbConf.getInputTableName(), 298 dbProductName); 299 } 300 } catch (SQLException ex) { 301 throw new IOException(ex.getMessage()); 302 } 303 } 304 305 // Configuration methods override superclass to ensure that the proper 306 // DataDrivenDBInputFormat gets used. 307 308 /** Note that the "orderBy" column is called the "splitBy" in this version. 309 * We reuse the same field, but it's not strictly ordering it -- just partitioning 310 * the results. 311 */ 312 public static void setInput(Job job, 313 Class<? extends DBWritable> inputClass, 314 String tableName,String conditions, 315 String splitBy, String... fieldNames) { 316 DBInputFormat.setInput(job, inputClass, tableName, conditions, splitBy, fieldNames); 317 job.setInputFormatClass(DataDrivenDBInputFormat.class); 318 } 319 320 /** setInput() takes a custom query and a separate "bounding query" to use 321 instead of the custom "count query" used by DBInputFormat. 322 */ 323 public static void setInput(Job job, 324 Class<? extends DBWritable> inputClass, 325 String inputQuery, String inputBoundingQuery) { 326 DBInputFormat.setInput(job, inputClass, inputQuery, ""); 327 job.getConfiguration().set(DBConfiguration.INPUT_BOUNDING_QUERY, inputBoundingQuery); 328 job.setInputFormatClass(DataDrivenDBInputFormat.class); 329 } 330}