001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce.lib.db; 020 021import java.io.IOException; 022import java.sql.Connection; 023import java.sql.SQLException; 024import java.lang.reflect.Method; 025 026import org.apache.hadoop.classification.InterfaceAudience; 027import org.apache.hadoop.classification.InterfaceStability; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.commons.logging.Log; 030import org.apache.commons.logging.LogFactory; 031 032/** 033 * A RecordReader that reads records from an Oracle SQL table. 034 */ 035@InterfaceAudience.Public 036@InterfaceStability.Evolving 037public class OracleDBRecordReader<T extends DBWritable> extends DBRecordReader<T> { 038 039 /** Configuration key to set to a timezone string. */ 040 public static final String SESSION_TIMEZONE_KEY = "oracle.sessionTimeZone"; 041 042 private static final Log LOG = LogFactory.getLog(OracleDBRecordReader.class); 043 044 public OracleDBRecordReader(DBInputFormat.DBInputSplit split, 045 Class<T> inputClass, Configuration conf, Connection conn, DBConfiguration dbConfig, 046 String cond, String [] fields, String table) throws SQLException { 047 super(split, inputClass, conf, conn, dbConfig, cond, fields, table); 048 setSessionTimeZone(conf, conn); 049 } 050 051 /** Returns the query for selecting the records from an Oracle DB. */ 052 protected String getSelectQuery() { 053 StringBuilder query = new StringBuilder(); 054 DBConfiguration dbConf = getDBConf(); 055 String conditions = getConditions(); 056 String tableName = getTableName(); 057 String [] fieldNames = getFieldNames(); 058 059 // Oracle-specific codepath to use rownum instead of LIMIT/OFFSET. 060 if(dbConf.getInputQuery() == null) { 061 query.append("SELECT "); 062 063 for (int i = 0; i < fieldNames.length; i++) { 064 query.append(fieldNames[i]); 065 if (i != fieldNames.length -1) { 066 query.append(", "); 067 } 068 } 069 070 query.append(" FROM ").append(tableName); 071 if (conditions != null && conditions.length() > 0) 072 query.append(" WHERE ").append(conditions); 073 String orderBy = dbConf.getInputOrderBy(); 074 if (orderBy != null && orderBy.length() > 0) { 075 query.append(" ORDER BY ").append(orderBy); 076 } 077 } else { 078 //PREBUILT QUERY 079 query.append(dbConf.getInputQuery()); 080 } 081 082 try { 083 DBInputFormat.DBInputSplit split = getSplit(); 084 if (split.getLength() > 0){ 085 String querystring = query.toString(); 086 087 query = new StringBuilder(); 088 query.append("SELECT * FROM (SELECT a.*,ROWNUM dbif_rno FROM ( "); 089 query.append(querystring); 090 query.append(" ) a WHERE rownum <= ").append(split.getEnd()); 091 query.append(" ) WHERE dbif_rno > ").append(split.getStart()); 092 } 093 } catch (IOException ex) { 094 // ignore, will not throw. 095 } 096 097 return query.toString(); 098 } 099 100 /** 101 * Set session time zone 102 * @param conf The current configuration. 103 * We read the 'oracle.sessionTimeZone' property from here. 104 * @param conn The connection to alter the timezone properties of. 105 */ 106 public static void setSessionTimeZone(Configuration conf, 107 Connection conn) throws SQLException { 108 // need to use reflection to call the method setSessionTimeZone on 109 // the OracleConnection class because oracle specific java libraries are 110 // not accessible in this context. 111 Method method; 112 try { 113 method = conn.getClass().getMethod( 114 "setSessionTimeZone", new Class [] {String.class}); 115 } catch (Exception ex) { 116 LOG.error("Could not find method setSessionTimeZone in " + conn.getClass().getName(), ex); 117 // rethrow SQLException 118 throw new SQLException(ex); 119 } 120 121 // Need to set the time zone in order for Java 122 // to correctly access the column "TIMESTAMP WITH LOCAL TIME ZONE". 123 // We can't easily get the correct Oracle-specific timezone string 124 // from Java; just let the user set the timezone in a property. 125 String clientTimeZone = conf.get(SESSION_TIMEZONE_KEY, "GMT"); 126 try { 127 method.setAccessible(true); 128 method.invoke(conn, clientTimeZone); 129 LOG.info("Time zone has been set to " + clientTimeZone); 130 } catch (Exception ex) { 131 LOG.warn("Time zone " + clientTimeZone + 132 " could not be set on Oracle database."); 133 LOG.warn("Setting default time zone: GMT"); 134 try { 135 // "GMT" timezone is guaranteed to exist. 136 method.invoke(conn, "GMT"); 137 } catch (Exception ex2) { 138 LOG.error("Could not set time zone for oracle connection", ex2); 139 // rethrow SQLException 140 throw new SQLException(ex); 141 } 142 } 143 } 144}