001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapreduce.lib.db;
020
021 import java.io.DataInput;
022 import java.io.DataOutput;
023 import java.io.IOException;
024 import java.sql.Connection;
025 import java.sql.DatabaseMetaData;
026 import java.sql.PreparedStatement;
027 import java.sql.ResultSet;
028 import java.sql.SQLException;
029 import java.sql.Statement;
030 import java.util.ArrayList;
031 import java.util.List;
032
033 import org.apache.commons.logging.Log;
034 import org.apache.commons.logging.LogFactory;
035 import org.apache.hadoop.io.LongWritable;
036 import org.apache.hadoop.io.Writable;
037 import org.apache.hadoop.mapreduce.InputFormat;
038 import org.apache.hadoop.mapreduce.InputSplit;
039 import org.apache.hadoop.mapreduce.Job;
040 import org.apache.hadoop.mapreduce.JobContext;
041 import org.apache.hadoop.mapreduce.MRJobConfig;
042 import org.apache.hadoop.mapreduce.RecordReader;
043 import org.apache.hadoop.mapreduce.TaskAttemptContext;
044 import org.apache.hadoop.util.ReflectionUtils;
045 import org.apache.hadoop.classification.InterfaceAudience;
046 import org.apache.hadoop.classification.InterfaceStability;
047 import org.apache.hadoop.conf.Configurable;
048 import org.apache.hadoop.conf.Configuration;
049 /**
050 * A InputFormat that reads input data from an SQL table.
051 * <p>
052 * DBInputFormat emits LongWritables containing the record number as
053 * key and DBWritables as value.
054 *
055 * The SQL query, and input class can be using one of the two
056 * setInput methods.
057 */
058 @InterfaceAudience.Public
059 @InterfaceStability.Stable
060 public class DBInputFormat<T extends DBWritable>
061 extends InputFormat<LongWritable, T> implements Configurable {
062
063 private static final Log LOG = LogFactory.getLog(DBInputFormat.class);
064
065 private String dbProductName = "DEFAULT";
066
067 /**
068 * A Class that does nothing, implementing DBWritable
069 */
070 @InterfaceStability.Evolving
071 public static class NullDBWritable implements DBWritable, Writable {
072 @Override
073 public void readFields(DataInput in) throws IOException { }
074 @Override
075 public void readFields(ResultSet arg0) throws SQLException { }
076 @Override
077 public void write(DataOutput out) throws IOException { }
078 @Override
079 public void write(PreparedStatement arg0) throws SQLException { }
080 }
081
082 /**
083 * A InputSplit that spans a set of rows
084 */
085 @InterfaceStability.Evolving
086 public static class DBInputSplit extends InputSplit implements Writable {
087
088 private long end = 0;
089 private long start = 0;
090
091 /**
092 * Default Constructor
093 */
094 public DBInputSplit() {
095 }
096
097 /**
098 * Convenience Constructor
099 * @param start the index of the first row to select
100 * @param end the index of the last row to select
101 */
102 public DBInputSplit(long start, long end) {
103 this.start = start;
104 this.end = end;
105 }
106
107 /** {@inheritDoc} */
108 public String[] getLocations() throws IOException {
109 // TODO Add a layer to enable SQL "sharding" and support locality
110 return new String[] {};
111 }
112
113 /**
114 * @return The index of the first row to select
115 */
116 public long getStart() {
117 return start;
118 }
119
120 /**
121 * @return The index of the last row to select
122 */
123 public long getEnd() {
124 return end;
125 }
126
127 /**
128 * @return The total row count in this split
129 */
130 public long getLength() throws IOException {
131 return end - start;
132 }
133
134 /** {@inheritDoc} */
135 public void readFields(DataInput input) throws IOException {
136 start = input.readLong();
137 end = input.readLong();
138 }
139
140 /** {@inheritDoc} */
141 public void write(DataOutput output) throws IOException {
142 output.writeLong(start);
143 output.writeLong(end);
144 }
145 }
146
147 private String conditions;
148
149 private Connection connection;
150
151 private String tableName;
152
153 private String[] fieldNames;
154
155 private DBConfiguration dbConf;
156
157 /** {@inheritDoc} */
158 public void setConf(Configuration conf) {
159
160 dbConf = new DBConfiguration(conf);
161
162 try {
163 getConnection();
164
165 DatabaseMetaData dbMeta = connection.getMetaData();
166 this.dbProductName = dbMeta.getDatabaseProductName().toUpperCase();
167 }
168 catch (Exception ex) {
169 throw new RuntimeException(ex);
170 }
171
172 tableName = dbConf.getInputTableName();
173 fieldNames = dbConf.getInputFieldNames();
174 conditions = dbConf.getInputConditions();
175 }
176
177 public Configuration getConf() {
178 return dbConf.getConf();
179 }
180
181 public DBConfiguration getDBConf() {
182 return dbConf;
183 }
184
185 public Connection getConnection() {
186 try {
187 if (null == this.connection) {
188 // The connection was closed; reinstantiate it.
189 this.connection = dbConf.getConnection();
190 this.connection.setAutoCommit(false);
191 this.connection.setTransactionIsolation(
192 Connection.TRANSACTION_SERIALIZABLE);
193 }
194 } catch (Exception e) {
195 throw new RuntimeException(e);
196 }
197 return connection;
198 }
199
200 public String getDBProductName() {
201 return dbProductName;
202 }
203
204 protected RecordReader<LongWritable, T> createDBRecordReader(DBInputSplit split,
205 Configuration conf) throws IOException {
206
207 @SuppressWarnings("unchecked")
208 Class<T> inputClass = (Class<T>) (dbConf.getInputClass());
209 try {
210 // use database product name to determine appropriate record reader.
211 if (dbProductName.startsWith("ORACLE")) {
212 // use Oracle-specific db reader.
213 return new OracleDBRecordReader<T>(split, inputClass,
214 conf, getConnection(), getDBConf(), conditions, fieldNames,
215 tableName);
216 } else if (dbProductName.startsWith("MYSQL")) {
217 // use MySQL-specific db reader.
218 return new MySQLDBRecordReader<T>(split, inputClass,
219 conf, getConnection(), getDBConf(), conditions, fieldNames,
220 tableName);
221 } else {
222 // Generic reader.
223 return new DBRecordReader<T>(split, inputClass,
224 conf, getConnection(), getDBConf(), conditions, fieldNames,
225 tableName);
226 }
227 } catch (SQLException ex) {
228 throw new IOException(ex.getMessage());
229 }
230 }
231
232 /** {@inheritDoc} */
233 @SuppressWarnings("unchecked")
234 public RecordReader<LongWritable, T> createRecordReader(InputSplit split,
235 TaskAttemptContext context) throws IOException, InterruptedException {
236
237 return createDBRecordReader((DBInputSplit) split, context.getConfiguration());
238 }
239
240 /** {@inheritDoc} */
241 public List<InputSplit> getSplits(JobContext job) throws IOException {
242
243 ResultSet results = null;
244 Statement statement = null;
245 try {
246 statement = connection.createStatement();
247
248 results = statement.executeQuery(getCountQuery());
249 results.next();
250
251 long count = results.getLong(1);
252 int chunks = job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1);
253 long chunkSize = (count / chunks);
254
255 results.close();
256 statement.close();
257
258 List<InputSplit> splits = new ArrayList<InputSplit>();
259
260 // Split the rows into n-number of chunks and adjust the last chunk
261 // accordingly
262 for (int i = 0; i < chunks; i++) {
263 DBInputSplit split;
264
265 if ((i + 1) == chunks)
266 split = new DBInputSplit(i * chunkSize, count);
267 else
268 split = new DBInputSplit(i * chunkSize, (i * chunkSize)
269 + chunkSize);
270
271 splits.add(split);
272 }
273
274 connection.commit();
275 return splits;
276 } catch (SQLException e) {
277 throw new IOException("Got SQLException", e);
278 } finally {
279 try {
280 if (results != null) { results.close(); }
281 } catch (SQLException e1) {}
282 try {
283 if (statement != null) { statement.close(); }
284 } catch (SQLException e1) {}
285
286 closeConnection();
287 }
288 }
289
290 /** Returns the query for getting the total number of rows,
291 * subclasses can override this for custom behaviour.*/
292 protected String getCountQuery() {
293
294 if(dbConf.getInputCountQuery() != null) {
295 return dbConf.getInputCountQuery();
296 }
297
298 StringBuilder query = new StringBuilder();
299 query.append("SELECT COUNT(*) FROM " + tableName);
300
301 if (conditions != null && conditions.length() > 0)
302 query.append(" WHERE " + conditions);
303 return query.toString();
304 }
305
306 /**
307 * Initializes the map-part of the job with the appropriate input settings.
308 *
309 * @param job The map-reduce job
310 * @param inputClass the class object implementing DBWritable, which is the
311 * Java object holding tuple fields.
312 * @param tableName The table to read data from
313 * @param conditions The condition which to select data with,
314 * eg. '(updated > 20070101 AND length > 0)'
315 * @param orderBy the fieldNames in the orderBy clause.
316 * @param fieldNames The field names in the table
317 * @see #setInput(Job, Class, String, String)
318 */
319 public static void setInput(Job job,
320 Class<? extends DBWritable> inputClass,
321 String tableName,String conditions,
322 String orderBy, String... fieldNames) {
323 job.setInputFormatClass(DBInputFormat.class);
324 DBConfiguration dbConf = new DBConfiguration(job.getConfiguration());
325 dbConf.setInputClass(inputClass);
326 dbConf.setInputTableName(tableName);
327 dbConf.setInputFieldNames(fieldNames);
328 dbConf.setInputConditions(conditions);
329 dbConf.setInputOrderBy(orderBy);
330 }
331
332 /**
333 * Initializes the map-part of the job with the appropriate input settings.
334 *
335 * @param job The map-reduce job
336 * @param inputClass the class object implementing DBWritable, which is the
337 * Java object holding tuple fields.
338 * @param inputQuery the input query to select fields. Example :
339 * "SELECT f1, f2, f3 FROM Mytable ORDER BY f1"
340 * @param inputCountQuery the input query that returns
341 * the number of records in the table.
342 * Example : "SELECT COUNT(f1) FROM Mytable"
343 * @see #setInput(Job, Class, String, String, String, String...)
344 */
345 public static void setInput(Job job,
346 Class<? extends DBWritable> inputClass,
347 String inputQuery, String inputCountQuery) {
348 job.setInputFormatClass(DBInputFormat.class);
349 DBConfiguration dbConf = new DBConfiguration(job.getConfiguration());
350 dbConf.setInputClass(inputClass);
351 dbConf.setInputQuery(inputQuery);
352 dbConf.setInputCountQuery(inputCountQuery);
353 }
354
355 protected void closeConnection() {
356 try {
357 if (null != this.connection) {
358 this.connection.close();
359 this.connection = null;
360 }
361 } catch (SQLException sqlE) {
362 LOG.debug("Exception on close", sqlE);
363 }
364 }
365 }