001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.tools;
020
021import java.io.IOException;
022import java.util.Random;
023
024import org.apache.commons.logging.Log;
025import org.apache.commons.logging.LogFactory;
026import org.apache.hadoop.classification.InterfaceAudience;
027import org.apache.hadoop.classification.InterfaceStability;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.conf.Configured;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.io.Text;
033import org.apache.hadoop.mapreduce.Cluster;
034import org.apache.hadoop.mapreduce.Job;
035import org.apache.hadoop.mapreduce.JobContext;
036import org.apache.hadoop.mapreduce.JobSubmissionFiles;
037import org.apache.hadoop.tools.CopyListing.*;
038import org.apache.hadoop.tools.mapred.CopyMapper;
039import org.apache.hadoop.tools.mapred.CopyOutputFormat;
040import org.apache.hadoop.tools.util.DistCpUtils;
041import org.apache.hadoop.util.ShutdownHookManager;
042import org.apache.hadoop.util.Tool;
043import org.apache.hadoop.util.ToolRunner;
044
045import com.google.common.annotations.VisibleForTesting;
046
047/**
048 * DistCp is the main driver-class for DistCpV2.
049 * For command-line use, DistCp::main() orchestrates the parsing of command-line
050 * parameters and the launch of the DistCp job.
051 * For programmatic use, a DistCp object can be constructed by specifying
052 * options (in a DistCpOptions object), and DistCp::execute() may be used to
053 * launch the copy-job. DistCp may alternatively be sub-classed to fine-tune
054 * behaviour.
055 */
056@InterfaceAudience.Public
057@InterfaceStability.Evolving
058public class DistCp extends Configured implements Tool {
059
060  /**
061   * Priority of the shutdown hook.
062   */
063  static final int SHUTDOWN_HOOK_PRIORITY = 30;
064
065  static final Log LOG = LogFactory.getLog(DistCp.class);
066
067  private DistCpOptions inputOptions;
068  private Path metaFolder;
069
070  private static final String PREFIX = "_distcp";
071  private static final String WIP_PREFIX = "._WIP_";
072  private static final String DISTCP_DEFAULT_XML = "distcp-default.xml";
073  static final Random rand = new Random();
074
075  private boolean submitted;
076  private FileSystem jobFS;
077
078  /**
079   * Public Constructor. Creates DistCp object with specified input-parameters.
080   * (E.g. source-paths, target-location, etc.)
081   * @param inputOptions Options (indicating source-paths, target-location.)
082   * @param configuration The Hadoop configuration against which the Copy-mapper must run.
083   * @throws Exception
084   */
085  public DistCp(Configuration configuration, DistCpOptions inputOptions) throws Exception {
086    Configuration config = new Configuration(configuration);
087    config.addResource(DISTCP_DEFAULT_XML);
088    setConf(config);
089    this.inputOptions = inputOptions;
090    this.metaFolder   = createMetaFolderPath();
091  }
092
093  /**
094   * To be used with the ToolRunner. Not for public consumption.
095   */
096  @VisibleForTesting
097  DistCp() {}
098
099  /**
100   * Implementation of Tool::run(). Orchestrates the copy of source file(s)
101   * to target location, by:
102   *  1. Creating a list of files to be copied to target.
103   *  2. Launching a Map-only job to copy the files. (Delegates to execute().)
104   * @param argv List of arguments passed to DistCp, from the ToolRunner.
105   * @return On success, it returns 0. Else, -1.
106   */
107  @Override
108  public int run(String[] argv) {
109    if (argv.length < 1) {
110      OptionsParser.usage();
111      return DistCpConstants.INVALID_ARGUMENT;
112    }
113    
114    try {
115      inputOptions = (OptionsParser.parse(argv));
116      setTargetPathExists();
117      LOG.info("Input Options: " + inputOptions);
118    } catch (Throwable e) {
119      LOG.error("Invalid arguments: ", e);
120      System.err.println("Invalid arguments: " + e.getMessage());
121      OptionsParser.usage();      
122      return DistCpConstants.INVALID_ARGUMENT;
123    }
124    
125    try {
126      execute();
127    } catch (InvalidInputException e) {
128      LOG.error("Invalid input: ", e);
129      return DistCpConstants.INVALID_ARGUMENT;
130    } catch (DuplicateFileException e) {
131      LOG.error("Duplicate files in input path: ", e);
132      return DistCpConstants.DUPLICATE_INPUT;
133    } catch (AclsNotSupportedException e) {
134      LOG.error("ACLs not supported on at least one file system: ", e);
135      return DistCpConstants.ACLS_NOT_SUPPORTED;
136    } catch (XAttrsNotSupportedException e) {
137      LOG.error("XAttrs not supported on at least one file system: ", e);
138      return DistCpConstants.XATTRS_NOT_SUPPORTED;
139    } catch (Exception e) {
140      LOG.error("Exception encountered ", e);
141      return DistCpConstants.UNKNOWN_ERROR;
142    }
143    return DistCpConstants.SUCCESS;
144  }
145
146  /**
147   * Implements the core-execution. Creates the file-list for copy,
148   * and launches the Hadoop-job, to do the copy.
149   * @return Job handle
150   * @throws Exception
151   */
152  public Job execute() throws Exception {
153    Job job = createAndSubmitJob();
154
155    if (inputOptions.shouldBlock()) {
156      waitForJobCompletion(job);
157    }
158    return job;
159  }
160
161  /**
162   * Create and submit the mapreduce job.
163   * @return The mapreduce job object that has been submitted
164   */
165  public Job createAndSubmitJob() throws Exception {
166    assert inputOptions != null;
167    assert getConf() != null;
168    Job job = null;
169    try {
170      synchronized(this) {
171        //Don't cleanup while we are setting up.
172        metaFolder = createMetaFolderPath();
173        jobFS = metaFolder.getFileSystem(getConf());
174        job = createJob();
175      }
176      if (inputOptions.shouldUseDiff()) {
177        if (!DistCpSync.sync(inputOptions, getConf())) {
178          inputOptions.disableUsingDiff();
179        }
180      }
181      createInputFileListing(job);
182
183      job.submit();
184      submitted = true;
185    } finally {
186      if (!submitted) {
187        cleanup();
188      }
189    }
190
191    String jobID = job.getJobID().toString();
192    job.getConfiguration().set(DistCpConstants.CONF_LABEL_DISTCP_JOB_ID, jobID);
193    LOG.info("DistCp job-id: " + jobID);
194
195    return job;
196  }
197
198  /**
199   * Wait for the given job to complete.
200   * @param job the given mapreduce job that has already been submitted
201   */
202  public void waitForJobCompletion(Job job) throws Exception {
203    assert job != null;
204    if (!job.waitForCompletion(true)) {
205      throw new IOException("DistCp failure: Job " + job.getJobID()
206          + " has failed: " + job.getStatus().getFailureInfo());
207    }
208  }
209
210  /**
211   * Set targetPathExists in both inputOptions and job config,
212   * for the benefit of CopyCommitter
213   */
214  private void setTargetPathExists() throws IOException {
215    Path target = inputOptions.getTargetPath();
216    FileSystem targetFS = target.getFileSystem(getConf());
217    boolean targetExists = targetFS.exists(target);
218    inputOptions.setTargetPathExists(targetExists);
219    getConf().setBoolean(DistCpConstants.CONF_LABEL_TARGET_PATH_EXISTS, 
220        targetExists);
221  }
222  /**
223   * Create Job object for submitting it, with all the configuration
224   *
225   * @return Reference to job object.
226   * @throws IOException - Exception if any
227   */
228  private Job createJob() throws IOException {
229    String jobName = "distcp";
230    String userChosenName = getConf().get(JobContext.JOB_NAME);
231    if (userChosenName != null)
232      jobName += ": " + userChosenName;
233    Job job = Job.getInstance(getConf());
234    job.setJobName(jobName);
235    job.setInputFormatClass(DistCpUtils.getStrategy(getConf(), inputOptions));
236    job.setJarByClass(CopyMapper.class);
237    configureOutputFormat(job);
238
239    job.setMapperClass(CopyMapper.class);
240    job.setNumReduceTasks(0);
241    job.setMapOutputKeyClass(Text.class);
242    job.setMapOutputValueClass(Text.class);
243    job.setOutputFormatClass(CopyOutputFormat.class);
244    job.getConfiguration().set(JobContext.MAP_SPECULATIVE, "false");
245    job.getConfiguration().set(JobContext.NUM_MAPS,
246                  String.valueOf(inputOptions.getMaxMaps()));
247
248    if (inputOptions.getSslConfigurationFile() != null) {
249      setupSSLConfig(job);
250    }
251
252    inputOptions.appendToConf(job.getConfiguration());
253    return job;
254  }
255
256  /**
257   * Setup ssl configuration on the job configuration to enable hsftp access
258   * from map job. Also copy the ssl configuration file to Distributed cache
259   *
260   * @param job - Reference to job's handle
261   * @throws java.io.IOException - Exception if unable to locate ssl config file
262   */
263  private void setupSSLConfig(Job job) throws IOException  {
264    Configuration configuration = job.getConfiguration();
265    Path sslConfigPath = new Path(configuration.
266        getResource(inputOptions.getSslConfigurationFile()).toString());
267
268    addSSLFilesToDistCache(job, sslConfigPath);
269    configuration.set(DistCpConstants.CONF_LABEL_SSL_CONF, sslConfigPath.getName());
270    configuration.set(DistCpConstants.CONF_LABEL_SSL_KEYSTORE, sslConfigPath.getName());
271  }
272
273  /**
274   * Add SSL files to distributed cache. Trust store, key store and ssl config xml
275   *
276   * @param job - Job handle
277   * @param sslConfigPath - ssl Configuration file specified through options
278   * @throws IOException - If any
279   */
280  private void addSSLFilesToDistCache(Job job,
281                                      Path sslConfigPath) throws IOException {
282    Configuration configuration = job.getConfiguration();
283    FileSystem localFS = FileSystem.getLocal(configuration);
284
285    Configuration sslConf = new Configuration(false);
286    sslConf.addResource(sslConfigPath);
287
288    Path localStorePath = getLocalStorePath(sslConf,
289                            DistCpConstants.CONF_LABEL_SSL_TRUST_STORE_LOCATION);
290    job.addCacheFile(localStorePath.makeQualified(localFS.getUri(),
291                                      localFS.getWorkingDirectory()).toUri());
292    configuration.set(DistCpConstants.CONF_LABEL_SSL_TRUST_STORE_LOCATION,
293                      localStorePath.getName());
294
295    localStorePath = getLocalStorePath(sslConf,
296                             DistCpConstants.CONF_LABEL_SSL_KEY_STORE_LOCATION);
297    job.addCacheFile(localStorePath.makeQualified(localFS.getUri(),
298                                      localFS.getWorkingDirectory()).toUri());
299    configuration.set(DistCpConstants.CONF_LABEL_SSL_KEY_STORE_LOCATION,
300                                      localStorePath.getName());
301
302    job.addCacheFile(sslConfigPath.makeQualified(localFS.getUri(),
303                                      localFS.getWorkingDirectory()).toUri());
304
305  }
306
307  /**
308   * Get Local Trust store/key store path
309   *
310   * @param sslConf - Config from SSL Client xml
311   * @param storeKey - Key for either trust store or key store
312   * @return - Path where the store is present
313   * @throws IOException -If any
314   */
315  private Path getLocalStorePath(Configuration sslConf, String storeKey) throws IOException {
316    if (sslConf.get(storeKey) != null) {
317      return new Path(sslConf.get(storeKey));
318    } else {
319      throw new IOException("Store for " + storeKey + " is not set in " +
320          inputOptions.getSslConfigurationFile());
321    }
322  }
323
324  /**
325   * Setup output format appropriately
326   *
327   * @param job - Job handle
328   * @throws IOException - Exception if any
329   */
330  private void configureOutputFormat(Job job) throws IOException {
331    final Configuration configuration = job.getConfiguration();
332    Path targetPath = inputOptions.getTargetPath();
333    FileSystem targetFS = targetPath.getFileSystem(configuration);
334    targetPath = targetPath.makeQualified(targetFS.getUri(),
335                                          targetFS.getWorkingDirectory());
336    if (inputOptions.shouldPreserve(DistCpOptions.FileAttribute.ACL)) {
337      DistCpUtils.checkFileSystemAclSupport(targetFS);
338    }
339    if (inputOptions.shouldPreserve(DistCpOptions.FileAttribute.XATTR)) {
340      DistCpUtils.checkFileSystemXAttrSupport(targetFS);
341    }
342    if (inputOptions.shouldAtomicCommit()) {
343      Path workDir = inputOptions.getAtomicWorkPath();
344      if (workDir == null) {
345        workDir = targetPath.getParent();
346      }
347      workDir = new Path(workDir, WIP_PREFIX + targetPath.getName()
348                                + rand.nextInt());
349      FileSystem workFS = workDir.getFileSystem(configuration);
350      if (!DistCpUtils.compareFs(targetFS, workFS)) {
351        throw new IllegalArgumentException("Work path " + workDir +
352            " and target path " + targetPath + " are in different file system");
353      }
354      CopyOutputFormat.setWorkingDirectory(job, workDir);
355    } else {
356      CopyOutputFormat.setWorkingDirectory(job, targetPath);
357    }
358    CopyOutputFormat.setCommitDirectory(job, targetPath);
359
360    Path logPath = inputOptions.getLogPath();
361    if (logPath == null) {
362      logPath = new Path(metaFolder, "_logs");
363    } else {
364      LOG.info("DistCp job log path: " + logPath);
365    }
366    CopyOutputFormat.setOutputPath(job, logPath);
367  }
368
369  /**
370   * Create input listing by invoking an appropriate copy listing
371   * implementation. Also add delegation tokens for each path
372   * to job's credential store
373   *
374   * @param job - Handle to job
375   * @return Returns the path where the copy listing is created
376   * @throws IOException - If any
377   */
378  protected Path createInputFileListing(Job job) throws IOException {
379    Path fileListingPath = getFileListingPath();
380    CopyListing copyListing = CopyListing.getCopyListing(job.getConfiguration(),
381        job.getCredentials(), inputOptions);
382    copyListing.buildListing(fileListingPath, inputOptions);
383    return fileListingPath;
384  }
385
386  /**
387   * Get default name of the copy listing file. Use the meta folder
388   * to create the copy listing file
389   *
390   * @return - Path where the copy listing file has to be saved
391   * @throws IOException - Exception if any
392   */
393  protected Path getFileListingPath() throws IOException {
394    String fileListPathStr = metaFolder + "/fileList.seq";
395    Path path = new Path(fileListPathStr);
396    return new Path(path.toUri().normalize().toString());
397  }
398
399  /**
400   * Create a default working folder for the job, under the
401   * job staging directory
402   *
403   * @return Returns the working folder information
404   * @throws Exception - EXception if any
405   */
406  private Path createMetaFolderPath() throws Exception {
407    Configuration configuration = getConf();
408    Path stagingDir = JobSubmissionFiles.getStagingDir(
409            new Cluster(configuration), configuration);
410    Path metaFolderPath = new Path(stagingDir, PREFIX + String.valueOf(rand.nextInt()));
411    if (LOG.isDebugEnabled())
412      LOG.debug("Meta folder location: " + metaFolderPath);
413    configuration.set(DistCpConstants.CONF_LABEL_META_FOLDER, metaFolderPath.toString());    
414    return metaFolderPath;
415  }
416
417  /**
418   * Main function of the DistCp program. Parses the input arguments (via OptionsParser),
419   * and invokes the DistCp::run() method, via the ToolRunner.
420   * @param argv Command-line arguments sent to DistCp.
421   */
422  public static void main(String argv[]) {
423    int exitCode;
424    try {
425      DistCp distCp = new DistCp();
426      Cleanup CLEANUP = new Cleanup(distCp);
427
428      ShutdownHookManager.get().addShutdownHook(CLEANUP,
429        SHUTDOWN_HOOK_PRIORITY);
430      exitCode = ToolRunner.run(getDefaultConf(), distCp, argv);
431    }
432    catch (Exception e) {
433      LOG.error("Couldn't complete DistCp operation: ", e);
434      exitCode = DistCpConstants.UNKNOWN_ERROR;
435    }
436    System.exit(exitCode);
437  }
438
439  /**
440   * Loads properties from distcp-default.xml into configuration
441   * object
442   * @return Configuration which includes properties from distcp-default.xml
443   */
444  private static Configuration getDefaultConf() {
445    Configuration config = new Configuration();
446    config.addResource(DISTCP_DEFAULT_XML);
447    return config;
448  }
449
450  private synchronized void cleanup() {
451    try {
452      if (metaFolder == null) return;
453
454      jobFS.delete(metaFolder, true);
455      metaFolder = null;
456    } catch (IOException e) {
457      LOG.error("Unable to cleanup meta folder: " + metaFolder, e);
458    }
459  }
460
461  private boolean isSubmitted() {
462    return submitted;
463  }
464
465  private static class Cleanup implements Runnable {
466    private final DistCp distCp;
467
468    Cleanup(DistCp distCp) {
469      this.distCp = distCp;
470    }
471
472    @Override
473    public void run() {
474      if (distCp.isSubmitted()) return;
475
476      distCp.cleanup();
477    }
478  }
479}