001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.lib.db;
020    
021    import java.io.IOException;
022    import java.sql.Connection;
023    import java.sql.SQLException;
024    import java.lang.reflect.Method;
025    
026    import org.apache.hadoop.classification.InterfaceAudience;
027    import org.apache.hadoop.classification.InterfaceStability;
028    import org.apache.hadoop.conf.Configuration;
029    import org.apache.commons.logging.Log;
030    import org.apache.commons.logging.LogFactory;
031    
032    /**
033     * A RecordReader that reads records from an Oracle SQL table.
034     */
035    @InterfaceAudience.Public
036    @InterfaceStability.Evolving
037    public class OracleDBRecordReader<T extends DBWritable> extends DBRecordReader<T> {
038    
039      /** Configuration key to set to a timezone string. */
040      public static final String SESSION_TIMEZONE_KEY = "oracle.sessionTimeZone";
041    
042      private static final Log LOG = LogFactory.getLog(OracleDBRecordReader.class);
043    
044      public OracleDBRecordReader(DBInputFormat.DBInputSplit split, 
045          Class<T> inputClass, Configuration conf, Connection conn, DBConfiguration dbConfig,
046          String cond, String [] fields, String table) throws SQLException {
047        super(split, inputClass, conf, conn, dbConfig, cond, fields, table);
048        setSessionTimeZone(conf, conn);
049      }
050    
051      /** Returns the query for selecting the records from an Oracle DB. */
052      protected String getSelectQuery() {
053        StringBuilder query = new StringBuilder();
054        DBConfiguration dbConf = getDBConf();
055        String conditions = getConditions();
056        String tableName = getTableName();
057        String [] fieldNames = getFieldNames();
058    
059        // Oracle-specific codepath to use rownum instead of LIMIT/OFFSET.
060        if(dbConf.getInputQuery() == null) {
061          query.append("SELECT ");
062      
063          for (int i = 0; i < fieldNames.length; i++) {
064            query.append(fieldNames[i]);
065            if (i != fieldNames.length -1) {
066              query.append(", ");
067            }
068          }
069      
070          query.append(" FROM ").append(tableName);
071          if (conditions != null && conditions.length() > 0)
072            query.append(" WHERE ").append(conditions);
073          String orderBy = dbConf.getInputOrderBy();
074          if (orderBy != null && orderBy.length() > 0) {
075            query.append(" ORDER BY ").append(orderBy);
076          }
077        } else {
078          //PREBUILT QUERY
079          query.append(dbConf.getInputQuery());
080        }
081            
082        try {
083          DBInputFormat.DBInputSplit split = getSplit();
084          if (split.getLength() > 0){
085            String querystring = query.toString();
086    
087            query = new StringBuilder();
088            query.append("SELECT * FROM (SELECT a.*,ROWNUM dbif_rno FROM ( ");
089            query.append(querystring);
090            query.append(" ) a WHERE rownum <= ").append(split.getEnd());
091            query.append(" ) WHERE dbif_rno > ").append(split.getStart());
092          }
093        } catch (IOException ex) {
094          // ignore, will not throw.
095        }                 
096    
097        return query.toString();
098      }
099    
100      /**
101       * Set session time zone
102       * @param conf The current configuration.
103       * We read the 'oracle.sessionTimeZone' property from here.
104       * @param conn The connection to alter the timezone properties of.
105       */
106      public static void setSessionTimeZone(Configuration conf,
107          Connection conn) throws SQLException {
108        // need to use reflection to call the method setSessionTimeZone on
109        // the OracleConnection class because oracle specific java libraries are
110        // not accessible in this context.
111        Method method;
112        try {
113          method = conn.getClass().getMethod(
114                  "setSessionTimeZone", new Class [] {String.class});
115        } catch (Exception ex) {
116          LOG.error("Could not find method setSessionTimeZone in " + conn.getClass().getName(), ex);
117          // rethrow SQLException
118          throw new SQLException(ex);
119        }
120    
121        // Need to set the time zone in order for Java
122        // to correctly access the column "TIMESTAMP WITH LOCAL TIME ZONE".
123        // We can't easily get the correct Oracle-specific timezone string
124        // from Java; just let the user set the timezone in a property.
125        String clientTimeZone = conf.get(SESSION_TIMEZONE_KEY, "GMT");
126        try {
127          method.setAccessible(true);
128          method.invoke(conn, clientTimeZone);
129          LOG.info("Time zone has been set to " + clientTimeZone);
130        } catch (Exception ex) {
131          LOG.warn("Time zone " + clientTimeZone +
132                   " could not be set on Oracle database.");
133          LOG.warn("Setting default time zone: GMT");
134          try {
135            // "GMT" timezone is guaranteed to exist.
136            method.invoke(conn, "GMT");
137          } catch (Exception ex2) {
138            LOG.error("Could not set time zone for oracle connection", ex2);
139            // rethrow SQLException
140            throw new SQLException(ex);
141          }
142        }
143      }
144    }