001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.lib.input;
020
021import java.io.*;
022import java.lang.reflect.*;
023
024import org.apache.hadoop.mapreduce.*;
025import org.apache.hadoop.classification.InterfaceAudience;
026import org.apache.hadoop.classification.InterfaceStability;
027import org.apache.hadoop.conf.Configuration;
028
029/**
030 * A generic RecordReader that can hand out different recordReaders
031 * for each chunk in a {@link CombineFileSplit}.
032 * A CombineFileSplit can combine data chunks from multiple files. 
033 * This class allows using different RecordReaders for processing
034 * these data chunks from different files.
035 * @see CombineFileSplit
036 */
037@InterfaceAudience.Public
038@InterfaceStability.Stable
039public class CombineFileRecordReader<K, V> extends RecordReader<K, V> {
040
041  static final Class [] constructorSignature = new Class [] 
042                                         {CombineFileSplit.class,
043                                          TaskAttemptContext.class,
044                                          Integer.class};
045
046  protected CombineFileSplit split;
047  protected Constructor<? extends RecordReader<K,V>> rrConstructor;
048  protected TaskAttemptContext context;
049  
050  protected int idx;
051  protected long progress;
052  protected RecordReader<K, V> curReader;
053
054  public void initialize(InputSplit split,
055      TaskAttemptContext context) throws IOException, InterruptedException {
056    this.split = (CombineFileSplit)split;
057    this.context = context;
058    if (null != this.curReader) {
059      this.curReader.initialize(split, context);
060    }
061  }
062  
063  public boolean nextKeyValue() throws IOException, InterruptedException {
064
065    while ((curReader == null) || !curReader.nextKeyValue()) {
066      if (!initNextRecordReader()) {
067        return false;
068      }
069    }
070    return true;
071  }
072
073  public K getCurrentKey() throws IOException, InterruptedException {
074    return curReader.getCurrentKey();
075  }
076  
077  public V getCurrentValue() throws IOException, InterruptedException {
078    return curReader.getCurrentValue();
079  }
080  
081  public void close() throws IOException {
082    if (curReader != null) {
083      curReader.close();
084      curReader = null;
085    }
086  }
087  
088  /**
089   * return progress based on the amount of data processed so far.
090   */
091  public float getProgress() throws IOException, InterruptedException {
092    long subprogress = 0;    // bytes processed in current split
093    if (null != curReader) {
094      // idx is always one past the current subsplit's true index.
095      subprogress = (long)(curReader.getProgress() * split.getLength(idx - 1));
096    }
097    return Math.min(1.0f,  (progress + subprogress)/(float)(split.getLength()));
098  }
099  
100  /**
101   * A generic RecordReader that can hand out different recordReaders
102   * for each chunk in the CombineFileSplit.
103   */
104  public CombineFileRecordReader(CombineFileSplit split,
105                                 TaskAttemptContext context,
106                                 Class<? extends RecordReader<K,V>> rrClass)
107    throws IOException {
108    this.split = split;
109    this.context = context;
110    this.idx = 0;
111    this.curReader = null;
112    this.progress = 0;
113
114    try {
115      rrConstructor = rrClass.getDeclaredConstructor(constructorSignature);
116      rrConstructor.setAccessible(true);
117    } catch (Exception e) {
118      throw new RuntimeException(rrClass.getName() + 
119                                 " does not have valid constructor", e);
120    }
121    initNextRecordReader();
122  }
123  
124  /**
125   * Get the record reader for the next chunk in this CombineFileSplit.
126   */
127  protected boolean initNextRecordReader() throws IOException {
128
129    if (curReader != null) {
130      curReader.close();
131      curReader = null;
132      if (idx > 0) {
133        progress += split.getLength(idx-1);    // done processing so far
134      }
135    }
136
137    // if all chunks have been processed, nothing more to do.
138    if (idx == split.getNumPaths()) {
139      return false;
140    }
141
142    context.progress();
143
144    // get a record reader for the idx-th chunk
145    try {
146      Configuration conf = context.getConfiguration();
147      // setup some helper config variables.
148      conf.set(MRJobConfig.MAP_INPUT_FILE, split.getPath(idx).toString());
149      conf.setLong(MRJobConfig.MAP_INPUT_START, split.getOffset(idx));
150      conf.setLong(MRJobConfig.MAP_INPUT_PATH, split.getLength(idx));
151
152      curReader =  rrConstructor.newInstance(new Object [] 
153                            {split, context, Integer.valueOf(idx)});
154
155      if (idx > 0) {
156        // initialize() for the first RecordReader will be called by MapTask;
157        // we're responsible for initializing subsequent RecordReaders.
158        curReader.initialize(split, context);
159      }
160    } catch (Exception e) {
161      throw new RuntimeException (e);
162    }
163    idx++;
164    return true;
165  }
166}