001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.lib.db;
020    
021    import java.sql.Connection;
022    import java.sql.DriverManager;
023    import java.sql.SQLException;
024    
025    import org.apache.hadoop.classification.InterfaceAudience;
026    import org.apache.hadoop.classification.InterfaceStability;
027    import org.apache.hadoop.conf.Configuration;
028    import org.apache.hadoop.mapreduce.Job;
029    import org.apache.hadoop.mapreduce.lib.db.DBInputFormat.NullDBWritable;
030    
031    /**
032     * A container for configuration property names for jobs with DB input/output.
033     *  
034     * The job can be configured using the static methods in this class, 
035     * {@link DBInputFormat}, and {@link DBOutputFormat}. 
036     * Alternatively, the properties can be set in the configuration with proper
037     * values. 
038     *   
039     * @see DBConfiguration#configureDB(Configuration, String, String, String, String)
040     * @see DBInputFormat#setInput(Job, Class, String, String)
041     * @see DBInputFormat#setInput(Job, Class, String, String, String, String...)
042     * @see DBOutputFormat#setOutput(Job, String, String...)
043     */
044    @InterfaceAudience.Public
045    @InterfaceStability.Stable
046    public class DBConfiguration {
047    
048      /** The JDBC Driver class name */
049      public static final String DRIVER_CLASS_PROPERTY = 
050        "mapreduce.jdbc.driver.class";
051      
052      /** JDBC Database access URL */
053      public static final String URL_PROPERTY = "mapreduce.jdbc.url";
054    
055      /** User name to access the database */
056      public static final String USERNAME_PROPERTY = "mapreduce.jdbc.username";
057      
058      /** Password to access the database */
059      public static final String PASSWORD_PROPERTY = "mapreduce.jdbc.password";
060    
061      /** Input table name */
062      public static final String INPUT_TABLE_NAME_PROPERTY = 
063        "mapreduce.jdbc.input.table.name";
064    
065      /** Field names in the Input table */
066      public static final String INPUT_FIELD_NAMES_PROPERTY = 
067        "mapreduce.jdbc.input.field.names";
068    
069      /** WHERE clause in the input SELECT statement */
070      public static final String INPUT_CONDITIONS_PROPERTY = 
071        "mapreduce.jdbc.input.conditions";
072      
073      /** ORDER BY clause in the input SELECT statement */
074      public static final String INPUT_ORDER_BY_PROPERTY = 
075        "mapreduce.jdbc.input.orderby";
076      
077      /** Whole input query, exluding LIMIT...OFFSET */
078      public static final String INPUT_QUERY = "mapreduce.jdbc.input.query";
079      
080      /** Input query to get the count of records */
081      public static final String INPUT_COUNT_QUERY = 
082        "mapreduce.jdbc.input.count.query";
083      
084      /** Input query to get the max and min values of the jdbc.input.query */
085      public static final String INPUT_BOUNDING_QUERY =
086          "mapred.jdbc.input.bounding.query";
087      
088      /** Class name implementing DBWritable which will hold input tuples */
089      public static final String INPUT_CLASS_PROPERTY = 
090        "mapreduce.jdbc.input.class";
091    
092      /** Output table name */
093      public static final String OUTPUT_TABLE_NAME_PROPERTY = 
094        "mapreduce.jdbc.output.table.name";
095    
096      /** Field names in the Output table */
097      public static final String OUTPUT_FIELD_NAMES_PROPERTY = 
098        "mapreduce.jdbc.output.field.names";  
099    
100      /** Number of fields in the Output table */
101      public static final String OUTPUT_FIELD_COUNT_PROPERTY = 
102        "mapreduce.jdbc.output.field.count";  
103      
104      /**
105       * Sets the DB access related fields in the {@link Configuration}.  
106       * @param conf the configuration
107       * @param driverClass JDBC Driver class name
108       * @param dbUrl JDBC DB access URL. 
109       * @param userName DB access username 
110       * @param passwd DB access passwd
111       */
112      public static void configureDB(Configuration conf, String driverClass, 
113          String dbUrl, String userName, String passwd) {
114    
115        conf.set(DRIVER_CLASS_PROPERTY, driverClass);
116        conf.set(URL_PROPERTY, dbUrl);
117        if (userName != null) {
118          conf.set(USERNAME_PROPERTY, userName);
119        }
120        if (passwd != null) {
121          conf.set(PASSWORD_PROPERTY, passwd);
122        }
123      }
124    
125      /**
126       * Sets the DB access related fields in the JobConf.  
127       * @param job the job
128       * @param driverClass JDBC Driver class name
129       * @param dbUrl JDBC DB access URL. 
130       */
131      public static void configureDB(Configuration job, String driverClass,
132          String dbUrl) {
133        configureDB(job, driverClass, dbUrl, null, null);
134      }
135    
136      private Configuration conf;
137    
138      public DBConfiguration(Configuration job) {
139        this.conf = job;
140      }
141    
142      /** Returns a connection object o the DB 
143       * @throws ClassNotFoundException 
144       * @throws SQLException */
145      public Connection getConnection() 
146          throws ClassNotFoundException, SQLException {
147    
148        Class.forName(conf.get(DBConfiguration.DRIVER_CLASS_PROPERTY));
149    
150        if(conf.get(DBConfiguration.USERNAME_PROPERTY) == null) {
151          return DriverManager.getConnection(
152                   conf.get(DBConfiguration.URL_PROPERTY));
153        } else {
154          return DriverManager.getConnection(
155              conf.get(DBConfiguration.URL_PROPERTY), 
156              conf.get(DBConfiguration.USERNAME_PROPERTY), 
157              conf.get(DBConfiguration.PASSWORD_PROPERTY));
158        }
159      }
160    
161      public Configuration getConf() {
162        return conf;
163      }
164      
165      public String getInputTableName() {
166        return conf.get(DBConfiguration.INPUT_TABLE_NAME_PROPERTY);
167      }
168    
169      public void setInputTableName(String tableName) {
170        conf.set(DBConfiguration.INPUT_TABLE_NAME_PROPERTY, tableName);
171      }
172    
173      public String[] getInputFieldNames() {
174        return conf.getStrings(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY);
175      }
176    
177      public void setInputFieldNames(String... fieldNames) {
178        conf.setStrings(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY, fieldNames);
179      }
180    
181      public String getInputConditions() {
182        return conf.get(DBConfiguration.INPUT_CONDITIONS_PROPERTY);
183      }
184    
185      public void setInputConditions(String conditions) {
186        if (conditions != null && conditions.length() > 0)
187          conf.set(DBConfiguration.INPUT_CONDITIONS_PROPERTY, conditions);
188      }
189    
190      public String getInputOrderBy() {
191        return conf.get(DBConfiguration.INPUT_ORDER_BY_PROPERTY);
192      }
193      
194      public void setInputOrderBy(String orderby) {
195        if(orderby != null && orderby.length() >0) {
196          conf.set(DBConfiguration.INPUT_ORDER_BY_PROPERTY, orderby);
197        }
198      }
199      
200      public String getInputQuery() {
201        return conf.get(DBConfiguration.INPUT_QUERY);
202      }
203      
204      public void setInputQuery(String query) {
205        if(query != null && query.length() >0) {
206          conf.set(DBConfiguration.INPUT_QUERY, query);
207        }
208      }
209      
210      public String getInputCountQuery() {
211        return conf.get(DBConfiguration.INPUT_COUNT_QUERY);
212      }
213      
214      public void setInputCountQuery(String query) {
215        if(query != null && query.length() > 0) {
216          conf.set(DBConfiguration.INPUT_COUNT_QUERY, query);
217        }
218      }
219    
220      public void setInputBoundingQuery(String query) {
221        if (query != null && query.length() > 0) {
222          conf.set(DBConfiguration.INPUT_BOUNDING_QUERY, query);
223        }
224      }
225    
226      public String getInputBoundingQuery() {
227        return conf.get(DBConfiguration.INPUT_BOUNDING_QUERY);
228      }
229    
230      public Class<?> getInputClass() {
231        return conf.getClass(DBConfiguration.INPUT_CLASS_PROPERTY,
232                             NullDBWritable.class);
233      }
234    
235      public void setInputClass(Class<? extends DBWritable> inputClass) {
236        conf.setClass(DBConfiguration.INPUT_CLASS_PROPERTY, inputClass,
237                      DBWritable.class);
238      }
239    
240      public String getOutputTableName() {
241        return conf.get(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY);
242      }
243    
244      public void setOutputTableName(String tableName) {
245        conf.set(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY, tableName);
246      }
247    
248      public String[] getOutputFieldNames() {
249        return conf.getStrings(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY);
250      }
251    
252      public void setOutputFieldNames(String... fieldNames) {
253        conf.setStrings(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY, fieldNames);
254      }
255    
256      public void setOutputFieldCount(int fieldCount) {
257        conf.setInt(DBConfiguration.OUTPUT_FIELD_COUNT_PROPERTY, fieldCount);
258      }
259      
260      public int getOutputFieldCount() {
261        return conf.getInt(OUTPUT_FIELD_COUNT_PROPERTY, 0);
262      }
263      
264    }
265