001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.lib.db;
020    
021    import java.io.IOException;
022    import java.sql.Connection;
023    import java.sql.SQLException;
024    import java.lang.reflect.Method;
025    
026    import org.apache.hadoop.classification.InterfaceAudience;
027    import org.apache.hadoop.classification.InterfaceStability;
028    import org.apache.hadoop.conf.Configuration;
029    import org.apache.commons.logging.Log;
030    import org.apache.commons.logging.LogFactory;
031    
032    /**
033     * A RecordReader that reads records from an Oracle SQL table.
034     */
035    @InterfaceAudience.Public
036    @InterfaceStability.Evolving
037    public class OracleDBRecordReader<T extends DBWritable> extends DBRecordReader<T> {
038    
039      /** Configuration key to set to a timezone string. */
040      public static final String SESSION_TIMEZONE_KEY = "oracle.sessionTimeZone";
041    
042      private static final Log LOG = LogFactory.getLog(OracleDBRecordReader.class);
043    
044      public OracleDBRecordReader(DBInputFormat.DBInputSplit split, 
045          Class<T> inputClass, Configuration conf, Connection conn, DBConfiguration dbConfig,
046          String cond, String [] fields, String table) throws SQLException {
047        super(split, inputClass, conf, conn, dbConfig, cond, fields, table);
048        setSessionTimeZone(conf, conn);
049      }
050    
051      /** Returns the query for selecting the records from an Oracle DB. */
052      protected String getSelectQuery() {
053        StringBuilder query = new StringBuilder();
054        DBConfiguration dbConf = getDBConf();
055        String conditions = getConditions();
056        String tableName = getTableName();
057        String [] fieldNames = getFieldNames();
058    
059        // Oracle-specific codepath to use rownum instead of LIMIT/OFFSET.
060        if(dbConf.getInputQuery() == null) {
061          query.append("SELECT ");
062      
063          for (int i = 0; i < fieldNames.length; i++) {
064            query.append(fieldNames[i]);
065            if (i != fieldNames.length -1) {
066              query.append(", ");
067            }
068          }
069      
070          query.append(" FROM ").append(tableName);
071          if (conditions != null && conditions.length() > 0)
072            query.append(" WHERE ").append(conditions);
073          String orderBy = dbConf.getInputOrderBy();
074          if (orderBy != null && orderBy.length() > 0) {
075            query.append(" ORDER BY ").append(orderBy);
076          }
077        } else {
078          //PREBUILT QUERY
079          query.append(dbConf.getInputQuery());
080        }
081            
082        try {
083          DBInputFormat.DBInputSplit split = getSplit();
084          if (split.getLength() > 0 && split.getStart() > 0){
085            String querystring = query.toString();
086    
087            query = new StringBuilder();
088            query.append("SELECT * FROM (SELECT a.*,ROWNUM dbif_rno FROM ( ");
089            query.append(querystring);
090            query.append(" ) a WHERE rownum <= ").append(split.getStart());
091            query.append(" + ").append(split.getLength());
092            query.append(" ) WHERE dbif_rno >= ").append(split.getStart());
093          }
094        } catch (IOException ex) {
095          // ignore, will not throw.
096        }                 
097    
098        return query.toString();
099      }
100    
101      /**
102       * Set session time zone
103       * @param conf The current configuration.
104       * We read the 'oracle.sessionTimeZone' property from here.
105       * @param conn The connection to alter the timezone properties of.
106       */
107      public static void setSessionTimeZone(Configuration conf,
108          Connection conn) throws SQLException {
109        // need to use reflection to call the method setSessionTimeZone on
110        // the OracleConnection class because oracle specific java libraries are
111        // not accessible in this context.
112        Method method;
113        try {
114          method = conn.getClass().getMethod(
115                  "setSessionTimeZone", new Class [] {String.class});
116        } catch (Exception ex) {
117          LOG.error("Could not find method setSessionTimeZone in " + conn.getClass().getName(), ex);
118          // rethrow SQLException
119          throw new SQLException(ex);
120        }
121    
122        // Need to set the time zone in order for Java
123        // to correctly access the column "TIMESTAMP WITH LOCAL TIME ZONE".
124        // We can't easily get the correct Oracle-specific timezone string
125        // from Java; just let the user set the timezone in a property.
126        String clientTimeZone = conf.get(SESSION_TIMEZONE_KEY, "GMT");
127        try {
128          method.setAccessible(true);
129          method.invoke(conn, clientTimeZone);
130          LOG.info("Time zone has been set to " + clientTimeZone);
131        } catch (Exception ex) {
132          LOG.warn("Time zone " + clientTimeZone +
133                   " could not be set on Oracle database.");
134          LOG.warn("Setting default time zone: GMT");
135          try {
136            // "GMT" timezone is guaranteed to exist.
137            method.invoke(conn, "GMT");
138          } catch (Exception ex2) {
139            LOG.error("Could not set time zone for oracle connection", ex2);
140            // rethrow SQLException
141            throw new SQLException(ex);
142          }
143        }
144      }
145    }