001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce.lib.input; 020 021import java.io.IOException; 022 023import org.apache.hadoop.classification.InterfaceAudience; 024import org.apache.hadoop.classification.InterfaceStability; 025import org.apache.hadoop.io.Text; 026import org.apache.hadoop.io.Writable; 027import org.apache.hadoop.io.WritableComparable; 028import org.apache.hadoop.mapreduce.InputSplit; 029import org.apache.hadoop.mapreduce.RecordReader; 030import org.apache.hadoop.mapreduce.TaskAttemptContext; 031 032/** 033 * This class converts the input keys and values to their String forms by 034 * calling toString() method. This class to SequenceFileAsTextInputFormat 035 * class is as LineRecordReader class to TextInputFormat class. 036 */ 037@InterfaceAudience.Public 038@InterfaceStability.Stable 039public class SequenceFileAsTextRecordReader 040 extends RecordReader<Text, Text> { 041 042 private final SequenceFileRecordReader<WritableComparable<?>, Writable> 043 sequenceFileRecordReader; 044 045 private Text key; 046 private Text value; 047 048 public SequenceFileAsTextRecordReader() 049 throws IOException { 050 sequenceFileRecordReader = 051 new SequenceFileRecordReader<WritableComparable<?>, Writable>(); 052 } 053 054 public void initialize(InputSplit split, TaskAttemptContext context) 055 throws IOException, InterruptedException { 056 sequenceFileRecordReader.initialize(split, context); 057 } 058 059 @Override 060 public Text getCurrentKey() 061 throws IOException, InterruptedException { 062 return key; 063 } 064 065 @Override 066 public Text getCurrentValue() 067 throws IOException, InterruptedException { 068 return value; 069 } 070 071 /** Read key/value pair in a line. */ 072 public synchronized boolean nextKeyValue() 073 throws IOException, InterruptedException { 074 if (!sequenceFileRecordReader.nextKeyValue()) { 075 return false; 076 } 077 if (key == null) { 078 key = new Text(); 079 } 080 if (value == null) { 081 value = new Text(); 082 } 083 key.set(sequenceFileRecordReader.getCurrentKey().toString()); 084 value.set(sequenceFileRecordReader.getCurrentValue().toString()); 085 return true; 086 } 087 088 public float getProgress() throws IOException, InterruptedException { 089 return sequenceFileRecordReader.getProgress(); 090 } 091 092 public synchronized void close() throws IOException { 093 sequenceFileRecordReader.close(); 094 } 095}