001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapreduce.lib.db; 020 021 import java.sql.Connection; 022 import java.sql.DriverManager; 023 import java.sql.SQLException; 024 025 import org.apache.hadoop.classification.InterfaceAudience; 026 import org.apache.hadoop.classification.InterfaceStability; 027 import org.apache.hadoop.conf.Configuration; 028 import org.apache.hadoop.mapreduce.Job; 029 import org.apache.hadoop.mapreduce.lib.db.DBInputFormat.NullDBWritable; 030 031 /** 032 * A container for configuration property names for jobs with DB input/output. 033 * 034 * The job can be configured using the static methods in this class, 035 * {@link DBInputFormat}, and {@link DBOutputFormat}. 036 * Alternatively, the properties can be set in the configuration with proper 037 * values. 038 * 039 * @see DBConfiguration#configureDB(Configuration, String, String, String, String) 040 * @see DBInputFormat#setInput(Job, Class, String, String) 041 * @see DBInputFormat#setInput(Job, Class, String, String, String, String...) 042 * @see DBOutputFormat#setOutput(Job, String, String...) 043 */ 044 @InterfaceAudience.Public 045 @InterfaceStability.Stable 046 public class DBConfiguration { 047 048 /** The JDBC Driver class name */ 049 public static final String DRIVER_CLASS_PROPERTY = 050 "mapreduce.jdbc.driver.class"; 051 052 /** JDBC Database access URL */ 053 public static final String URL_PROPERTY = "mapreduce.jdbc.url"; 054 055 /** User name to access the database */ 056 public static final String USERNAME_PROPERTY = "mapreduce.jdbc.username"; 057 058 /** Password to access the database */ 059 public static final String PASSWORD_PROPERTY = "mapreduce.jdbc.password"; 060 061 /** Input table name */ 062 public static final String INPUT_TABLE_NAME_PROPERTY = 063 "mapreduce.jdbc.input.table.name"; 064 065 /** Field names in the Input table */ 066 public static final String INPUT_FIELD_NAMES_PROPERTY = 067 "mapreduce.jdbc.input.field.names"; 068 069 /** WHERE clause in the input SELECT statement */ 070 public static final String INPUT_CONDITIONS_PROPERTY = 071 "mapreduce.jdbc.input.conditions"; 072 073 /** ORDER BY clause in the input SELECT statement */ 074 public static final String INPUT_ORDER_BY_PROPERTY = 075 "mapreduce.jdbc.input.orderby"; 076 077 /** Whole input query, exluding LIMIT...OFFSET */ 078 public static final String INPUT_QUERY = "mapreduce.jdbc.input.query"; 079 080 /** Input query to get the count of records */ 081 public static final String INPUT_COUNT_QUERY = 082 "mapreduce.jdbc.input.count.query"; 083 084 /** Input query to get the max and min values of the jdbc.input.query */ 085 public static final String INPUT_BOUNDING_QUERY = 086 "mapred.jdbc.input.bounding.query"; 087 088 /** Class name implementing DBWritable which will hold input tuples */ 089 public static final String INPUT_CLASS_PROPERTY = 090 "mapreduce.jdbc.input.class"; 091 092 /** Output table name */ 093 public static final String OUTPUT_TABLE_NAME_PROPERTY = 094 "mapreduce.jdbc.output.table.name"; 095 096 /** Field names in the Output table */ 097 public static final String OUTPUT_FIELD_NAMES_PROPERTY = 098 "mapreduce.jdbc.output.field.names"; 099 100 /** Number of fields in the Output table */ 101 public static final String OUTPUT_FIELD_COUNT_PROPERTY = 102 "mapreduce.jdbc.output.field.count"; 103 104 /** 105 * Sets the DB access related fields in the {@link Configuration}. 106 * @param conf the configuration 107 * @param driverClass JDBC Driver class name 108 * @param dbUrl JDBC DB access URL. 109 * @param userName DB access username 110 * @param passwd DB access passwd 111 */ 112 public static void configureDB(Configuration conf, String driverClass, 113 String dbUrl, String userName, String passwd) { 114 115 conf.set(DRIVER_CLASS_PROPERTY, driverClass); 116 conf.set(URL_PROPERTY, dbUrl); 117 if (userName != null) { 118 conf.set(USERNAME_PROPERTY, userName); 119 } 120 if (passwd != null) { 121 conf.set(PASSWORD_PROPERTY, passwd); 122 } 123 } 124 125 /** 126 * Sets the DB access related fields in the JobConf. 127 * @param job the job 128 * @param driverClass JDBC Driver class name 129 * @param dbUrl JDBC DB access URL. 130 */ 131 public static void configureDB(Configuration job, String driverClass, 132 String dbUrl) { 133 configureDB(job, driverClass, dbUrl, null, null); 134 } 135 136 private Configuration conf; 137 138 public DBConfiguration(Configuration job) { 139 this.conf = job; 140 } 141 142 /** Returns a connection object o the DB 143 * @throws ClassNotFoundException 144 * @throws SQLException */ 145 public Connection getConnection() 146 throws ClassNotFoundException, SQLException { 147 148 Class.forName(conf.get(DBConfiguration.DRIVER_CLASS_PROPERTY)); 149 150 if(conf.get(DBConfiguration.USERNAME_PROPERTY) == null) { 151 return DriverManager.getConnection( 152 conf.get(DBConfiguration.URL_PROPERTY)); 153 } else { 154 return DriverManager.getConnection( 155 conf.get(DBConfiguration.URL_PROPERTY), 156 conf.get(DBConfiguration.USERNAME_PROPERTY), 157 conf.get(DBConfiguration.PASSWORD_PROPERTY)); 158 } 159 } 160 161 public Configuration getConf() { 162 return conf; 163 } 164 165 public String getInputTableName() { 166 return conf.get(DBConfiguration.INPUT_TABLE_NAME_PROPERTY); 167 } 168 169 public void setInputTableName(String tableName) { 170 conf.set(DBConfiguration.INPUT_TABLE_NAME_PROPERTY, tableName); 171 } 172 173 public String[] getInputFieldNames() { 174 return conf.getStrings(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY); 175 } 176 177 public void setInputFieldNames(String... fieldNames) { 178 conf.setStrings(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY, fieldNames); 179 } 180 181 public String getInputConditions() { 182 return conf.get(DBConfiguration.INPUT_CONDITIONS_PROPERTY); 183 } 184 185 public void setInputConditions(String conditions) { 186 if (conditions != null && conditions.length() > 0) 187 conf.set(DBConfiguration.INPUT_CONDITIONS_PROPERTY, conditions); 188 } 189 190 public String getInputOrderBy() { 191 return conf.get(DBConfiguration.INPUT_ORDER_BY_PROPERTY); 192 } 193 194 public void setInputOrderBy(String orderby) { 195 if(orderby != null && orderby.length() >0) { 196 conf.set(DBConfiguration.INPUT_ORDER_BY_PROPERTY, orderby); 197 } 198 } 199 200 public String getInputQuery() { 201 return conf.get(DBConfiguration.INPUT_QUERY); 202 } 203 204 public void setInputQuery(String query) { 205 if(query != null && query.length() >0) { 206 conf.set(DBConfiguration.INPUT_QUERY, query); 207 } 208 } 209 210 public String getInputCountQuery() { 211 return conf.get(DBConfiguration.INPUT_COUNT_QUERY); 212 } 213 214 public void setInputCountQuery(String query) { 215 if(query != null && query.length() > 0) { 216 conf.set(DBConfiguration.INPUT_COUNT_QUERY, query); 217 } 218 } 219 220 public void setInputBoundingQuery(String query) { 221 if (query != null && query.length() > 0) { 222 conf.set(DBConfiguration.INPUT_BOUNDING_QUERY, query); 223 } 224 } 225 226 public String getInputBoundingQuery() { 227 return conf.get(DBConfiguration.INPUT_BOUNDING_QUERY); 228 } 229 230 public Class<?> getInputClass() { 231 return conf.getClass(DBConfiguration.INPUT_CLASS_PROPERTY, 232 NullDBWritable.class); 233 } 234 235 public void setInputClass(Class<? extends DBWritable> inputClass) { 236 conf.setClass(DBConfiguration.INPUT_CLASS_PROPERTY, inputClass, 237 DBWritable.class); 238 } 239 240 public String getOutputTableName() { 241 return conf.get(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY); 242 } 243 244 public void setOutputTableName(String tableName) { 245 conf.set(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY, tableName); 246 } 247 248 public String[] getOutputFieldNames() { 249 return conf.getStrings(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY); 250 } 251 252 public void setOutputFieldNames(String... fieldNames) { 253 conf.setStrings(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY, fieldNames); 254 } 255 256 public void setOutputFieldCount(int fieldCount) { 257 conf.setInt(DBConfiguration.OUTPUT_FIELD_COUNT_PROPERTY, fieldCount); 258 } 259 260 public int getOutputFieldCount() { 261 return conf.getInt(OUTPUT_FIELD_COUNT_PROPERTY, 0); 262 } 263 264 } 265