001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.lib.db;
020
021import java.sql.Connection;
022import java.sql.DriverManager;
023import java.sql.SQLException;
024
025import org.apache.hadoop.classification.InterfaceAudience;
026import org.apache.hadoop.classification.InterfaceStability;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.mapreduce.Job;
029import org.apache.hadoop.mapreduce.lib.db.DBInputFormat.NullDBWritable;
030
031/**
032 * A container for configuration property names for jobs with DB input/output.
033 *  
034 * The job can be configured using the static methods in this class, 
035 * {@link DBInputFormat}, and {@link DBOutputFormat}. 
036 * Alternatively, the properties can be set in the configuration with proper
037 * values. 
038 *   
039 * @see DBConfiguration#configureDB(Configuration, String, String, String, String)
040 * @see DBInputFormat#setInput(Job, Class, String, String)
041 * @see DBInputFormat#setInput(Job, Class, String, String, String, String...)
042 * @see DBOutputFormat#setOutput(Job, String, String...)
043 */
044@InterfaceAudience.Public
045@InterfaceStability.Stable
046public class DBConfiguration {
047
048  /** The JDBC Driver class name */
049  public static final String DRIVER_CLASS_PROPERTY = 
050    "mapreduce.jdbc.driver.class";
051  
052  /** JDBC Database access URL */
053  public static final String URL_PROPERTY = "mapreduce.jdbc.url";
054
055  /** User name to access the database */
056  public static final String USERNAME_PROPERTY = "mapreduce.jdbc.username";
057  
058  /** Password to access the database */
059  public static final String PASSWORD_PROPERTY = "mapreduce.jdbc.password";
060
061  /** Input table name */
062  public static final String INPUT_TABLE_NAME_PROPERTY = 
063    "mapreduce.jdbc.input.table.name";
064
065  /** Field names in the Input table */
066  public static final String INPUT_FIELD_NAMES_PROPERTY = 
067    "mapreduce.jdbc.input.field.names";
068
069  /** WHERE clause in the input SELECT statement */
070  public static final String INPUT_CONDITIONS_PROPERTY = 
071    "mapreduce.jdbc.input.conditions";
072  
073  /** ORDER BY clause in the input SELECT statement */
074  public static final String INPUT_ORDER_BY_PROPERTY = 
075    "mapreduce.jdbc.input.orderby";
076  
077  /** Whole input query, exluding LIMIT...OFFSET */
078  public static final String INPUT_QUERY = "mapreduce.jdbc.input.query";
079  
080  /** Input query to get the count of records */
081  public static final String INPUT_COUNT_QUERY = 
082    "mapreduce.jdbc.input.count.query";
083  
084  /** Input query to get the max and min values of the jdbc.input.query */
085  public static final String INPUT_BOUNDING_QUERY =
086      "mapred.jdbc.input.bounding.query";
087  
088  /** Class name implementing DBWritable which will hold input tuples */
089  public static final String INPUT_CLASS_PROPERTY = 
090    "mapreduce.jdbc.input.class";
091
092  /** Output table name */
093  public static final String OUTPUT_TABLE_NAME_PROPERTY = 
094    "mapreduce.jdbc.output.table.name";
095
096  /** Field names in the Output table */
097  public static final String OUTPUT_FIELD_NAMES_PROPERTY = 
098    "mapreduce.jdbc.output.field.names";  
099
100  /** Number of fields in the Output table */
101  public static final String OUTPUT_FIELD_COUNT_PROPERTY = 
102    "mapreduce.jdbc.output.field.count";  
103  
104  /**
105   * Sets the DB access related fields in the {@link Configuration}.  
106   * @param conf the configuration
107   * @param driverClass JDBC Driver class name
108   * @param dbUrl JDBC DB access URL. 
109   * @param userName DB access username 
110   * @param passwd DB access passwd
111   */
112  public static void configureDB(Configuration conf, String driverClass, 
113      String dbUrl, String userName, String passwd) {
114
115    conf.set(DRIVER_CLASS_PROPERTY, driverClass);
116    conf.set(URL_PROPERTY, dbUrl);
117    if (userName != null) {
118      conf.set(USERNAME_PROPERTY, userName);
119    }
120    if (passwd != null) {
121      conf.set(PASSWORD_PROPERTY, passwd);
122    }
123  }
124
125  /**
126   * Sets the DB access related fields in the JobConf.  
127   * @param job the job
128   * @param driverClass JDBC Driver class name
129   * @param dbUrl JDBC DB access URL. 
130   */
131  public static void configureDB(Configuration job, String driverClass,
132      String dbUrl) {
133    configureDB(job, driverClass, dbUrl, null, null);
134  }
135
136  private Configuration conf;
137
138  public DBConfiguration(Configuration job) {
139    this.conf = job;
140  }
141
142  /** Returns a connection object o the DB 
143   * @throws ClassNotFoundException 
144   * @throws SQLException */
145  public Connection getConnection() 
146      throws ClassNotFoundException, SQLException {
147
148    Class.forName(conf.get(DBConfiguration.DRIVER_CLASS_PROPERTY));
149
150    if(conf.get(DBConfiguration.USERNAME_PROPERTY) == null) {
151      return DriverManager.getConnection(
152               conf.get(DBConfiguration.URL_PROPERTY));
153    } else {
154      return DriverManager.getConnection(
155          conf.get(DBConfiguration.URL_PROPERTY), 
156          conf.get(DBConfiguration.USERNAME_PROPERTY), 
157          conf.get(DBConfiguration.PASSWORD_PROPERTY));
158    }
159  }
160
161  public Configuration getConf() {
162    return conf;
163  }
164  
165  public String getInputTableName() {
166    return conf.get(DBConfiguration.INPUT_TABLE_NAME_PROPERTY);
167  }
168
169  public void setInputTableName(String tableName) {
170    conf.set(DBConfiguration.INPUT_TABLE_NAME_PROPERTY, tableName);
171  }
172
173  public String[] getInputFieldNames() {
174    return conf.getStrings(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY);
175  }
176
177  public void setInputFieldNames(String... fieldNames) {
178    conf.setStrings(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY, fieldNames);
179  }
180
181  public String getInputConditions() {
182    return conf.get(DBConfiguration.INPUT_CONDITIONS_PROPERTY);
183  }
184
185  public void setInputConditions(String conditions) {
186    if (conditions != null && conditions.length() > 0)
187      conf.set(DBConfiguration.INPUT_CONDITIONS_PROPERTY, conditions);
188  }
189
190  public String getInputOrderBy() {
191    return conf.get(DBConfiguration.INPUT_ORDER_BY_PROPERTY);
192  }
193  
194  public void setInputOrderBy(String orderby) {
195    if(orderby != null && orderby.length() >0) {
196      conf.set(DBConfiguration.INPUT_ORDER_BY_PROPERTY, orderby);
197    }
198  }
199  
200  public String getInputQuery() {
201    return conf.get(DBConfiguration.INPUT_QUERY);
202  }
203  
204  public void setInputQuery(String query) {
205    if(query != null && query.length() >0) {
206      conf.set(DBConfiguration.INPUT_QUERY, query);
207    }
208  }
209  
210  public String getInputCountQuery() {
211    return conf.get(DBConfiguration.INPUT_COUNT_QUERY);
212  }
213  
214  public void setInputCountQuery(String query) {
215    if(query != null && query.length() > 0) {
216      conf.set(DBConfiguration.INPUT_COUNT_QUERY, query);
217    }
218  }
219
220  public void setInputBoundingQuery(String query) {
221    if (query != null && query.length() > 0) {
222      conf.set(DBConfiguration.INPUT_BOUNDING_QUERY, query);
223    }
224  }
225
226  public String getInputBoundingQuery() {
227    return conf.get(DBConfiguration.INPUT_BOUNDING_QUERY);
228  }
229
230  public Class<?> getInputClass() {
231    return conf.getClass(DBConfiguration.INPUT_CLASS_PROPERTY,
232                         NullDBWritable.class);
233  }
234
235  public void setInputClass(Class<? extends DBWritable> inputClass) {
236    conf.setClass(DBConfiguration.INPUT_CLASS_PROPERTY, inputClass,
237                  DBWritable.class);
238  }
239
240  public String getOutputTableName() {
241    return conf.get(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY);
242  }
243
244  public void setOutputTableName(String tableName) {
245    conf.set(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY, tableName);
246  }
247
248  public String[] getOutputFieldNames() {
249    return conf.getStrings(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY);
250  }
251
252  public void setOutputFieldNames(String... fieldNames) {
253    conf.setStrings(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY, fieldNames);
254  }
255
256  public void setOutputFieldCount(int fieldCount) {
257    conf.setInt(DBConfiguration.OUTPUT_FIELD_COUNT_PROPERTY, fieldCount);
258  }
259  
260  public int getOutputFieldCount() {
261    return conf.getInt(OUTPUT_FIELD_COUNT_PROPERTY, 0);
262  }
263  
264}
265