001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapred;
020
021import java.io.IOException;
022
023import org.apache.hadoop.classification.InterfaceAudience;
024import org.apache.hadoop.classification.InterfaceStability;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.io.LongWritable;
027import org.apache.hadoop.io.Text;
028
029/**
030 * This class treats a line in the input as a key/value pair separated by a 
031 * separator character. The separator can be specified in config file 
032 * under the attribute name mapreduce.input.keyvaluelinerecordreader.key.value.separator. The default
033 * separator is the tab character ('\t').
034 */
035@InterfaceAudience.Public
036@InterfaceStability.Stable
037public class KeyValueLineRecordReader implements RecordReader<Text, Text> {
038  
039  private final LineRecordReader lineRecordReader;
040
041  private byte separator = (byte) '\t';
042
043  private LongWritable dummyKey;
044
045  private Text innerValue;
046
047  public Class getKeyClass() { return Text.class; }
048  
049  public Text createKey() {
050    return new Text();
051  }
052  
053  public Text createValue() {
054    return new Text();
055  }
056
057  public KeyValueLineRecordReader(Configuration job, FileSplit split)
058    throws IOException {
059    
060    lineRecordReader = new LineRecordReader(job, split);
061    dummyKey = lineRecordReader.createKey();
062    innerValue = lineRecordReader.createValue();
063    String sepStr = job.get("mapreduce.input.keyvaluelinerecordreader.key.value.separator", "\t");
064    this.separator = (byte) sepStr.charAt(0);
065  }
066
067  public static int findSeparator(byte[] utf, int start, int length, 
068      byte sep) {
069    return org.apache.hadoop.mapreduce.lib.input.
070      KeyValueLineRecordReader.findSeparator(utf, start, length, sep);
071  }
072
073  /** Read key/value pair in a line. */
074  public synchronized boolean next(Text key, Text value)
075    throws IOException {
076    byte[] line = null;
077    int lineLen = -1;
078    if (lineRecordReader.next(dummyKey, innerValue)) {
079      line = innerValue.getBytes();
080      lineLen = innerValue.getLength();
081    } else {
082      return false;
083    }
084    if (line == null)
085      return false;
086    int pos = findSeparator(line, 0, lineLen, this.separator);
087    org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader.
088      setKeyValue(key, value, line, lineLen, pos);
089    return true;
090  }
091  
092  public float getProgress() throws IOException {
093    return lineRecordReader.getProgress();
094  }
095  
096  public synchronized long getPos() throws IOException {
097    return lineRecordReader.getPos();
098  }
099
100  public synchronized void close() throws IOException { 
101    lineRecordReader.close();
102  }
103}