001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapred.lib.db; 020 021import java.io.IOException; 022import java.sql.Connection; 023import java.sql.PreparedStatement; 024import java.sql.SQLException; 025 026import org.apache.hadoop.classification.InterfaceAudience; 027import org.apache.hadoop.classification.InterfaceStability; 028import org.apache.hadoop.fs.FileSystem; 029import org.apache.hadoop.mapred.JobConf; 030import org.apache.hadoop.mapred.OutputFormat; 031import org.apache.hadoop.mapred.RecordWriter; 032import org.apache.hadoop.mapred.Reporter; 033import org.apache.hadoop.mapreduce.MRJobConfig; 034import org.apache.hadoop.mapreduce.TaskAttemptID; 035import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; 036import org.apache.hadoop.util.Progressable; 037 038@InterfaceAudience.Public 039@InterfaceStability.Stable 040public class DBOutputFormat<K extends DBWritable, V> 041 extends org.apache.hadoop.mapreduce.lib.db.DBOutputFormat<K, V> 042 implements OutputFormat<K, V> { 043 044 /** 045 * A RecordWriter that writes the reduce output to a SQL table 046 */ 047 protected class DBRecordWriter extends 048 org.apache.hadoop.mapreduce.lib.db.DBOutputFormat<K, V>.DBRecordWriter 049 implements RecordWriter<K, V> { 050 051 protected DBRecordWriter(Connection connection, 052 PreparedStatement statement) throws SQLException { 053 super(connection, statement); 054 } 055 056 /** {@inheritDoc} */ 057 public void close(Reporter reporter) throws IOException { 058 super.close(null); 059 } 060 } 061 062 /** {@inheritDoc} */ 063 public void checkOutputSpecs(FileSystem filesystem, JobConf job) 064 throws IOException { 065 } 066 067 068 /** {@inheritDoc} */ 069 public RecordWriter<K, V> getRecordWriter(FileSystem filesystem, 070 JobConf job, String name, Progressable progress) throws IOException { 071 org.apache.hadoop.mapreduce.RecordWriter<K, V> w = super.getRecordWriter( 072 new TaskAttemptContextImpl(job, 073 TaskAttemptID.forName(job.get(MRJobConfig.TASK_ATTEMPT_ID)))); 074 org.apache.hadoop.mapreduce.lib.db.DBOutputFormat.DBRecordWriter writer = 075 (org.apache.hadoop.mapreduce.lib.db.DBOutputFormat.DBRecordWriter) w; 076 try { 077 return new DBRecordWriter(writer.getConnection(), writer.getStatement()); 078 } catch(SQLException se) { 079 throw new IOException(se); 080 } 081 } 082 083 /** 084 * Initializes the reduce-part of the job with the appropriate output settings 085 * 086 * @param job The job 087 * @param tableName The table to insert data into 088 * @param fieldNames The field names in the table. 089 */ 090 public static void setOutput(JobConf job, String tableName, String... fieldNames) { 091 if(fieldNames.length > 0 && fieldNames[0] != null) { 092 DBConfiguration dbConf = setOutput(job, tableName); 093 dbConf.setOutputFieldNames(fieldNames); 094 } else { 095 if(fieldNames.length > 0) 096 setOutput(job, tableName, fieldNames.length); 097 else 098 throw new IllegalArgumentException("Field names must be greater than 0"); 099 } 100 } 101 102 /** 103 * Initializes the reduce-part of the job with the appropriate output settings 104 * 105 * @param job The job 106 * @param tableName The table to insert data into 107 * @param fieldCount the number of fields in the table. 108 */ 109 public static void setOutput(JobConf job, String tableName, int fieldCount) { 110 DBConfiguration dbConf = setOutput(job, tableName); 111 dbConf.setOutputFieldCount(fieldCount); 112 } 113 114 private static DBConfiguration setOutput(JobConf job, String tableName) { 115 job.setOutputFormat(DBOutputFormat.class); 116 job.setReduceSpeculativeExecution(false); 117 118 DBConfiguration dbConf = new DBConfiguration(job); 119 120 dbConf.setOutputTableName(tableName); 121 return dbConf; 122 } 123 124}