001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapreduce.lib.db; 020 021 import java.sql.ResultSet; 022 import java.sql.SQLException; 023 import java.util.ArrayList; 024 import java.util.List; 025 026 import org.apache.commons.logging.Log; 027 import org.apache.commons.logging.LogFactory; 028 029 import org.apache.hadoop.classification.InterfaceAudience; 030 import org.apache.hadoop.classification.InterfaceStability; 031 import org.apache.hadoop.conf.Configuration; 032 import org.apache.hadoop.mapreduce.InputSplit; 033 import org.apache.hadoop.mapreduce.MRJobConfig; 034 035 /** 036 * Implement DBSplitter over floating-point values. 037 */ 038 @InterfaceAudience.Public 039 @InterfaceStability.Evolving 040 public class FloatSplitter implements DBSplitter { 041 042 private static final Log LOG = LogFactory.getLog(FloatSplitter.class); 043 044 private static final double MIN_INCREMENT = 10000 * Double.MIN_VALUE; 045 046 public List<InputSplit> split(Configuration conf, ResultSet results, String colName) 047 throws SQLException { 048 049 LOG.warn("Generating splits for a floating-point index column. Due to the"); 050 LOG.warn("imprecise representation of floating-point values in Java, this"); 051 LOG.warn("may result in an incomplete import."); 052 LOG.warn("You are strongly encouraged to choose an integral split column."); 053 054 List<InputSplit> splits = new ArrayList<InputSplit>(); 055 056 if (results.getString(1) == null && results.getString(2) == null) { 057 // Range is null to null. Return a null split accordingly. 058 splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit( 059 colName + " IS NULL", colName + " IS NULL")); 060 return splits; 061 } 062 063 double minVal = results.getDouble(1); 064 double maxVal = results.getDouble(2); 065 066 // Use this as a hint. May need an extra task if the size doesn't 067 // divide cleanly. 068 int numSplits = conf.getInt(MRJobConfig.NUM_MAPS, 1); 069 double splitSize = (maxVal - minVal) / (double) numSplits; 070 071 if (splitSize < MIN_INCREMENT) { 072 splitSize = MIN_INCREMENT; 073 } 074 075 String lowClausePrefix = colName + " >= "; 076 String highClausePrefix = colName + " < "; 077 078 double curLower = minVal; 079 double curUpper = curLower + splitSize; 080 081 while (curUpper < maxVal) { 082 splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit( 083 lowClausePrefix + Double.toString(curLower), 084 highClausePrefix + Double.toString(curUpper))); 085 086 curLower = curUpper; 087 curUpper += splitSize; 088 } 089 090 // Catch any overage and create the closed interval for the last split. 091 if (curLower <= maxVal || splits.size() == 1) { 092 splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit( 093 lowClausePrefix + Double.toString(curLower), 094 colName + " <= " + Double.toString(maxVal))); 095 } 096 097 if (results.getString(1) == null || results.getString(2) == null) { 098 // At least one extrema is null; add a null split. 099 splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit( 100 colName + " IS NULL", colName + " IS NULL")); 101 } 102 103 return splits; 104 } 105 }