001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapred.pipes;
020
021import java.io.IOException;
022import java.net.URI;
023import java.net.URISyntaxException;
024import java.net.URL;
025import java.net.URLClassLoader;
026import java.security.AccessController;
027import java.security.PrivilegedAction;
028import java.util.StringTokenizer;
029
030import org.apache.commons.cli.BasicParser;
031import org.apache.commons.cli.CommandLine;
032import org.apache.commons.cli.Option;
033import org.apache.commons.cli.OptionBuilder;
034import org.apache.commons.cli.Options;
035import org.apache.commons.cli.ParseException;
036import org.apache.commons.cli.Parser;
037import org.apache.commons.logging.Log;
038import org.apache.commons.logging.LogFactory;
039import org.apache.hadoop.classification.InterfaceAudience;
040import org.apache.hadoop.classification.InterfaceStability;
041import org.apache.hadoop.conf.Configuration;
042import org.apache.hadoop.conf.Configured;
043import org.apache.hadoop.fs.FileSystem;
044import org.apache.hadoop.fs.Path;
045import org.apache.hadoop.io.Text;
046import org.apache.hadoop.mapred.FileInputFormat;
047import org.apache.hadoop.mapred.FileOutputFormat;
048import org.apache.hadoop.mapred.InputFormat;
049import org.apache.hadoop.mapred.JobClient;
050import org.apache.hadoop.mapred.JobConf;
051import org.apache.hadoop.mapred.Mapper;
052import org.apache.hadoop.mapred.OutputFormat;
053import org.apache.hadoop.mapred.Partitioner;
054import org.apache.hadoop.mapred.Reducer;
055import org.apache.hadoop.mapred.RunningJob;
056import org.apache.hadoop.mapred.lib.HashPartitioner;
057import org.apache.hadoop.mapred.lib.LazyOutputFormat;
058import org.apache.hadoop.mapred.lib.NullOutputFormat;
059import org.apache.hadoop.mapreduce.MRJobConfig;
060import org.apache.hadoop.mapreduce.filecache.DistributedCache;
061import org.apache.hadoop.util.ExitUtil;
062import org.apache.hadoop.util.GenericOptionsParser;
063import org.apache.hadoop.util.Tool;
064
065/**
066 * The main entry point and job submitter. It may either be used as a command
067 * line-based or API-based method to launch Pipes jobs.
068 */
069@InterfaceAudience.Public
070@InterfaceStability.Stable
071public class Submitter extends Configured implements Tool {
072
073  protected static final Log LOG = LogFactory.getLog(Submitter.class);
074  public static final String PRESERVE_COMMANDFILE = 
075    "mapreduce.pipes.commandfile.preserve";
076  public static final String EXECUTABLE = "mapreduce.pipes.executable";
077  public static final String INTERPRETOR = 
078    "mapreduce.pipes.executable.interpretor";
079  public static final String IS_JAVA_MAP = "mapreduce.pipes.isjavamapper";
080  public static final String IS_JAVA_RR = "mapreduce.pipes.isjavarecordreader";
081  public static final String IS_JAVA_RW = "mapreduce.pipes.isjavarecordwriter";
082  public static final String IS_JAVA_REDUCE = "mapreduce.pipes.isjavareducer";
083  public static final String PARTITIONER = "mapreduce.pipes.partitioner";
084  public static final String INPUT_FORMAT = "mapreduce.pipes.inputformat";
085  public static final String PORT = "mapreduce.pipes.command.port";
086  
087  public Submitter() {
088    this(new Configuration());
089  }
090  
091  public Submitter(Configuration conf) {
092    setConf(conf);
093  }
094  
095  /**
096   * Get the URI of the application's executable.
097   * @param conf
098   * @return the URI where the application's executable is located
099   */
100  public static String getExecutable(JobConf conf) {
101    return conf.get(Submitter.EXECUTABLE);
102  }
103
104  /**
105   * Set the URI for the application's executable. Normally this is a hdfs: 
106   * location.
107   * @param conf
108   * @param executable The URI of the application's executable.
109   */
110  public static void setExecutable(JobConf conf, String executable) {
111    conf.set(Submitter.EXECUTABLE, executable);
112  }
113
114  /**
115   * Set whether the job is using a Java RecordReader.
116   * @param conf the configuration to modify
117   * @param value the new value
118   */
119  public static void setIsJavaRecordReader(JobConf conf, boolean value) {
120    conf.setBoolean(Submitter.IS_JAVA_RR, value);
121  }
122
123  /**
124   * Check whether the job is using a Java RecordReader
125   * @param conf the configuration to check
126   * @return is it a Java RecordReader?
127   */
128  public static boolean getIsJavaRecordReader(JobConf conf) {
129    return conf.getBoolean(Submitter.IS_JAVA_RR, false);
130  }
131
132  /**
133   * Set whether the Mapper is written in Java.
134   * @param conf the configuration to modify
135   * @param value the new value
136   */
137  public static void setIsJavaMapper(JobConf conf, boolean value) {
138    conf.setBoolean(Submitter.IS_JAVA_MAP, value);
139  }
140
141  /**
142   * Check whether the job is using a Java Mapper.
143   * @param conf the configuration to check
144   * @return is it a Java Mapper?
145   */
146  public static boolean getIsJavaMapper(JobConf conf) {
147    return conf.getBoolean(Submitter.IS_JAVA_MAP, false);
148  }
149
150  /**
151   * Set whether the Reducer is written in Java.
152   * @param conf the configuration to modify
153   * @param value the new value
154   */
155  public static void setIsJavaReducer(JobConf conf, boolean value) {
156    conf.setBoolean(Submitter.IS_JAVA_REDUCE, value);
157  }
158
159  /**
160   * Check whether the job is using a Java Reducer.
161   * @param conf the configuration to check
162   * @return is it a Java Reducer?
163   */
164  public static boolean getIsJavaReducer(JobConf conf) {
165    return conf.getBoolean(Submitter.IS_JAVA_REDUCE, false);
166  }
167
168  /**
169   * Set whether the job will use a Java RecordWriter.
170   * @param conf the configuration to modify
171   * @param value the new value to set
172   */
173  public static void setIsJavaRecordWriter(JobConf conf, boolean value) {
174    conf.setBoolean(Submitter.IS_JAVA_RW, value);
175  }
176
177  /**
178   * Will the reduce use a Java RecordWriter?
179   * @param conf the configuration to check
180   * @return true, if the output of the job will be written by Java
181   */
182  public static boolean getIsJavaRecordWriter(JobConf conf) {
183    return conf.getBoolean(Submitter.IS_JAVA_RW, false);
184  }
185
186  /**
187   * Set the configuration, if it doesn't already have a value for the given
188   * key.
189   * @param conf the configuration to modify
190   * @param key the key to set
191   * @param value the new "default" value to set
192   */
193  private static void setIfUnset(JobConf conf, String key, String value) {
194    if (conf.get(key) == null) {
195      conf.set(key, value);
196    }
197  }
198
199  /**
200   * Save away the user's original partitioner before we override it.
201   * @param conf the configuration to modify
202   * @param cls the user's partitioner class
203   */
204  static void setJavaPartitioner(JobConf conf, Class cls) {
205    conf.set(Submitter.PARTITIONER, cls.getName());
206  }
207  
208  /**
209   * Get the user's original partitioner.
210   * @param conf the configuration to look in
211   * @return the class that the user submitted
212   */
213  static Class<? extends Partitioner> getJavaPartitioner(JobConf conf) {
214    return conf.getClass(Submitter.PARTITIONER, 
215                         HashPartitioner.class,
216                         Partitioner.class);
217  }
218
219  /**
220   * Does the user want to keep the command file for debugging? If this is
221   * true, pipes will write a copy of the command data to a file in the
222   * task directory named "downlink.data", which may be used to run the C++
223   * program under the debugger. You probably also want to set 
224   * JobConf.setKeepFailedTaskFiles(true) to keep the entire directory from
225   * being deleted.
226   * To run using the data file, set the environment variable 
227   * "mapreduce.pipes.commandfile" to point to the file.
228   * @param conf the configuration to check
229   * @return will the framework save the command file?
230   */
231  public static boolean getKeepCommandFile(JobConf conf) {
232    return conf.getBoolean(Submitter.PRESERVE_COMMANDFILE, false);
233  }
234
235  /**
236   * Set whether to keep the command file for debugging
237   * @param conf the configuration to modify
238   * @param keep the new value
239   */
240  public static void setKeepCommandFile(JobConf conf, boolean keep) {
241    conf.setBoolean(Submitter.PRESERVE_COMMANDFILE, keep);
242  }
243
244  /**
245   * Submit a job to the map/reduce cluster. All of the necessary modifications
246   * to the job to run under pipes are made to the configuration.
247   * @param conf the job to submit to the cluster (MODIFIED)
248   * @throws IOException
249   * @deprecated Use {@link Submitter#runJob(JobConf)}
250   */
251  @Deprecated
252  public static RunningJob submitJob(JobConf conf) throws IOException {
253    return runJob(conf);
254  }
255
256  /**
257   * Submit a job to the map/reduce cluster. All of the necessary modifications
258   * to the job to run under pipes are made to the configuration.
259   * @param conf the job to submit to the cluster (MODIFIED)
260   * @throws IOException
261   */
262  public static RunningJob runJob(JobConf conf) throws IOException {
263    setupPipesJob(conf);
264    return JobClient.runJob(conf);
265  }
266
267  /**
268   * Submit a job to the Map-Reduce framework.
269   * This returns a handle to the {@link RunningJob} which can be used to track
270   * the running-job.
271   * 
272   * @param conf the job configuration.
273   * @return a handle to the {@link RunningJob} which can be used to track the
274   *         running-job.
275   * @throws IOException
276   */
277  public static RunningJob jobSubmit(JobConf conf) throws IOException {
278    setupPipesJob(conf);
279    return new JobClient(conf).submitJob(conf);
280  }
281  
282  private static void setupPipesJob(JobConf conf) throws IOException {
283    // default map output types to Text
284    if (!getIsJavaMapper(conf)) {
285      conf.setMapRunnerClass(PipesMapRunner.class);
286      // Save the user's partitioner and hook in our's.
287      setJavaPartitioner(conf, conf.getPartitionerClass());
288      conf.setPartitionerClass(PipesPartitioner.class);
289    }
290    if (!getIsJavaReducer(conf)) {
291      conf.setReducerClass(PipesReducer.class);
292      if (!getIsJavaRecordWriter(conf)) {
293        conf.setOutputFormat(NullOutputFormat.class);
294      }
295    }
296    String textClassname = Text.class.getName();
297    setIfUnset(conf, MRJobConfig.MAP_OUTPUT_KEY_CLASS, textClassname);
298    setIfUnset(conf, MRJobConfig.MAP_OUTPUT_VALUE_CLASS, textClassname);
299    setIfUnset(conf, MRJobConfig.OUTPUT_KEY_CLASS, textClassname);
300    setIfUnset(conf, MRJobConfig.OUTPUT_VALUE_CLASS, textClassname);
301    
302    // Use PipesNonJavaInputFormat if necessary to handle progress reporting
303    // from C++ RecordReaders ...
304    if (!getIsJavaRecordReader(conf) && !getIsJavaMapper(conf)) {
305      conf.setClass(Submitter.INPUT_FORMAT, 
306                    conf.getInputFormat().getClass(), InputFormat.class);
307      conf.setInputFormat(PipesNonJavaInputFormat.class);
308    }
309    
310    String exec = getExecutable(conf);
311    if (exec == null) {
312      throw new IllegalArgumentException("No application program defined.");
313    }
314    // add default debug script only when executable is expressed as
315    // <path>#<executable>
316    if (exec.contains("#")) {
317      // set default gdb commands for map and reduce task 
318      String defScript = "$HADOOP_PREFIX/src/c++/pipes/debug/pipes-default-script";
319      setIfUnset(conf, MRJobConfig.MAP_DEBUG_SCRIPT,defScript);
320      setIfUnset(conf, MRJobConfig.REDUCE_DEBUG_SCRIPT,defScript);
321    }
322    URI[] fileCache = DistributedCache.getCacheFiles(conf);
323    if (fileCache == null) {
324      fileCache = new URI[1];
325    } else {
326      URI[] tmp = new URI[fileCache.length+1];
327      System.arraycopy(fileCache, 0, tmp, 1, fileCache.length);
328      fileCache = tmp;
329    }
330    try {
331      fileCache[0] = new URI(exec);
332    } catch (URISyntaxException e) {
333      IOException ie = new IOException("Problem parsing execable URI " + exec);
334      ie.initCause(e);
335      throw ie;
336    }
337    DistributedCache.setCacheFiles(fileCache, conf);
338  }
339
340  /**
341   * A command line parser for the CLI-based Pipes job submitter.
342   */
343  static class CommandLineParser {
344    private Options options = new Options();
345    
346    void addOption(String longName, boolean required, String description, 
347                   String paramName) {
348      Option option = OptionBuilder.withArgName(paramName).hasArgs(1).withDescription(description).isRequired(required).create(longName);
349      options.addOption(option);
350    }
351    
352    void addArgument(String name, boolean required, String description) {
353      Option option = OptionBuilder.withArgName(name).hasArgs(1).withDescription(description).isRequired(required).create();
354      options.addOption(option);
355
356    }
357
358    Parser createParser() {
359      Parser result = new BasicParser();
360      return result;
361    }
362    
363    void printUsage() {
364      // The CLI package should do this for us, but I can't figure out how
365      // to make it print something reasonable.
366      System.out.println("bin/hadoop pipes");
367      System.out.println("  [-input <path>] // Input directory");
368      System.out.println("  [-output <path>] // Output directory");
369      System.out.println("  [-jar <jar file> // jar filename");
370      System.out.println("  [-inputformat <class>] // InputFormat class");
371      System.out.println("  [-map <class>] // Java Map class");
372      System.out.println("  [-partitioner <class>] // Java Partitioner");
373      System.out.println("  [-reduce <class>] // Java Reduce class");
374      System.out.println("  [-writer <class>] // Java RecordWriter");
375      System.out.println("  [-program <executable>] // executable URI");
376      System.out.println("  [-reduces <num>] // number of reduces");
377      System.out.println("  [-lazyOutput <true/false>] // createOutputLazily");
378      System.out.println();
379      GenericOptionsParser.printGenericCommandUsage(System.out);
380    }
381  }
382  
383  private static <InterfaceType> 
384  Class<? extends InterfaceType> getClass(CommandLine cl, String key, 
385                                          JobConf conf, 
386                                          Class<InterfaceType> cls
387                                         ) throws ClassNotFoundException {
388    return conf.getClassByName(cl.getOptionValue(key)).asSubclass(cls);
389  }
390
391  @Override
392  public int run(String[] args) throws Exception {
393    CommandLineParser cli = new CommandLineParser();
394    if (args.length == 0) {
395      cli.printUsage();
396      return 1;
397    }
398    cli.addOption("input", false, "input path to the maps", "path");
399    cli.addOption("output", false, "output path from the reduces", "path");
400    
401    cli.addOption("jar", false, "job jar file", "path");
402    cli.addOption("inputformat", false, "java classname of InputFormat", 
403                  "class");
404    //cli.addArgument("javareader", false, "is the RecordReader in Java");
405    cli.addOption("map", false, "java classname of Mapper", "class");
406    cli.addOption("partitioner", false, "java classname of Partitioner", 
407                  "class");
408    cli.addOption("reduce", false, "java classname of Reducer", "class");
409    cli.addOption("writer", false, "java classname of OutputFormat", "class");
410    cli.addOption("program", false, "URI to application executable", "class");
411    cli.addOption("reduces", false, "number of reduces", "num");
412    cli.addOption("jobconf", false, 
413        "\"n1=v1,n2=v2,..\" (Deprecated) Optional. Add or override a JobConf property.",
414        "key=val");
415    cli.addOption("lazyOutput", false, "Optional. Create output lazily",
416                  "boolean");
417    Parser parser = cli.createParser();
418    try {
419      
420      GenericOptionsParser genericParser = new GenericOptionsParser(getConf(), args);
421      CommandLine results = parser.parse(cli.options, genericParser.getRemainingArgs());
422      
423      JobConf job = new JobConf(getConf());
424      
425      if (results.hasOption("input")) {
426        FileInputFormat.setInputPaths(job, results.getOptionValue("input"));
427      }
428      if (results.hasOption("output")) {
429        FileOutputFormat.setOutputPath(job, 
430          new Path(results.getOptionValue("output")));
431      }
432      if (results.hasOption("jar")) {
433        job.setJar(results.getOptionValue("jar"));
434      }
435      if (results.hasOption("inputformat")) {
436        setIsJavaRecordReader(job, true);
437        job.setInputFormat(getClass(results, "inputformat", job,
438                                     InputFormat.class));
439      }
440      if (results.hasOption("javareader")) {
441        setIsJavaRecordReader(job, true);
442      }
443      if (results.hasOption("map")) {
444        setIsJavaMapper(job, true);
445        job.setMapperClass(getClass(results, "map", job, Mapper.class));
446      }
447      if (results.hasOption("partitioner")) {
448        job.setPartitionerClass(getClass(results, "partitioner", job,
449                                          Partitioner.class));
450      }
451      if (results.hasOption("reduce")) {
452        setIsJavaReducer(job, true);
453        job.setReducerClass(getClass(results, "reduce", job, Reducer.class));
454      }
455      if (results.hasOption("reduces")) {
456        job.setNumReduceTasks(Integer.parseInt( 
457                                           results.getOptionValue("reduces")));
458      }
459      if (results.hasOption("writer")) {
460        setIsJavaRecordWriter(job, true);
461        job.setOutputFormat(getClass(results, "writer", job, 
462                                      OutputFormat.class));
463      }
464      
465      if (results.hasOption("lazyOutput")) {
466        if (Boolean.parseBoolean(results.getOptionValue("lazyOutput"))) {
467          LazyOutputFormat.setOutputFormatClass(job,
468              job.getOutputFormat().getClass());
469        }
470      }
471      
472      if (results.hasOption("program")) {
473        setExecutable(job, results.getOptionValue("program"));
474      }
475      if (results.hasOption("jobconf")) {
476        LOG.warn("-jobconf option is deprecated, please use -D instead.");
477        String options = results.getOptionValue("jobconf");
478        StringTokenizer tokenizer = new StringTokenizer(options, ",");
479        while (tokenizer.hasMoreTokens()) {
480          String keyVal = tokenizer.nextToken().trim();
481          String[] keyValSplit = keyVal.split("=");
482          job.set(keyValSplit[0], keyValSplit[1]);
483        }
484      }
485      // if they gave us a jar file, include it into the class path
486      String jarFile = job.getJar();
487      if (jarFile != null) {
488        final URL[] urls = new URL[]{ FileSystem.getLocal(job).
489            pathToFile(new Path(jarFile)).toURL()};
490        //FindBugs complains that creating a URLClassLoader should be
491        //in a doPrivileged() block. 
492        ClassLoader loader =
493          AccessController.doPrivileged(
494              new PrivilegedAction<ClassLoader>() {
495                public ClassLoader run() {
496                  return new URLClassLoader(urls);
497                }
498              }
499            );
500        job.setClassLoader(loader);
501      }
502      
503      runJob(job);
504      return 0;
505    } catch (ParseException pe) {
506      LOG.info("Error : " + pe);
507      cli.printUsage();
508      return 1;
509    }
510    
511  }
512  
513  /**
514   * Submit a pipes job based on the command line arguments.
515   * @param args
516   */
517  public static void main(String[] args) throws Exception {
518    int exitCode =  new Submitter().run(args);
519    ExitUtil.terminate(exitCode);
520  }
521
522}