001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.lib.db;
020
021import java.sql.ResultSet;
022import java.sql.SQLException;
023import java.util.ArrayList;
024import java.util.List;
025
026import org.apache.commons.logging.Log;
027import org.apache.commons.logging.LogFactory;
028
029import org.apache.hadoop.classification.InterfaceAudience;
030import org.apache.hadoop.classification.InterfaceStability;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.mapreduce.InputSplit;
033import org.apache.hadoop.mapreduce.MRJobConfig;
034
035/**
036 * Implement DBSplitter over floating-point values.
037 */
038@InterfaceAudience.Public
039@InterfaceStability.Evolving
040public class FloatSplitter implements DBSplitter {
041
042  private static final Log LOG = LogFactory.getLog(FloatSplitter.class);
043
044  private static final double MIN_INCREMENT = 10000 * Double.MIN_VALUE;
045
046  public List<InputSplit> split(Configuration conf, ResultSet results, String colName)
047      throws SQLException {
048
049    LOG.warn("Generating splits for a floating-point index column. Due to the");
050    LOG.warn("imprecise representation of floating-point values in Java, this");
051    LOG.warn("may result in an incomplete import.");
052    LOG.warn("You are strongly encouraged to choose an integral split column.");
053
054    List<InputSplit> splits = new ArrayList<InputSplit>();
055
056    if (results.getString(1) == null && results.getString(2) == null) {
057      // Range is null to null. Return a null split accordingly.
058      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
059          colName + " IS NULL", colName + " IS NULL"));
060      return splits;
061    }
062
063    double minVal = results.getDouble(1);
064    double maxVal = results.getDouble(2);
065
066    // Use this as a hint. May need an extra task if the size doesn't
067    // divide cleanly.
068    int numSplits = conf.getInt(MRJobConfig.NUM_MAPS, 1);
069    double splitSize = (maxVal - minVal) / (double) numSplits;
070
071    if (splitSize < MIN_INCREMENT) {
072      splitSize = MIN_INCREMENT;
073    }
074
075    String lowClausePrefix = colName + " >= ";
076    String highClausePrefix = colName + " < ";
077
078    double curLower = minVal;
079    double curUpper = curLower + splitSize;
080
081    while (curUpper < maxVal) {
082      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
083          lowClausePrefix + Double.toString(curLower),
084          highClausePrefix + Double.toString(curUpper)));
085
086      curLower = curUpper;
087      curUpper += splitSize;
088    }
089
090    // Catch any overage and create the closed interval for the last split.
091    if (curLower <= maxVal || splits.size() == 1) {
092      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
093          lowClausePrefix + Double.toString(curLower),
094          colName + " <= " + Double.toString(maxVal)));
095    }
096
097    if (results.getString(1) == null || results.getString(2) == null) {
098      // At least one extrema is null; add a null split.
099      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
100          colName + " IS NULL", colName + " IS NULL"));
101    }
102
103    return splits;
104  }
105}