001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.lib.db;
020
021import java.sql.ResultSet;
022import java.sql.SQLException;
023import java.sql.Time;
024import java.sql.Timestamp;
025import java.sql.Types;
026import java.util.ArrayList;
027import java.util.Date;
028import java.util.List;
029
030import org.apache.commons.logging.Log;
031import org.apache.commons.logging.LogFactory;
032
033import org.apache.hadoop.classification.InterfaceAudience;
034import org.apache.hadoop.classification.InterfaceStability;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.mapreduce.InputSplit;
037import org.apache.hadoop.mapreduce.MRJobConfig;
038
039/**
040 * Implement DBSplitter over date/time values.
041 * Make use of logic from IntegerSplitter, since date/time are just longs
042 * in Java.
043 */
044@InterfaceAudience.Public
045@InterfaceStability.Evolving
046public class DateSplitter extends IntegerSplitter {
047
048  private static final Log LOG = LogFactory.getLog(DateSplitter.class);
049
050  public List<InputSplit> split(Configuration conf, ResultSet results, String colName)
051      throws SQLException {
052
053    long minVal;
054    long maxVal;
055
056    int sqlDataType = results.getMetaData().getColumnType(1);
057    minVal = resultSetColToLong(results, 1, sqlDataType);
058    maxVal = resultSetColToLong(results, 2, sqlDataType);
059
060    String lowClausePrefix = colName + " >= ";
061    String highClausePrefix = colName + " < ";
062
063    int numSplits = conf.getInt(MRJobConfig.NUM_MAPS, 1);
064    if (numSplits < 1) {
065      numSplits = 1;
066    }
067
068    if (minVal == Long.MIN_VALUE && maxVal == Long.MIN_VALUE) {
069      // The range of acceptable dates is NULL to NULL. Just create a single split.
070      List<InputSplit> splits = new ArrayList<InputSplit>();
071      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
072          colName + " IS NULL", colName + " IS NULL"));
073      return splits;
074    }
075
076    // Gather the split point integers
077    List<Long> splitPoints = split(numSplits, minVal, maxVal);
078    List<InputSplit> splits = new ArrayList<InputSplit>();
079
080    // Turn the split points into a set of intervals.
081    long start = splitPoints.get(0);
082    Date startDate = longToDate(start, sqlDataType);
083    if (sqlDataType == Types.TIMESTAMP) {
084      // The lower bound's nanos value needs to match the actual lower-bound nanos.
085      try {
086        ((java.sql.Timestamp) startDate).setNanos(results.getTimestamp(1).getNanos());
087      } catch (NullPointerException npe) {
088        // If the lower bound was NULL, we'll get an NPE; just ignore it and don't set nanos.
089      }
090    }
091
092    for (int i = 1; i < splitPoints.size(); i++) {
093      long end = splitPoints.get(i);
094      Date endDate = longToDate(end, sqlDataType);
095
096      if (i == splitPoints.size() - 1) {
097        if (sqlDataType == Types.TIMESTAMP) {
098          // The upper bound's nanos value needs to match the actual upper-bound nanos.
099          try {
100            ((java.sql.Timestamp) endDate).setNanos(results.getTimestamp(2).getNanos());
101          } catch (NullPointerException npe) {
102            // If the upper bound was NULL, we'll get an NPE; just ignore it and don't set nanos.
103          }
104        }
105        // This is the last one; use a closed interval.
106        splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
107            lowClausePrefix + dateToString(startDate),
108            colName + " <= " + dateToString(endDate)));
109      } else {
110        // Normal open-interval case.
111        splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
112            lowClausePrefix + dateToString(startDate),
113            highClausePrefix + dateToString(endDate)));
114      }
115
116      start = end;
117      startDate = endDate;
118    }
119
120    if (minVal == Long.MIN_VALUE || maxVal == Long.MIN_VALUE) {
121      // Add an extra split to handle the null case that we saw.
122      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
123          colName + " IS NULL", colName + " IS NULL"));
124    }
125
126    return splits;
127  }
128
129  /** Retrieve the value from the column in a type-appropriate manner and return
130      its timestamp since the epoch. If the column is null, then return Long.MIN_VALUE.
131      This will cause a special split to be generated for the NULL case, but may also
132      cause poorly-balanced splits if most of the actual dates are positive time
133      since the epoch, etc.
134    */
135  private long resultSetColToLong(ResultSet rs, int colNum, int sqlDataType) throws SQLException {
136    try {
137      switch (sqlDataType) {
138      case Types.DATE:
139        return rs.getDate(colNum).getTime();
140      case Types.TIME:
141        return rs.getTime(colNum).getTime();
142      case Types.TIMESTAMP:
143        return rs.getTimestamp(colNum).getTime();
144      default:
145        throw new SQLException("Not a date-type field");
146      }
147    } catch (NullPointerException npe) {
148      // null column. return minimum long value.
149      LOG.warn("Encountered a NULL date in the split column. Splits may be poorly balanced.");
150      return Long.MIN_VALUE;
151    }
152  }
153
154  /**  Parse the long-valued timestamp into the appropriate SQL date type. */
155  private Date longToDate(long val, int sqlDataType) {
156    switch (sqlDataType) {
157    case Types.DATE:
158      return new java.sql.Date(val);
159    case Types.TIME:
160      return new java.sql.Time(val);
161    case Types.TIMESTAMP:
162      return new java.sql.Timestamp(val);
163    default: // Shouldn't ever hit this case.
164      return null;
165    }
166  }
167
168  /**
169   * Given a Date 'd', format it as a string for use in a SQL date
170   * comparison operation.
171   * @param d the date to format.
172   * @return the string representing this date in SQL with any appropriate
173   * quotation characters, etc.
174   */
175  protected String dateToString(Date d) {
176    return "'" + d.toString() + "'";
177  }
178}