001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapred.lib;
020
021import org.apache.hadoop.util.ReflectionUtils;
022import org.apache.hadoop.classification.InterfaceAudience;
023import org.apache.hadoop.classification.InterfaceStability;
024import org.apache.hadoop.mapred.MapRunnable;
025import org.apache.hadoop.mapred.JobConf;
026import org.apache.hadoop.mapred.Mapper;
027import org.apache.hadoop.mapred.RecordReader;
028import org.apache.hadoop.mapred.OutputCollector;
029import org.apache.hadoop.mapred.Reporter;
030import org.apache.hadoop.mapred.SkipBadRecords;
031import org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper;
032import org.apache.commons.logging.Log;
033import org.apache.commons.logging.LogFactory;
034
035import java.io.IOException;
036import java.util.concurrent.*;
037
038/**
039 * Multithreaded implementation for @link org.apache.hadoop.mapred.MapRunnable.
040 * <p>
041 * It can be used instead of the default implementation,
042 * @link org.apache.hadoop.mapred.MapRunner, when the Map operation is not CPU
043 * bound in order to improve throughput.
044 * <p>
045 * Map implementations using this MapRunnable must be thread-safe.
046 * <p>
047 * The Map-Reduce job has to be configured to use this MapRunnable class (using
048 * the JobConf.setMapRunnerClass method) and
049 * the number of thread the thread-pool can use with the
050 * <code>mapred.map.multithreadedrunner.threads</code> property, its default
051 * value is 10 threads.
052 * <p>
053 */
054@InterfaceAudience.Public
055@InterfaceStability.Stable
056public class MultithreadedMapRunner<K1, V1, K2, V2>
057    implements MapRunnable<K1, V1, K2, V2> {
058
059  private static final Log LOG =
060    LogFactory.getLog(MultithreadedMapRunner.class.getName());
061
062  private JobConf job;
063  private Mapper<K1, V1, K2, V2> mapper;
064  private ExecutorService executorService;
065  private volatile IOException ioException;
066  private volatile RuntimeException runtimeException;
067  private boolean incrProcCount;
068
069  @SuppressWarnings("unchecked")
070  public void configure(JobConf jobConf) {
071    int numberOfThreads =
072      jobConf.getInt(MultithreadedMapper.NUM_THREADS, 10);
073    if (LOG.isDebugEnabled()) {
074      LOG.debug("Configuring jobConf " + jobConf.getJobName() +
075                " to use " + numberOfThreads + " threads");
076    }
077
078    this.job = jobConf;
079    //increment processed counter only if skipping feature is enabled
080    this.incrProcCount = SkipBadRecords.getMapperMaxSkipRecords(job)>0 && 
081      SkipBadRecords.getAutoIncrMapperProcCount(job);
082    this.mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(),
083        jobConf);
084
085    // Creating a threadpool of the configured size to execute the Mapper
086    // map method in parallel.
087    executorService = new ThreadPoolExecutor(numberOfThreads, numberOfThreads, 
088                                             0L, TimeUnit.MILLISECONDS,
089                                             new BlockingArrayQueue
090                                               (numberOfThreads));
091  }
092
093  /**
094   * A blocking array queue that replaces offer and add, which throws on a full
095   * queue, to a put, which waits on a full queue.
096   */
097  private static class BlockingArrayQueue extends ArrayBlockingQueue<Runnable> {
098 
099    private static final long serialVersionUID = 1L;
100    public BlockingArrayQueue(int capacity) {
101      super(capacity);
102    }
103    public boolean offer(Runnable r) {
104      return add(r);
105    }
106    public boolean add(Runnable r) {
107      try {
108        put(r);
109      } catch (InterruptedException ie) {
110        Thread.currentThread().interrupt();
111      }
112      return true;
113    }
114  }
115
116  private void checkForExceptionsFromProcessingThreads()
117      throws IOException, RuntimeException {
118    // Checking if a Mapper.map within a Runnable has generated an
119    // IOException. If so we rethrow it to force an abort of the Map
120    // operation thus keeping the semantics of the default
121    // implementation.
122    if (ioException != null) {
123      throw ioException;
124    }
125
126    // Checking if a Mapper.map within a Runnable has generated a
127    // RuntimeException. If so we rethrow it to force an abort of the Map
128    // operation thus keeping the semantics of the default
129    // implementation.
130    if (runtimeException != null) {
131      throw runtimeException;
132    }
133  }
134
135  public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
136                  Reporter reporter)
137    throws IOException {
138    try {
139      // allocate key & value instances these objects will not be reused
140      // because execution of Mapper.map is not serialized.
141      K1 key = input.createKey();
142      V1 value = input.createValue();
143
144      while (input.next(key, value)) {
145
146        executorService.execute(new MapperInvokeRunable(key, value, output,
147                                reporter));
148
149        checkForExceptionsFromProcessingThreads();
150
151        // Allocate new key & value instances as mapper is running in parallel
152        key = input.createKey();
153        value = input.createValue();
154      }
155
156      if (LOG.isDebugEnabled()) {
157        LOG.debug("Finished dispatching all Mappper.map calls, job "
158                  + job.getJobName());
159      }
160
161      // Graceful shutdown of the Threadpool, it will let all scheduled
162      // Runnables to end.
163      executorService.shutdown();
164
165      try {
166
167        // Now waiting for all Runnables to end.
168        while (!executorService.awaitTermination(100, TimeUnit.MILLISECONDS)) {
169          if (LOG.isDebugEnabled()) {
170            LOG.debug("Awaiting all running Mappper.map calls to finish, job "
171                      + job.getJobName());
172          }
173
174          // NOTE: while Mapper.map dispatching has concluded there are still
175          // map calls in progress and exceptions would be thrown.
176          checkForExceptionsFromProcessingThreads();
177
178        }
179
180        // NOTE: it could be that a map call has had an exception after the
181        // call for awaitTermination() returing true. And edge case but it
182        // could happen.
183        checkForExceptionsFromProcessingThreads();
184
185      } catch (IOException ioEx) {
186        // Forcing a shutdown of all thread of the threadpool and rethrowing
187        // the IOException
188        executorService.shutdownNow();
189        throw ioEx;
190      } catch (InterruptedException iEx) {
191        throw new RuntimeException(iEx);
192      }
193
194    } finally {
195      mapper.close();
196    }
197  }
198
199
200  /**
201   * Runnable to execute a single Mapper.map call from a forked thread.
202   */
203  private class MapperInvokeRunable implements Runnable {
204    private K1 key;
205    private V1 value;
206    private OutputCollector<K2, V2> output;
207    private Reporter reporter;
208
209    /**
210     * Collecting all required parameters to execute a Mapper.map call.
211     * <p>
212     *
213     * @param key
214     * @param value
215     * @param output
216     * @param reporter
217     */
218    public MapperInvokeRunable(K1 key, V1 value,
219                               OutputCollector<K2, V2> output,
220                               Reporter reporter) {
221      this.key = key;
222      this.value = value;
223      this.output = output;
224      this.reporter = reporter;
225    }
226
227    /**
228     * Executes a Mapper.map call with the given Mapper and parameters.
229     * <p>
230     * This method is called from the thread-pool thread.
231     *
232     */
233    public void run() {
234      try {
235        // map pair to output
236        MultithreadedMapRunner.this.mapper.map(key, value, output, reporter);
237        if(incrProcCount) {
238          reporter.incrCounter(SkipBadRecords.COUNTER_GROUP, 
239              SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1);
240        }
241      } catch (IOException ex) {
242        // If there is an IOException during the call it is set in an instance
243        // variable of the MultithreadedMapRunner from where it will be
244        // rethrown.
245        synchronized (MultithreadedMapRunner.this) {
246          if (MultithreadedMapRunner.this.ioException == null) {
247            MultithreadedMapRunner.this.ioException = ex;
248          }
249        }
250      } catch (RuntimeException ex) {
251        // If there is a RuntimeException during the call it is set in an
252        // instance variable of the MultithreadedMapRunner from where it will be
253        // rethrown.
254        synchronized (MultithreadedMapRunner.this) {
255          if (MultithreadedMapRunner.this.runtimeException == null) {
256            MultithreadedMapRunner.this.runtimeException = ex;
257          }
258        }
259      }
260    }
261  }
262
263}