001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.lib.input;
020    
021    import java.io.IOException;
022    
023    import org.apache.hadoop.classification.InterfaceAudience;
024    import org.apache.hadoop.classification.InterfaceStability;
025    import org.apache.hadoop.io.Text;
026    import org.apache.hadoop.io.Writable;
027    import org.apache.hadoop.io.WritableComparable;
028    import org.apache.hadoop.mapreduce.InputSplit;
029    import org.apache.hadoop.mapreduce.RecordReader;
030    import org.apache.hadoop.mapreduce.TaskAttemptContext;
031    
032    /**
033     * This class converts the input keys and values to their String forms by
034     * calling toString() method. This class to SequenceFileAsTextInputFormat
035     * class is as LineRecordReader class to TextInputFormat class.
036     */
037    @InterfaceAudience.Public
038    @InterfaceStability.Stable
039    public class SequenceFileAsTextRecordReader
040      extends RecordReader<Text, Text> {
041      
042      private final SequenceFileRecordReader<WritableComparable<?>, Writable>
043        sequenceFileRecordReader;
044    
045      private Text key;
046      private Text value;
047    
048      public SequenceFileAsTextRecordReader()
049        throws IOException {
050        sequenceFileRecordReader =
051          new SequenceFileRecordReader<WritableComparable<?>, Writable>();
052      }
053    
054      public void initialize(InputSplit split, TaskAttemptContext context)
055          throws IOException, InterruptedException {
056        sequenceFileRecordReader.initialize(split, context);
057      }
058    
059      @Override
060      public Text getCurrentKey() 
061          throws IOException, InterruptedException {
062        return key;
063      }
064      
065      @Override
066      public Text getCurrentValue() 
067          throws IOException, InterruptedException {
068        return value;
069      }
070      
071      /** Read key/value pair in a line. */
072      public synchronized boolean nextKeyValue() 
073          throws IOException, InterruptedException {
074        if (!sequenceFileRecordReader.nextKeyValue()) {
075          return false;
076        }
077        if (key == null) {
078          key = new Text(); 
079        }
080        if (value == null) {
081          value = new Text(); 
082        }
083        key.set(sequenceFileRecordReader.getCurrentKey().toString());
084        value.set(sequenceFileRecordReader.getCurrentValue().toString());
085        return true;
086      }
087      
088      public float getProgress() throws IOException,  InterruptedException {
089        return sequenceFileRecordReader.getProgress();
090      }
091      
092      public synchronized void close() throws IOException {
093        sequenceFileRecordReader.close();
094      }
095    }