001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.lib.db;
020
021import java.sql.ResultSet;
022import java.sql.SQLException;
023import java.math.BigDecimal;
024import java.util.ArrayList;
025import java.util.Iterator;
026import java.util.List;
027
028import org.apache.commons.logging.Log;
029import org.apache.commons.logging.LogFactory;
030
031import org.apache.hadoop.classification.InterfaceAudience;
032import org.apache.hadoop.classification.InterfaceStability;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.mapreduce.InputSplit;
035import org.apache.hadoop.mapreduce.MRJobConfig;
036
037/**
038 * Implement DBSplitter over text strings.
039 */
040@InterfaceAudience.Public
041@InterfaceStability.Evolving
042public class TextSplitter extends BigDecimalSplitter {
043
044  private static final Log LOG = LogFactory.getLog(TextSplitter.class);
045
046  /**
047   * This method needs to determine the splits between two user-provided strings.
048   * In the case where the user's strings are 'A' and 'Z', this is not hard; we 
049   * could create two splits from ['A', 'M') and ['M', 'Z'], 26 splits for strings
050   * beginning with each letter, etc.
051   *
052   * If a user has provided us with the strings "Ham" and "Haze", however, we need
053   * to create splits that differ in the third letter.
054   *
055   * The algorithm used is as follows:
056   * Since there are 2**16 unicode characters, we interpret characters as digits in
057   * base 65536. Given a string 's' containing characters s_0, s_1 .. s_n, we interpret
058   * the string as the number: 0.s_0 s_1 s_2.. s_n in base 65536. Having mapped the
059   * low and high strings into floating-point values, we then use the BigDecimalSplitter
060   * to establish the even split points, then map the resulting floating point values
061   * back into strings.
062   */
063  public List<InputSplit> split(Configuration conf, ResultSet results, String colName)
064      throws SQLException {
065
066    LOG.warn("Generating splits for a textual index column.");
067    LOG.warn("If your database sorts in a case-insensitive order, "
068        + "this may result in a partial import or duplicate records.");
069    LOG.warn("You are strongly encouraged to choose an integral split column.");
070
071    String minString = results.getString(1);
072    String maxString = results.getString(2);
073
074    boolean minIsNull = false;
075
076    // If the min value is null, switch it to an empty string instead for purposes
077    // of interpolation. Then add [null, null] as a special case split.
078    if (null == minString) {
079      minString = "";
080      minIsNull = true;
081    }
082
083    if (null == maxString) {
084      // If the max string is null, then the min string has to be null too.
085      // Just return a special split for this case.
086      List<InputSplit> splits = new ArrayList<InputSplit>();
087      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
088          colName + " IS NULL", colName + " IS NULL"));
089      return splits;
090    }
091
092    // Use this as a hint. May need an extra task if the size doesn't
093    // divide cleanly.
094    int numSplits = conf.getInt(MRJobConfig.NUM_MAPS, 1);
095
096    String lowClausePrefix = colName + " >= '";
097    String highClausePrefix = colName + " < '";
098
099    // If there is a common prefix between minString and maxString, establish it
100    // and pull it out of minString and maxString.
101    int maxPrefixLen = Math.min(minString.length(), maxString.length());
102    int sharedLen;
103    for (sharedLen = 0; sharedLen < maxPrefixLen; sharedLen++) {
104      char c1 = minString.charAt(sharedLen);
105      char c2 = maxString.charAt(sharedLen);
106      if (c1 != c2) {
107        break;
108      }
109    }
110
111    // The common prefix has length 'sharedLen'. Extract it from both.
112    String commonPrefix = minString.substring(0, sharedLen);
113    minString = minString.substring(sharedLen);
114    maxString = maxString.substring(sharedLen);
115
116    List<String> splitStrings = split(numSplits, minString, maxString, commonPrefix);
117    List<InputSplit> splits = new ArrayList<InputSplit>();
118
119    // Convert the list of split point strings into an actual set of InputSplits.
120    String start = splitStrings.get(0);
121    for (int i = 1; i < splitStrings.size(); i++) {
122      String end = splitStrings.get(i);
123
124      if (i == splitStrings.size() - 1) {
125        // This is the last one; use a closed interval.
126        splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
127            lowClausePrefix + start + "'", colName + " <= '" + end + "'"));
128      } else {
129        // Normal open-interval case.
130        splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
131            lowClausePrefix + start + "'", highClausePrefix + end + "'"));
132      }
133    }
134
135    if (minIsNull) {
136      // Add the special null split at the end.
137      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
138          colName + " IS NULL", colName + " IS NULL"));
139    }
140
141    return splits;
142  }
143
144  List<String> split(int numSplits, String minString, String maxString, String commonPrefix)
145      throws SQLException {
146
147    BigDecimal minVal = stringToBigDecimal(minString);
148    BigDecimal maxVal = stringToBigDecimal(maxString);
149
150    List<BigDecimal> splitPoints = split(new BigDecimal(numSplits), minVal, maxVal);
151    List<String> splitStrings = new ArrayList<String>();
152
153    // Convert the BigDecimal splitPoints into their string representations.
154    for (BigDecimal bd : splitPoints) {
155      splitStrings.add(commonPrefix + bigDecimalToString(bd));
156    }
157
158    // Make sure that our user-specified boundaries are the first and last entries
159    // in the array.
160    if (splitStrings.size() == 0 || !splitStrings.get(0).equals(commonPrefix + minString)) {
161      splitStrings.add(0, commonPrefix + minString);
162    }
163    if (splitStrings.size() == 1
164        || !splitStrings.get(splitStrings.size() - 1).equals(commonPrefix + maxString)) {
165      splitStrings.add(commonPrefix + maxString);
166    }
167
168    return splitStrings;
169  }
170
171  private final static BigDecimal ONE_PLACE = new BigDecimal(65536);
172
173  // Maximum number of characters to convert. This is to prevent rounding errors
174  // or repeating fractions near the very bottom from getting out of control. Note
175  // that this still gives us a huge number of possible splits.
176  private final static int MAX_CHARS = 8;
177
178  /**
179   * Return a BigDecimal representation of string 'str' suitable for use
180   * in a numerically-sorting order.
181   */
182  BigDecimal stringToBigDecimal(String str) {
183    BigDecimal result = BigDecimal.ZERO;
184    BigDecimal curPlace = ONE_PLACE; // start with 1/65536 to compute the first digit.
185
186    int len = Math.min(str.length(), MAX_CHARS);
187
188    for (int i = 0; i < len; i++) {
189      int codePoint = str.codePointAt(i);
190      result = result.add(tryDivide(new BigDecimal(codePoint), curPlace));
191      // advance to the next less significant place. e.g., 1/(65536^2) for the second char.
192      curPlace = curPlace.multiply(ONE_PLACE);
193    }
194
195    return result;
196  }
197
198  /**
199   * Return the string encoded in a BigDecimal.
200   * Repeatedly multiply the input value by 65536; the integer portion after such a multiplication
201   * represents a single character in base 65536. Convert that back into a char and create a
202   * string out of these until we have no data left.
203   */
204  String bigDecimalToString(BigDecimal bd) {
205    BigDecimal cur = bd.stripTrailingZeros();
206    StringBuilder sb = new StringBuilder();
207
208    for (int numConverted = 0; numConverted < MAX_CHARS; numConverted++) {
209      cur = cur.multiply(ONE_PLACE);
210      int curCodePoint = cur.intValue();
211      if (0 == curCodePoint) {
212        break;
213      }
214
215      cur = cur.subtract(new BigDecimal(curCodePoint));
216      sb.append(Character.toChars(curCodePoint));
217    }
218
219    return sb.toString();
220  }
221}