001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapred; 020 021 import java.io.IOException; 022 023 import org.apache.hadoop.classification.InterfaceAudience; 024 import org.apache.hadoop.classification.InterfaceStability; 025 import org.apache.hadoop.conf.Configuration; 026 import org.apache.hadoop.io.LongWritable; 027 import org.apache.hadoop.io.Text; 028 029 /** 030 * This class treats a line in the input as a key/value pair separated by a 031 * separator character. The separator can be specified in config file 032 * under the attribute name mapreduce.input.keyvaluelinerecordreader.key.value.separator. The default 033 * separator is the tab character ('\t'). 034 */ 035 @InterfaceAudience.Public 036 @InterfaceStability.Stable 037 public class KeyValueLineRecordReader implements RecordReader<Text, Text> { 038 039 private final LineRecordReader lineRecordReader; 040 041 private byte separator = (byte) '\t'; 042 043 private LongWritable dummyKey; 044 045 private Text innerValue; 046 047 public Class getKeyClass() { return Text.class; } 048 049 public Text createKey() { 050 return new Text(); 051 } 052 053 public Text createValue() { 054 return new Text(); 055 } 056 057 public KeyValueLineRecordReader(Configuration job, FileSplit split) 058 throws IOException { 059 060 lineRecordReader = new LineRecordReader(job, split); 061 dummyKey = lineRecordReader.createKey(); 062 innerValue = lineRecordReader.createValue(); 063 String sepStr = job.get("mapreduce.input.keyvaluelinerecordreader.key.value.separator", "\t"); 064 this.separator = (byte) sepStr.charAt(0); 065 } 066 067 public static int findSeparator(byte[] utf, int start, int length, 068 byte sep) { 069 return org.apache.hadoop.mapreduce.lib.input. 070 KeyValueLineRecordReader.findSeparator(utf, start, length, sep); 071 } 072 073 /** Read key/value pair in a line. */ 074 public synchronized boolean next(Text key, Text value) 075 throws IOException { 076 byte[] line = null; 077 int lineLen = -1; 078 if (lineRecordReader.next(dummyKey, innerValue)) { 079 line = innerValue.getBytes(); 080 lineLen = innerValue.getLength(); 081 } else { 082 return false; 083 } 084 if (line == null) 085 return false; 086 int pos = findSeparator(line, 0, lineLen, this.separator); 087 org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader. 088 setKeyValue(key, value, line, lineLen, pos); 089 return true; 090 } 091 092 public float getProgress() throws IOException { 093 return lineRecordReader.getProgress(); 094 } 095 096 public synchronized long getPos() throws IOException { 097 return lineRecordReader.getPos(); 098 } 099 100 public synchronized void close() throws IOException { 101 lineRecordReader.close(); 102 } 103 }