001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.lib.db;
020    
021    import java.io.DataInput;
022    import java.io.DataOutput;
023    import java.io.IOException;
024    import java.sql.Connection;
025    import java.sql.DatabaseMetaData;
026    import java.sql.PreparedStatement;
027    import java.sql.ResultSet;
028    import java.sql.SQLException;
029    import java.sql.Statement;
030    import java.util.ArrayList;
031    import java.util.List;
032    
033    import org.apache.commons.logging.Log;
034    import org.apache.commons.logging.LogFactory;
035    import org.apache.hadoop.io.LongWritable;
036    import org.apache.hadoop.io.Writable;
037    import org.apache.hadoop.mapreduce.InputFormat;
038    import org.apache.hadoop.mapreduce.InputSplit;
039    import org.apache.hadoop.mapreduce.Job;
040    import org.apache.hadoop.mapreduce.JobContext;
041    import org.apache.hadoop.mapreduce.RecordReader;
042    import org.apache.hadoop.mapreduce.TaskAttemptContext;
043    import org.apache.hadoop.util.ReflectionUtils;
044    import org.apache.hadoop.classification.InterfaceAudience;
045    import org.apache.hadoop.classification.InterfaceStability;
046    import org.apache.hadoop.conf.Configurable;
047    import org.apache.hadoop.conf.Configuration;
048    
049    /**
050     * A RecordReader that reads records from a SQL table.
051     * Emits LongWritables containing the record number as 
052     * key and DBWritables as value.  
053     */
054    @InterfaceAudience.Public
055    @InterfaceStability.Evolving
056    public class DBRecordReader<T extends DBWritable> extends
057        RecordReader<LongWritable, T> {
058    
059      private static final Log LOG = LogFactory.getLog(DBRecordReader.class);
060    
061      private ResultSet results = null;
062    
063      private Class<T> inputClass;
064    
065      private Configuration conf;
066    
067      private DBInputFormat.DBInputSplit split;
068    
069      private long pos = 0;
070      
071      private LongWritable key = null;
072      
073      private T value = null;
074    
075      private Connection connection;
076    
077      protected PreparedStatement statement;
078    
079      private DBConfiguration dbConf;
080    
081      private String conditions;
082    
083      private String [] fieldNames;
084    
085      private String tableName;
086    
087      /**
088       * @param split The InputSplit to read data for
089       * @throws SQLException 
090       */
091      public DBRecordReader(DBInputFormat.DBInputSplit split, 
092          Class<T> inputClass, Configuration conf, Connection conn, DBConfiguration dbConfig,
093          String cond, String [] fields, String table)
094          throws SQLException {
095        this.inputClass = inputClass;
096        this.split = split;
097        this.conf = conf;
098        this.connection = conn;
099        this.dbConf = dbConfig;
100        this.conditions = cond;
101        this.fieldNames = fields;
102        this.tableName = table;
103      }
104    
105      protected ResultSet executeQuery(String query) throws SQLException {
106        this.statement = connection.prepareStatement(query,
107            ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
108        return statement.executeQuery();
109      }
110    
111      /** Returns the query for selecting the records, 
112       * subclasses can override this for custom behaviour.*/
113      protected String getSelectQuery() {
114        StringBuilder query = new StringBuilder();
115    
116        // Default codepath for MySQL, HSQLDB, etc. Relies on LIMIT/OFFSET for splits.
117        if(dbConf.getInputQuery() == null) {
118          query.append("SELECT ");
119      
120          for (int i = 0; i < fieldNames.length; i++) {
121            query.append(fieldNames[i]);
122            if (i != fieldNames.length -1) {
123              query.append(", ");
124            }
125          }
126    
127          query.append(" FROM ").append(tableName);
128          query.append(" AS ").append(tableName); //in hsqldb this is necessary
129          if (conditions != null && conditions.length() > 0) {
130            query.append(" WHERE (").append(conditions).append(")");
131          }
132    
133          String orderBy = dbConf.getInputOrderBy();
134          if (orderBy != null && orderBy.length() > 0) {
135            query.append(" ORDER BY ").append(orderBy);
136          }
137        } else {
138          //PREBUILT QUERY
139          query.append(dbConf.getInputQuery());
140        }
141            
142        try {
143          query.append(" LIMIT ").append(split.getLength());
144          query.append(" OFFSET ").append(split.getStart());
145        } catch (IOException ex) {
146          // Ignore, will not throw.
147        }           
148    
149        return query.toString();
150      }
151    
152      /** {@inheritDoc} */
153      public void close() throws IOException {
154        try {
155          if (null != results) {
156            results.close();
157          }
158          if (null != statement) {
159            statement.close();
160          }
161          if (null != connection) {
162            connection.commit();
163            connection.close();
164          }
165        } catch (SQLException e) {
166          throw new IOException(e.getMessage());
167        }
168      }
169    
170      public void initialize(InputSplit split, TaskAttemptContext context) 
171          throws IOException, InterruptedException {
172        //do nothing
173      }
174    
175      /** {@inheritDoc} */
176      public LongWritable getCurrentKey() {
177        return key;  
178      }
179    
180      /** {@inheritDoc} */
181      public T getCurrentValue() {
182        return value;
183      }
184    
185      /**
186       * @deprecated 
187       */
188      @Deprecated
189      public T createValue() {
190        return ReflectionUtils.newInstance(inputClass, conf);
191      }
192    
193      /**
194       * @deprecated 
195       */
196      @Deprecated
197      public long getPos() throws IOException {
198        return pos;
199      }
200    
201      /**
202       * @deprecated Use {@link #nextKeyValue()}
203       */
204      @Deprecated
205      public boolean next(LongWritable key, T value) throws IOException {
206        this.key = key;
207        this.value = value;
208        return nextKeyValue();
209      }
210    
211      /** {@inheritDoc} */
212      public float getProgress() throws IOException {
213        return pos / (float)split.getLength();
214      }
215    
216      /** {@inheritDoc} */
217      public boolean nextKeyValue() throws IOException {
218        try {
219          if (key == null) {
220            key = new LongWritable();
221          }
222          if (value == null) {
223            value = createValue();
224          }
225          if (null == this.results) {
226            // First time into this method, run the query.
227            this.results = executeQuery(getSelectQuery());
228          }
229          if (!results.next())
230            return false;
231    
232          // Set the key field value as the output key value
233          key.set(pos + split.getStart());
234    
235          value.readFields(results);
236    
237          pos ++;
238        } catch (SQLException e) {
239          throw new IOException("SQLException in nextKeyValue", e);
240        }
241        return true;
242      }
243    
244      protected DBInputFormat.DBInputSplit getSplit() {
245        return split;
246      }
247    
248      protected String [] getFieldNames() {
249        return fieldNames;
250      }
251    
252      protected String getTableName() {
253        return tableName;
254      }
255    
256      protected String getConditions() {
257        return conditions;
258      }
259    
260      protected DBConfiguration getDBConf() {
261        return dbConf;
262      }
263    
264      protected Connection getConnection() {
265        return connection;
266      }
267    
268      protected PreparedStatement getStatement() {
269        return statement;
270      }
271    
272      protected void setStatement(PreparedStatement stmt) {
273        this.statement = stmt;
274      }
275    }