001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.lib.input;
020    
021    import java.io.IOException;
022    
023    import org.apache.hadoop.classification.InterfaceAudience;
024    import org.apache.hadoop.classification.InterfaceStability;
025    import org.apache.hadoop.conf.Configuration;
026    import org.apache.hadoop.io.Text;
027    import org.apache.hadoop.mapreduce.InputSplit;
028    import org.apache.hadoop.mapreduce.RecordReader;
029    import org.apache.hadoop.mapreduce.TaskAttemptContext;
030    
031    /**
032     * This class treats a line in the input as a key/value pair separated by a 
033     * separator character. The separator can be specified in config file 
034     * under the attribute name mapreduce.input.keyvaluelinerecordreader.key.value.separator. The default
035     * separator is the tab character ('\t').
036     */
037    @InterfaceAudience.Public
038    @InterfaceStability.Stable
039    public class KeyValueLineRecordReader extends RecordReader<Text, Text> {
040      public static final String KEY_VALUE_SEPERATOR = 
041        "mapreduce.input.keyvaluelinerecordreader.key.value.separator";
042      
043      private final LineRecordReader lineRecordReader;
044    
045      private byte separator = (byte) '\t';
046    
047      private Text innerValue;
048    
049      private Text key;
050      
051      private Text value;
052      
053      public Class getKeyClass() { return Text.class; }
054      
055      public KeyValueLineRecordReader(Configuration conf)
056        throws IOException {
057        
058        lineRecordReader = new LineRecordReader();
059        String sepStr = conf.get(KEY_VALUE_SEPERATOR, "\t");
060        this.separator = (byte) sepStr.charAt(0);
061      }
062    
063      public void initialize(InputSplit genericSplit,
064          TaskAttemptContext context) throws IOException {
065        lineRecordReader.initialize(genericSplit, context);
066      }
067      
068      public static int findSeparator(byte[] utf, int start, int length, 
069          byte sep) {
070        for (int i = start; i < (start + length); i++) {
071          if (utf[i] == sep) {
072            return i;
073          }
074        }
075        return -1;
076      }
077    
078      public static void setKeyValue(Text key, Text value, byte[] line,
079          int lineLen, int pos) {
080        if (pos == -1) {
081          key.set(line, 0, lineLen);
082          value.set("");
083        } else {
084          key.set(line, 0, pos);
085          value.set(line, pos + 1, lineLen - pos - 1);
086        }
087      }
088      /** Read key/value pair in a line. */
089      public synchronized boolean nextKeyValue()
090        throws IOException {
091        byte[] line = null;
092        int lineLen = -1;
093        if (lineRecordReader.nextKeyValue()) {
094          innerValue = lineRecordReader.getCurrentValue();
095          line = innerValue.getBytes();
096          lineLen = innerValue.getLength();
097        } else {
098          return false;
099        }
100        if (line == null)
101          return false;
102        if (key == null) {
103          key = new Text();
104        }
105        if (value == null) {
106          value = new Text();
107        }
108        int pos = findSeparator(line, 0, lineLen, this.separator);
109        setKeyValue(key, value, line, lineLen, pos);
110        return true;
111      }
112      
113      public Text getCurrentKey() {
114        return key;
115      }
116    
117      public Text getCurrentValue() {
118        return value;
119      }
120    
121      public float getProgress() throws IOException {
122        return lineRecordReader.getProgress();
123      }
124      
125      public synchronized void close() throws IOException { 
126        lineRecordReader.close();
127      }
128    }