001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapred.lib; 020 021import org.apache.hadoop.util.ReflectionUtils; 022import org.apache.hadoop.classification.InterfaceAudience; 023import org.apache.hadoop.classification.InterfaceStability; 024import org.apache.hadoop.mapred.MapRunnable; 025import org.apache.hadoop.mapred.JobConf; 026import org.apache.hadoop.mapred.Mapper; 027import org.apache.hadoop.mapred.RecordReader; 028import org.apache.hadoop.mapred.OutputCollector; 029import org.apache.hadoop.mapred.Reporter; 030import org.apache.hadoop.mapred.SkipBadRecords; 031import org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper; 032import org.apache.commons.logging.Log; 033import org.apache.commons.logging.LogFactory; 034 035import java.io.IOException; 036import java.util.concurrent.*; 037 038/** 039 * Multithreaded implementation for {@link MapRunnable}. 040 * <p> 041 * It can be used instead of the default implementation, 042 * of {@link org.apache.hadoop.mapred.MapRunner}, when the Map 043 * operation is not CPU bound in order to improve throughput. 044 * <p> 045 * Map implementations using this MapRunnable must be thread-safe. 046 * <p> 047 * The Map-Reduce job has to be configured to use this MapRunnable class (using 048 * the JobConf.setMapRunnerClass method) and 049 * the number of threads the thread-pool can use with the 050 * <code>mapred.map.multithreadedrunner.threads</code> property, its default 051 * value is 10 threads. 052 * <p> 053 */ 054@InterfaceAudience.Public 055@InterfaceStability.Stable 056public class MultithreadedMapRunner<K1, V1, K2, V2> 057 implements MapRunnable<K1, V1, K2, V2> { 058 059 private static final Log LOG = 060 LogFactory.getLog(MultithreadedMapRunner.class.getName()); 061 062 private JobConf job; 063 private Mapper<K1, V1, K2, V2> mapper; 064 private ExecutorService executorService; 065 private volatile IOException ioException; 066 private volatile RuntimeException runtimeException; 067 private boolean incrProcCount; 068 069 @SuppressWarnings("unchecked") 070 public void configure(JobConf jobConf) { 071 int numberOfThreads = 072 jobConf.getInt(MultithreadedMapper.NUM_THREADS, 10); 073 if (LOG.isDebugEnabled()) { 074 LOG.debug("Configuring jobConf " + jobConf.getJobName() + 075 " to use " + numberOfThreads + " threads"); 076 } 077 078 this.job = jobConf; 079 //increment processed counter only if skipping feature is enabled 080 this.incrProcCount = SkipBadRecords.getMapperMaxSkipRecords(job)>0 && 081 SkipBadRecords.getAutoIncrMapperProcCount(job); 082 this.mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(), 083 jobConf); 084 085 // Creating a threadpool of the configured size to execute the Mapper 086 // map method in parallel. 087 executorService = new ThreadPoolExecutor(numberOfThreads, numberOfThreads, 088 0L, TimeUnit.MILLISECONDS, 089 new BlockingArrayQueue 090 (numberOfThreads)); 091 } 092 093 /** 094 * A blocking array queue that replaces offer and add, which throws on a full 095 * queue, to a put, which waits on a full queue. 096 */ 097 private static class BlockingArrayQueue extends ArrayBlockingQueue<Runnable> { 098 099 private static final long serialVersionUID = 1L; 100 public BlockingArrayQueue(int capacity) { 101 super(capacity); 102 } 103 public boolean offer(Runnable r) { 104 return add(r); 105 } 106 public boolean add(Runnable r) { 107 try { 108 put(r); 109 } catch (InterruptedException ie) { 110 Thread.currentThread().interrupt(); 111 } 112 return true; 113 } 114 } 115 116 private void checkForExceptionsFromProcessingThreads() 117 throws IOException, RuntimeException { 118 // Checking if a Mapper.map within a Runnable has generated an 119 // IOException. If so we rethrow it to force an abort of the Map 120 // operation thus keeping the semantics of the default 121 // implementation. 122 if (ioException != null) { 123 throw ioException; 124 } 125 126 // Checking if a Mapper.map within a Runnable has generated a 127 // RuntimeException. If so we rethrow it to force an abort of the Map 128 // operation thus keeping the semantics of the default 129 // implementation. 130 if (runtimeException != null) { 131 throw runtimeException; 132 } 133 } 134 135 public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output, 136 Reporter reporter) 137 throws IOException { 138 try { 139 // allocate key & value instances these objects will not be reused 140 // because execution of Mapper.map is not serialized. 141 K1 key = input.createKey(); 142 V1 value = input.createValue(); 143 144 while (input.next(key, value)) { 145 146 executorService.execute(new MapperInvokeRunable(key, value, output, 147 reporter)); 148 149 checkForExceptionsFromProcessingThreads(); 150 151 // Allocate new key & value instances as mapper is running in parallel 152 key = input.createKey(); 153 value = input.createValue(); 154 } 155 156 if (LOG.isDebugEnabled()) { 157 LOG.debug("Finished dispatching all Mappper.map calls, job " 158 + job.getJobName()); 159 } 160 161 // Graceful shutdown of the Threadpool, it will let all scheduled 162 // Runnables to end. 163 executorService.shutdown(); 164 165 try { 166 167 // Now waiting for all Runnables to end. 168 while (!executorService.awaitTermination(100, TimeUnit.MILLISECONDS)) { 169 if (LOG.isDebugEnabled()) { 170 LOG.debug("Awaiting all running Mappper.map calls to finish, job " 171 + job.getJobName()); 172 } 173 174 // NOTE: while Mapper.map dispatching has concluded there are still 175 // map calls in progress and exceptions would be thrown. 176 checkForExceptionsFromProcessingThreads(); 177 178 } 179 180 // NOTE: it could be that a map call has had an exception after the 181 // call for awaitTermination() returing true. And edge case but it 182 // could happen. 183 checkForExceptionsFromProcessingThreads(); 184 185 } catch (IOException ioEx) { 186 // Forcing a shutdown of all thread of the threadpool and rethrowing 187 // the IOException 188 executorService.shutdownNow(); 189 throw ioEx; 190 } catch (InterruptedException iEx) { 191 throw new RuntimeException(iEx); 192 } 193 194 } finally { 195 mapper.close(); 196 } 197 } 198 199 200 /** 201 * Runnable to execute a single Mapper.map call from a forked thread. 202 */ 203 private class MapperInvokeRunable implements Runnable { 204 private K1 key; 205 private V1 value; 206 private OutputCollector<K2, V2> output; 207 private Reporter reporter; 208 209 /** 210 * Collecting all required parameters to execute a Mapper.map call. 211 * <p> 212 * 213 * @param key 214 * @param value 215 * @param output 216 * @param reporter 217 */ 218 public MapperInvokeRunable(K1 key, V1 value, 219 OutputCollector<K2, V2> output, 220 Reporter reporter) { 221 this.key = key; 222 this.value = value; 223 this.output = output; 224 this.reporter = reporter; 225 } 226 227 /** 228 * Executes a Mapper.map call with the given Mapper and parameters. 229 * <p> 230 * This method is called from the thread-pool thread. 231 * 232 */ 233 public void run() { 234 try { 235 // map pair to output 236 MultithreadedMapRunner.this.mapper.map(key, value, output, reporter); 237 if(incrProcCount) { 238 reporter.incrCounter(SkipBadRecords.COUNTER_GROUP, 239 SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1); 240 } 241 } catch (IOException ex) { 242 // If there is an IOException during the call it is set in an instance 243 // variable of the MultithreadedMapRunner from where it will be 244 // rethrown. 245 synchronized (MultithreadedMapRunner.this) { 246 if (MultithreadedMapRunner.this.ioException == null) { 247 MultithreadedMapRunner.this.ioException = ex; 248 } 249 } 250 } catch (RuntimeException ex) { 251 // If there is a RuntimeException during the call it is set in an 252 // instance variable of the MultithreadedMapRunner from where it will be 253 // rethrown. 254 synchronized (MultithreadedMapRunner.this) { 255 if (MultithreadedMapRunner.this.runtimeException == null) { 256 MultithreadedMapRunner.this.runtimeException = ex; 257 } 258 } 259 } 260 } 261 } 262 263}