001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapred;
020
021 import java.io.IOException;
022
023 import org.apache.hadoop.classification.InterfaceAudience;
024 import org.apache.hadoop.classification.InterfaceStability;
025 import org.apache.hadoop.conf.Configuration;
026 import org.apache.hadoop.io.LongWritable;
027 import org.apache.hadoop.io.Text;
028
029 /**
030 * This class treats a line in the input as a key/value pair separated by a
031 * separator character. The separator can be specified in config file
032 * under the attribute name mapreduce.input.keyvaluelinerecordreader.key.value.separator. The default
033 * separator is the tab character ('\t').
034 */
035 @InterfaceAudience.Public
036 @InterfaceStability.Stable
037 public class KeyValueLineRecordReader implements RecordReader<Text, Text> {
038
039 private final LineRecordReader lineRecordReader;
040
041 private byte separator = (byte) '\t';
042
043 private LongWritable dummyKey;
044
045 private Text innerValue;
046
047 public Class getKeyClass() { return Text.class; }
048
049 public Text createKey() {
050 return new Text();
051 }
052
053 public Text createValue() {
054 return new Text();
055 }
056
057 public KeyValueLineRecordReader(Configuration job, FileSplit split)
058 throws IOException {
059
060 lineRecordReader = new LineRecordReader(job, split);
061 dummyKey = lineRecordReader.createKey();
062 innerValue = lineRecordReader.createValue();
063 String sepStr = job.get("mapreduce.input.keyvaluelinerecordreader.key.value.separator", "\t");
064 this.separator = (byte) sepStr.charAt(0);
065 }
066
067 public static int findSeparator(byte[] utf, int start, int length,
068 byte sep) {
069 return org.apache.hadoop.mapreduce.lib.input.
070 KeyValueLineRecordReader.findSeparator(utf, start, length, sep);
071 }
072
073 /** Read key/value pair in a line. */
074 public synchronized boolean next(Text key, Text value)
075 throws IOException {
076 byte[] line = null;
077 int lineLen = -1;
078 if (lineRecordReader.next(dummyKey, innerValue)) {
079 line = innerValue.getBytes();
080 lineLen = innerValue.getLength();
081 } else {
082 return false;
083 }
084 if (line == null)
085 return false;
086 int pos = findSeparator(line, 0, lineLen, this.separator);
087 org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader.
088 setKeyValue(key, value, line, lineLen, pos);
089 return true;
090 }
091
092 public float getProgress() throws IOException {
093 return lineRecordReader.getProgress();
094 }
095
096 public synchronized long getPos() throws IOException {
097 return lineRecordReader.getPos();
098 }
099
100 public synchronized void close() throws IOException {
101 lineRecordReader.close();
102 }
103 }