001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapreduce.lib.db;
020
021 import java.sql.ResultSet;
022 import java.sql.SQLException;
023 import java.util.ArrayList;
024 import java.util.List;
025
026 import org.apache.hadoop.classification.InterfaceAudience;
027 import org.apache.hadoop.classification.InterfaceStability;
028 import org.apache.hadoop.conf.Configuration;
029 import org.apache.hadoop.mapreduce.InputSplit;
030 import org.apache.hadoop.mapreduce.MRJobConfig;
031
032 /**
033 * Implement DBSplitter over integer values.
034 */
035 @InterfaceAudience.Public
036 @InterfaceStability.Evolving
037 public class IntegerSplitter implements DBSplitter {
038 public List<InputSplit> split(Configuration conf, ResultSet results, String colName)
039 throws SQLException {
040
041 long minVal = results.getLong(1);
042 long maxVal = results.getLong(2);
043
044 String lowClausePrefix = colName + " >= ";
045 String highClausePrefix = colName + " < ";
046
047 int numSplits = conf.getInt(MRJobConfig.NUM_MAPS, 1);
048 if (numSplits < 1) {
049 numSplits = 1;
050 }
051
052 if (results.getString(1) == null && results.getString(2) == null) {
053 // Range is null to null. Return a null split accordingly.
054 List<InputSplit> splits = new ArrayList<InputSplit>();
055 splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
056 colName + " IS NULL", colName + " IS NULL"));
057 return splits;
058 }
059
060 // Get all the split points together.
061 List<Long> splitPoints = split(numSplits, minVal, maxVal);
062 List<InputSplit> splits = new ArrayList<InputSplit>();
063
064 // Turn the split points into a set of intervals.
065 long start = splitPoints.get(0);
066 for (int i = 1; i < splitPoints.size(); i++) {
067 long end = splitPoints.get(i);
068
069 if (i == splitPoints.size() - 1) {
070 // This is the last one; use a closed interval.
071 splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
072 lowClausePrefix + Long.toString(start),
073 colName + " <= " + Long.toString(end)));
074 } else {
075 // Normal open-interval case.
076 splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
077 lowClausePrefix + Long.toString(start),
078 highClausePrefix + Long.toString(end)));
079 }
080
081 start = end;
082 }
083
084 if (results.getString(1) == null || results.getString(2) == null) {
085 // At least one extrema is null; add a null split.
086 splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
087 colName + " IS NULL", colName + " IS NULL"));
088 }
089
090 return splits;
091 }
092
093 /**
094 * Returns a list of longs one element longer than the list of input splits.
095 * This represents the boundaries between input splits.
096 * All splits are open on the top end, except the last one.
097 *
098 * So the list [0, 5, 8, 12, 18] would represent splits capturing the intervals:
099 *
100 * [0, 5)
101 * [5, 8)
102 * [8, 12)
103 * [12, 18] note the closed interval for the last split.
104 */
105 List<Long> split(long numSplits, long minVal, long maxVal)
106 throws SQLException {
107
108 List<Long> splits = new ArrayList<Long>();
109
110 // Use numSplits as a hint. May need an extra task if the size doesn't
111 // divide cleanly.
112
113 long splitSize = (maxVal - minVal) / numSplits;
114 if (splitSize < 1) {
115 splitSize = 1;
116 }
117
118 long curVal = minVal;
119
120 while (curVal <= maxVal) {
121 splits.add(curVal);
122 curVal += splitSize;
123 }
124
125 if (splits.get(splits.size() - 1) != maxVal || splits.size() == 1) {
126 // We didn't end on the maxVal. Add that to the end of the list.
127 splits.add(maxVal);
128 }
129
130 return splits;
131 }
132 }