001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce.lib.input; 020 021import java.io.IOException; 022 023import org.apache.hadoop.classification.InterfaceAudience; 024import org.apache.hadoop.classification.InterfaceStability; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.mapreduce.InputSplit; 027import org.apache.hadoop.mapreduce.MRJobConfig; 028import org.apache.hadoop.mapreduce.RecordReader; 029import org.apache.hadoop.mapreduce.TaskAttemptContext; 030 031/** 032 * A wrapper class for a record reader that handles a single file split. It 033 * delegates most of the methods to the wrapped instance. A concrete subclass 034 * needs to provide a constructor that calls this parent constructor with the 035 * appropriate input format. The subclass constructor must satisfy the specific 036 * constructor signature that is required by 037 * <code>CombineFileRecordReader</code>. 038 * 039 * Subclassing is needed to get a concrete record reader wrapper because of the 040 * constructor requirement. 041 * 042 * @see CombineFileRecordReader 043 * @see CombineFileInputFormat 044 */ 045@InterfaceAudience.Public 046@InterfaceStability.Stable 047public abstract class CombineFileRecordReaderWrapper<K,V> 048 extends RecordReader<K,V> { 049 private final FileSplit fileSplit; 050 private final RecordReader<K,V> delegate; 051 052 protected CombineFileRecordReaderWrapper(FileInputFormat<K,V> inputFormat, 053 CombineFileSplit split, TaskAttemptContext context, Integer idx) 054 throws IOException, InterruptedException { 055 fileSplit = new FileSplit(split.getPath(idx), 056 split.getOffset(idx), 057 split.getLength(idx), 058 split.getLocations()); 059 060 delegate = inputFormat.createRecordReader(fileSplit, context); 061 } 062 063 public void initialize(InputSplit split, TaskAttemptContext context) 064 throws IOException, InterruptedException { 065 // it really should be the same file split at the time the wrapper instance 066 // was created 067 assert fileSplitIsValid(context); 068 069 delegate.initialize(fileSplit, context); 070 } 071 072 private boolean fileSplitIsValid(TaskAttemptContext context) { 073 Configuration conf = context.getConfiguration(); 074 long offset = conf.getLong(MRJobConfig.MAP_INPUT_START, 0L); 075 if (fileSplit.getStart() != offset) { 076 return false; 077 } 078 long length = conf.getLong(MRJobConfig.MAP_INPUT_PATH, 0L); 079 if (fileSplit.getLength() != length) { 080 return false; 081 } 082 String path = conf.get(MRJobConfig.MAP_INPUT_FILE); 083 if (!fileSplit.getPath().toString().equals(path)) { 084 return false; 085 } 086 return true; 087 } 088 089 public boolean nextKeyValue() throws IOException, InterruptedException { 090 return delegate.nextKeyValue(); 091 } 092 093 public K getCurrentKey() throws IOException, InterruptedException { 094 return delegate.getCurrentKey(); 095 } 096 097 public V getCurrentValue() throws IOException, InterruptedException { 098 return delegate.getCurrentValue(); 099 } 100 101 public float getProgress() throws IOException, InterruptedException { 102 return delegate.getProgress(); 103 } 104 105 public void close() throws IOException { 106 delegate.close(); 107 } 108}