001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapreduce.lib.db; 020 021 import java.sql.ResultSet; 022 import java.sql.SQLException; 023 import java.math.BigDecimal; 024 import java.util.ArrayList; 025 import java.util.Iterator; 026 import java.util.List; 027 028 import org.apache.commons.logging.Log; 029 import org.apache.commons.logging.LogFactory; 030 031 import org.apache.hadoop.classification.InterfaceAudience; 032 import org.apache.hadoop.classification.InterfaceStability; 033 import org.apache.hadoop.conf.Configuration; 034 import org.apache.hadoop.mapreduce.InputSplit; 035 import org.apache.hadoop.mapreduce.MRJobConfig; 036 037 /** 038 * Implement DBSplitter over text strings. 039 */ 040 @InterfaceAudience.Public 041 @InterfaceStability.Evolving 042 public class TextSplitter extends BigDecimalSplitter { 043 044 private static final Log LOG = LogFactory.getLog(TextSplitter.class); 045 046 /** 047 * This method needs to determine the splits between two user-provided strings. 048 * In the case where the user's strings are 'A' and 'Z', this is not hard; we 049 * could create two splits from ['A', 'M') and ['M', 'Z'], 26 splits for strings 050 * beginning with each letter, etc. 051 * 052 * If a user has provided us with the strings "Ham" and "Haze", however, we need 053 * to create splits that differ in the third letter. 054 * 055 * The algorithm used is as follows: 056 * Since there are 2**16 unicode characters, we interpret characters as digits in 057 * base 65536. Given a string 's' containing characters s_0, s_1 .. s_n, we interpret 058 * the string as the number: 0.s_0 s_1 s_2.. s_n in base 65536. Having mapped the 059 * low and high strings into floating-point values, we then use the BigDecimalSplitter 060 * to establish the even split points, then map the resulting floating point values 061 * back into strings. 062 */ 063 public List<InputSplit> split(Configuration conf, ResultSet results, String colName) 064 throws SQLException { 065 066 LOG.warn("Generating splits for a textual index column."); 067 LOG.warn("If your database sorts in a case-insensitive order, " 068 + "this may result in a partial import or duplicate records."); 069 LOG.warn("You are strongly encouraged to choose an integral split column."); 070 071 String minString = results.getString(1); 072 String maxString = results.getString(2); 073 074 boolean minIsNull = false; 075 076 // If the min value is null, switch it to an empty string instead for purposes 077 // of interpolation. Then add [null, null] as a special case split. 078 if (null == minString) { 079 minString = ""; 080 minIsNull = true; 081 } 082 083 if (null == maxString) { 084 // If the max string is null, then the min string has to be null too. 085 // Just return a special split for this case. 086 List<InputSplit> splits = new ArrayList<InputSplit>(); 087 splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit( 088 colName + " IS NULL", colName + " IS NULL")); 089 return splits; 090 } 091 092 // Use this as a hint. May need an extra task if the size doesn't 093 // divide cleanly. 094 int numSplits = conf.getInt(MRJobConfig.NUM_MAPS, 1); 095 096 String lowClausePrefix = colName + " >= '"; 097 String highClausePrefix = colName + " < '"; 098 099 // If there is a common prefix between minString and maxString, establish it 100 // and pull it out of minString and maxString. 101 int maxPrefixLen = Math.min(minString.length(), maxString.length()); 102 int sharedLen; 103 for (sharedLen = 0; sharedLen < maxPrefixLen; sharedLen++) { 104 char c1 = minString.charAt(sharedLen); 105 char c2 = maxString.charAt(sharedLen); 106 if (c1 != c2) { 107 break; 108 } 109 } 110 111 // The common prefix has length 'sharedLen'. Extract it from both. 112 String commonPrefix = minString.substring(0, sharedLen); 113 minString = minString.substring(sharedLen); 114 maxString = maxString.substring(sharedLen); 115 116 List<String> splitStrings = split(numSplits, minString, maxString, commonPrefix); 117 List<InputSplit> splits = new ArrayList<InputSplit>(); 118 119 // Convert the list of split point strings into an actual set of InputSplits. 120 String start = splitStrings.get(0); 121 for (int i = 1; i < splitStrings.size(); i++) { 122 String end = splitStrings.get(i); 123 124 if (i == splitStrings.size() - 1) { 125 // This is the last one; use a closed interval. 126 splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit( 127 lowClausePrefix + start + "'", colName + " <= '" + end + "'")); 128 } else { 129 // Normal open-interval case. 130 splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit( 131 lowClausePrefix + start + "'", highClausePrefix + end + "'")); 132 } 133 } 134 135 if (minIsNull) { 136 // Add the special null split at the end. 137 splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit( 138 colName + " IS NULL", colName + " IS NULL")); 139 } 140 141 return splits; 142 } 143 144 List<String> split(int numSplits, String minString, String maxString, String commonPrefix) 145 throws SQLException { 146 147 BigDecimal minVal = stringToBigDecimal(minString); 148 BigDecimal maxVal = stringToBigDecimal(maxString); 149 150 List<BigDecimal> splitPoints = split(new BigDecimal(numSplits), minVal, maxVal); 151 List<String> splitStrings = new ArrayList<String>(); 152 153 // Convert the BigDecimal splitPoints into their string representations. 154 for (BigDecimal bd : splitPoints) { 155 splitStrings.add(commonPrefix + bigDecimalToString(bd)); 156 } 157 158 // Make sure that our user-specified boundaries are the first and last entries 159 // in the array. 160 if (splitStrings.size() == 0 || !splitStrings.get(0).equals(commonPrefix + minString)) { 161 splitStrings.add(0, commonPrefix + minString); 162 } 163 if (splitStrings.size() == 1 164 || !splitStrings.get(splitStrings.size() - 1).equals(commonPrefix + maxString)) { 165 splitStrings.add(commonPrefix + maxString); 166 } 167 168 return splitStrings; 169 } 170 171 private final static BigDecimal ONE_PLACE = new BigDecimal(65536); 172 173 // Maximum number of characters to convert. This is to prevent rounding errors 174 // or repeating fractions near the very bottom from getting out of control. Note 175 // that this still gives us a huge number of possible splits. 176 private final static int MAX_CHARS = 8; 177 178 /** 179 * Return a BigDecimal representation of string 'str' suitable for use 180 * in a numerically-sorting order. 181 */ 182 BigDecimal stringToBigDecimal(String str) { 183 BigDecimal result = BigDecimal.ZERO; 184 BigDecimal curPlace = ONE_PLACE; // start with 1/65536 to compute the first digit. 185 186 int len = Math.min(str.length(), MAX_CHARS); 187 188 for (int i = 0; i < len; i++) { 189 int codePoint = str.codePointAt(i); 190 result = result.add(tryDivide(new BigDecimal(codePoint), curPlace)); 191 // advance to the next less significant place. e.g., 1/(65536^2) for the second char. 192 curPlace = curPlace.multiply(ONE_PLACE); 193 } 194 195 return result; 196 } 197 198 /** 199 * Return the string encoded in a BigDecimal. 200 * Repeatedly multiply the input value by 65536; the integer portion after such a multiplication 201 * represents a single character in base 65536. Convert that back into a char and create a 202 * string out of these until we have no data left. 203 */ 204 String bigDecimalToString(BigDecimal bd) { 205 BigDecimal cur = bd.stripTrailingZeros(); 206 StringBuilder sb = new StringBuilder(); 207 208 for (int numConverted = 0; numConverted < MAX_CHARS; numConverted++) { 209 cur = cur.multiply(ONE_PLACE); 210 int curCodePoint = cur.intValue(); 211 if (0 == curCodePoint) { 212 break; 213 } 214 215 cur = cur.subtract(new BigDecimal(curCodePoint)); 216 sb.append(Character.toChars(curCodePoint)); 217 } 218 219 return sb.toString(); 220 } 221 }