001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapred.lib.db; 020 021import java.io.IOException; 022import java.sql.Connection; 023import java.sql.SQLException; 024import java.util.List; 025 026import org.apache.hadoop.classification.InterfaceAudience; 027import org.apache.hadoop.classification.InterfaceStability; 028import org.apache.hadoop.io.LongWritable; 029import org.apache.hadoop.io.Writable; 030import org.apache.hadoop.mapred.InputFormat; 031import org.apache.hadoop.mapred.InputSplit; 032import org.apache.hadoop.mapred.JobConf; 033import org.apache.hadoop.mapred.JobConfigurable; 034import org.apache.hadoop.mapred.RecordReader; 035import org.apache.hadoop.mapred.Reporter; 036import org.apache.hadoop.mapreduce.Job; 037 038@InterfaceAudience.Public 039@InterfaceStability.Stable 040public class DBInputFormat<T extends DBWritable> 041 extends org.apache.hadoop.mapreduce.lib.db.DBInputFormat<T> 042 implements InputFormat<LongWritable, T>, JobConfigurable { 043 /** 044 * A RecordReader that reads records from a SQL table. 045 * Emits LongWritables containing the record number as 046 * key and DBWritables as value. 047 */ 048 protected class DBRecordReader extends 049 org.apache.hadoop.mapreduce.lib.db.DBRecordReader<T> 050 implements RecordReader<LongWritable, T> { 051 /** 052 * @param split The InputSplit to read data for 053 * @throws SQLException 054 */ 055 protected DBRecordReader(DBInputSplit split, Class<T> inputClass, 056 JobConf job, Connection conn, DBConfiguration dbConfig, String cond, 057 String [] fields, String table) throws SQLException { 058 super(split, inputClass, job, conn, dbConfig, cond, fields, table); 059 } 060 061 /** {@inheritDoc} */ 062 public LongWritable createKey() { 063 return new LongWritable(); 064 } 065 066 /** {@inheritDoc} */ 067 public T createValue() { 068 return super.createValue(); 069 } 070 071 public long getPos() throws IOException { 072 return super.getPos(); 073 } 074 075 /** {@inheritDoc} */ 076 public boolean next(LongWritable key, T value) throws IOException { 077 return super.next(key, value); 078 } 079 } 080 081 /** 082 * A RecordReader implementation that just passes through to a wrapped 083 * RecordReader built with the new API. 084 */ 085 private static class DBRecordReaderWrapper<T extends DBWritable> 086 implements RecordReader<LongWritable, T> { 087 088 private org.apache.hadoop.mapreduce.lib.db.DBRecordReader<T> rr; 089 090 public DBRecordReaderWrapper( 091 org.apache.hadoop.mapreduce.lib.db.DBRecordReader<T> inner) { 092 this.rr = inner; 093 } 094 095 public void close() throws IOException { 096 rr.close(); 097 } 098 099 public LongWritable createKey() { 100 return new LongWritable(); 101 } 102 103 public T createValue() { 104 return rr.createValue(); 105 } 106 107 public float getProgress() throws IOException { 108 return rr.getProgress(); 109 } 110 111 public long getPos() throws IOException { 112 return rr.getPos(); 113 } 114 115 public boolean next(LongWritable key, T value) throws IOException { 116 return rr.next(key, value); 117 } 118 } 119 120 /** 121 * A Class that does nothing, implementing DBWritable 122 */ 123 public static class NullDBWritable extends 124 org.apache.hadoop.mapreduce.lib.db.DBInputFormat.NullDBWritable 125 implements DBWritable, Writable { 126 } 127 /** 128 * A InputSplit that spans a set of rows 129 */ 130 protected static class DBInputSplit extends 131 org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit 132 implements InputSplit { 133 /** 134 * Default Constructor 135 */ 136 public DBInputSplit() { 137 } 138 139 /** 140 * Convenience Constructor 141 * @param start the index of the first row to select 142 * @param end the index of the last row to select 143 */ 144 public DBInputSplit(long start, long end) { 145 super(start, end); 146 } 147 } 148 149 /** {@inheritDoc} */ 150 public void configure(JobConf job) { 151 super.setConf(job); 152 } 153 154 /** {@inheritDoc} */ 155 @SuppressWarnings("unchecked") 156 public RecordReader<LongWritable, T> getRecordReader(InputSplit split, 157 JobConf job, Reporter reporter) throws IOException { 158 159 // wrap the DBRR in a shim class to deal with API differences. 160 return new DBRecordReaderWrapper<T>( 161 (org.apache.hadoop.mapreduce.lib.db.DBRecordReader<T>) 162 createDBRecordReader( 163 (org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit) split, job)); 164 } 165 166 /** {@inheritDoc} */ 167 public InputSplit[] getSplits(JobConf job, int chunks) throws IOException { 168 List<org.apache.hadoop.mapreduce.InputSplit> newSplits = 169 super.getSplits(new Job(job)); 170 InputSplit[] ret = new InputSplit[newSplits.size()]; 171 int i = 0; 172 for (org.apache.hadoop.mapreduce.InputSplit s : newSplits) { 173 org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit split = 174 (org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit)s; 175 ret[i++] = new DBInputSplit(split.getStart(), split.getEnd()); 176 } 177 return ret; 178 } 179 180 /** 181 * Initializes the map-part of the job with the appropriate input settings. 182 * 183 * @param job The job 184 * @param inputClass the class object implementing DBWritable, which is the 185 * Java object holding tuple fields. 186 * @param tableName The table to read data from 187 * @param conditions The condition which to select data with, eg. '(updated > 188 * 20070101 AND length > 0)' 189 * @param orderBy the fieldNames in the orderBy clause. 190 * @param fieldNames The field names in the table 191 * @see #setInput(JobConf, Class, String, String) 192 */ 193 public static void setInput(JobConf job, Class<? extends DBWritable> inputClass, 194 String tableName,String conditions, String orderBy, String... fieldNames) { 195 job.setInputFormat(DBInputFormat.class); 196 197 DBConfiguration dbConf = new DBConfiguration(job); 198 dbConf.setInputClass(inputClass); 199 dbConf.setInputTableName(tableName); 200 dbConf.setInputFieldNames(fieldNames); 201 dbConf.setInputConditions(conditions); 202 dbConf.setInputOrderBy(orderBy); 203 } 204 205 /** 206 * Initializes the map-part of the job with the appropriate input settings. 207 * 208 * @param job The job 209 * @param inputClass the class object implementing DBWritable, which is the 210 * Java object holding tuple fields. 211 * @param inputQuery the input query to select fields. Example : 212 * "SELECT f1, f2, f3 FROM Mytable ORDER BY f1" 213 * @param inputCountQuery the input query that returns the number of records in 214 * the table. 215 * Example : "SELECT COUNT(f1) FROM Mytable" 216 * @see #setInput(JobConf, Class, String, String, String, String...) 217 */ 218 public static void setInput(JobConf job, Class<? extends DBWritable> inputClass, 219 String inputQuery, String inputCountQuery) { 220 job.setInputFormat(DBInputFormat.class); 221 222 DBConfiguration dbConf = new DBConfiguration(job); 223 dbConf.setInputClass(inputClass); 224 dbConf.setInputQuery(inputQuery); 225 dbConf.setInputCountQuery(inputCountQuery); 226 227 } 228}