001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapred;
020
021 import java.io.IOException;
022
023 import org.apache.hadoop.classification.InterfaceAudience;
024 import org.apache.hadoop.classification.InterfaceStability;
025 import org.apache.hadoop.util.ReflectionUtils;
026
027 /** Default {@link MapRunnable} implementation.*/
028 @InterfaceAudience.Public
029 @InterfaceStability.Stable
030 public class MapRunner<K1, V1, K2, V2>
031 implements MapRunnable<K1, V1, K2, V2> {
032
033 private Mapper<K1, V1, K2, V2> mapper;
034 private boolean incrProcCount;
035
036 @SuppressWarnings("unchecked")
037 public void configure(JobConf job) {
038 this.mapper = ReflectionUtils.newInstance(job.getMapperClass(), job);
039 //increment processed counter only if skipping feature is enabled
040 this.incrProcCount = SkipBadRecords.getMapperMaxSkipRecords(job)>0 &&
041 SkipBadRecords.getAutoIncrMapperProcCount(job);
042 }
043
044 public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
045 Reporter reporter)
046 throws IOException {
047 try {
048 // allocate key & value instances that are re-used for all entries
049 K1 key = input.createKey();
050 V1 value = input.createValue();
051
052 while (input.next(key, value)) {
053 // map pair to output
054 mapper.map(key, value, output, reporter);
055 if(incrProcCount) {
056 reporter.incrCounter(SkipBadRecords.COUNTER_GROUP,
057 SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1);
058 }
059 }
060 } finally {
061 mapper.close();
062 }
063 }
064
065 protected Mapper<K1, V1, K2, V2> getMapper() {
066 return mapper;
067 }
068 }