001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.lib.db;
020    
021    import java.sql.ResultSet;
022    import java.sql.SQLException;
023    import java.util.ArrayList;
024    import java.util.List;
025    
026    import org.apache.hadoop.classification.InterfaceAudience;
027    import org.apache.hadoop.classification.InterfaceStability;
028    import org.apache.hadoop.conf.Configuration;
029    import org.apache.hadoop.mapreduce.InputSplit;
030    
031    /**
032     * Implement DBSplitter over boolean values.
033     */
034    @InterfaceAudience.Public
035    @InterfaceStability.Evolving
036    public class BooleanSplitter implements DBSplitter {
037      public List<InputSplit> split(Configuration conf, ResultSet results, String colName)
038          throws SQLException {
039    
040        List<InputSplit> splits = new ArrayList<InputSplit>();
041    
042        if (results.getString(1) == null && results.getString(2) == null) {
043          // Range is null to null. Return a null split accordingly.
044          splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
045              colName + " IS NULL", colName + " IS NULL"));
046          return splits;
047        }
048    
049        boolean minVal = results.getBoolean(1);
050        boolean maxVal = results.getBoolean(2);
051    
052        // Use one or two splits.
053        if (!minVal) {
054          splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
055              colName + " = FALSE", colName + " = FALSE"));
056        }
057    
058        if (maxVal) {
059          splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
060              colName + " = TRUE", colName + " = TRUE"));
061        }
062    
063        if (results.getString(1) == null || results.getString(2) == null) {
064          // Include a null value.
065          splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
066              colName + " IS NULL", colName + " IS NULL"));
067        }
068    
069        return splits;
070      }
071    }