001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.lib.db;
020    
021    import java.sql.ResultSet;
022    import java.sql.SQLException;
023    import java.util.ArrayList;
024    import java.util.List;
025    
026    import org.apache.commons.logging.Log;
027    import org.apache.commons.logging.LogFactory;
028    
029    import org.apache.hadoop.classification.InterfaceAudience;
030    import org.apache.hadoop.classification.InterfaceStability;
031    import org.apache.hadoop.conf.Configuration;
032    import org.apache.hadoop.mapreduce.InputSplit;
033    import org.apache.hadoop.mapreduce.MRJobConfig;
034    
035    /**
036     * Implement DBSplitter over floating-point values.
037     */
038    @InterfaceAudience.Public
039    @InterfaceStability.Evolving
040    public class FloatSplitter implements DBSplitter {
041    
042      private static final Log LOG = LogFactory.getLog(FloatSplitter.class);
043    
044      private static final double MIN_INCREMENT = 10000 * Double.MIN_VALUE;
045    
046      public List<InputSplit> split(Configuration conf, ResultSet results, String colName)
047          throws SQLException {
048    
049        LOG.warn("Generating splits for a floating-point index column. Due to the");
050        LOG.warn("imprecise representation of floating-point values in Java, this");
051        LOG.warn("may result in an incomplete import.");
052        LOG.warn("You are strongly encouraged to choose an integral split column.");
053    
054        List<InputSplit> splits = new ArrayList<InputSplit>();
055    
056        if (results.getString(1) == null && results.getString(2) == null) {
057          // Range is null to null. Return a null split accordingly.
058          splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
059              colName + " IS NULL", colName + " IS NULL"));
060          return splits;
061        }
062    
063        double minVal = results.getDouble(1);
064        double maxVal = results.getDouble(2);
065    
066        // Use this as a hint. May need an extra task if the size doesn't
067        // divide cleanly.
068        int numSplits = conf.getInt(MRJobConfig.NUM_MAPS, 1);
069        double splitSize = (maxVal - minVal) / (double) numSplits;
070    
071        if (splitSize < MIN_INCREMENT) {
072          splitSize = MIN_INCREMENT;
073        }
074    
075        String lowClausePrefix = colName + " >= ";
076        String highClausePrefix = colName + " < ";
077    
078        double curLower = minVal;
079        double curUpper = curLower + splitSize;
080    
081        while (curUpper < maxVal) {
082          splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
083              lowClausePrefix + Double.toString(curLower),
084              highClausePrefix + Double.toString(curUpper)));
085    
086          curLower = curUpper;
087          curUpper += splitSize;
088        }
089    
090        // Catch any overage and create the closed interval for the last split.
091        if (curLower <= maxVal || splits.size() == 1) {
092          splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
093              lowClausePrefix + Double.toString(curLower),
094              colName + " <= " + Double.toString(maxVal)));
095        }
096    
097        if (results.getString(1) == null || results.getString(2) == null) {
098          // At least one extrema is null; add a null split.
099          splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
100              colName + " IS NULL", colName + " IS NULL"));
101        }
102    
103        return splits;
104      }
105    }