001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapred.lib.db;
020
021import java.io.IOException;
022import java.sql.Connection;
023import java.sql.SQLException;
024import java.util.List;
025
026import org.apache.hadoop.classification.InterfaceAudience;
027import org.apache.hadoop.classification.InterfaceStability;
028import org.apache.hadoop.io.LongWritable;
029import org.apache.hadoop.io.Writable;
030import org.apache.hadoop.mapred.InputFormat;
031import org.apache.hadoop.mapred.InputSplit;
032import org.apache.hadoop.mapred.JobConf;
033import org.apache.hadoop.mapred.JobConfigurable;
034import org.apache.hadoop.mapred.RecordReader;
035import org.apache.hadoop.mapred.Reporter;
036import org.apache.hadoop.mapreduce.Job;
037
038@InterfaceAudience.Public
039@InterfaceStability.Stable
040public class DBInputFormat<T  extends DBWritable>
041    extends org.apache.hadoop.mapreduce.lib.db.DBInputFormat<T> 
042    implements InputFormat<LongWritable, T>, JobConfigurable {
043  /**
044   * A RecordReader that reads records from a SQL table.
045   * Emits LongWritables containing the record number as 
046   * key and DBWritables as value.  
047   */
048  protected class DBRecordReader extends
049      org.apache.hadoop.mapreduce.lib.db.DBRecordReader<T>
050      implements RecordReader<LongWritable, T> {
051    /**
052     * @param split The InputSplit to read data for
053     * @throws SQLException 
054     */
055    protected DBRecordReader(DBInputSplit split, Class<T> inputClass, 
056        JobConf job, Connection conn, DBConfiguration dbConfig, String cond,
057        String [] fields, String table) throws SQLException {
058      super(split, inputClass, job, conn, dbConfig, cond, fields, table);
059    }
060
061    /** {@inheritDoc} */
062    public LongWritable createKey() {
063      return new LongWritable();  
064    }
065
066    /** {@inheritDoc} */
067    public T createValue() {
068      return super.createValue();
069    }
070
071    public long getPos() throws IOException {
072      return super.getPos();
073    }
074
075    /** {@inheritDoc} */
076    public boolean next(LongWritable key, T value) throws IOException {
077      return super.next(key, value);
078    }
079  }
080
081  /**
082   * A RecordReader implementation that just passes through to a wrapped
083   * RecordReader built with the new API.
084   */
085  private static class DBRecordReaderWrapper<T extends DBWritable>
086      implements RecordReader<LongWritable, T> {
087
088    private org.apache.hadoop.mapreduce.lib.db.DBRecordReader<T> rr;
089    
090    public DBRecordReaderWrapper(
091        org.apache.hadoop.mapreduce.lib.db.DBRecordReader<T> inner) {
092      this.rr = inner;
093    }
094
095    public void close() throws IOException {
096      rr.close();
097    }
098
099    public LongWritable createKey() {
100      return new LongWritable();
101    }
102
103    public T createValue() {
104      return rr.createValue();
105    }
106
107    public float getProgress() throws IOException {
108      return rr.getProgress();
109    }
110    
111    public long getPos() throws IOException {
112      return rr.getPos();
113    }
114
115    public boolean next(LongWritable key, T value) throws IOException {
116      return rr.next(key, value);
117    }
118  }
119
120  /**
121   * A Class that does nothing, implementing DBWritable
122   */
123  public static class NullDBWritable extends 
124      org.apache.hadoop.mapreduce.lib.db.DBInputFormat.NullDBWritable 
125      implements DBWritable, Writable {
126  }
127  /**
128   * A InputSplit that spans a set of rows
129   */
130  protected static class DBInputSplit extends 
131      org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit 
132      implements InputSplit {
133    /**
134     * Default Constructor
135     */
136    public DBInputSplit() {
137    }
138
139    /**
140     * Convenience Constructor
141     * @param start the index of the first row to select
142     * @param end the index of the last row to select
143     */
144    public DBInputSplit(long start, long end) {
145      super(start, end);
146    }
147  }
148
149  /** {@inheritDoc} */
150  public void configure(JobConf job) {
151    super.setConf(job);
152  }
153
154  /** {@inheritDoc} */
155  @SuppressWarnings("unchecked")
156  public RecordReader<LongWritable, T> getRecordReader(InputSplit split,
157      JobConf job, Reporter reporter) throws IOException {
158
159    // wrap the DBRR in a shim class to deal with API differences.
160    return new DBRecordReaderWrapper<T>(
161        (org.apache.hadoop.mapreduce.lib.db.DBRecordReader<T>) 
162        createDBRecordReader(
163          (org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit) split, job));
164  }
165
166  /** {@inheritDoc} */
167  public InputSplit[] getSplits(JobConf job, int chunks) throws IOException {
168    List<org.apache.hadoop.mapreduce.InputSplit> newSplits = 
169      super.getSplits(new Job(job));
170    InputSplit[] ret = new InputSplit[newSplits.size()];
171    int i = 0;
172    for (org.apache.hadoop.mapreduce.InputSplit s : newSplits) {
173      org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit split = 
174        (org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit)s;
175      ret[i++] = new DBInputSplit(split.getStart(), split.getEnd());
176    }
177    return ret;
178  }
179
180  /**
181   * Initializes the map-part of the job with the appropriate input settings.
182   * 
183   * @param job The job
184   * @param inputClass the class object implementing DBWritable, which is the 
185   * Java object holding tuple fields.
186   * @param tableName The table to read data from
187   * @param conditions The condition which to select data with, eg. '(updated >
188   * 20070101 AND length > 0)'
189   * @param orderBy the fieldNames in the orderBy clause.
190   * @param fieldNames The field names in the table
191   * @see #setInput(JobConf, Class, String, String)
192   */
193  public static void setInput(JobConf job, Class<? extends DBWritable> inputClass,
194      String tableName,String conditions, String orderBy, String... fieldNames) {
195    job.setInputFormat(DBInputFormat.class);
196
197    DBConfiguration dbConf = new DBConfiguration(job);
198    dbConf.setInputClass(inputClass);
199    dbConf.setInputTableName(tableName);
200    dbConf.setInputFieldNames(fieldNames);
201    dbConf.setInputConditions(conditions);
202    dbConf.setInputOrderBy(orderBy);
203  }
204  
205  /**
206   * Initializes the map-part of the job with the appropriate input settings.
207   * 
208   * @param job The job
209   * @param inputClass the class object implementing DBWritable, which is the 
210   * Java object holding tuple fields.
211   * @param inputQuery the input query to select fields. Example : 
212   * "SELECT f1, f2, f3 FROM Mytable ORDER BY f1"
213   * @param inputCountQuery the input query that returns the number of records in
214   * the table. 
215   * Example : "SELECT COUNT(f1) FROM Mytable"
216   * @see #setInput(JobConf, Class, String, String, String, String...)
217   */
218  public static void setInput(JobConf job, Class<? extends DBWritable> inputClass,
219      String inputQuery, String inputCountQuery) {
220    job.setInputFormat(DBInputFormat.class);
221    
222    DBConfiguration dbConf = new DBConfiguration(job);
223    dbConf.setInputClass(inputClass);
224    dbConf.setInputQuery(inputQuery);
225    dbConf.setInputCountQuery(inputCountQuery);
226    
227  }
228}