001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.mapreduce.lib.chain;
019
020import java.io.IOException;
021
022import org.apache.hadoop.classification.InterfaceAudience;
023import org.apache.hadoop.classification.InterfaceStability;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.mapreduce.Job;
026import org.apache.hadoop.mapreduce.Mapper;
027import org.apache.hadoop.mapreduce.lib.chain.Chain.ChainBlockingQueue;
028
029/**
030 * The ChainMapper class allows to use multiple Mapper classes within a single
031 * Map task.
032 * 
033 * <p>
034 * The Mapper classes are invoked in a chained (or piped) fashion, the output of
035 * the first becomes the input of the second, and so on until the last Mapper,
036 * the output of the last Mapper will be written to the task's output.
037 * </p>
038 * <p>
039 * The key functionality of this feature is that the Mappers in the chain do not
040 * need to be aware that they are executed in a chain. This enables having
041 * reusable specialized Mappers that can be combined to perform composite
042 * operations within a single task.
043 * </p>
044 * <p>
045 * Special care has to be taken when creating chains that the key/values output
046 * by a Mapper are valid for the following Mapper in the chain. It is assumed
047 * all Mappers and the Reduce in the chain use matching output and input key and
048 * value classes as no conversion is done by the chaining code.
049 * </p>
050 * <p>
051 * Using the ChainMapper and the ChainReducer classes is possible to compose
052 * Map/Reduce jobs that look like <code>[MAP+ / REDUCE MAP*]</code>. And
053 * immediate benefit of this pattern is a dramatic reduction in disk IO.
054 * </p>
055 * <p>
056 * IMPORTANT: There is no need to specify the output key/value classes for the
057 * ChainMapper, this is done by the addMapper for the last mapper in the chain.
058 * </p>
059 * ChainMapper usage pattern:
060 * <p/>
061 * 
062 * <pre>
063 * ...
064 * Job = new Job(conf);
065 * <p/>
066 * Configuration mapAConf = new Configuration(false);
067 * ...
068 * ChainMapper.addMapper(job, AMap.class, LongWritable.class, Text.class,
069 *   Text.class, Text.class, true, mapAConf);
070 * <p/>
071 * Configuration mapBConf = new Configuration(false);
072 * ...
073 * ChainMapper.addMapper(job, BMap.class, Text.class, Text.class,
074 *   LongWritable.class, Text.class, false, mapBConf);
075 * <p/>
076 * ...
077 * <p/>
078 * job.waitForComplettion(true);
079 * ...
080 * </pre>
081 */
082@InterfaceAudience.Public
083@InterfaceStability.Stable
084public class ChainMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends
085    Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
086
087  /**
088   * Adds a {@link Mapper} class to the chain mapper.
089   * 
090   * <p>
091   * The key and values are passed from one element of the chain to the next, by
092   * value. For the added Mapper the configuration given for it,
093   * <code>mapperConf</code>, have precedence over the job's Configuration. This
094   * precedence is in effect when the task is running.
095   * </p>
096   * <p>
097   * IMPORTANT: There is no need to specify the output key/value classes for the
098   * ChainMapper, this is done by the addMapper for the last mapper in the chain
099   * </p>
100   * 
101   * @param job
102   *          The job.
103   * @param klass
104   *          the Mapper class to add.
105   * @param inputKeyClass
106   *          mapper input key class.
107   * @param inputValueClass
108   *          mapper input value class.
109   * @param outputKeyClass
110   *          mapper output key class.
111   * @param outputValueClass
112   *          mapper output value class.
113   * @param mapperConf
114   *          a configuration for the Mapper class. It is recommended to use a
115   *          Configuration without default values using the
116   *          <code>Configuration(boolean loadDefaults)</code> constructor with
117   *          FALSE.
118   */
119  public static void addMapper(Job job, Class<? extends Mapper> klass,
120      Class<?> inputKeyClass, Class<?> inputValueClass,
121      Class<?> outputKeyClass, Class<?> outputValueClass,
122      Configuration mapperConf) throws IOException {
123    job.setMapperClass(ChainMapper.class);
124    job.setMapOutputKeyClass(outputKeyClass);
125    job.setMapOutputValueClass(outputValueClass);
126    Chain.addMapper(true, job, klass, inputKeyClass, inputValueClass,
127        outputKeyClass, outputValueClass, mapperConf);
128  }
129
130  private Chain chain;
131
132  protected void setup(Context context) {
133    chain = new Chain(true);
134    chain.setup(context.getConfiguration());
135  }
136
137  public void run(Context context) throws IOException, InterruptedException {
138
139    setup(context);
140
141    int numMappers = chain.getAllMappers().size();
142    if (numMappers == 0) {
143      return;
144    }
145
146    ChainBlockingQueue<Chain.KeyValuePair<?, ?>> inputqueue;
147    ChainBlockingQueue<Chain.KeyValuePair<?, ?>> outputqueue;
148    if (numMappers == 1) {
149      chain.runMapper(context, 0);
150    } else {
151      // add all the mappers with proper context
152      // add first mapper
153      outputqueue = chain.createBlockingQueue();
154      chain.addMapper(context, outputqueue, 0);
155      // add other mappers
156      for (int i = 1; i < numMappers - 1; i++) {
157        inputqueue = outputqueue;
158        outputqueue = chain.createBlockingQueue();
159        chain.addMapper(inputqueue, outputqueue, context, i);
160      }
161      // add last mapper
162      chain.addMapper(outputqueue, context, numMappers - 1);
163    }
164    
165    // start all threads
166    chain.startAllThreads();
167    
168    // wait for all threads
169    chain.joinAllThreads();
170  }
171}