001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapred.lib; 020 021 import java.io.*; 022 import java.lang.reflect.*; 023 024 import org.apache.hadoop.fs.FileSystem; 025 026 import org.apache.hadoop.mapred.*; 027 import org.apache.hadoop.classification.InterfaceAudience; 028 import org.apache.hadoop.classification.InterfaceStability; 029 import org.apache.hadoop.conf.Configuration; 030 031 /** 032 * A generic RecordReader that can hand out different recordReaders 033 * for each chunk in a {@link CombineFileSplit}. 034 * A CombineFileSplit can combine data chunks from multiple files. 035 * This class allows using different RecordReaders for processing 036 * these data chunks from different files. 037 * @see CombineFileSplit 038 */ 039 @InterfaceAudience.Public 040 @InterfaceStability.Stable 041 public class CombineFileRecordReader<K, V> implements RecordReader<K, V> { 042 043 static final Class [] constructorSignature = new Class [] 044 {CombineFileSplit.class, 045 Configuration.class, 046 Reporter.class, 047 Integer.class}; 048 049 protected CombineFileSplit split; 050 protected JobConf jc; 051 protected Reporter reporter; 052 protected Class<RecordReader<K, V>> rrClass; 053 protected Constructor<RecordReader<K, V>> rrConstructor; 054 protected FileSystem fs; 055 056 protected int idx; 057 protected long progress; 058 protected RecordReader<K, V> curReader; 059 060 public boolean next(K key, V value) throws IOException { 061 062 while ((curReader == null) || !curReader.next(key, value)) { 063 if (!initNextRecordReader()) { 064 return false; 065 } 066 } 067 return true; 068 } 069 070 public K createKey() { 071 return curReader.createKey(); 072 } 073 074 public V createValue() { 075 return curReader.createValue(); 076 } 077 078 /** 079 * return the amount of data processed 080 */ 081 public long getPos() throws IOException { 082 return progress; 083 } 084 085 public void close() throws IOException { 086 if (curReader != null) { 087 curReader.close(); 088 curReader = null; 089 } 090 } 091 092 /** 093 * return progress based on the amount of data processed so far. 094 */ 095 public float getProgress() throws IOException { 096 return Math.min(1.0f, progress/(float)(split.getLength())); 097 } 098 099 /** 100 * A generic RecordReader that can hand out different recordReaders 101 * for each chunk in the CombineFileSplit. 102 */ 103 public CombineFileRecordReader(JobConf job, CombineFileSplit split, 104 Reporter reporter, 105 Class<RecordReader<K, V>> rrClass) 106 throws IOException { 107 this.split = split; 108 this.jc = job; 109 this.rrClass = rrClass; 110 this.reporter = reporter; 111 this.idx = 0; 112 this.curReader = null; 113 this.progress = 0; 114 115 try { 116 rrConstructor = rrClass.getDeclaredConstructor(constructorSignature); 117 rrConstructor.setAccessible(true); 118 } catch (Exception e) { 119 throw new RuntimeException(rrClass.getName() + 120 " does not have valid constructor", e); 121 } 122 initNextRecordReader(); 123 } 124 125 /** 126 * Get the record reader for the next chunk in this CombineFileSplit. 127 */ 128 protected boolean initNextRecordReader() throws IOException { 129 130 if (curReader != null) { 131 curReader.close(); 132 curReader = null; 133 if (idx > 0) { 134 progress += split.getLength(idx-1); // done processing so far 135 } 136 } 137 138 // if all chunks have been processed, nothing more to do. 139 if (idx == split.getNumPaths()) { 140 return false; 141 } 142 143 reporter.progress(); 144 145 // get a record reader for the idx-th chunk 146 try { 147 curReader = rrConstructor.newInstance(new Object [] 148 {split, jc, reporter, Integer.valueOf(idx)}); 149 150 // setup some helper config variables. 151 jc.set(JobContext.MAP_INPUT_FILE, split.getPath(idx).toString()); 152 jc.setLong(JobContext.MAP_INPUT_START, split.getOffset(idx)); 153 jc.setLong(JobContext.MAP_INPUT_PATH, split.getLength(idx)); 154 } catch (Exception e) { 155 throw new RuntimeException (e); 156 } 157 idx++; 158 return true; 159 } 160 }