001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.lib.db;
020    
021    import java.sql.ResultSet;
022    import java.sql.SQLException;
023    import java.sql.Time;
024    import java.sql.Timestamp;
025    import java.sql.Types;
026    import java.util.ArrayList;
027    import java.util.Date;
028    import java.util.List;
029    
030    import org.apache.commons.logging.Log;
031    import org.apache.commons.logging.LogFactory;
032    
033    import org.apache.hadoop.classification.InterfaceAudience;
034    import org.apache.hadoop.classification.InterfaceStability;
035    import org.apache.hadoop.conf.Configuration;
036    import org.apache.hadoop.mapreduce.InputSplit;
037    import org.apache.hadoop.mapreduce.MRJobConfig;
038    
039    /**
040     * Implement DBSplitter over date/time values.
041     * Make use of logic from IntegerSplitter, since date/time are just longs
042     * in Java.
043     */
044    @InterfaceAudience.Public
045    @InterfaceStability.Evolving
046    public class DateSplitter extends IntegerSplitter {
047    
048      private static final Log LOG = LogFactory.getLog(DateSplitter.class);
049    
050      public List<InputSplit> split(Configuration conf, ResultSet results, String colName)
051          throws SQLException {
052    
053        long minVal;
054        long maxVal;
055    
056        int sqlDataType = results.getMetaData().getColumnType(1);
057        minVal = resultSetColToLong(results, 1, sqlDataType);
058        maxVal = resultSetColToLong(results, 2, sqlDataType);
059    
060        String lowClausePrefix = colName + " >= ";
061        String highClausePrefix = colName + " < ";
062    
063        int numSplits = conf.getInt(MRJobConfig.NUM_MAPS, 1);
064        if (numSplits < 1) {
065          numSplits = 1;
066        }
067    
068        if (minVal == Long.MIN_VALUE && maxVal == Long.MIN_VALUE) {
069          // The range of acceptable dates is NULL to NULL. Just create a single split.
070          List<InputSplit> splits = new ArrayList<InputSplit>();
071          splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
072              colName + " IS NULL", colName + " IS NULL"));
073          return splits;
074        }
075    
076        // Gather the split point integers
077        List<Long> splitPoints = split(numSplits, minVal, maxVal);
078        List<InputSplit> splits = new ArrayList<InputSplit>();
079    
080        // Turn the split points into a set of intervals.
081        long start = splitPoints.get(0);
082        Date startDate = longToDate(start, sqlDataType);
083        if (sqlDataType == Types.TIMESTAMP) {
084          // The lower bound's nanos value needs to match the actual lower-bound nanos.
085          try {
086            ((java.sql.Timestamp) startDate).setNanos(results.getTimestamp(1).getNanos());
087          } catch (NullPointerException npe) {
088            // If the lower bound was NULL, we'll get an NPE; just ignore it and don't set nanos.
089          }
090        }
091    
092        for (int i = 1; i < splitPoints.size(); i++) {
093          long end = splitPoints.get(i);
094          Date endDate = longToDate(end, sqlDataType);
095    
096          if (i == splitPoints.size() - 1) {
097            if (sqlDataType == Types.TIMESTAMP) {
098              // The upper bound's nanos value needs to match the actual upper-bound nanos.
099              try {
100                ((java.sql.Timestamp) endDate).setNanos(results.getTimestamp(2).getNanos());
101              } catch (NullPointerException npe) {
102                // If the upper bound was NULL, we'll get an NPE; just ignore it and don't set nanos.
103              }
104            }
105            // This is the last one; use a closed interval.
106            splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
107                lowClausePrefix + dateToString(startDate),
108                colName + " <= " + dateToString(endDate)));
109          } else {
110            // Normal open-interval case.
111            splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
112                lowClausePrefix + dateToString(startDate),
113                highClausePrefix + dateToString(endDate)));
114          }
115    
116          start = end;
117          startDate = endDate;
118        }
119    
120        if (minVal == Long.MIN_VALUE || maxVal == Long.MIN_VALUE) {
121          // Add an extra split to handle the null case that we saw.
122          splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
123              colName + " IS NULL", colName + " IS NULL"));
124        }
125    
126        return splits;
127      }
128    
129      /** Retrieve the value from the column in a type-appropriate manner and return
130          its timestamp since the epoch. If the column is null, then return Long.MIN_VALUE.
131          This will cause a special split to be generated for the NULL case, but may also
132          cause poorly-balanced splits if most of the actual dates are positive time
133          since the epoch, etc.
134        */
135      private long resultSetColToLong(ResultSet rs, int colNum, int sqlDataType) throws SQLException {
136        try {
137          switch (sqlDataType) {
138          case Types.DATE:
139            return rs.getDate(colNum).getTime();
140          case Types.TIME:
141            return rs.getTime(colNum).getTime();
142          case Types.TIMESTAMP:
143            return rs.getTimestamp(colNum).getTime();
144          default:
145            throw new SQLException("Not a date-type field");
146          }
147        } catch (NullPointerException npe) {
148          // null column. return minimum long value.
149          LOG.warn("Encountered a NULL date in the split column. Splits may be poorly balanced.");
150          return Long.MIN_VALUE;
151        }
152      }
153    
154      /**  Parse the long-valued timestamp into the appropriate SQL date type. */
155      private Date longToDate(long val, int sqlDataType) {
156        switch (sqlDataType) {
157        case Types.DATE:
158          return new java.sql.Date(val);
159        case Types.TIME:
160          return new java.sql.Time(val);
161        case Types.TIMESTAMP:
162          return new java.sql.Timestamp(val);
163        default: // Shouldn't ever hit this case.
164          return null;
165        }
166      }
167    
168      /**
169       * Given a Date 'd', format it as a string for use in a SQL date
170       * comparison operation.
171       * @param d the date to format.
172       * @return the string representing this date in SQL with any appropriate
173       * quotation characters, etc.
174       */
175      protected String dateToString(Date d) {
176        return "'" + d.toString() + "'";
177      }
178    }