001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapreduce.lib.db; 020 021 import java.io.DataInput; 022 import java.io.DataOutput; 023 import java.io.IOException; 024 import java.sql.Connection; 025 import java.sql.DatabaseMetaData; 026 import java.sql.PreparedStatement; 027 import java.sql.ResultSet; 028 import java.sql.SQLException; 029 import java.sql.Statement; 030 import java.util.ArrayList; 031 import java.util.List; 032 033 import org.apache.commons.logging.Log; 034 import org.apache.commons.logging.LogFactory; 035 import org.apache.hadoop.io.LongWritable; 036 import org.apache.hadoop.io.Writable; 037 import org.apache.hadoop.mapreduce.InputFormat; 038 import org.apache.hadoop.mapreduce.InputSplit; 039 import org.apache.hadoop.mapreduce.Job; 040 import org.apache.hadoop.mapreduce.JobContext; 041 import org.apache.hadoop.mapreduce.RecordReader; 042 import org.apache.hadoop.mapreduce.TaskAttemptContext; 043 import org.apache.hadoop.util.ReflectionUtils; 044 import org.apache.hadoop.classification.InterfaceAudience; 045 import org.apache.hadoop.classification.InterfaceStability; 046 import org.apache.hadoop.conf.Configurable; 047 import org.apache.hadoop.conf.Configuration; 048 049 /** 050 * A RecordReader that reads records from a SQL table, 051 * using data-driven WHERE clause splits. 052 * Emits LongWritables containing the record number as 053 * key and DBWritables as value. 054 */ 055 @InterfaceAudience.Public 056 @InterfaceStability.Evolving 057 public class DataDrivenDBRecordReader<T extends DBWritable> extends DBRecordReader<T> { 058 059 private static final Log LOG = LogFactory.getLog(DataDrivenDBRecordReader.class); 060 061 private String dbProductName; // database manufacturer string. 062 063 /** 064 * @param split The InputSplit to read data for 065 * @throws SQLException 066 */ 067 public DataDrivenDBRecordReader(DBInputFormat.DBInputSplit split, 068 Class<T> inputClass, Configuration conf, Connection conn, DBConfiguration dbConfig, 069 String cond, String [] fields, String table, String dbProduct) 070 throws SQLException { 071 super(split, inputClass, conf, conn, dbConfig, cond, fields, table); 072 this.dbProductName = dbProduct; 073 } 074 075 /** Returns the query for selecting the records, 076 * subclasses can override this for custom behaviour.*/ 077 @SuppressWarnings("unchecked") 078 protected String getSelectQuery() { 079 StringBuilder query = new StringBuilder(); 080 DataDrivenDBInputFormat.DataDrivenDBInputSplit dataSplit = 081 (DataDrivenDBInputFormat.DataDrivenDBInputSplit) getSplit(); 082 DBConfiguration dbConf = getDBConf(); 083 String [] fieldNames = getFieldNames(); 084 String tableName = getTableName(); 085 String conditions = getConditions(); 086 087 // Build the WHERE clauses associated with the data split first. 088 // We need them in both branches of this function. 089 StringBuilder conditionClauses = new StringBuilder(); 090 conditionClauses.append("( ").append(dataSplit.getLowerClause()); 091 conditionClauses.append(" ) AND ( ").append(dataSplit.getUpperClause()); 092 conditionClauses.append(" )"); 093 094 if(dbConf.getInputQuery() == null) { 095 // We need to generate the entire query. 096 query.append("SELECT "); 097 098 for (int i = 0; i < fieldNames.length; i++) { 099 query.append(fieldNames[i]); 100 if (i != fieldNames.length -1) { 101 query.append(", "); 102 } 103 } 104 105 query.append(" FROM ").append(tableName); 106 if (!dbProductName.startsWith("ORACLE")) { 107 // Seems to be necessary for hsqldb? Oracle explicitly does *not* 108 // use this clause. 109 query.append(" AS ").append(tableName); 110 } 111 query.append(" WHERE "); 112 if (conditions != null && conditions.length() > 0) { 113 // Put the user's conditions first. 114 query.append("( ").append(conditions).append(" ) AND "); 115 } 116 117 // Now append the conditions associated with our split. 118 query.append(conditionClauses.toString()); 119 120 } else { 121 // User provided the query. We replace the special token with our WHERE clause. 122 String inputQuery = dbConf.getInputQuery(); 123 if (inputQuery.indexOf(DataDrivenDBInputFormat.SUBSTITUTE_TOKEN) == -1) { 124 LOG.error("Could not find the clause substitution token " 125 + DataDrivenDBInputFormat.SUBSTITUTE_TOKEN + " in the query: [" 126 + inputQuery + "]. Parallel splits may not work correctly."); 127 } 128 129 query.append(inputQuery.replace(DataDrivenDBInputFormat.SUBSTITUTE_TOKEN, 130 conditionClauses.toString())); 131 } 132 133 LOG.debug("Using query: " + query.toString()); 134 135 return query.toString(); 136 } 137 }