001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.lib.db;
020    
021    import java.io.DataInput;
022    import java.io.DataOutput;
023    import java.io.IOException;
024    import java.sql.Connection;
025    import java.sql.DatabaseMetaData;
026    import java.sql.PreparedStatement;
027    import java.sql.ResultSet;
028    import java.sql.SQLException;
029    import java.sql.Statement;
030    import java.util.ArrayList;
031    import java.util.List;
032    
033    import org.apache.commons.logging.Log;
034    import org.apache.commons.logging.LogFactory;
035    import org.apache.hadoop.io.LongWritable;
036    import org.apache.hadoop.io.Writable;
037    import org.apache.hadoop.mapreduce.InputFormat;
038    import org.apache.hadoop.mapreduce.InputSplit;
039    import org.apache.hadoop.mapreduce.Job;
040    import org.apache.hadoop.mapreduce.JobContext;
041    import org.apache.hadoop.mapreduce.RecordReader;
042    import org.apache.hadoop.mapreduce.TaskAttemptContext;
043    import org.apache.hadoop.util.ReflectionUtils;
044    import org.apache.hadoop.classification.InterfaceAudience;
045    import org.apache.hadoop.classification.InterfaceStability;
046    import org.apache.hadoop.conf.Configurable;
047    import org.apache.hadoop.conf.Configuration;
048    
049    /**
050     * A RecordReader that reads records from a SQL table,
051     * using data-driven WHERE clause splits.
052     * Emits LongWritables containing the record number as
053     * key and DBWritables as value.
054     */
055    @InterfaceAudience.Public
056    @InterfaceStability.Evolving
057    public class DataDrivenDBRecordReader<T extends DBWritable> extends DBRecordReader<T> {
058    
059      private static final Log LOG = LogFactory.getLog(DataDrivenDBRecordReader.class);
060    
061      private String dbProductName; // database manufacturer string.
062    
063      /**
064       * @param split The InputSplit to read data for
065       * @throws SQLException 
066       */
067      public DataDrivenDBRecordReader(DBInputFormat.DBInputSplit split,
068          Class<T> inputClass, Configuration conf, Connection conn, DBConfiguration dbConfig,
069          String cond, String [] fields, String table, String dbProduct)
070          throws SQLException {
071        super(split, inputClass, conf, conn, dbConfig, cond, fields, table);
072        this.dbProductName = dbProduct;
073      }
074    
075      /** Returns the query for selecting the records,
076       * subclasses can override this for custom behaviour.*/
077      @SuppressWarnings("unchecked")
078      protected String getSelectQuery() {
079        StringBuilder query = new StringBuilder();
080        DataDrivenDBInputFormat.DataDrivenDBInputSplit dataSplit =
081            (DataDrivenDBInputFormat.DataDrivenDBInputSplit) getSplit();
082        DBConfiguration dbConf = getDBConf();
083        String [] fieldNames = getFieldNames();
084        String tableName = getTableName();
085        String conditions = getConditions();
086    
087        // Build the WHERE clauses associated with the data split first.
088        // We need them in both branches of this function.
089        StringBuilder conditionClauses = new StringBuilder();
090        conditionClauses.append("( ").append(dataSplit.getLowerClause());
091        conditionClauses.append(" ) AND ( ").append(dataSplit.getUpperClause());
092        conditionClauses.append(" )");
093    
094        if(dbConf.getInputQuery() == null) {
095          // We need to generate the entire query.
096          query.append("SELECT ");
097    
098          for (int i = 0; i < fieldNames.length; i++) {
099            query.append(fieldNames[i]);
100            if (i != fieldNames.length -1) {
101              query.append(", ");
102            }
103          }
104    
105          query.append(" FROM ").append(tableName);
106          if (!dbProductName.startsWith("ORACLE")) {
107            // Seems to be necessary for hsqldb? Oracle explicitly does *not*
108            // use this clause.
109            query.append(" AS ").append(tableName);
110          }
111          query.append(" WHERE ");
112          if (conditions != null && conditions.length() > 0) {
113            // Put the user's conditions first.
114            query.append("( ").append(conditions).append(" ) AND ");
115          }
116    
117          // Now append the conditions associated with our split.
118          query.append(conditionClauses.toString());
119    
120        } else {
121          // User provided the query. We replace the special token with our WHERE clause.
122          String inputQuery = dbConf.getInputQuery();
123          if (inputQuery.indexOf(DataDrivenDBInputFormat.SUBSTITUTE_TOKEN) == -1) {
124            LOG.error("Could not find the clause substitution token "
125                + DataDrivenDBInputFormat.SUBSTITUTE_TOKEN + " in the query: ["
126                + inputQuery + "]. Parallel splits may not work correctly.");
127          }
128    
129          query.append(inputQuery.replace(DataDrivenDBInputFormat.SUBSTITUTE_TOKEN,
130              conditionClauses.toString()));
131        }
132    
133        LOG.debug("Using query: " + query.toString());
134    
135        return query.toString();
136      }
137    }