001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapreduce.lib.db;
020
021 import java.sql.ResultSet;
022 import java.sql.SQLException;
023 import java.sql.Time;
024 import java.sql.Timestamp;
025 import java.sql.Types;
026 import java.util.ArrayList;
027 import java.util.Date;
028 import java.util.List;
029
030 import org.apache.commons.logging.Log;
031 import org.apache.commons.logging.LogFactory;
032
033 import org.apache.hadoop.classification.InterfaceAudience;
034 import org.apache.hadoop.classification.InterfaceStability;
035 import org.apache.hadoop.conf.Configuration;
036 import org.apache.hadoop.mapreduce.InputSplit;
037 import org.apache.hadoop.mapreduce.MRJobConfig;
038
039 /**
040 * Implement DBSplitter over date/time values.
041 * Make use of logic from IntegerSplitter, since date/time are just longs
042 * in Java.
043 */
044 @InterfaceAudience.Public
045 @InterfaceStability.Evolving
046 public class DateSplitter extends IntegerSplitter {
047
048 private static final Log LOG = LogFactory.getLog(DateSplitter.class);
049
050 public List<InputSplit> split(Configuration conf, ResultSet results, String colName)
051 throws SQLException {
052
053 long minVal;
054 long maxVal;
055
056 int sqlDataType = results.getMetaData().getColumnType(1);
057 minVal = resultSetColToLong(results, 1, sqlDataType);
058 maxVal = resultSetColToLong(results, 2, sqlDataType);
059
060 String lowClausePrefix = colName + " >= ";
061 String highClausePrefix = colName + " < ";
062
063 int numSplits = conf.getInt(MRJobConfig.NUM_MAPS, 1);
064 if (numSplits < 1) {
065 numSplits = 1;
066 }
067
068 if (minVal == Long.MIN_VALUE && maxVal == Long.MIN_VALUE) {
069 // The range of acceptable dates is NULL to NULL. Just create a single split.
070 List<InputSplit> splits = new ArrayList<InputSplit>();
071 splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
072 colName + " IS NULL", colName + " IS NULL"));
073 return splits;
074 }
075
076 // Gather the split point integers
077 List<Long> splitPoints = split(numSplits, minVal, maxVal);
078 List<InputSplit> splits = new ArrayList<InputSplit>();
079
080 // Turn the split points into a set of intervals.
081 long start = splitPoints.get(0);
082 Date startDate = longToDate(start, sqlDataType);
083 if (sqlDataType == Types.TIMESTAMP) {
084 // The lower bound's nanos value needs to match the actual lower-bound nanos.
085 try {
086 ((java.sql.Timestamp) startDate).setNanos(results.getTimestamp(1).getNanos());
087 } catch (NullPointerException npe) {
088 // If the lower bound was NULL, we'll get an NPE; just ignore it and don't set nanos.
089 }
090 }
091
092 for (int i = 1; i < splitPoints.size(); i++) {
093 long end = splitPoints.get(i);
094 Date endDate = longToDate(end, sqlDataType);
095
096 if (i == splitPoints.size() - 1) {
097 if (sqlDataType == Types.TIMESTAMP) {
098 // The upper bound's nanos value needs to match the actual upper-bound nanos.
099 try {
100 ((java.sql.Timestamp) endDate).setNanos(results.getTimestamp(2).getNanos());
101 } catch (NullPointerException npe) {
102 // If the upper bound was NULL, we'll get an NPE; just ignore it and don't set nanos.
103 }
104 }
105 // This is the last one; use a closed interval.
106 splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
107 lowClausePrefix + dateToString(startDate),
108 colName + " <= " + dateToString(endDate)));
109 } else {
110 // Normal open-interval case.
111 splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
112 lowClausePrefix + dateToString(startDate),
113 highClausePrefix + dateToString(endDate)));
114 }
115
116 start = end;
117 startDate = endDate;
118 }
119
120 if (minVal == Long.MIN_VALUE || maxVal == Long.MIN_VALUE) {
121 // Add an extra split to handle the null case that we saw.
122 splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
123 colName + " IS NULL", colName + " IS NULL"));
124 }
125
126 return splits;
127 }
128
129 /** Retrieve the value from the column in a type-appropriate manner and return
130 its timestamp since the epoch. If the column is null, then return Long.MIN_VALUE.
131 This will cause a special split to be generated for the NULL case, but may also
132 cause poorly-balanced splits if most of the actual dates are positive time
133 since the epoch, etc.
134 */
135 private long resultSetColToLong(ResultSet rs, int colNum, int sqlDataType) throws SQLException {
136 try {
137 switch (sqlDataType) {
138 case Types.DATE:
139 return rs.getDate(colNum).getTime();
140 case Types.TIME:
141 return rs.getTime(colNum).getTime();
142 case Types.TIMESTAMP:
143 return rs.getTimestamp(colNum).getTime();
144 default:
145 throw new SQLException("Not a date-type field");
146 }
147 } catch (NullPointerException npe) {
148 // null column. return minimum long value.
149 LOG.warn("Encountered a NULL date in the split column. Splits may be poorly balanced.");
150 return Long.MIN_VALUE;
151 }
152 }
153
154 /** Parse the long-valued timestamp into the appropriate SQL date type. */
155 private Date longToDate(long val, int sqlDataType) {
156 switch (sqlDataType) {
157 case Types.DATE:
158 return new java.sql.Date(val);
159 case Types.TIME:
160 return new java.sql.Time(val);
161 case Types.TIMESTAMP:
162 return new java.sql.Timestamp(val);
163 default: // Shouldn't ever hit this case.
164 return null;
165 }
166 }
167
168 /**
169 * Given a Date 'd', format it as a string for use in a SQL date
170 * comparison operation.
171 * @param d the date to format.
172 * @return the string representing this date in SQL with any appropriate
173 * quotation characters, etc.
174 */
175 protected String dateToString(Date d) {
176 return "'" + d.toString() + "'";
177 }
178 }