001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapreduce.lib.db; 020 021 import java.io.DataInput; 022 import java.io.DataOutput; 023 import java.io.IOException; 024 import java.sql.Connection; 025 import java.sql.DatabaseMetaData; 026 import java.sql.PreparedStatement; 027 import java.sql.ResultSet; 028 import java.sql.SQLException; 029 import java.sql.Statement; 030 import java.sql.Types; 031 import java.util.ArrayList; 032 import java.util.List; 033 034 import org.apache.commons.logging.Log; 035 import org.apache.commons.logging.LogFactory; 036 037 import org.apache.hadoop.io.LongWritable; 038 import org.apache.hadoop.io.Text; 039 import org.apache.hadoop.io.Writable; 040 import org.apache.hadoop.mapreduce.InputFormat; 041 import org.apache.hadoop.mapreduce.InputSplit; 042 import org.apache.hadoop.mapreduce.Job; 043 import org.apache.hadoop.mapreduce.JobContext; 044 import org.apache.hadoop.mapreduce.MRJobConfig; 045 import org.apache.hadoop.mapreduce.RecordReader; 046 import org.apache.hadoop.mapreduce.TaskAttemptContext; 047 import org.apache.hadoop.util.ReflectionUtils; 048 import org.apache.hadoop.classification.InterfaceAudience; 049 import org.apache.hadoop.classification.InterfaceStability; 050 import org.apache.hadoop.conf.Configurable; 051 import org.apache.hadoop.conf.Configuration; 052 053 /** 054 * A InputFormat that reads input data from an SQL table. 055 * Operates like DBInputFormat, but instead of using LIMIT and OFFSET to demarcate 056 * splits, it tries to generate WHERE clauses which separate the data into roughly 057 * equivalent shards. 058 */ 059 @InterfaceAudience.Public 060 @InterfaceStability.Evolving 061 public class DataDrivenDBInputFormat<T extends DBWritable> 062 extends DBInputFormat<T> implements Configurable { 063 064 private static final Log LOG = LogFactory.getLog(DataDrivenDBInputFormat.class); 065 066 /** If users are providing their own query, the following string is expected to 067 appear in the WHERE clause, which will be substituted with a pair of conditions 068 on the input to allow input splits to parallelise the import. */ 069 public static final String SUBSTITUTE_TOKEN = "$CONDITIONS"; 070 071 /** 072 * A InputSplit that spans a set of rows 073 */ 074 @InterfaceStability.Evolving 075 public static class DataDrivenDBInputSplit extends DBInputFormat.DBInputSplit { 076 077 private String lowerBoundClause; 078 private String upperBoundClause; 079 080 /** 081 * Default Constructor 082 */ 083 public DataDrivenDBInputSplit() { 084 } 085 086 /** 087 * Convenience Constructor 088 * @param lower the string to be put in the WHERE clause to guard on the 'lower' end 089 * @param upper the string to be put in the WHERE clause to guard on the 'upper' end 090 */ 091 public DataDrivenDBInputSplit(final String lower, final String upper) { 092 this.lowerBoundClause = lower; 093 this.upperBoundClause = upper; 094 } 095 096 097 /** 098 * @return The total row count in this split 099 */ 100 public long getLength() throws IOException { 101 return 0; // unfortunately, we don't know this. 102 } 103 104 /** {@inheritDoc} */ 105 public void readFields(DataInput input) throws IOException { 106 this.lowerBoundClause = Text.readString(input); 107 this.upperBoundClause = Text.readString(input); 108 } 109 110 /** {@inheritDoc} */ 111 public void write(DataOutput output) throws IOException { 112 Text.writeString(output, this.lowerBoundClause); 113 Text.writeString(output, this.upperBoundClause); 114 } 115 116 public String getLowerClause() { 117 return lowerBoundClause; 118 } 119 120 public String getUpperClause() { 121 return upperBoundClause; 122 } 123 } 124 125 /** 126 * @return the DBSplitter implementation to use to divide the table/query into InputSplits. 127 */ 128 protected DBSplitter getSplitter(int sqlDataType) { 129 switch (sqlDataType) { 130 case Types.NUMERIC: 131 case Types.DECIMAL: 132 return new BigDecimalSplitter(); 133 134 case Types.BIT: 135 case Types.BOOLEAN: 136 return new BooleanSplitter(); 137 138 case Types.INTEGER: 139 case Types.TINYINT: 140 case Types.SMALLINT: 141 case Types.BIGINT: 142 return new IntegerSplitter(); 143 144 case Types.REAL: 145 case Types.FLOAT: 146 case Types.DOUBLE: 147 return new FloatSplitter(); 148 149 case Types.CHAR: 150 case Types.VARCHAR: 151 case Types.LONGVARCHAR: 152 return new TextSplitter(); 153 154 case Types.DATE: 155 case Types.TIME: 156 case Types.TIMESTAMP: 157 return new DateSplitter(); 158 159 default: 160 // TODO: Support BINARY, VARBINARY, LONGVARBINARY, DISTINCT, CLOB, BLOB, ARRAY 161 // STRUCT, REF, DATALINK, and JAVA_OBJECT. 162 return null; 163 } 164 } 165 166 /** {@inheritDoc} */ 167 public List<InputSplit> getSplits(JobContext job) throws IOException { 168 169 int targetNumTasks = job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1); 170 if (1 == targetNumTasks) { 171 // There's no need to run a bounding vals query; just return a split 172 // that separates nothing. This can be considerably more optimal for a 173 // large table with no index. 174 List<InputSplit> singletonSplit = new ArrayList<InputSplit>(); 175 singletonSplit.add(new DataDrivenDBInputSplit("1=1", "1=1")); 176 return singletonSplit; 177 } 178 179 ResultSet results = null; 180 Statement statement = null; 181 Connection connection = getConnection(); 182 try { 183 statement = connection.createStatement(); 184 185 results = statement.executeQuery(getBoundingValsQuery()); 186 results.next(); 187 188 // Based on the type of the results, use a different mechanism 189 // for interpolating split points (i.e., numeric splits, text splits, 190 // dates, etc.) 191 int sqlDataType = results.getMetaData().getColumnType(1); 192 DBSplitter splitter = getSplitter(sqlDataType); 193 if (null == splitter) { 194 throw new IOException("Unknown SQL data type: " + sqlDataType); 195 } 196 197 return splitter.split(job.getConfiguration(), results, getDBConf().getInputOrderBy()); 198 } catch (SQLException e) { 199 throw new IOException(e.getMessage()); 200 } finally { 201 // More-or-less ignore SQL exceptions here, but log in case we need it. 202 try { 203 if (null != results) { 204 results.close(); 205 } 206 } catch (SQLException se) { 207 LOG.debug("SQLException closing resultset: " + se.toString()); 208 } 209 210 try { 211 if (null != statement) { 212 statement.close(); 213 } 214 } catch (SQLException se) { 215 LOG.debug("SQLException closing statement: " + se.toString()); 216 } 217 218 try { 219 connection.commit(); 220 closeConnection(); 221 } catch (SQLException se) { 222 LOG.debug("SQLException committing split transaction: " + se.toString()); 223 } 224 } 225 } 226 227 /** 228 * @return a query which returns the minimum and maximum values for 229 * the order-by column. 230 * 231 * The min value should be in the first column, and the 232 * max value should be in the second column of the results. 233 */ 234 protected String getBoundingValsQuery() { 235 // If the user has provided a query, use that instead. 236 String userQuery = getDBConf().getInputBoundingQuery(); 237 if (null != userQuery) { 238 return userQuery; 239 } 240 241 // Auto-generate one based on the table name we've been provided with. 242 StringBuilder query = new StringBuilder(); 243 244 String splitCol = getDBConf().getInputOrderBy(); 245 query.append("SELECT MIN(").append(splitCol).append("), "); 246 query.append("MAX(").append(splitCol).append(") FROM "); 247 query.append(getDBConf().getInputTableName()); 248 String conditions = getDBConf().getInputConditions(); 249 if (null != conditions) { 250 query.append(" WHERE ( " + conditions + " )"); 251 } 252 253 return query.toString(); 254 } 255 256 /** Set the user-defined bounding query to use with a user-defined query. 257 This *must* include the substring "$CONDITIONS" 258 (DataDrivenDBInputFormat.SUBSTITUTE_TOKEN) inside the WHERE clause, 259 so that DataDrivenDBInputFormat knows where to insert split clauses. 260 e.g., "SELECT foo FROM mytable WHERE $CONDITIONS" 261 This will be expanded to something like: 262 SELECT foo FROM mytable WHERE (id > 100) AND (id < 250) 263 inside each split. 264 */ 265 public static void setBoundingQuery(Configuration conf, String query) { 266 if (null != query) { 267 // If the user's settng a query, warn if they don't allow conditions. 268 if (query.indexOf(SUBSTITUTE_TOKEN) == -1) { 269 LOG.warn("Could not find " + SUBSTITUTE_TOKEN + " token in query: " + query 270 + "; splits may not partition data."); 271 } 272 } 273 274 conf.set(DBConfiguration.INPUT_BOUNDING_QUERY, query); 275 } 276 277 protected RecordReader<LongWritable, T> createDBRecordReader(DBInputSplit split, 278 Configuration conf) throws IOException { 279 280 DBConfiguration dbConf = getDBConf(); 281 @SuppressWarnings("unchecked") 282 Class<T> inputClass = (Class<T>) (dbConf.getInputClass()); 283 String dbProductName = getDBProductName(); 284 285 LOG.debug("Creating db record reader for db product: " + dbProductName); 286 287 try { 288 // use database product name to determine appropriate record reader. 289 if (dbProductName.startsWith("MYSQL")) { 290 // use MySQL-specific db reader. 291 return new MySQLDataDrivenDBRecordReader<T>(split, inputClass, 292 conf, getConnection(), dbConf, dbConf.getInputConditions(), 293 dbConf.getInputFieldNames(), dbConf.getInputTableName()); 294 } else { 295 // Generic reader. 296 return new DataDrivenDBRecordReader<T>(split, inputClass, 297 conf, getConnection(), dbConf, dbConf.getInputConditions(), 298 dbConf.getInputFieldNames(), dbConf.getInputTableName(), 299 dbProductName); 300 } 301 } catch (SQLException ex) { 302 throw new IOException(ex.getMessage()); 303 } 304 } 305 306 // Configuration methods override superclass to ensure that the proper 307 // DataDrivenDBInputFormat gets used. 308 309 /** Note that the "orderBy" column is called the "splitBy" in this version. 310 * We reuse the same field, but it's not strictly ordering it -- just partitioning 311 * the results. 312 */ 313 public static void setInput(Job job, 314 Class<? extends DBWritable> inputClass, 315 String tableName,String conditions, 316 String splitBy, String... fieldNames) { 317 DBInputFormat.setInput(job, inputClass, tableName, conditions, splitBy, fieldNames); 318 job.setInputFormatClass(DataDrivenDBInputFormat.class); 319 } 320 321 /** setInput() takes a custom query and a separate "bounding query" to use 322 instead of the custom "count query" used by DBInputFormat. 323 */ 324 public static void setInput(Job job, 325 Class<? extends DBWritable> inputClass, 326 String inputQuery, String inputBoundingQuery) { 327 DBInputFormat.setInput(job, inputClass, inputQuery, ""); 328 job.getConfiguration().set(DBConfiguration.INPUT_BOUNDING_QUERY, inputBoundingQuery); 329 job.setInputFormatClass(DataDrivenDBInputFormat.class); 330 } 331 }