001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.lib.db;
020
021import java.io.DataInput;
022import java.io.DataOutput;
023import java.io.IOException;
024import java.sql.Connection;
025import java.sql.DatabaseMetaData;
026import java.sql.PreparedStatement;
027import java.sql.ResultSet;
028import java.sql.SQLException;
029import java.sql.Statement;
030import java.util.ArrayList;
031import java.util.List;
032
033import org.apache.commons.logging.Log;
034import org.apache.commons.logging.LogFactory;
035import org.apache.hadoop.io.LongWritable;
036import org.apache.hadoop.io.Writable;
037import org.apache.hadoop.mapreduce.InputFormat;
038import org.apache.hadoop.mapreduce.InputSplit;
039import org.apache.hadoop.mapreduce.Job;
040import org.apache.hadoop.mapreduce.JobContext;
041import org.apache.hadoop.mapreduce.RecordReader;
042import org.apache.hadoop.mapreduce.TaskAttemptContext;
043import org.apache.hadoop.util.ReflectionUtils;
044import org.apache.hadoop.classification.InterfaceAudience;
045import org.apache.hadoop.classification.InterfaceStability;
046import org.apache.hadoop.conf.Configurable;
047import org.apache.hadoop.conf.Configuration;
048
049/**
050 * A RecordReader that reads records from a SQL table.
051 * Emits LongWritables containing the record number as 
052 * key and DBWritables as value.  
053 */
054@InterfaceAudience.Public
055@InterfaceStability.Evolving
056public class DBRecordReader<T extends DBWritable> extends
057    RecordReader<LongWritable, T> {
058
059  private static final Log LOG = LogFactory.getLog(DBRecordReader.class);
060
061  private ResultSet results = null;
062
063  private Class<T> inputClass;
064
065  private Configuration conf;
066
067  private DBInputFormat.DBInputSplit split;
068
069  private long pos = 0;
070  
071  private LongWritable key = null;
072  
073  private T value = null;
074
075  private Connection connection;
076
077  protected PreparedStatement statement;
078
079  private DBConfiguration dbConf;
080
081  private String conditions;
082
083  private String [] fieldNames;
084
085  private String tableName;
086
087  /**
088   * @param split The InputSplit to read data for
089   * @throws SQLException 
090   */
091  public DBRecordReader(DBInputFormat.DBInputSplit split, 
092      Class<T> inputClass, Configuration conf, Connection conn, DBConfiguration dbConfig,
093      String cond, String [] fields, String table)
094      throws SQLException {
095    this.inputClass = inputClass;
096    this.split = split;
097    this.conf = conf;
098    this.connection = conn;
099    this.dbConf = dbConfig;
100    this.conditions = cond;
101    this.fieldNames = fields;
102    this.tableName = table;
103  }
104
105  protected ResultSet executeQuery(String query) throws SQLException {
106    this.statement = connection.prepareStatement(query,
107        ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
108    return statement.executeQuery();
109  }
110
111  /** Returns the query for selecting the records, 
112   * subclasses can override this for custom behaviour.*/
113  protected String getSelectQuery() {
114    StringBuilder query = new StringBuilder();
115
116    // Default codepath for MySQL, HSQLDB, etc. Relies on LIMIT/OFFSET for splits.
117    if(dbConf.getInputQuery() == null) {
118      query.append("SELECT ");
119  
120      for (int i = 0; i < fieldNames.length; i++) {
121        query.append(fieldNames[i]);
122        if (i != fieldNames.length -1) {
123          query.append(", ");
124        }
125      }
126
127      query.append(" FROM ").append(tableName);
128      query.append(" AS ").append(tableName); //in hsqldb this is necessary
129      if (conditions != null && conditions.length() > 0) {
130        query.append(" WHERE (").append(conditions).append(")");
131      }
132
133      String orderBy = dbConf.getInputOrderBy();
134      if (orderBy != null && orderBy.length() > 0) {
135        query.append(" ORDER BY ").append(orderBy);
136      }
137    } else {
138      //PREBUILT QUERY
139      query.append(dbConf.getInputQuery());
140    }
141        
142    try {
143      query.append(" LIMIT ").append(split.getLength());
144      query.append(" OFFSET ").append(split.getStart());
145    } catch (IOException ex) {
146      // Ignore, will not throw.
147    }           
148
149    return query.toString();
150  }
151
152  /** {@inheritDoc} */
153  public void close() throws IOException {
154    try {
155      if (null != results) {
156        results.close();
157      }
158      if (null != statement) {
159        statement.close();
160      }
161      if (null != connection) {
162        connection.commit();
163        connection.close();
164      }
165    } catch (SQLException e) {
166      throw new IOException(e.getMessage());
167    }
168  }
169
170  public void initialize(InputSplit split, TaskAttemptContext context) 
171      throws IOException, InterruptedException {
172    //do nothing
173  }
174
175  /** {@inheritDoc} */
176  public LongWritable getCurrentKey() {
177    return key;  
178  }
179
180  /** {@inheritDoc} */
181  public T getCurrentValue() {
182    return value;
183  }
184
185  /**
186   * @deprecated 
187   */
188  @Deprecated
189  public T createValue() {
190    return ReflectionUtils.newInstance(inputClass, conf);
191  }
192
193  /**
194   * @deprecated 
195   */
196  @Deprecated
197  public long getPos() throws IOException {
198    return pos;
199  }
200
201  /**
202   * @deprecated Use {@link #nextKeyValue()}
203   */
204  @Deprecated
205  public boolean next(LongWritable key, T value) throws IOException {
206    this.key = key;
207    this.value = value;
208    return nextKeyValue();
209  }
210
211  /** {@inheritDoc} */
212  public float getProgress() throws IOException {
213    return pos / (float)split.getLength();
214  }
215
216  /** {@inheritDoc} */
217  public boolean nextKeyValue() throws IOException {
218    try {
219      if (key == null) {
220        key = new LongWritable();
221      }
222      if (value == null) {
223        value = createValue();
224      }
225      if (null == this.results) {
226        // First time into this method, run the query.
227        this.results = executeQuery(getSelectQuery());
228      }
229      if (!results.next())
230        return false;
231
232      // Set the key field value as the output key value
233      key.set(pos + split.getStart());
234
235      value.readFields(results);
236
237      pos ++;
238    } catch (SQLException e) {
239      throw new IOException("SQLException in nextKeyValue", e);
240    }
241    return true;
242  }
243
244  protected DBInputFormat.DBInputSplit getSplit() {
245    return split;
246  }
247
248  protected String [] getFieldNames() {
249    return fieldNames;
250  }
251
252  protected String getTableName() {
253    return tableName;
254  }
255
256  protected String getConditions() {
257    return conditions;
258  }
259
260  protected DBConfiguration getDBConf() {
261    return dbConf;
262  }
263
264  protected Connection getConnection() {
265    return connection;
266  }
267
268  protected PreparedStatement getStatement() {
269    return statement;
270  }
271
272  protected void setStatement(PreparedStatement stmt) {
273    this.statement = stmt;
274  }
275}