001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.lib.input;
020
021import java.io.IOException;
022
023import org.apache.hadoop.classification.InterfaceAudience;
024import org.apache.hadoop.classification.InterfaceStability;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.io.Text;
027import org.apache.hadoop.mapreduce.InputSplit;
028import org.apache.hadoop.mapreduce.RecordReader;
029import org.apache.hadoop.mapreduce.TaskAttemptContext;
030
031/**
032 * This class treats a line in the input as a key/value pair separated by a 
033 * separator character. The separator can be specified in config file 
034 * under the attribute name mapreduce.input.keyvaluelinerecordreader.key.value.separator. The default
035 * separator is the tab character ('\t').
036 */
037@InterfaceAudience.Public
038@InterfaceStability.Stable
039public class KeyValueLineRecordReader extends RecordReader<Text, Text> {
040  public static final String KEY_VALUE_SEPERATOR = 
041    "mapreduce.input.keyvaluelinerecordreader.key.value.separator";
042  
043  private final LineRecordReader lineRecordReader;
044
045  private byte separator = (byte) '\t';
046
047  private Text innerValue;
048
049  private Text key;
050  
051  private Text value;
052  
053  public Class getKeyClass() { return Text.class; }
054  
055  public KeyValueLineRecordReader(Configuration conf)
056    throws IOException {
057    
058    lineRecordReader = new LineRecordReader();
059    String sepStr = conf.get(KEY_VALUE_SEPERATOR, "\t");
060    this.separator = (byte) sepStr.charAt(0);
061  }
062
063  public void initialize(InputSplit genericSplit,
064      TaskAttemptContext context) throws IOException {
065    lineRecordReader.initialize(genericSplit, context);
066  }
067  
068  public static int findSeparator(byte[] utf, int start, int length, 
069      byte sep) {
070    for (int i = start; i < (start + length); i++) {
071      if (utf[i] == sep) {
072        return i;
073      }
074    }
075    return -1;
076  }
077
078  public static void setKeyValue(Text key, Text value, byte[] line,
079      int lineLen, int pos) {
080    if (pos == -1) {
081      key.set(line, 0, lineLen);
082      value.set("");
083    } else {
084      key.set(line, 0, pos);
085      value.set(line, pos + 1, lineLen - pos - 1);
086    }
087  }
088  /** Read key/value pair in a line. */
089  public synchronized boolean nextKeyValue()
090    throws IOException {
091    byte[] line = null;
092    int lineLen = -1;
093    if (lineRecordReader.nextKeyValue()) {
094      innerValue = lineRecordReader.getCurrentValue();
095      line = innerValue.getBytes();
096      lineLen = innerValue.getLength();
097    } else {
098      return false;
099    }
100    if (line == null)
101      return false;
102    if (key == null) {
103      key = new Text();
104    }
105    if (value == null) {
106      value = new Text();
107    }
108    int pos = findSeparator(line, 0, lineLen, this.separator);
109    setKeyValue(key, value, line, lineLen, pos);
110    return true;
111  }
112  
113  public Text getCurrentKey() {
114    return key;
115  }
116
117  public Text getCurrentValue() {
118    return value;
119  }
120
121  public float getProgress() throws IOException {
122    return lineRecordReader.getProgress();
123  }
124  
125  public synchronized void close() throws IOException { 
126    lineRecordReader.close();
127  }
128}