001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapred.lib.db; 020 021 import java.io.IOException; 022 import java.sql.Connection; 023 import java.sql.SQLException; 024 import java.util.List; 025 026 import org.apache.hadoop.classification.InterfaceAudience; 027 import org.apache.hadoop.classification.InterfaceStability; 028 import org.apache.hadoop.io.LongWritable; 029 import org.apache.hadoop.io.Writable; 030 import org.apache.hadoop.mapred.InputFormat; 031 import org.apache.hadoop.mapred.InputSplit; 032 import org.apache.hadoop.mapred.JobConf; 033 import org.apache.hadoop.mapred.JobConfigurable; 034 import org.apache.hadoop.mapred.RecordReader; 035 import org.apache.hadoop.mapred.Reporter; 036 import org.apache.hadoop.mapreduce.Job; 037 038 @InterfaceAudience.Public 039 @InterfaceStability.Stable 040 @SuppressWarnings("deprecation") 041 public class DBInputFormat<T extends DBWritable> 042 extends org.apache.hadoop.mapreduce.lib.db.DBInputFormat<T> 043 implements InputFormat<LongWritable, T>, JobConfigurable { 044 /** 045 * A RecordReader that reads records from a SQL table. 046 * Emits LongWritables containing the record number as 047 * key and DBWritables as value. 048 */ 049 protected class DBRecordReader extends 050 org.apache.hadoop.mapreduce.lib.db.DBRecordReader<T> 051 implements RecordReader<LongWritable, T> { 052 /** 053 * The constructor is kept to be compatible with M/R 1.x 054 * 055 * @param split The InputSplit to read data for 056 * @throws SQLException 057 */ 058 protected DBRecordReader(DBInputSplit split, Class<T> inputClass, 059 JobConf job) throws SQLException { 060 super(split, inputClass, job, connection, dbConf, conditions, fieldNames, tableName); 061 } 062 063 /** 064 * @param split The InputSplit to read data for 065 * @throws SQLException 066 */ 067 protected DBRecordReader(DBInputSplit split, Class<T> inputClass, 068 JobConf job, Connection conn, DBConfiguration dbConfig, String cond, 069 String [] fields, String table) throws SQLException { 070 super(split, inputClass, job, conn, dbConfig, cond, fields, table); 071 } 072 073 /** {@inheritDoc} */ 074 public LongWritable createKey() { 075 return new LongWritable(); 076 } 077 078 /** {@inheritDoc} */ 079 public T createValue() { 080 return super.createValue(); 081 } 082 083 public long getPos() throws IOException { 084 return super.getPos(); 085 } 086 087 /** {@inheritDoc} */ 088 public boolean next(LongWritable key, T value) throws IOException { 089 return super.next(key, value); 090 } 091 } 092 093 /** 094 * A RecordReader implementation that just passes through to a wrapped 095 * RecordReader built with the new API. 096 */ 097 private static class DBRecordReaderWrapper<T extends DBWritable> 098 implements RecordReader<LongWritable, T> { 099 100 private org.apache.hadoop.mapreduce.lib.db.DBRecordReader<T> rr; 101 102 public DBRecordReaderWrapper( 103 org.apache.hadoop.mapreduce.lib.db.DBRecordReader<T> inner) { 104 this.rr = inner; 105 } 106 107 public void close() throws IOException { 108 rr.close(); 109 } 110 111 public LongWritable createKey() { 112 return new LongWritable(); 113 } 114 115 public T createValue() { 116 return rr.createValue(); 117 } 118 119 public float getProgress() throws IOException { 120 return rr.getProgress(); 121 } 122 123 public long getPos() throws IOException { 124 return rr.getPos(); 125 } 126 127 public boolean next(LongWritable key, T value) throws IOException { 128 return rr.next(key, value); 129 } 130 } 131 132 /** 133 * A Class that does nothing, implementing DBWritable 134 */ 135 public static class NullDBWritable extends 136 org.apache.hadoop.mapreduce.lib.db.DBInputFormat.NullDBWritable 137 implements DBWritable, Writable { 138 } 139 /** 140 * A InputSplit that spans a set of rows 141 */ 142 protected static class DBInputSplit extends 143 org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit 144 implements InputSplit { 145 /** 146 * Default Constructor 147 */ 148 public DBInputSplit() { 149 } 150 151 /** 152 * Convenience Constructor 153 * @param start the index of the first row to select 154 * @param end the index of the last row to select 155 */ 156 public DBInputSplit(long start, long end) { 157 super(start, end); 158 } 159 } 160 161 /** {@inheritDoc} */ 162 public void configure(JobConf job) { 163 super.setConf(job); 164 } 165 166 /** {@inheritDoc} */ 167 public RecordReader<LongWritable, T> getRecordReader(InputSplit split, 168 JobConf job, Reporter reporter) throws IOException { 169 170 // wrap the DBRR in a shim class to deal with API differences. 171 return new DBRecordReaderWrapper<T>( 172 (org.apache.hadoop.mapreduce.lib.db.DBRecordReader<T>) 173 createDBRecordReader( 174 (org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit) split, job)); 175 } 176 177 /** {@inheritDoc} */ 178 public InputSplit[] getSplits(JobConf job, int chunks) throws IOException { 179 List<org.apache.hadoop.mapreduce.InputSplit> newSplits = 180 super.getSplits(new Job(job)); 181 InputSplit[] ret = new InputSplit[newSplits.size()]; 182 int i = 0; 183 for (org.apache.hadoop.mapreduce.InputSplit s : newSplits) { 184 org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit split = 185 (org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit)s; 186 ret[i++] = new DBInputSplit(split.getStart(), split.getEnd()); 187 } 188 return ret; 189 } 190 191 /** 192 * Initializes the map-part of the job with the appropriate input settings. 193 * 194 * @param job The job 195 * @param inputClass the class object implementing DBWritable, which is the 196 * Java object holding tuple fields. 197 * @param tableName The table to read data from 198 * @param conditions The condition which to select data with, eg. '(updated > 199 * 20070101 AND length > 0)' 200 * @param orderBy the fieldNames in the orderBy clause. 201 * @param fieldNames The field names in the table 202 * @see #setInput(JobConf, Class, String, String) 203 */ 204 public static void setInput(JobConf job, Class<? extends DBWritable> inputClass, 205 String tableName,String conditions, String orderBy, String... fieldNames) { 206 job.setInputFormat(DBInputFormat.class); 207 208 DBConfiguration dbConf = new DBConfiguration(job); 209 dbConf.setInputClass(inputClass); 210 dbConf.setInputTableName(tableName); 211 dbConf.setInputFieldNames(fieldNames); 212 dbConf.setInputConditions(conditions); 213 dbConf.setInputOrderBy(orderBy); 214 } 215 216 /** 217 * Initializes the map-part of the job with the appropriate input settings. 218 * 219 * @param job The job 220 * @param inputClass the class object implementing DBWritable, which is the 221 * Java object holding tuple fields. 222 * @param inputQuery the input query to select fields. Example : 223 * "SELECT f1, f2, f3 FROM Mytable ORDER BY f1" 224 * @param inputCountQuery the input query that returns the number of records in 225 * the table. 226 * Example : "SELECT COUNT(f1) FROM Mytable" 227 * @see #setInput(JobConf, Class, String, String, String, String...) 228 */ 229 public static void setInput(JobConf job, Class<? extends DBWritable> inputClass, 230 String inputQuery, String inputCountQuery) { 231 job.setInputFormat(DBInputFormat.class); 232 233 DBConfiguration dbConf = new DBConfiguration(job); 234 dbConf.setInputClass(inputClass); 235 dbConf.setInputQuery(inputQuery); 236 dbConf.setInputCountQuery(inputCountQuery); 237 238 } 239 }