001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.lib.db;
020    
021    import java.math.BigDecimal;
022    import java.sql.ResultSet;
023    import java.sql.SQLException;
024    import java.util.ArrayList;
025    import java.util.List;
026    
027    import org.apache.commons.logging.Log;
028    import org.apache.commons.logging.LogFactory;
029    
030    import org.apache.hadoop.classification.InterfaceAudience;
031    import org.apache.hadoop.classification.InterfaceStability;
032    import org.apache.hadoop.conf.Configuration;
033    import org.apache.hadoop.mapreduce.InputSplit;
034    import org.apache.hadoop.mapreduce.MRJobConfig;
035    
036    /**
037     * Implement DBSplitter over BigDecimal values.
038     */
039    @InterfaceAudience.Public
040    @InterfaceStability.Evolving
041    public class BigDecimalSplitter implements DBSplitter {
042      private static final Log LOG = LogFactory.getLog(BigDecimalSplitter.class);
043    
044      public List<InputSplit> split(Configuration conf, ResultSet results, String colName)
045          throws SQLException {
046    
047        BigDecimal minVal = results.getBigDecimal(1);
048        BigDecimal maxVal = results.getBigDecimal(2);
049    
050        String lowClausePrefix = colName + " >= ";
051        String highClausePrefix = colName + " < ";
052    
053        BigDecimal numSplits = new BigDecimal(conf.getInt(MRJobConfig.NUM_MAPS, 1));
054    
055        if (minVal == null && maxVal == null) {
056          // Range is null to null. Return a null split accordingly.
057          List<InputSplit> splits = new ArrayList<InputSplit>();
058          splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
059              colName + " IS NULL", colName + " IS NULL"));
060          return splits;
061        }
062    
063        if (minVal == null || maxVal == null) {
064          // Don't know what is a reasonable min/max value for interpolation. Fail.
065          LOG.error("Cannot find a range for NUMERIC or DECIMAL fields with one end NULL.");
066          return null;
067        }
068    
069        // Get all the split points together.
070        List<BigDecimal> splitPoints = split(numSplits, minVal, maxVal);
071        List<InputSplit> splits = new ArrayList<InputSplit>();
072    
073        // Turn the split points into a set of intervals.
074        BigDecimal start = splitPoints.get(0);
075        for (int i = 1; i < splitPoints.size(); i++) {
076          BigDecimal end = splitPoints.get(i);
077    
078          if (i == splitPoints.size() - 1) {
079            // This is the last one; use a closed interval.
080            splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
081                lowClausePrefix + start.toString(),
082                colName + " <= " + end.toString()));
083          } else {
084            // Normal open-interval case.
085            splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
086                lowClausePrefix + start.toString(),
087                highClausePrefix + end.toString()));
088          }
089    
090          start = end;
091        }
092    
093        return splits;
094      }
095    
096      private static final BigDecimal MIN_INCREMENT = new BigDecimal(10000 * Double.MIN_VALUE);
097    
098      /**
099       * Divide numerator by denominator. If impossible in exact mode, use rounding.
100       */
101      protected BigDecimal tryDivide(BigDecimal numerator, BigDecimal denominator) {
102        try {
103          return numerator.divide(denominator);
104        } catch (ArithmeticException ae) {
105          return numerator.divide(denominator, BigDecimal.ROUND_HALF_UP);
106        }
107      }
108    
109      /**
110       * Returns a list of BigDecimals one element longer than the list of input splits.
111       * This represents the boundaries between input splits.
112       * All splits are open on the top end, except the last one.
113       *
114       * So the list [0, 5, 8, 12, 18] would represent splits capturing the intervals:
115       *
116       * [0, 5)
117       * [5, 8)
118       * [8, 12)
119       * [12, 18] note the closed interval for the last split.
120       */
121      List<BigDecimal> split(BigDecimal numSplits, BigDecimal minVal, BigDecimal maxVal)
122          throws SQLException {
123    
124        List<BigDecimal> splits = new ArrayList<BigDecimal>();
125    
126        // Use numSplits as a hint. May need an extra task if the size doesn't
127        // divide cleanly.
128    
129        BigDecimal splitSize = tryDivide(maxVal.subtract(minVal), (numSplits));
130        if (splitSize.compareTo(MIN_INCREMENT) < 0) {
131          splitSize = MIN_INCREMENT;
132          LOG.warn("Set BigDecimal splitSize to MIN_INCREMENT");
133        }
134    
135        BigDecimal curVal = minVal;
136    
137        while (curVal.compareTo(maxVal) <= 0) {
138          splits.add(curVal);
139          curVal = curVal.add(splitSize);
140        }
141    
142        if (splits.get(splits.size() - 1).compareTo(maxVal) != 0 || splits.size() == 1) {
143          // We didn't end on the maxVal. Add that to the end of the list.
144          splits.add(maxVal);
145        }
146    
147        return splits;
148      }
149    }