001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapreduce.lib.db; 020 021 import java.sql.ResultSet; 022 import java.sql.SQLException; 023 import java.util.ArrayList; 024 import java.util.List; 025 026 import org.apache.hadoop.classification.InterfaceAudience; 027 import org.apache.hadoop.classification.InterfaceStability; 028 import org.apache.hadoop.conf.Configuration; 029 import org.apache.hadoop.mapreduce.InputSplit; 030 import org.apache.hadoop.mapreduce.MRJobConfig; 031 032 /** 033 * Implement DBSplitter over integer values. 034 */ 035 @InterfaceAudience.Public 036 @InterfaceStability.Evolving 037 public class IntegerSplitter implements DBSplitter { 038 public List<InputSplit> split(Configuration conf, ResultSet results, String colName) 039 throws SQLException { 040 041 long minVal = results.getLong(1); 042 long maxVal = results.getLong(2); 043 044 String lowClausePrefix = colName + " >= "; 045 String highClausePrefix = colName + " < "; 046 047 int numSplits = conf.getInt(MRJobConfig.NUM_MAPS, 1); 048 if (numSplits < 1) { 049 numSplits = 1; 050 } 051 052 if (results.getString(1) == null && results.getString(2) == null) { 053 // Range is null to null. Return a null split accordingly. 054 List<InputSplit> splits = new ArrayList<InputSplit>(); 055 splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit( 056 colName + " IS NULL", colName + " IS NULL")); 057 return splits; 058 } 059 060 // Get all the split points together. 061 List<Long> splitPoints = split(numSplits, minVal, maxVal); 062 List<InputSplit> splits = new ArrayList<InputSplit>(); 063 064 // Turn the split points into a set of intervals. 065 long start = splitPoints.get(0); 066 for (int i = 1; i < splitPoints.size(); i++) { 067 long end = splitPoints.get(i); 068 069 if (i == splitPoints.size() - 1) { 070 // This is the last one; use a closed interval. 071 splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit( 072 lowClausePrefix + Long.toString(start), 073 colName + " <= " + Long.toString(end))); 074 } else { 075 // Normal open-interval case. 076 splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit( 077 lowClausePrefix + Long.toString(start), 078 highClausePrefix + Long.toString(end))); 079 } 080 081 start = end; 082 } 083 084 if (results.getString(1) == null || results.getString(2) == null) { 085 // At least one extrema is null; add a null split. 086 splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit( 087 colName + " IS NULL", colName + " IS NULL")); 088 } 089 090 return splits; 091 } 092 093 /** 094 * Returns a list of longs one element longer than the list of input splits. 095 * This represents the boundaries between input splits. 096 * All splits are open on the top end, except the last one. 097 * 098 * So the list [0, 5, 8, 12, 18] would represent splits capturing the intervals: 099 * 100 * [0, 5) 101 * [5, 8) 102 * [8, 12) 103 * [12, 18] note the closed interval for the last split. 104 */ 105 List<Long> split(long numSplits, long minVal, long maxVal) 106 throws SQLException { 107 108 List<Long> splits = new ArrayList<Long>(); 109 110 // Use numSplits as a hint. May need an extra task if the size doesn't 111 // divide cleanly. 112 113 long splitSize = (maxVal - minVal) / numSplits; 114 if (splitSize < 1) { 115 splitSize = 1; 116 } 117 118 long curVal = minVal; 119 120 while (curVal <= maxVal) { 121 splits.add(curVal); 122 curVal += splitSize; 123 } 124 125 if (splits.get(splits.size() - 1) != maxVal || splits.size() == 1) { 126 // We didn't end on the maxVal. Add that to the end of the list. 127 splits.add(maxVal); 128 } 129 130 return splits; 131 } 132 }