001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapred.lib.db;
020
021 import java.io.IOException;
022 import java.sql.Connection;
023 import java.sql.SQLException;
024 import java.util.List;
025
026 import org.apache.hadoop.classification.InterfaceAudience;
027 import org.apache.hadoop.classification.InterfaceStability;
028 import org.apache.hadoop.io.LongWritable;
029 import org.apache.hadoop.io.Writable;
030 import org.apache.hadoop.mapred.InputFormat;
031 import org.apache.hadoop.mapred.InputSplit;
032 import org.apache.hadoop.mapred.JobConf;
033 import org.apache.hadoop.mapred.JobConfigurable;
034 import org.apache.hadoop.mapred.RecordReader;
035 import org.apache.hadoop.mapred.Reporter;
036 import org.apache.hadoop.mapreduce.Job;
037
038 @InterfaceAudience.Public
039 @InterfaceStability.Stable
040 @SuppressWarnings("deprecation")
041 public class DBInputFormat<T extends DBWritable>
042 extends org.apache.hadoop.mapreduce.lib.db.DBInputFormat<T>
043 implements InputFormat<LongWritable, T>, JobConfigurable {
044 /**
045 * A RecordReader that reads records from a SQL table.
046 * Emits LongWritables containing the record number as
047 * key and DBWritables as value.
048 */
049 protected class DBRecordReader extends
050 org.apache.hadoop.mapreduce.lib.db.DBRecordReader<T>
051 implements RecordReader<LongWritable, T> {
052 /**
053 * The constructor is kept to be compatible with M/R 1.x
054 *
055 * @param split The InputSplit to read data for
056 * @throws SQLException
057 */
058 protected DBRecordReader(DBInputSplit split, Class<T> inputClass,
059 JobConf job) throws SQLException {
060 super(split, inputClass, job, connection, dbConf, conditions, fieldNames, tableName);
061 }
062
063 /**
064 * @param split The InputSplit to read data for
065 * @throws SQLException
066 */
067 protected DBRecordReader(DBInputSplit split, Class<T> inputClass,
068 JobConf job, Connection conn, DBConfiguration dbConfig, String cond,
069 String [] fields, String table) throws SQLException {
070 super(split, inputClass, job, conn, dbConfig, cond, fields, table);
071 }
072
073 /** {@inheritDoc} */
074 public LongWritable createKey() {
075 return new LongWritable();
076 }
077
078 /** {@inheritDoc} */
079 public T createValue() {
080 return super.createValue();
081 }
082
083 public long getPos() throws IOException {
084 return super.getPos();
085 }
086
087 /** {@inheritDoc} */
088 public boolean next(LongWritable key, T value) throws IOException {
089 return super.next(key, value);
090 }
091 }
092
093 /**
094 * A RecordReader implementation that just passes through to a wrapped
095 * RecordReader built with the new API.
096 */
097 private static class DBRecordReaderWrapper<T extends DBWritable>
098 implements RecordReader<LongWritable, T> {
099
100 private org.apache.hadoop.mapreduce.lib.db.DBRecordReader<T> rr;
101
102 public DBRecordReaderWrapper(
103 org.apache.hadoop.mapreduce.lib.db.DBRecordReader<T> inner) {
104 this.rr = inner;
105 }
106
107 public void close() throws IOException {
108 rr.close();
109 }
110
111 public LongWritable createKey() {
112 return new LongWritable();
113 }
114
115 public T createValue() {
116 return rr.createValue();
117 }
118
119 public float getProgress() throws IOException {
120 return rr.getProgress();
121 }
122
123 public long getPos() throws IOException {
124 return rr.getPos();
125 }
126
127 public boolean next(LongWritable key, T value) throws IOException {
128 return rr.next(key, value);
129 }
130 }
131
132 /**
133 * A Class that does nothing, implementing DBWritable
134 */
135 public static class NullDBWritable extends
136 org.apache.hadoop.mapreduce.lib.db.DBInputFormat.NullDBWritable
137 implements DBWritable, Writable {
138 }
139 /**
140 * A InputSplit that spans a set of rows
141 */
142 protected static class DBInputSplit extends
143 org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit
144 implements InputSplit {
145 /**
146 * Default Constructor
147 */
148 public DBInputSplit() {
149 }
150
151 /**
152 * Convenience Constructor
153 * @param start the index of the first row to select
154 * @param end the index of the last row to select
155 */
156 public DBInputSplit(long start, long end) {
157 super(start, end);
158 }
159 }
160
161 /** {@inheritDoc} */
162 public void configure(JobConf job) {
163 super.setConf(job);
164 }
165
166 /** {@inheritDoc} */
167 public RecordReader<LongWritable, T> getRecordReader(InputSplit split,
168 JobConf job, Reporter reporter) throws IOException {
169
170 // wrap the DBRR in a shim class to deal with API differences.
171 return new DBRecordReaderWrapper<T>(
172 (org.apache.hadoop.mapreduce.lib.db.DBRecordReader<T>)
173 createDBRecordReader(
174 (org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit) split, job));
175 }
176
177 /** {@inheritDoc} */
178 public InputSplit[] getSplits(JobConf job, int chunks) throws IOException {
179 List<org.apache.hadoop.mapreduce.InputSplit> newSplits =
180 super.getSplits(new Job(job));
181 InputSplit[] ret = new InputSplit[newSplits.size()];
182 int i = 0;
183 for (org.apache.hadoop.mapreduce.InputSplit s : newSplits) {
184 org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit split =
185 (org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit)s;
186 ret[i++] = new DBInputSplit(split.getStart(), split.getEnd());
187 }
188 return ret;
189 }
190
191 /**
192 * Initializes the map-part of the job with the appropriate input settings.
193 *
194 * @param job The job
195 * @param inputClass the class object implementing DBWritable, which is the
196 * Java object holding tuple fields.
197 * @param tableName The table to read data from
198 * @param conditions The condition which to select data with, eg. '(updated >
199 * 20070101 AND length > 0)'
200 * @param orderBy the fieldNames in the orderBy clause.
201 * @param fieldNames The field names in the table
202 * @see #setInput(JobConf, Class, String, String)
203 */
204 public static void setInput(JobConf job, Class<? extends DBWritable> inputClass,
205 String tableName,String conditions, String orderBy, String... fieldNames) {
206 job.setInputFormat(DBInputFormat.class);
207
208 DBConfiguration dbConf = new DBConfiguration(job);
209 dbConf.setInputClass(inputClass);
210 dbConf.setInputTableName(tableName);
211 dbConf.setInputFieldNames(fieldNames);
212 dbConf.setInputConditions(conditions);
213 dbConf.setInputOrderBy(orderBy);
214 }
215
216 /**
217 * Initializes the map-part of the job with the appropriate input settings.
218 *
219 * @param job The job
220 * @param inputClass the class object implementing DBWritable, which is the
221 * Java object holding tuple fields.
222 * @param inputQuery the input query to select fields. Example :
223 * "SELECT f1, f2, f3 FROM Mytable ORDER BY f1"
224 * @param inputCountQuery the input query that returns the number of records in
225 * the table.
226 * Example : "SELECT COUNT(f1) FROM Mytable"
227 * @see #setInput(JobConf, Class, String, String, String, String...)
228 */
229 public static void setInput(JobConf job, Class<? extends DBWritable> inputClass,
230 String inputQuery, String inputCountQuery) {
231 job.setInputFormat(DBInputFormat.class);
232
233 DBConfiguration dbConf = new DBConfiguration(job);
234 dbConf.setInputClass(inputClass);
235 dbConf.setInputQuery(inputQuery);
236 dbConf.setInputCountQuery(inputCountQuery);
237
238 }
239 }