001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapred.lib.db;
020    
021    import java.io.IOException;
022    import java.sql.Connection;
023    import java.sql.SQLException;
024    import java.util.List;
025    
026    import org.apache.hadoop.classification.InterfaceAudience;
027    import org.apache.hadoop.classification.InterfaceStability;
028    import org.apache.hadoop.io.LongWritable;
029    import org.apache.hadoop.io.Writable;
030    import org.apache.hadoop.mapred.InputFormat;
031    import org.apache.hadoop.mapred.InputSplit;
032    import org.apache.hadoop.mapred.JobConf;
033    import org.apache.hadoop.mapred.JobConfigurable;
034    import org.apache.hadoop.mapred.RecordReader;
035    import org.apache.hadoop.mapred.Reporter;
036    import org.apache.hadoop.mapreduce.Job;
037    
038    @InterfaceAudience.Public
039    @InterfaceStability.Stable
040    @SuppressWarnings("deprecation")
041    public class DBInputFormat<T  extends DBWritable>
042        extends org.apache.hadoop.mapreduce.lib.db.DBInputFormat<T> 
043        implements InputFormat<LongWritable, T>, JobConfigurable {
044      /**
045       * A RecordReader that reads records from a SQL table.
046       * Emits LongWritables containing the record number as 
047       * key and DBWritables as value.  
048       */
049      protected class DBRecordReader extends
050          org.apache.hadoop.mapreduce.lib.db.DBRecordReader<T>
051          implements RecordReader<LongWritable, T> {
052        /**
053         * The constructor is kept to be compatible with M/R 1.x
054         *
055         * @param split The InputSplit to read data for
056         * @throws SQLException
057         */
058        protected DBRecordReader(DBInputSplit split, Class<T> inputClass,
059            JobConf job) throws SQLException {
060          super(split, inputClass, job, connection, dbConf, conditions, fieldNames, tableName);
061        }
062    
063        /**
064         * @param split The InputSplit to read data for
065         * @throws SQLException 
066         */
067        protected DBRecordReader(DBInputSplit split, Class<T> inputClass, 
068            JobConf job, Connection conn, DBConfiguration dbConfig, String cond,
069            String [] fields, String table) throws SQLException {
070          super(split, inputClass, job, conn, dbConfig, cond, fields, table);
071        }
072    
073        /** {@inheritDoc} */
074        public LongWritable createKey() {
075          return new LongWritable();  
076        }
077    
078        /** {@inheritDoc} */
079        public T createValue() {
080          return super.createValue();
081        }
082    
083        public long getPos() throws IOException {
084          return super.getPos();
085        }
086    
087        /** {@inheritDoc} */
088        public boolean next(LongWritable key, T value) throws IOException {
089          return super.next(key, value);
090        }
091      }
092    
093      /**
094       * A RecordReader implementation that just passes through to a wrapped
095       * RecordReader built with the new API.
096       */
097      private static class DBRecordReaderWrapper<T extends DBWritable>
098          implements RecordReader<LongWritable, T> {
099    
100        private org.apache.hadoop.mapreduce.lib.db.DBRecordReader<T> rr;
101        
102        public DBRecordReaderWrapper(
103            org.apache.hadoop.mapreduce.lib.db.DBRecordReader<T> inner) {
104          this.rr = inner;
105        }
106    
107        public void close() throws IOException {
108          rr.close();
109        }
110    
111        public LongWritable createKey() {
112          return new LongWritable();
113        }
114    
115        public T createValue() {
116          return rr.createValue();
117        }
118    
119        public float getProgress() throws IOException {
120          return rr.getProgress();
121        }
122        
123        public long getPos() throws IOException {
124          return rr.getPos();
125        }
126    
127        public boolean next(LongWritable key, T value) throws IOException {
128          return rr.next(key, value);
129        }
130      }
131    
132      /**
133       * A Class that does nothing, implementing DBWritable
134       */
135      public static class NullDBWritable extends 
136          org.apache.hadoop.mapreduce.lib.db.DBInputFormat.NullDBWritable 
137          implements DBWritable, Writable {
138      }
139      /**
140       * A InputSplit that spans a set of rows
141       */
142      protected static class DBInputSplit extends 
143          org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit 
144          implements InputSplit {
145        /**
146         * Default Constructor
147         */
148        public DBInputSplit() {
149        }
150    
151        /**
152         * Convenience Constructor
153         * @param start the index of the first row to select
154         * @param end the index of the last row to select
155         */
156        public DBInputSplit(long start, long end) {
157          super(start, end);
158        }
159      }
160    
161      /** {@inheritDoc} */
162      public void configure(JobConf job) {
163        super.setConf(job);
164      }
165    
166      /** {@inheritDoc} */
167      public RecordReader<LongWritable, T> getRecordReader(InputSplit split,
168          JobConf job, Reporter reporter) throws IOException {
169    
170        // wrap the DBRR in a shim class to deal with API differences.
171        return new DBRecordReaderWrapper<T>(
172            (org.apache.hadoop.mapreduce.lib.db.DBRecordReader<T>) 
173            createDBRecordReader(
174              (org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit) split, job));
175      }
176    
177      /** {@inheritDoc} */
178      public InputSplit[] getSplits(JobConf job, int chunks) throws IOException {
179        List<org.apache.hadoop.mapreduce.InputSplit> newSplits = 
180          super.getSplits(new Job(job));
181        InputSplit[] ret = new InputSplit[newSplits.size()];
182        int i = 0;
183        for (org.apache.hadoop.mapreduce.InputSplit s : newSplits) {
184          org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit split = 
185            (org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit)s;
186          ret[i++] = new DBInputSplit(split.getStart(), split.getEnd());
187        }
188        return ret;
189      }
190    
191      /**
192       * Initializes the map-part of the job with the appropriate input settings.
193       * 
194       * @param job The job
195       * @param inputClass the class object implementing DBWritable, which is the 
196       * Java object holding tuple fields.
197       * @param tableName The table to read data from
198       * @param conditions The condition which to select data with, eg. '(updated >
199       * 20070101 AND length > 0)'
200       * @param orderBy the fieldNames in the orderBy clause.
201       * @param fieldNames The field names in the table
202       * @see #setInput(JobConf, Class, String, String)
203       */
204      public static void setInput(JobConf job, Class<? extends DBWritable> inputClass,
205          String tableName,String conditions, String orderBy, String... fieldNames) {
206        job.setInputFormat(DBInputFormat.class);
207    
208        DBConfiguration dbConf = new DBConfiguration(job);
209        dbConf.setInputClass(inputClass);
210        dbConf.setInputTableName(tableName);
211        dbConf.setInputFieldNames(fieldNames);
212        dbConf.setInputConditions(conditions);
213        dbConf.setInputOrderBy(orderBy);
214      }
215      
216      /**
217       * Initializes the map-part of the job with the appropriate input settings.
218       * 
219       * @param job The job
220       * @param inputClass the class object implementing DBWritable, which is the 
221       * Java object holding tuple fields.
222       * @param inputQuery the input query to select fields. Example : 
223       * "SELECT f1, f2, f3 FROM Mytable ORDER BY f1"
224       * @param inputCountQuery the input query that returns the number of records in
225       * the table. 
226       * Example : "SELECT COUNT(f1) FROM Mytable"
227       * @see #setInput(JobConf, Class, String, String, String, String...)
228       */
229      public static void setInput(JobConf job, Class<? extends DBWritable> inputClass,
230          String inputQuery, String inputCountQuery) {
231        job.setInputFormat(DBInputFormat.class);
232        
233        DBConfiguration dbConf = new DBConfiguration(job);
234        dbConf.setInputClass(inputClass);
235        dbConf.setInputQuery(inputQuery);
236        dbConf.setInputCountQuery(inputCountQuery);
237        
238      }
239    }