001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.lib.db;
020    
021    import java.sql.ResultSet;
022    import java.sql.SQLException;
023    import java.util.ArrayList;
024    import java.util.List;
025    
026    import org.apache.hadoop.classification.InterfaceAudience;
027    import org.apache.hadoop.classification.InterfaceStability;
028    import org.apache.hadoop.conf.Configuration;
029    import org.apache.hadoop.mapreduce.InputSplit;
030    import org.apache.hadoop.mapreduce.MRJobConfig;
031    
032    /**
033     * Implement DBSplitter over integer values.
034     */
035    @InterfaceAudience.Public
036    @InterfaceStability.Evolving
037    public class IntegerSplitter implements DBSplitter {
038      public List<InputSplit> split(Configuration conf, ResultSet results, String colName)
039          throws SQLException {
040    
041        long minVal = results.getLong(1);
042        long maxVal = results.getLong(2);
043    
044        String lowClausePrefix = colName + " >= ";
045        String highClausePrefix = colName + " < ";
046    
047        int numSplits = conf.getInt(MRJobConfig.NUM_MAPS, 1);
048        if (numSplits < 1) {
049          numSplits = 1;
050        }
051    
052        if (results.getString(1) == null && results.getString(2) == null) {
053          // Range is null to null. Return a null split accordingly.
054          List<InputSplit> splits = new ArrayList<InputSplit>();
055          splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
056              colName + " IS NULL", colName + " IS NULL"));
057          return splits;
058        }
059    
060        // Get all the split points together.
061        List<Long> splitPoints = split(numSplits, minVal, maxVal);
062        List<InputSplit> splits = new ArrayList<InputSplit>();
063    
064        // Turn the split points into a set of intervals.
065        long start = splitPoints.get(0);
066        for (int i = 1; i < splitPoints.size(); i++) {
067          long end = splitPoints.get(i);
068    
069          if (i == splitPoints.size() - 1) {
070            // This is the last one; use a closed interval.
071            splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
072                lowClausePrefix + Long.toString(start),
073                colName + " <= " + Long.toString(end)));
074          } else {
075            // Normal open-interval case.
076            splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
077                lowClausePrefix + Long.toString(start),
078                highClausePrefix + Long.toString(end)));
079          }
080    
081          start = end;
082        }
083    
084        if (results.getString(1) == null || results.getString(2) == null) {
085          // At least one extrema is null; add a null split.
086          splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
087              colName + " IS NULL", colName + " IS NULL"));
088        }
089    
090        return splits;
091      }
092    
093      /**
094       * Returns a list of longs one element longer than the list of input splits.
095       * This represents the boundaries between input splits.
096       * All splits are open on the top end, except the last one.
097       *
098       * So the list [0, 5, 8, 12, 18] would represent splits capturing the intervals:
099       *
100       * [0, 5)
101       * [5, 8)
102       * [8, 12)
103       * [12, 18] note the closed interval for the last split.
104       */
105      List<Long> split(long numSplits, long minVal, long maxVal)
106          throws SQLException {
107    
108        List<Long> splits = new ArrayList<Long>();
109    
110        // Use numSplits as a hint. May need an extra task if the size doesn't
111        // divide cleanly.
112    
113        long splitSize = (maxVal - minVal) / numSplits;
114        if (splitSize < 1) {
115          splitSize = 1;
116        }
117    
118        long curVal = minVal;
119    
120        while (curVal <= maxVal) {
121          splits.add(curVal);
122          curVal += splitSize;
123        }
124    
125        if (splits.get(splits.size() - 1) != maxVal || splits.size() == 1) {
126          // We didn't end on the maxVal. Add that to the end of the list.
127          splits.add(maxVal);
128        }
129    
130        return splits;
131      }
132    }