001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapred.lib;
020    
021    import org.apache.hadoop.util.ReflectionUtils;
022    import org.apache.hadoop.classification.InterfaceAudience;
023    import org.apache.hadoop.classification.InterfaceStability;
024    import org.apache.hadoop.mapred.MapRunnable;
025    import org.apache.hadoop.mapred.JobConf;
026    import org.apache.hadoop.mapred.Mapper;
027    import org.apache.hadoop.mapred.RecordReader;
028    import org.apache.hadoop.mapred.OutputCollector;
029    import org.apache.hadoop.mapred.Reporter;
030    import org.apache.hadoop.mapred.SkipBadRecords;
031    import org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper;
032    import org.apache.commons.logging.Log;
033    import org.apache.commons.logging.LogFactory;
034    
035    import java.io.IOException;
036    import java.util.concurrent.*;
037    
038    /**
039     * Multithreaded implementation for @link org.apache.hadoop.mapred.MapRunnable.
040     * <p>
041     * It can be used instead of the default implementation,
042     * @link org.apache.hadoop.mapred.MapRunner, when the Map operation is not CPU
043     * bound in order to improve throughput.
044     * <p>
045     * Map implementations using this MapRunnable must be thread-safe.
046     * <p>
047     * The Map-Reduce job has to be configured to use this MapRunnable class (using
048     * the JobConf.setMapRunnerClass method) and
049     * the number of thread the thread-pool can use with the
050     * <code>mapred.map.multithreadedrunner.threads</code> property, its default
051     * value is 10 threads.
052     * <p>
053     */
054    @InterfaceAudience.Public
055    @InterfaceStability.Stable
056    public class MultithreadedMapRunner<K1, V1, K2, V2>
057        implements MapRunnable<K1, V1, K2, V2> {
058    
059      private static final Log LOG =
060        LogFactory.getLog(MultithreadedMapRunner.class.getName());
061    
062      private JobConf job;
063      private Mapper<K1, V1, K2, V2> mapper;
064      private ExecutorService executorService;
065      private volatile IOException ioException;
066      private volatile RuntimeException runtimeException;
067      private boolean incrProcCount;
068    
069      @SuppressWarnings("unchecked")
070      public void configure(JobConf jobConf) {
071        int numberOfThreads =
072          jobConf.getInt(MultithreadedMapper.NUM_THREADS, 10);
073        if (LOG.isDebugEnabled()) {
074          LOG.debug("Configuring jobConf " + jobConf.getJobName() +
075                    " to use " + numberOfThreads + " threads");
076        }
077    
078        this.job = jobConf;
079        //increment processed counter only if skipping feature is enabled
080        this.incrProcCount = SkipBadRecords.getMapperMaxSkipRecords(job)>0 && 
081          SkipBadRecords.getAutoIncrMapperProcCount(job);
082        this.mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(),
083            jobConf);
084    
085        // Creating a threadpool of the configured size to execute the Mapper
086        // map method in parallel.
087        executorService = new ThreadPoolExecutor(numberOfThreads, numberOfThreads, 
088                                                 0L, TimeUnit.MILLISECONDS,
089                                                 new BlockingArrayQueue
090                                                   (numberOfThreads));
091      }
092    
093      /**
094       * A blocking array queue that replaces offer and add, which throws on a full
095       * queue, to a put, which waits on a full queue.
096       */
097      private static class BlockingArrayQueue extends ArrayBlockingQueue<Runnable> {
098     
099        private static final long serialVersionUID = 1L;
100        public BlockingArrayQueue(int capacity) {
101          super(capacity);
102        }
103        public boolean offer(Runnable r) {
104          return add(r);
105        }
106        public boolean add(Runnable r) {
107          try {
108            put(r);
109          } catch (InterruptedException ie) {
110            Thread.currentThread().interrupt();
111          }
112          return true;
113        }
114      }
115    
116      private void checkForExceptionsFromProcessingThreads()
117          throws IOException, RuntimeException {
118        // Checking if a Mapper.map within a Runnable has generated an
119        // IOException. If so we rethrow it to force an abort of the Map
120        // operation thus keeping the semantics of the default
121        // implementation.
122        if (ioException != null) {
123          throw ioException;
124        }
125    
126        // Checking if a Mapper.map within a Runnable has generated a
127        // RuntimeException. If so we rethrow it to force an abort of the Map
128        // operation thus keeping the semantics of the default
129        // implementation.
130        if (runtimeException != null) {
131          throw runtimeException;
132        }
133      }
134    
135      public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
136                      Reporter reporter)
137        throws IOException {
138        try {
139          // allocate key & value instances these objects will not be reused
140          // because execution of Mapper.map is not serialized.
141          K1 key = input.createKey();
142          V1 value = input.createValue();
143    
144          while (input.next(key, value)) {
145    
146            executorService.execute(new MapperInvokeRunable(key, value, output,
147                                    reporter));
148    
149            checkForExceptionsFromProcessingThreads();
150    
151            // Allocate new key & value instances as mapper is running in parallel
152            key = input.createKey();
153            value = input.createValue();
154          }
155    
156          if (LOG.isDebugEnabled()) {
157            LOG.debug("Finished dispatching all Mappper.map calls, job "
158                      + job.getJobName());
159          }
160    
161          // Graceful shutdown of the Threadpool, it will let all scheduled
162          // Runnables to end.
163          executorService.shutdown();
164    
165          try {
166    
167            // Now waiting for all Runnables to end.
168            while (!executorService.awaitTermination(100, TimeUnit.MILLISECONDS)) {
169              if (LOG.isDebugEnabled()) {
170                LOG.debug("Awaiting all running Mappper.map calls to finish, job "
171                          + job.getJobName());
172              }
173    
174              // NOTE: while Mapper.map dispatching has concluded there are still
175              // map calls in progress and exceptions would be thrown.
176              checkForExceptionsFromProcessingThreads();
177    
178            }
179    
180            // NOTE: it could be that a map call has had an exception after the
181            // call for awaitTermination() returing true. And edge case but it
182            // could happen.
183            checkForExceptionsFromProcessingThreads();
184    
185          } catch (IOException ioEx) {
186            // Forcing a shutdown of all thread of the threadpool and rethrowing
187            // the IOException
188            executorService.shutdownNow();
189            throw ioEx;
190          } catch (InterruptedException iEx) {
191            throw new RuntimeException(iEx);
192          }
193    
194        } finally {
195          mapper.close();
196        }
197      }
198    
199    
200      /**
201       * Runnable to execute a single Mapper.map call from a forked thread.
202       */
203      private class MapperInvokeRunable implements Runnable {
204        private K1 key;
205        private V1 value;
206        private OutputCollector<K2, V2> output;
207        private Reporter reporter;
208    
209        /**
210         * Collecting all required parameters to execute a Mapper.map call.
211         * <p>
212         *
213         * @param key
214         * @param value
215         * @param output
216         * @param reporter
217         */
218        public MapperInvokeRunable(K1 key, V1 value,
219                                   OutputCollector<K2, V2> output,
220                                   Reporter reporter) {
221          this.key = key;
222          this.value = value;
223          this.output = output;
224          this.reporter = reporter;
225        }
226    
227        /**
228         * Executes a Mapper.map call with the given Mapper and parameters.
229         * <p>
230         * This method is called from the thread-pool thread.
231         *
232         */
233        public void run() {
234          try {
235            // map pair to output
236            MultithreadedMapRunner.this.mapper.map(key, value, output, reporter);
237            if(incrProcCount) {
238              reporter.incrCounter(SkipBadRecords.COUNTER_GROUP, 
239                  SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1);
240            }
241          } catch (IOException ex) {
242            // If there is an IOException during the call it is set in an instance
243            // variable of the MultithreadedMapRunner from where it will be
244            // rethrown.
245            synchronized (MultithreadedMapRunner.this) {
246              if (MultithreadedMapRunner.this.ioException == null) {
247                MultithreadedMapRunner.this.ioException = ex;
248              }
249            }
250          } catch (RuntimeException ex) {
251            // If there is a RuntimeException during the call it is set in an
252            // instance variable of the MultithreadedMapRunner from where it will be
253            // rethrown.
254            synchronized (MultithreadedMapRunner.this) {
255              if (MultithreadedMapRunner.this.runtimeException == null) {
256                MultithreadedMapRunner.this.runtimeException = ex;
257              }
258            }
259          }
260        }
261      }
262    
263    }