001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.lib.db;
020
021import java.io.IOException;
022import java.sql.Connection;
023import java.sql.PreparedStatement;
024import java.sql.SQLException;
025
026import org.apache.commons.logging.Log;
027import org.apache.commons.logging.LogFactory;
028import org.apache.hadoop.classification.InterfaceAudience;
029import org.apache.hadoop.classification.InterfaceStability;
030import org.apache.hadoop.mapreduce.Job;
031import org.apache.hadoop.mapreduce.JobContext;
032import org.apache.hadoop.mapreduce.OutputCommitter;
033import org.apache.hadoop.mapreduce.OutputFormat;
034import org.apache.hadoop.mapreduce.RecordWriter;
035import org.apache.hadoop.mapreduce.TaskAttemptContext;
036import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
037import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
038import org.apache.hadoop.util.StringUtils;
039
040/**
041 * A OutputFormat that sends the reduce output to a SQL table.
042 * <p> 
043 * {@link DBOutputFormat} accepts &lt;key,value&gt; pairs, where 
044 * key has a type extending DBWritable. Returned {@link RecordWriter} 
045 * writes <b>only the key</b> to the database with a batch SQL query.  
046 * 
047 */
048@InterfaceAudience.Public
049@InterfaceStability.Stable
050public class DBOutputFormat<K  extends DBWritable, V> 
051extends OutputFormat<K,V> {
052
053  private static final Log LOG = LogFactory.getLog(DBOutputFormat.class);
054  public void checkOutputSpecs(JobContext context) 
055      throws IOException, InterruptedException {}
056
057  public OutputCommitter getOutputCommitter(TaskAttemptContext context) 
058      throws IOException, InterruptedException {
059    return new FileOutputCommitter(FileOutputFormat.getOutputPath(context),
060                                   context);
061  }
062
063  /**
064   * A RecordWriter that writes the reduce output to a SQL table
065   */
066  @InterfaceStability.Evolving
067  public class DBRecordWriter 
068      extends RecordWriter<K, V> {
069
070    private Connection connection;
071    private PreparedStatement statement;
072
073    public DBRecordWriter() throws SQLException {
074    }
075
076    public DBRecordWriter(Connection connection
077        , PreparedStatement statement) throws SQLException {
078      this.connection = connection;
079      this.statement = statement;
080      this.connection.setAutoCommit(false);
081    }
082
083    public Connection getConnection() {
084      return connection;
085    }
086    
087    public PreparedStatement getStatement() {
088      return statement;
089    }
090    
091    /** {@inheritDoc} */
092    public void close(TaskAttemptContext context) throws IOException {
093      try {
094        statement.executeBatch();
095        connection.commit();
096      } catch (SQLException e) {
097        try {
098          connection.rollback();
099        }
100        catch (SQLException ex) {
101          LOG.warn(StringUtils.stringifyException(ex));
102        }
103        throw new IOException(e.getMessage());
104      } finally {
105        try {
106          statement.close();
107          connection.close();
108        }
109        catch (SQLException ex) {
110          throw new IOException(ex.getMessage());
111        }
112      }
113    }
114
115    /** {@inheritDoc} */
116    public void write(K key, V value) throws IOException {
117      try {
118        key.write(statement);
119        statement.addBatch();
120      } catch (SQLException e) {
121        e.printStackTrace();
122      }
123    }
124  }
125
126  /**
127   * Constructs the query used as the prepared statement to insert data.
128   * 
129   * @param table
130   *          the table to insert into
131   * @param fieldNames
132   *          the fields to insert into. If field names are unknown, supply an
133   *          array of nulls.
134   */
135  public String constructQuery(String table, String[] fieldNames) {
136    if(fieldNames == null) {
137      throw new IllegalArgumentException("Field names may not be null");
138    }
139
140    StringBuilder query = new StringBuilder();
141    query.append("INSERT INTO ").append(table);
142
143    if (fieldNames.length > 0 && fieldNames[0] != null) {
144      query.append(" (");
145      for (int i = 0; i < fieldNames.length; i++) {
146        query.append(fieldNames[i]);
147        if (i != fieldNames.length - 1) {
148          query.append(",");
149        }
150      }
151      query.append(")");
152    }
153    query.append(" VALUES (");
154
155    for (int i = 0; i < fieldNames.length; i++) {
156      query.append("?");
157      if(i != fieldNames.length - 1) {
158        query.append(",");
159      }
160    }
161    query.append(");");
162
163    return query.toString();
164  }
165
166  /** {@inheritDoc} */
167  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) 
168      throws IOException {
169    DBConfiguration dbConf = new DBConfiguration(context.getConfiguration());
170    String tableName = dbConf.getOutputTableName();
171    String[] fieldNames = dbConf.getOutputFieldNames();
172    
173    if(fieldNames == null) {
174      fieldNames = new String[dbConf.getOutputFieldCount()];
175    }
176    
177    try {
178      Connection connection = dbConf.getConnection();
179      PreparedStatement statement = null;
180  
181      statement = connection.prepareStatement(
182                    constructQuery(tableName, fieldNames));
183      return new DBRecordWriter(connection, statement);
184    } catch (Exception ex) {
185      throw new IOException(ex.getMessage());
186    }
187  }
188
189  /**
190   * Initializes the reduce-part of the job with 
191   * the appropriate output settings
192   * 
193   * @param job The job
194   * @param tableName The table to insert data into
195   * @param fieldNames The field names in the table.
196   */
197  public static void setOutput(Job job, String tableName, 
198      String... fieldNames) throws IOException {
199    if(fieldNames.length > 0 && fieldNames[0] != null) {
200      DBConfiguration dbConf = setOutput(job, tableName);
201      dbConf.setOutputFieldNames(fieldNames);
202    } else {
203      if (fieldNames.length > 0) {
204        setOutput(job, tableName, fieldNames.length);
205      }
206      else { 
207        throw new IllegalArgumentException(
208          "Field names must be greater than 0");
209      }
210    }
211  }
212  
213  /**
214   * Initializes the reduce-part of the job 
215   * with the appropriate output settings
216   * 
217   * @param job The job
218   * @param tableName The table to insert data into
219   * @param fieldCount the number of fields in the table.
220   */
221  public static void setOutput(Job job, String tableName, 
222      int fieldCount) throws IOException {
223    DBConfiguration dbConf = setOutput(job, tableName);
224    dbConf.setOutputFieldCount(fieldCount);
225  }
226  
227  private static DBConfiguration setOutput(Job job,
228      String tableName) throws IOException {
229    job.setOutputFormatClass(DBOutputFormat.class);
230    job.setReduceSpeculativeExecution(false);
231
232    DBConfiguration dbConf = new DBConfiguration(job.getConfiguration());
233    
234    dbConf.setOutputTableName(tableName);
235    return dbConf;
236  }
237}