001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapred.lib.db;
020
021import java.io.IOException;
022import java.sql.Connection;
023import java.sql.SQLException;
024import java.util.List;
025
026import org.apache.hadoop.classification.InterfaceAudience;
027import org.apache.hadoop.classification.InterfaceStability;
028import org.apache.hadoop.io.LongWritable;
029import org.apache.hadoop.io.Writable;
030import org.apache.hadoop.mapred.InputFormat;
031import org.apache.hadoop.mapred.InputSplit;
032import org.apache.hadoop.mapred.JobConf;
033import org.apache.hadoop.mapred.JobConfigurable;
034import org.apache.hadoop.mapred.RecordReader;
035import org.apache.hadoop.mapred.Reporter;
036import org.apache.hadoop.mapreduce.Job;
037
038@InterfaceAudience.Public
039@InterfaceStability.Stable
040@SuppressWarnings("deprecation")
041public class DBInputFormat<T  extends DBWritable>
042    extends org.apache.hadoop.mapreduce.lib.db.DBInputFormat<T> 
043    implements InputFormat<LongWritable, T>, JobConfigurable {
044  /**
045   * A RecordReader that reads records from a SQL table.
046   * Emits LongWritables containing the record number as 
047   * key and DBWritables as value.  
048   */
049  protected class DBRecordReader extends
050      org.apache.hadoop.mapreduce.lib.db.DBRecordReader<T>
051      implements RecordReader<LongWritable, T> {
052    /**
053     * The constructor is kept to be compatible with M/R 1.x
054     *
055     * @param split The InputSplit to read data for
056     * @throws SQLException
057     */
058    protected DBRecordReader(DBInputSplit split, Class<T> inputClass,
059        JobConf job) throws SQLException {
060      super(split, inputClass, job, connection, dbConf, conditions, fieldNames, tableName);
061    }
062
063    /**
064     * @param split The InputSplit to read data for
065     * @throws SQLException 
066     */
067    protected DBRecordReader(DBInputSplit split, Class<T> inputClass, 
068        JobConf job, Connection conn, DBConfiguration dbConfig, String cond,
069        String [] fields, String table) throws SQLException {
070      super(split, inputClass, job, conn, dbConfig, cond, fields, table);
071    }
072
073    /** {@inheritDoc} */
074    public LongWritable createKey() {
075      return new LongWritable();  
076    }
077
078    /** {@inheritDoc} */
079    public T createValue() {
080      return super.createValue();
081    }
082
083    public long getPos() throws IOException {
084      return super.getPos();
085    }
086
087    /** {@inheritDoc} */
088    public boolean next(LongWritable key, T value) throws IOException {
089      return super.next(key, value);
090    }
091  }
092
093  /**
094   * A RecordReader implementation that just passes through to a wrapped
095   * RecordReader built with the new API.
096   */
097  private static class DBRecordReaderWrapper<T extends DBWritable>
098      implements RecordReader<LongWritable, T> {
099
100    private org.apache.hadoop.mapreduce.lib.db.DBRecordReader<T> rr;
101    
102    public DBRecordReaderWrapper(
103        org.apache.hadoop.mapreduce.lib.db.DBRecordReader<T> inner) {
104      this.rr = inner;
105    }
106
107    public void close() throws IOException {
108      rr.close();
109    }
110
111    public LongWritable createKey() {
112      return new LongWritable();
113    }
114
115    public T createValue() {
116      return rr.createValue();
117    }
118
119    public float getProgress() throws IOException {
120      return rr.getProgress();
121    }
122    
123    public long getPos() throws IOException {
124      return rr.getPos();
125    }
126
127    public boolean next(LongWritable key, T value) throws IOException {
128      return rr.next(key, value);
129    }
130  }
131
132  /**
133   * A Class that does nothing, implementing DBWritable
134   */
135  public static class NullDBWritable extends 
136      org.apache.hadoop.mapreduce.lib.db.DBInputFormat.NullDBWritable 
137      implements DBWritable, Writable {
138  }
139  /**
140   * A InputSplit that spans a set of rows
141   */
142  protected static class DBInputSplit extends 
143      org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit 
144      implements InputSplit {
145    /**
146     * Default Constructor
147     */
148    public DBInputSplit() {
149    }
150
151    /**
152     * Convenience Constructor
153     * @param start the index of the first row to select
154     * @param end the index of the last row to select
155     */
156    public DBInputSplit(long start, long end) {
157      super(start, end);
158    }
159  }
160
161  /** {@inheritDoc} */
162  public void configure(JobConf job) {
163    super.setConf(job);
164  }
165
166  /** {@inheritDoc} */
167  public RecordReader<LongWritable, T> getRecordReader(InputSplit split,
168      JobConf job, Reporter reporter) throws IOException {
169
170    // wrap the DBRR in a shim class to deal with API differences.
171    return new DBRecordReaderWrapper<T>(
172        (org.apache.hadoop.mapreduce.lib.db.DBRecordReader<T>) 
173        createDBRecordReader(
174          (org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit) split, job));
175  }
176
177  /** {@inheritDoc} */
178  public InputSplit[] getSplits(JobConf job, int chunks) throws IOException {
179    List<org.apache.hadoop.mapreduce.InputSplit> newSplits = 
180      super.getSplits(new Job(job));
181    InputSplit[] ret = new InputSplit[newSplits.size()];
182    int i = 0;
183    for (org.apache.hadoop.mapreduce.InputSplit s : newSplits) {
184      org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit split = 
185        (org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit)s;
186      ret[i++] = new DBInputSplit(split.getStart(), split.getEnd());
187    }
188    return ret;
189  }
190
191  /**
192   * Initializes the map-part of the job with the appropriate input settings.
193   * 
194   * @param job The job
195   * @param inputClass the class object implementing DBWritable, which is the 
196   * Java object holding tuple fields.
197   * @param tableName The table to read data from
198   * @param conditions The condition which to select data with, eg. '(updated >
199   * 20070101 AND length > 0)'
200   * @param orderBy the fieldNames in the orderBy clause.
201   * @param fieldNames The field names in the table
202   * @see #setInput(JobConf, Class, String, String)
203   */
204  public static void setInput(JobConf job, Class<? extends DBWritable> inputClass,
205      String tableName,String conditions, String orderBy, String... fieldNames) {
206    job.setInputFormat(DBInputFormat.class);
207
208    DBConfiguration dbConf = new DBConfiguration(job);
209    dbConf.setInputClass(inputClass);
210    dbConf.setInputTableName(tableName);
211    dbConf.setInputFieldNames(fieldNames);
212    dbConf.setInputConditions(conditions);
213    dbConf.setInputOrderBy(orderBy);
214  }
215  
216  /**
217   * Initializes the map-part of the job with the appropriate input settings.
218   * 
219   * @param job The job
220   * @param inputClass the class object implementing DBWritable, which is the 
221   * Java object holding tuple fields.
222   * @param inputQuery the input query to select fields. Example : 
223   * "SELECT f1, f2, f3 FROM Mytable ORDER BY f1"
224   * @param inputCountQuery the input query that returns the number of records in
225   * the table. 
226   * Example : "SELECT COUNT(f1) FROM Mytable"
227   * @see #setInput(JobConf, Class, String, String, String, String...)
228   */
229  public static void setInput(JobConf job, Class<? extends DBWritable> inputClass,
230      String inputQuery, String inputCountQuery) {
231    job.setInputFormat(DBInputFormat.class);
232    
233    DBConfiguration dbConf = new DBConfiguration(job);
234    dbConf.setInputClass(inputClass);
235    dbConf.setInputQuery(inputQuery);
236    dbConf.setInputCountQuery(inputCountQuery);
237    
238  }
239}