001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.lib.db;
020    
021    import java.io.IOException;
022    import java.sql.Connection;
023    import java.sql.PreparedStatement;
024    import java.sql.SQLException;
025    
026    import org.apache.commons.logging.Log;
027    import org.apache.commons.logging.LogFactory;
028    import org.apache.hadoop.classification.InterfaceAudience;
029    import org.apache.hadoop.classification.InterfaceStability;
030    import org.apache.hadoop.mapreduce.Job;
031    import org.apache.hadoop.mapreduce.JobContext;
032    import org.apache.hadoop.mapreduce.OutputCommitter;
033    import org.apache.hadoop.mapreduce.OutputFormat;
034    import org.apache.hadoop.mapreduce.RecordWriter;
035    import org.apache.hadoop.mapreduce.TaskAttemptContext;
036    import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
037    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
038    import org.apache.hadoop.util.StringUtils;
039    
040    /**
041     * A OutputFormat that sends the reduce output to a SQL table.
042     * <p> 
043     * {@link DBOutputFormat} accepts &lt;key,value&gt; pairs, where 
044     * key has a type extending DBWritable. Returned {@link RecordWriter} 
045     * writes <b>only the key</b> to the database with a batch SQL query.  
046     * 
047     */
048    @InterfaceAudience.Public
049    @InterfaceStability.Stable
050    public class DBOutputFormat<K  extends DBWritable, V> 
051    extends OutputFormat<K,V> {
052    
053      private static final Log LOG = LogFactory.getLog(DBOutputFormat.class);
054      public void checkOutputSpecs(JobContext context) 
055          throws IOException, InterruptedException {}
056    
057      public OutputCommitter getOutputCommitter(TaskAttemptContext context) 
058          throws IOException, InterruptedException {
059        return new FileOutputCommitter(FileOutputFormat.getOutputPath(context),
060                                       context);
061      }
062    
063      /**
064       * A RecordWriter that writes the reduce output to a SQL table
065       */
066      @InterfaceStability.Evolving
067      public class DBRecordWriter 
068          extends RecordWriter<K, V> {
069    
070        private Connection connection;
071        private PreparedStatement statement;
072    
073        public DBRecordWriter() throws SQLException {
074        }
075    
076        public DBRecordWriter(Connection connection
077            , PreparedStatement statement) throws SQLException {
078          this.connection = connection;
079          this.statement = statement;
080          this.connection.setAutoCommit(false);
081        }
082    
083        public Connection getConnection() {
084          return connection;
085        }
086        
087        public PreparedStatement getStatement() {
088          return statement;
089        }
090        
091        /** {@inheritDoc} */
092        public void close(TaskAttemptContext context) throws IOException {
093          try {
094            statement.executeBatch();
095            connection.commit();
096          } catch (SQLException e) {
097            try {
098              connection.rollback();
099            }
100            catch (SQLException ex) {
101              LOG.warn(StringUtils.stringifyException(ex));
102            }
103            throw new IOException(e.getMessage());
104          } finally {
105            try {
106              statement.close();
107              connection.close();
108            }
109            catch (SQLException ex) {
110              throw new IOException(ex.getMessage());
111            }
112          }
113        }
114    
115        /** {@inheritDoc} */
116        public void write(K key, V value) throws IOException {
117          try {
118            key.write(statement);
119            statement.addBatch();
120          } catch (SQLException e) {
121            e.printStackTrace();
122          }
123        }
124      }
125    
126      /**
127       * Constructs the query used as the prepared statement to insert data.
128       * 
129       * @param table
130       *          the table to insert into
131       * @param fieldNames
132       *          the fields to insert into. If field names are unknown, supply an
133       *          array of nulls.
134       */
135      public String constructQuery(String table, String[] fieldNames) {
136        if(fieldNames == null) {
137          throw new IllegalArgumentException("Field names may not be null");
138        }
139    
140        StringBuilder query = new StringBuilder();
141        query.append("INSERT INTO ").append(table);
142    
143        if (fieldNames.length > 0 && fieldNames[0] != null) {
144          query.append(" (");
145          for (int i = 0; i < fieldNames.length; i++) {
146            query.append(fieldNames[i]);
147            if (i != fieldNames.length - 1) {
148              query.append(",");
149            }
150          }
151          query.append(")");
152        }
153        query.append(" VALUES (");
154    
155        for (int i = 0; i < fieldNames.length; i++) {
156          query.append("?");
157          if(i != fieldNames.length - 1) {
158            query.append(",");
159          }
160        }
161        query.append(");");
162    
163        return query.toString();
164      }
165    
166      /** {@inheritDoc} */
167      public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) 
168          throws IOException {
169        DBConfiguration dbConf = new DBConfiguration(context.getConfiguration());
170        String tableName = dbConf.getOutputTableName();
171        String[] fieldNames = dbConf.getOutputFieldNames();
172        
173        if(fieldNames == null) {
174          fieldNames = new String[dbConf.getOutputFieldCount()];
175        }
176        
177        try {
178          Connection connection = dbConf.getConnection();
179          PreparedStatement statement = null;
180      
181          statement = connection.prepareStatement(
182                        constructQuery(tableName, fieldNames));
183          return new DBRecordWriter(connection, statement);
184        } catch (Exception ex) {
185          throw new IOException(ex.getMessage());
186        }
187      }
188    
189      /**
190       * Initializes the reduce-part of the job with 
191       * the appropriate output settings
192       * 
193       * @param job The job
194       * @param tableName The table to insert data into
195       * @param fieldNames The field names in the table.
196       */
197      public static void setOutput(Job job, String tableName, 
198          String... fieldNames) throws IOException {
199        if(fieldNames.length > 0 && fieldNames[0] != null) {
200          DBConfiguration dbConf = setOutput(job, tableName);
201          dbConf.setOutputFieldNames(fieldNames);
202        } else {
203          if (fieldNames.length > 0) {
204            setOutput(job, tableName, fieldNames.length);
205          }
206          else { 
207            throw new IllegalArgumentException(
208              "Field names must be greater than 0");
209          }
210        }
211      }
212      
213      /**
214       * Initializes the reduce-part of the job 
215       * with the appropriate output settings
216       * 
217       * @param job The job
218       * @param tableName The table to insert data into
219       * @param fieldCount the number of fields in the table.
220       */
221      public static void setOutput(Job job, String tableName, 
222          int fieldCount) throws IOException {
223        DBConfiguration dbConf = setOutput(job, tableName);
224        dbConf.setOutputFieldCount(fieldCount);
225      }
226      
227      private static DBConfiguration setOutput(Job job,
228          String tableName) throws IOException {
229        job.setOutputFormatClass(DBOutputFormat.class);
230        job.setReduceSpeculativeExecution(false);
231    
232        DBConfiguration dbConf = new DBConfiguration(job.getConfiguration());
233        
234        dbConf.setOutputTableName(tableName);
235        return dbConf;
236      }
237    }