001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.lib.input;
020
021import java.io.*;
022import java.lang.reflect.*;
023
024import org.apache.hadoop.fs.FileSystem;
025
026import org.apache.hadoop.mapreduce.*;
027import org.apache.hadoop.classification.InterfaceAudience;
028import org.apache.hadoop.classification.InterfaceStability;
029import org.apache.hadoop.conf.Configuration;
030
031/**
032 * A generic RecordReader that can hand out different recordReaders
033 * for each chunk in a {@link CombineFileSplit}.
034 * A CombineFileSplit can combine data chunks from multiple files. 
035 * This class allows using different RecordReaders for processing
036 * these data chunks from different files.
037 * @see CombineFileSplit
038 */
039@InterfaceAudience.Public
040@InterfaceStability.Stable
041public class CombineFileRecordReader<K, V> extends RecordReader<K, V> {
042
043  static final Class [] constructorSignature = new Class [] 
044                                         {CombineFileSplit.class,
045                                          TaskAttemptContext.class,
046                                          Integer.class};
047
048  protected CombineFileSplit split;
049  protected Class<? extends RecordReader<K,V>> rrClass;
050  protected Constructor<? extends RecordReader<K,V>> rrConstructor;
051  protected FileSystem fs;
052  protected TaskAttemptContext context;
053  
054  protected int idx;
055  protected long progress;
056  protected RecordReader<K, V> curReader;
057
058  public void initialize(InputSplit split,
059      TaskAttemptContext context) throws IOException, InterruptedException {
060    this.split = (CombineFileSplit)split;
061    this.context = context;
062    if (null != this.curReader) {
063      this.curReader.initialize(split, context);
064    }
065  }
066  
067  public boolean nextKeyValue() throws IOException, InterruptedException {
068
069    while ((curReader == null) || !curReader.nextKeyValue()) {
070      if (!initNextRecordReader()) {
071        return false;
072      }
073    }
074    return true;
075  }
076
077  public K getCurrentKey() throws IOException, InterruptedException {
078    return curReader.getCurrentKey();
079  }
080  
081  public V getCurrentValue() throws IOException, InterruptedException {
082    return curReader.getCurrentValue();
083  }
084  
085  public void close() throws IOException {
086    if (curReader != null) {
087      curReader.close();
088      curReader = null;
089    }
090  }
091  
092  /**
093   * return progress based on the amount of data processed so far.
094   */
095  public float getProgress() throws IOException, InterruptedException {
096    long subprogress = 0;    // bytes processed in current split
097    if (null != curReader) {
098      // idx is always one past the current subsplit's true index.
099      subprogress = (long)(curReader.getProgress() * split.getLength(idx - 1));
100    }
101    return Math.min(1.0f,  (progress + subprogress)/(float)(split.getLength()));
102  }
103  
104  /**
105   * A generic RecordReader that can hand out different recordReaders
106   * for each chunk in the CombineFileSplit.
107   */
108  public CombineFileRecordReader(CombineFileSplit split,
109                                 TaskAttemptContext context,
110                                 Class<? extends RecordReader<K,V>> rrClass)
111    throws IOException {
112    this.split = split;
113    this.context = context;
114    this.rrClass = rrClass;
115    this.idx = 0;
116    this.curReader = null;
117    this.progress = 0;
118
119    try {
120      rrConstructor = rrClass.getDeclaredConstructor(constructorSignature);
121      rrConstructor.setAccessible(true);
122    } catch (Exception e) {
123      throw new RuntimeException(rrClass.getName() + 
124                                 " does not have valid constructor", e);
125    }
126    initNextRecordReader();
127  }
128  
129  /**
130   * Get the record reader for the next chunk in this CombineFileSplit.
131   */
132  protected boolean initNextRecordReader() throws IOException {
133
134    if (curReader != null) {
135      curReader.close();
136      curReader = null;
137      if (idx > 0) {
138        progress += split.getLength(idx-1);    // done processing so far
139      }
140    }
141
142    // if all chunks have been processed, nothing more to do.
143    if (idx == split.getNumPaths()) {
144      return false;
145    }
146
147    context.progress();
148
149    // get a record reader for the idx-th chunk
150    try {
151      Configuration conf = context.getConfiguration();
152      // setup some helper config variables.
153      conf.set(MRJobConfig.MAP_INPUT_FILE, split.getPath(idx).toString());
154      conf.setLong(MRJobConfig.MAP_INPUT_START, split.getOffset(idx));
155      conf.setLong(MRJobConfig.MAP_INPUT_PATH, split.getLength(idx));
156
157      curReader =  rrConstructor.newInstance(new Object [] 
158                            {split, context, Integer.valueOf(idx)});
159
160      if (idx > 0) {
161        // initialize() for the first RecordReader will be called by MapTask;
162        // we're responsible for initializing subsequent RecordReaders.
163        curReader.initialize(split, context);
164      }
165    } catch (Exception e) {
166      throw new RuntimeException (e);
167    }
168    idx++;
169    return true;
170  }
171}