001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce.lib.db; 020 021import java.sql.ResultSet; 022import java.sql.SQLException; 023import java.sql.Time; 024import java.sql.Timestamp; 025import java.sql.Types; 026import java.util.ArrayList; 027import java.util.Date; 028import java.util.List; 029 030import org.apache.commons.logging.Log; 031import org.apache.commons.logging.LogFactory; 032 033import org.apache.hadoop.classification.InterfaceAudience; 034import org.apache.hadoop.classification.InterfaceStability; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.mapreduce.InputSplit; 037import org.apache.hadoop.mapreduce.MRJobConfig; 038 039/** 040 * Implement DBSplitter over date/time values. 041 * Make use of logic from IntegerSplitter, since date/time are just longs 042 * in Java. 043 */ 044@InterfaceAudience.Public 045@InterfaceStability.Evolving 046public class DateSplitter extends IntegerSplitter { 047 048 private static final Log LOG = LogFactory.getLog(DateSplitter.class); 049 050 public List<InputSplit> split(Configuration conf, ResultSet results, String colName) 051 throws SQLException { 052 053 long minVal; 054 long maxVal; 055 056 int sqlDataType = results.getMetaData().getColumnType(1); 057 minVal = resultSetColToLong(results, 1, sqlDataType); 058 maxVal = resultSetColToLong(results, 2, sqlDataType); 059 060 String lowClausePrefix = colName + " >= "; 061 String highClausePrefix = colName + " < "; 062 063 int numSplits = conf.getInt(MRJobConfig.NUM_MAPS, 1); 064 if (numSplits < 1) { 065 numSplits = 1; 066 } 067 068 if (minVal == Long.MIN_VALUE && maxVal == Long.MIN_VALUE) { 069 // The range of acceptable dates is NULL to NULL. Just create a single split. 070 List<InputSplit> splits = new ArrayList<InputSplit>(); 071 splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit( 072 colName + " IS NULL", colName + " IS NULL")); 073 return splits; 074 } 075 076 // Gather the split point integers 077 List<Long> splitPoints = split(numSplits, minVal, maxVal); 078 List<InputSplit> splits = new ArrayList<InputSplit>(); 079 080 // Turn the split points into a set of intervals. 081 long start = splitPoints.get(0); 082 Date startDate = longToDate(start, sqlDataType); 083 if (sqlDataType == Types.TIMESTAMP) { 084 // The lower bound's nanos value needs to match the actual lower-bound nanos. 085 try { 086 ((java.sql.Timestamp) startDate).setNanos(results.getTimestamp(1).getNanos()); 087 } catch (NullPointerException npe) { 088 // If the lower bound was NULL, we'll get an NPE; just ignore it and don't set nanos. 089 } 090 } 091 092 for (int i = 1; i < splitPoints.size(); i++) { 093 long end = splitPoints.get(i); 094 Date endDate = longToDate(end, sqlDataType); 095 096 if (i == splitPoints.size() - 1) { 097 if (sqlDataType == Types.TIMESTAMP) { 098 // The upper bound's nanos value needs to match the actual upper-bound nanos. 099 try { 100 ((java.sql.Timestamp) endDate).setNanos(results.getTimestamp(2).getNanos()); 101 } catch (NullPointerException npe) { 102 // If the upper bound was NULL, we'll get an NPE; just ignore it and don't set nanos. 103 } 104 } 105 // This is the last one; use a closed interval. 106 splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit( 107 lowClausePrefix + dateToString(startDate), 108 colName + " <= " + dateToString(endDate))); 109 } else { 110 // Normal open-interval case. 111 splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit( 112 lowClausePrefix + dateToString(startDate), 113 highClausePrefix + dateToString(endDate))); 114 } 115 116 start = end; 117 startDate = endDate; 118 } 119 120 if (minVal == Long.MIN_VALUE || maxVal == Long.MIN_VALUE) { 121 // Add an extra split to handle the null case that we saw. 122 splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit( 123 colName + " IS NULL", colName + " IS NULL")); 124 } 125 126 return splits; 127 } 128 129 /** Retrieve the value from the column in a type-appropriate manner and return 130 its timestamp since the epoch. If the column is null, then return Long.MIN_VALUE. 131 This will cause a special split to be generated for the NULL case, but may also 132 cause poorly-balanced splits if most of the actual dates are positive time 133 since the epoch, etc. 134 */ 135 private long resultSetColToLong(ResultSet rs, int colNum, int sqlDataType) throws SQLException { 136 try { 137 switch (sqlDataType) { 138 case Types.DATE: 139 return rs.getDate(colNum).getTime(); 140 case Types.TIME: 141 return rs.getTime(colNum).getTime(); 142 case Types.TIMESTAMP: 143 return rs.getTimestamp(colNum).getTime(); 144 default: 145 throw new SQLException("Not a date-type field"); 146 } 147 } catch (NullPointerException npe) { 148 // null column. return minimum long value. 149 LOG.warn("Encountered a NULL date in the split column. Splits may be poorly balanced."); 150 return Long.MIN_VALUE; 151 } 152 } 153 154 /** Parse the long-valued timestamp into the appropriate SQL date type. */ 155 private Date longToDate(long val, int sqlDataType) { 156 switch (sqlDataType) { 157 case Types.DATE: 158 return new java.sql.Date(val); 159 case Types.TIME: 160 return new java.sql.Time(val); 161 case Types.TIMESTAMP: 162 return new java.sql.Timestamp(val); 163 default: // Shouldn't ever hit this case. 164 return null; 165 } 166 } 167 168 /** 169 * Given a Date 'd', format it as a string for use in a SQL date 170 * comparison operation. 171 * @param d the date to format. 172 * @return the string representing this date in SQL with any appropriate 173 * quotation characters, etc. 174 */ 175 protected String dateToString(Date d) { 176 return "'" + d.toString() + "'"; 177 } 178}