001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce.lib.db; 020 021import java.io.IOException; 022import java.sql.Connection; 023import java.sql.PreparedStatement; 024import java.sql.SQLException; 025 026import org.apache.commons.logging.Log; 027import org.apache.commons.logging.LogFactory; 028import org.apache.hadoop.classification.InterfaceAudience; 029import org.apache.hadoop.classification.InterfaceStability; 030import org.apache.hadoop.mapreduce.Job; 031import org.apache.hadoop.mapreduce.JobContext; 032import org.apache.hadoop.mapreduce.OutputCommitter; 033import org.apache.hadoop.mapreduce.OutputFormat; 034import org.apache.hadoop.mapreduce.RecordWriter; 035import org.apache.hadoop.mapreduce.TaskAttemptContext; 036import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; 037import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 038import org.apache.hadoop.util.StringUtils; 039 040/** 041 * A OutputFormat that sends the reduce output to a SQL table. 042 * <p> 043 * {@link DBOutputFormat} accepts <key,value> pairs, where 044 * key has a type extending DBWritable. Returned {@link RecordWriter} 045 * writes <b>only the key</b> to the database with a batch SQL query. 046 * 047 */ 048@InterfaceAudience.Public 049@InterfaceStability.Stable 050public class DBOutputFormat<K extends DBWritable, V> 051extends OutputFormat<K,V> { 052 053 private static final Log LOG = LogFactory.getLog(DBOutputFormat.class); 054 public void checkOutputSpecs(JobContext context) 055 throws IOException, InterruptedException {} 056 057 public OutputCommitter getOutputCommitter(TaskAttemptContext context) 058 throws IOException, InterruptedException { 059 return new FileOutputCommitter(FileOutputFormat.getOutputPath(context), 060 context); 061 } 062 063 /** 064 * A RecordWriter that writes the reduce output to a SQL table 065 */ 066 @InterfaceStability.Evolving 067 public class DBRecordWriter 068 extends RecordWriter<K, V> { 069 070 private Connection connection; 071 private PreparedStatement statement; 072 073 public DBRecordWriter() throws SQLException { 074 } 075 076 public DBRecordWriter(Connection connection 077 , PreparedStatement statement) throws SQLException { 078 this.connection = connection; 079 this.statement = statement; 080 this.connection.setAutoCommit(false); 081 } 082 083 public Connection getConnection() { 084 return connection; 085 } 086 087 public PreparedStatement getStatement() { 088 return statement; 089 } 090 091 /** {@inheritDoc} */ 092 public void close(TaskAttemptContext context) throws IOException { 093 try { 094 statement.executeBatch(); 095 connection.commit(); 096 } catch (SQLException e) { 097 try { 098 connection.rollback(); 099 } 100 catch (SQLException ex) { 101 LOG.warn(StringUtils.stringifyException(ex)); 102 } 103 throw new IOException(e.getMessage()); 104 } finally { 105 try { 106 statement.close(); 107 connection.close(); 108 } 109 catch (SQLException ex) { 110 throw new IOException(ex.getMessage()); 111 } 112 } 113 } 114 115 /** {@inheritDoc} */ 116 public void write(K key, V value) throws IOException { 117 try { 118 key.write(statement); 119 statement.addBatch(); 120 } catch (SQLException e) { 121 e.printStackTrace(); 122 } 123 } 124 } 125 126 /** 127 * Constructs the query used as the prepared statement to insert data. 128 * 129 * @param table 130 * the table to insert into 131 * @param fieldNames 132 * the fields to insert into. If field names are unknown, supply an 133 * array of nulls. 134 */ 135 public String constructQuery(String table, String[] fieldNames) { 136 if(fieldNames == null) { 137 throw new IllegalArgumentException("Field names may not be null"); 138 } 139 140 StringBuilder query = new StringBuilder(); 141 query.append("INSERT INTO ").append(table); 142 143 if (fieldNames.length > 0 && fieldNames[0] != null) { 144 query.append(" ("); 145 for (int i = 0; i < fieldNames.length; i++) { 146 query.append(fieldNames[i]); 147 if (i != fieldNames.length - 1) { 148 query.append(","); 149 } 150 } 151 query.append(")"); 152 } 153 query.append(" VALUES ("); 154 155 for (int i = 0; i < fieldNames.length; i++) { 156 query.append("?"); 157 if(i != fieldNames.length - 1) { 158 query.append(","); 159 } 160 } 161 query.append(");"); 162 163 return query.toString(); 164 } 165 166 /** {@inheritDoc} */ 167 public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) 168 throws IOException { 169 DBConfiguration dbConf = new DBConfiguration(context.getConfiguration()); 170 String tableName = dbConf.getOutputTableName(); 171 String[] fieldNames = dbConf.getOutputFieldNames(); 172 173 if(fieldNames == null) { 174 fieldNames = new String[dbConf.getOutputFieldCount()]; 175 } 176 177 try { 178 Connection connection = dbConf.getConnection(); 179 PreparedStatement statement = null; 180 181 statement = connection.prepareStatement( 182 constructQuery(tableName, fieldNames)); 183 return new DBRecordWriter(connection, statement); 184 } catch (Exception ex) { 185 throw new IOException(ex.getMessage()); 186 } 187 } 188 189 /** 190 * Initializes the reduce-part of the job with 191 * the appropriate output settings 192 * 193 * @param job The job 194 * @param tableName The table to insert data into 195 * @param fieldNames The field names in the table. 196 */ 197 public static void setOutput(Job job, String tableName, 198 String... fieldNames) throws IOException { 199 if(fieldNames.length > 0 && fieldNames[0] != null) { 200 DBConfiguration dbConf = setOutput(job, tableName); 201 dbConf.setOutputFieldNames(fieldNames); 202 } else { 203 if (fieldNames.length > 0) { 204 setOutput(job, tableName, fieldNames.length); 205 } 206 else { 207 throw new IllegalArgumentException( 208 "Field names must be greater than 0"); 209 } 210 } 211 } 212 213 /** 214 * Initializes the reduce-part of the job 215 * with the appropriate output settings 216 * 217 * @param job The job 218 * @param tableName The table to insert data into 219 * @param fieldCount the number of fields in the table. 220 */ 221 public static void setOutput(Job job, String tableName, 222 int fieldCount) throws IOException { 223 DBConfiguration dbConf = setOutput(job, tableName); 224 dbConf.setOutputFieldCount(fieldCount); 225 } 226 227 private static DBConfiguration setOutput(Job job, 228 String tableName) throws IOException { 229 job.setOutputFormatClass(DBOutputFormat.class); 230 job.setReduceSpeculativeExecution(false); 231 232 DBConfiguration dbConf = new DBConfiguration(job.getConfiguration()); 233 234 dbConf.setOutputTableName(tableName); 235 return dbConf; 236 } 237}