001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.lib.db;
020    
021    import java.sql.ResultSet;
022    import java.sql.SQLException;
023    import java.math.BigDecimal;
024    import java.util.ArrayList;
025    import java.util.Iterator;
026    import java.util.List;
027    
028    import org.apache.commons.logging.Log;
029    import org.apache.commons.logging.LogFactory;
030    
031    import org.apache.hadoop.classification.InterfaceAudience;
032    import org.apache.hadoop.classification.InterfaceStability;
033    import org.apache.hadoop.conf.Configuration;
034    import org.apache.hadoop.mapreduce.InputSplit;
035    import org.apache.hadoop.mapreduce.MRJobConfig;
036    
037    /**
038     * Implement DBSplitter over text strings.
039     */
040    @InterfaceAudience.Public
041    @InterfaceStability.Evolving
042    public class TextSplitter extends BigDecimalSplitter {
043    
044      private static final Log LOG = LogFactory.getLog(TextSplitter.class);
045    
046      /**
047       * This method needs to determine the splits between two user-provided strings.
048       * In the case where the user's strings are 'A' and 'Z', this is not hard; we 
049       * could create two splits from ['A', 'M') and ['M', 'Z'], 26 splits for strings
050       * beginning with each letter, etc.
051       *
052       * If a user has provided us with the strings "Ham" and "Haze", however, we need
053       * to create splits that differ in the third letter.
054       *
055       * The algorithm used is as follows:
056       * Since there are 2**16 unicode characters, we interpret characters as digits in
057       * base 65536. Given a string 's' containing characters s_0, s_1 .. s_n, we interpret
058       * the string as the number: 0.s_0 s_1 s_2.. s_n in base 65536. Having mapped the
059       * low and high strings into floating-point values, we then use the BigDecimalSplitter
060       * to establish the even split points, then map the resulting floating point values
061       * back into strings.
062       */
063      public List<InputSplit> split(Configuration conf, ResultSet results, String colName)
064          throws SQLException {
065    
066        LOG.warn("Generating splits for a textual index column.");
067        LOG.warn("If your database sorts in a case-insensitive order, "
068            + "this may result in a partial import or duplicate records.");
069        LOG.warn("You are strongly encouraged to choose an integral split column.");
070    
071        String minString = results.getString(1);
072        String maxString = results.getString(2);
073    
074        boolean minIsNull = false;
075    
076        // If the min value is null, switch it to an empty string instead for purposes
077        // of interpolation. Then add [null, null] as a special case split.
078        if (null == minString) {
079          minString = "";
080          minIsNull = true;
081        }
082    
083        if (null == maxString) {
084          // If the max string is null, then the min string has to be null too.
085          // Just return a special split for this case.
086          List<InputSplit> splits = new ArrayList<InputSplit>();
087          splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
088              colName + " IS NULL", colName + " IS NULL"));
089          return splits;
090        }
091    
092        // Use this as a hint. May need an extra task if the size doesn't
093        // divide cleanly.
094        int numSplits = conf.getInt(MRJobConfig.NUM_MAPS, 1);
095    
096        String lowClausePrefix = colName + " >= '";
097        String highClausePrefix = colName + " < '";
098    
099        // If there is a common prefix between minString and maxString, establish it
100        // and pull it out of minString and maxString.
101        int maxPrefixLen = Math.min(minString.length(), maxString.length());
102        int sharedLen;
103        for (sharedLen = 0; sharedLen < maxPrefixLen; sharedLen++) {
104          char c1 = minString.charAt(sharedLen);
105          char c2 = maxString.charAt(sharedLen);
106          if (c1 != c2) {
107            break;
108          }
109        }
110    
111        // The common prefix has length 'sharedLen'. Extract it from both.
112        String commonPrefix = minString.substring(0, sharedLen);
113        minString = minString.substring(sharedLen);
114        maxString = maxString.substring(sharedLen);
115    
116        List<String> splitStrings = split(numSplits, minString, maxString, commonPrefix);
117        List<InputSplit> splits = new ArrayList<InputSplit>();
118    
119        // Convert the list of split point strings into an actual set of InputSplits.
120        String start = splitStrings.get(0);
121        for (int i = 1; i < splitStrings.size(); i++) {
122          String end = splitStrings.get(i);
123    
124          if (i == splitStrings.size() - 1) {
125            // This is the last one; use a closed interval.
126            splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
127                lowClausePrefix + start + "'", colName + " <= '" + end + "'"));
128          } else {
129            // Normal open-interval case.
130            splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
131                lowClausePrefix + start + "'", highClausePrefix + end + "'"));
132          }
133        }
134    
135        if (minIsNull) {
136          // Add the special null split at the end.
137          splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
138              colName + " IS NULL", colName + " IS NULL"));
139        }
140    
141        return splits;
142      }
143    
144      List<String> split(int numSplits, String minString, String maxString, String commonPrefix)
145          throws SQLException {
146    
147        BigDecimal minVal = stringToBigDecimal(minString);
148        BigDecimal maxVal = stringToBigDecimal(maxString);
149    
150        List<BigDecimal> splitPoints = split(new BigDecimal(numSplits), minVal, maxVal);
151        List<String> splitStrings = new ArrayList<String>();
152    
153        // Convert the BigDecimal splitPoints into their string representations.
154        for (BigDecimal bd : splitPoints) {
155          splitStrings.add(commonPrefix + bigDecimalToString(bd));
156        }
157    
158        // Make sure that our user-specified boundaries are the first and last entries
159        // in the array.
160        if (splitStrings.size() == 0 || !splitStrings.get(0).equals(commonPrefix + minString)) {
161          splitStrings.add(0, commonPrefix + minString);
162        }
163        if (splitStrings.size() == 1
164            || !splitStrings.get(splitStrings.size() - 1).equals(commonPrefix + maxString)) {
165          splitStrings.add(commonPrefix + maxString);
166        }
167    
168        return splitStrings;
169      }
170    
171      private final static BigDecimal ONE_PLACE = new BigDecimal(65536);
172    
173      // Maximum number of characters to convert. This is to prevent rounding errors
174      // or repeating fractions near the very bottom from getting out of control. Note
175      // that this still gives us a huge number of possible splits.
176      private final static int MAX_CHARS = 8;
177    
178      /**
179       * Return a BigDecimal representation of string 'str' suitable for use
180       * in a numerically-sorting order.
181       */
182      BigDecimal stringToBigDecimal(String str) {
183        BigDecimal result = BigDecimal.ZERO;
184        BigDecimal curPlace = ONE_PLACE; // start with 1/65536 to compute the first digit.
185    
186        int len = Math.min(str.length(), MAX_CHARS);
187    
188        for (int i = 0; i < len; i++) {
189          int codePoint = str.codePointAt(i);
190          result = result.add(tryDivide(new BigDecimal(codePoint), curPlace));
191          // advance to the next less significant place. e.g., 1/(65536^2) for the second char.
192          curPlace = curPlace.multiply(ONE_PLACE);
193        }
194    
195        return result;
196      }
197    
198      /**
199       * Return the string encoded in a BigDecimal.
200       * Repeatedly multiply the input value by 65536; the integer portion after such a multiplication
201       * represents a single character in base 65536. Convert that back into a char and create a
202       * string out of these until we have no data left.
203       */
204      String bigDecimalToString(BigDecimal bd) {
205        BigDecimal cur = bd.stripTrailingZeros();
206        StringBuilder sb = new StringBuilder();
207    
208        for (int numConverted = 0; numConverted < MAX_CHARS; numConverted++) {
209          cur = cur.multiply(ONE_PLACE);
210          int curCodePoint = cur.intValue();
211          if (0 == curCodePoint) {
212            break;
213          }
214    
215          cur = cur.subtract(new BigDecimal(curCodePoint));
216          sb.append(Character.toChars(curCodePoint));
217        }
218    
219        return sb.toString();
220      }
221    }