001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.tools;
020
021import java.io.IOException;
022import java.net.URL;
023import java.util.Random;
024
025import org.apache.commons.logging.Log;
026import org.apache.commons.logging.LogFactory;
027import org.apache.hadoop.classification.InterfaceAudience;
028import org.apache.hadoop.classification.InterfaceStability;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.conf.Configured;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.FileUtil;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.io.Text;
035import org.apache.hadoop.mapreduce.Cluster;
036import org.apache.hadoop.mapreduce.Job;
037import org.apache.hadoop.mapreduce.JobContext;
038import org.apache.hadoop.mapreduce.JobSubmissionFiles;
039import org.apache.hadoop.tools.CopyListing.*;
040import org.apache.hadoop.tools.mapred.CopyMapper;
041import org.apache.hadoop.tools.mapred.CopyOutputFormat;
042import org.apache.hadoop.tools.util.DistCpUtils;
043import org.apache.hadoop.util.ShutdownHookManager;
044import org.apache.hadoop.util.Tool;
045import org.apache.hadoop.util.ToolRunner;
046
047import com.google.common.annotations.VisibleForTesting;
048
049/**
050 * DistCp is the main driver-class for DistCpV2.
051 * For command-line use, DistCp::main() orchestrates the parsing of command-line
052 * parameters and the launch of the DistCp job.
053 * For programmatic use, a DistCp object can be constructed by specifying
054 * options (in a DistCpOptions object), and DistCp::execute() may be used to
055 * launch the copy-job. DistCp may alternatively be sub-classed to fine-tune
056 * behaviour.
057 */
058@InterfaceAudience.Public
059@InterfaceStability.Evolving
060public class DistCp extends Configured implements Tool {
061
062  /**
063   * Priority of the shutdown hook.
064   */
065  static final int SHUTDOWN_HOOK_PRIORITY = 30;
066
067  static final Log LOG = LogFactory.getLog(DistCp.class);
068
069  private DistCpOptions inputOptions;
070  private Path metaFolder;
071
072  private static final String PREFIX = "_distcp";
073  private static final String WIP_PREFIX = "._WIP_";
074  private static final String DISTCP_DEFAULT_XML = "distcp-default.xml";
075  static final Random rand = new Random();
076
077  private boolean submitted;
078  private FileSystem jobFS;
079
080  /**
081   * Public Constructor. Creates DistCp object with specified input-parameters.
082   * (E.g. source-paths, target-location, etc.)
083   * @param inputOptions Options (indicating source-paths, target-location.)
084   * @param configuration The Hadoop configuration against which the Copy-mapper must run.
085   * @throws Exception
086   */
087  public DistCp(Configuration configuration, DistCpOptions inputOptions) throws Exception {
088    Configuration config = new Configuration(configuration);
089    config.addResource(DISTCP_DEFAULT_XML);
090    setConf(config);
091    this.inputOptions = inputOptions;
092    this.metaFolder   = createMetaFolderPath();
093  }
094
095  /**
096   * To be used with the ToolRunner. Not for public consumption.
097   */
098  @VisibleForTesting
099  DistCp() {}
100
101  /**
102   * Implementation of Tool::run(). Orchestrates the copy of source file(s)
103   * to target location, by:
104   *  1. Creating a list of files to be copied to target.
105   *  2. Launching a Map-only job to copy the files. (Delegates to execute().)
106   * @param argv List of arguments passed to DistCp, from the ToolRunner.
107   * @return On success, it returns 0. Else, -1.
108   */
109  @Override
110  public int run(String[] argv) {
111    if (argv.length < 1) {
112      OptionsParser.usage();
113      return DistCpConstants.INVALID_ARGUMENT;
114    }
115    
116    try {
117      inputOptions = (OptionsParser.parse(argv));
118      setTargetPathExists();
119      LOG.info("Input Options: " + inputOptions);
120    } catch (Throwable e) {
121      LOG.error("Invalid arguments: ", e);
122      System.err.println("Invalid arguments: " + e.getMessage());
123      OptionsParser.usage();      
124      return DistCpConstants.INVALID_ARGUMENT;
125    }
126    
127    try {
128      execute();
129    } catch (InvalidInputException e) {
130      LOG.error("Invalid input: ", e);
131      return DistCpConstants.INVALID_ARGUMENT;
132    } catch (DuplicateFileException e) {
133      LOG.error("Duplicate files in input path: ", e);
134      return DistCpConstants.DUPLICATE_INPUT;
135    } catch (AclsNotSupportedException e) {
136      LOG.error("ACLs not supported on at least one file system: ", e);
137      return DistCpConstants.ACLS_NOT_SUPPORTED;
138    } catch (XAttrsNotSupportedException e) {
139      LOG.error("XAttrs not supported on at least one file system: ", e);
140      return DistCpConstants.XATTRS_NOT_SUPPORTED;
141    } catch (Exception e) {
142      LOG.error("Exception encountered ", e);
143      return DistCpConstants.UNKNOWN_ERROR;
144    }
145    return DistCpConstants.SUCCESS;
146  }
147
148  /**
149   * Implements the core-execution. Creates the file-list for copy,
150   * and launches the Hadoop-job, to do the copy.
151   * @return Job handle
152   * @throws Exception
153   */
154  public Job execute() throws Exception {
155    Job job = createAndSubmitJob();
156
157    if (inputOptions.shouldBlock()) {
158      waitForJobCompletion(job);
159    }
160    return job;
161  }
162
163  /**
164   * Create and submit the mapreduce job.
165   * @return The mapreduce job object that has been submitted
166   */
167  public Job createAndSubmitJob() throws Exception {
168    assert inputOptions != null;
169    assert getConf() != null;
170    Job job = null;
171    try {
172      synchronized(this) {
173        //Don't cleanup while we are setting up.
174        metaFolder = createMetaFolderPath();
175        jobFS = metaFolder.getFileSystem(getConf());
176        job = createJob();
177      }
178      if (inputOptions.shouldUseDiff()) {
179        DistCpSync distCpSync = new DistCpSync(inputOptions, getConf());
180        if (distCpSync.sync()) {
181          createInputFileListingWithDiff(job, distCpSync);
182        } else {
183          throw new Exception("DistCp sync failed, input options: "
184              + inputOptions);
185        }
186      }
187
188      // Fallback to default DistCp if without "diff" option or sync failed.
189      if (!inputOptions.shouldUseDiff()) {
190        createInputFileListing(job);
191      }
192
193      job.submit();
194      submitted = true;
195    } finally {
196      if (!submitted) {
197        cleanup();
198      }
199    }
200
201    String jobID = job.getJobID().toString();
202    job.getConfiguration().set(DistCpConstants.CONF_LABEL_DISTCP_JOB_ID, jobID);
203    LOG.info("DistCp job-id: " + jobID);
204
205    return job;
206  }
207
208  /**
209   * Wait for the given job to complete.
210   * @param job the given mapreduce job that has already been submitted
211   */
212  public void waitForJobCompletion(Job job) throws Exception {
213    assert job != null;
214    if (!job.waitForCompletion(true)) {
215      throw new IOException("DistCp failure: Job " + job.getJobID()
216          + " has failed: " + job.getStatus().getFailureInfo());
217    }
218  }
219
220  /**
221   * Set targetPathExists in both inputOptions and job config,
222   * for the benefit of CopyCommitter
223   */
224  private void setTargetPathExists() throws IOException {
225    Path target = inputOptions.getTargetPath();
226    FileSystem targetFS = target.getFileSystem(getConf());
227    boolean targetExists = targetFS.exists(target);
228    inputOptions.setTargetPathExists(targetExists);
229    getConf().setBoolean(DistCpConstants.CONF_LABEL_TARGET_PATH_EXISTS, 
230        targetExists);
231  }
232  /**
233   * Create Job object for submitting it, with all the configuration
234   *
235   * @return Reference to job object.
236   * @throws IOException - Exception if any
237   */
238  private Job createJob() throws IOException {
239    String jobName = "distcp";
240    String userChosenName = getConf().get(JobContext.JOB_NAME);
241    if (userChosenName != null)
242      jobName += ": " + userChosenName;
243    Job job = Job.getInstance(getConf());
244    job.setJobName(jobName);
245    job.setInputFormatClass(DistCpUtils.getStrategy(getConf(), inputOptions));
246    job.setJarByClass(CopyMapper.class);
247    configureOutputFormat(job);
248
249    job.setMapperClass(CopyMapper.class);
250    job.setNumReduceTasks(0);
251    job.setMapOutputKeyClass(Text.class);
252    job.setMapOutputValueClass(Text.class);
253    job.setOutputFormatClass(CopyOutputFormat.class);
254    job.getConfiguration().set(JobContext.MAP_SPECULATIVE, "false");
255    job.getConfiguration().set(JobContext.NUM_MAPS,
256                  String.valueOf(inputOptions.getMaxMaps()));
257
258    if (inputOptions.getSslConfigurationFile() != null) {
259      setupSSLConfig(job);
260    }
261
262    inputOptions.appendToConf(job.getConfiguration());
263    return job;
264  }
265
266  /**
267   * Setup ssl configuration on the job configuration to enable hsftp access
268   * from map job. Also copy the ssl configuration file to Distributed cache
269   *
270   * @param job - Reference to job's handle
271   * @throws java.io.IOException - Exception if unable to locate ssl config file
272   */
273  private void setupSSLConfig(Job job) throws IOException  {
274    Configuration configuration = job.getConfiguration();
275    URL sslFileUrl = configuration.getResource(inputOptions
276        .getSslConfigurationFile());
277    if (sslFileUrl == null) {
278      throw new IOException(
279          "Given ssl configuration file doesn't exist in class path : "
280              + inputOptions.getSslConfigurationFile());
281    }
282    Path sslConfigPath = new Path(sslFileUrl.toString());
283
284    addSSLFilesToDistCache(job, sslConfigPath);
285    configuration.set(DistCpConstants.CONF_LABEL_SSL_CONF, sslConfigPath.getName());
286    configuration.set(DistCpConstants.CONF_LABEL_SSL_KEYSTORE, sslConfigPath.getName());
287  }
288
289  /**
290   * Add SSL files to distributed cache. Trust store, key store and ssl config xml
291   *
292   * @param job - Job handle
293   * @param sslConfigPath - ssl Configuration file specified through options
294   * @throws IOException - If any
295   */
296  private void addSSLFilesToDistCache(Job job,
297                                      Path sslConfigPath) throws IOException {
298    Configuration configuration = job.getConfiguration();
299    FileSystem localFS = FileSystem.getLocal(configuration);
300
301    Configuration sslConf = new Configuration(false);
302    sslConf.addResource(sslConfigPath);
303
304    Path localStorePath = getLocalStorePath(sslConf,
305                            DistCpConstants.CONF_LABEL_SSL_TRUST_STORE_LOCATION);
306    job.addCacheFile(localStorePath.makeQualified(localFS.getUri(),
307                                      localFS.getWorkingDirectory()).toUri());
308    configuration.set(DistCpConstants.CONF_LABEL_SSL_TRUST_STORE_LOCATION,
309                      localStorePath.getName());
310
311    localStorePath = getLocalStorePath(sslConf,
312                             DistCpConstants.CONF_LABEL_SSL_KEY_STORE_LOCATION);
313    job.addCacheFile(localStorePath.makeQualified(localFS.getUri(),
314                                      localFS.getWorkingDirectory()).toUri());
315    configuration.set(DistCpConstants.CONF_LABEL_SSL_KEY_STORE_LOCATION,
316                                      localStorePath.getName());
317
318    job.addCacheFile(sslConfigPath.makeQualified(localFS.getUri(),
319                                      localFS.getWorkingDirectory()).toUri());
320
321  }
322
323  /**
324   * Get Local Trust store/key store path
325   *
326   * @param sslConf - Config from SSL Client xml
327   * @param storeKey - Key for either trust store or key store
328   * @return - Path where the store is present
329   * @throws IOException -If any
330   */
331  private Path getLocalStorePath(Configuration sslConf, String storeKey) throws IOException {
332    if (sslConf.get(storeKey) != null) {
333      return new Path(sslConf.get(storeKey));
334    } else {
335      throw new IOException("Store for " + storeKey + " is not set in " +
336          inputOptions.getSslConfigurationFile());
337    }
338  }
339
340  /**
341   * Setup output format appropriately
342   *
343   * @param job - Job handle
344   * @throws IOException - Exception if any
345   */
346  private void configureOutputFormat(Job job) throws IOException {
347    final Configuration configuration = job.getConfiguration();
348    Path targetPath = inputOptions.getTargetPath();
349    FileSystem targetFS = targetPath.getFileSystem(configuration);
350    targetPath = targetPath.makeQualified(targetFS.getUri(),
351                                          targetFS.getWorkingDirectory());
352    if (inputOptions.shouldPreserve(DistCpOptions.FileAttribute.ACL)) {
353      DistCpUtils.checkFileSystemAclSupport(targetFS);
354    }
355    if (inputOptions.shouldPreserve(DistCpOptions.FileAttribute.XATTR)) {
356      DistCpUtils.checkFileSystemXAttrSupport(targetFS);
357    }
358    if (inputOptions.shouldAtomicCommit()) {
359      Path workDir = inputOptions.getAtomicWorkPath();
360      if (workDir == null) {
361        workDir = targetPath.getParent();
362      }
363      workDir = new Path(workDir, WIP_PREFIX + targetPath.getName()
364                                + rand.nextInt());
365      FileSystem workFS = workDir.getFileSystem(configuration);
366      if (!FileUtil.compareFs(targetFS, workFS)) {
367        throw new IllegalArgumentException("Work path " + workDir +
368            " and target path " + targetPath + " are in different file system");
369      }
370      CopyOutputFormat.setWorkingDirectory(job, workDir);
371    } else {
372      CopyOutputFormat.setWorkingDirectory(job, targetPath);
373    }
374    CopyOutputFormat.setCommitDirectory(job, targetPath);
375
376    Path logPath = inputOptions.getLogPath();
377    if (logPath == null) {
378      logPath = new Path(metaFolder, "_logs");
379    } else {
380      LOG.info("DistCp job log path: " + logPath);
381    }
382    CopyOutputFormat.setOutputPath(job, logPath);
383  }
384
385  /**
386   * Create input listing by invoking an appropriate copy listing
387   * implementation. Also add delegation tokens for each path
388   * to job's credential store
389   *
390   * @param job - Handle to job
391   * @return Returns the path where the copy listing is created
392   * @throws IOException - If any
393   */
394  protected Path createInputFileListing(Job job) throws IOException {
395    Path fileListingPath = getFileListingPath();
396    CopyListing copyListing = CopyListing.getCopyListing(job.getConfiguration(),
397        job.getCredentials(), inputOptions);
398    copyListing.buildListing(fileListingPath, inputOptions);
399    return fileListingPath;
400  }
401
402  /**
403   * Create input listing based on snapshot diff report.
404   * @param job - Handle to job
405   * @param distCpSync the class wraps the snapshot diff report
406   * @return Returns the path where the copy listing is created
407   * @throws IOException - If any
408   */
409  private Path createInputFileListingWithDiff(Job job, DistCpSync distCpSync)
410      throws IOException {
411    Path fileListingPath = getFileListingPath();
412    CopyListing copyListing = new SimpleCopyListing(job.getConfiguration(),
413        job.getCredentials(), distCpSync);
414    copyListing.buildListing(fileListingPath, inputOptions);
415    return fileListingPath;
416  }
417
418  /**
419   * Get default name of the copy listing file. Use the meta folder
420   * to create the copy listing file
421   *
422   * @return - Path where the copy listing file has to be saved
423   * @throws IOException - Exception if any
424   */
425  protected Path getFileListingPath() throws IOException {
426    String fileListPathStr = metaFolder + "/fileList.seq";
427    Path path = new Path(fileListPathStr);
428    return new Path(path.toUri().normalize().toString());
429  }
430
431  /**
432   * Create a default working folder for the job, under the
433   * job staging directory
434   *
435   * @return Returns the working folder information
436   * @throws Exception - Exception if any
437   */
438  private Path createMetaFolderPath() throws Exception {
439    Configuration configuration = getConf();
440    Path stagingDir = JobSubmissionFiles.getStagingDir(
441            new Cluster(configuration), configuration);
442    Path metaFolderPath = new Path(stagingDir, PREFIX + String.valueOf(rand.nextInt()));
443    if (LOG.isDebugEnabled())
444      LOG.debug("Meta folder location: " + metaFolderPath);
445    configuration.set(DistCpConstants.CONF_LABEL_META_FOLDER, metaFolderPath.toString());    
446    return metaFolderPath;
447  }
448
449  /**
450   * Main function of the DistCp program. Parses the input arguments (via OptionsParser),
451   * and invokes the DistCp::run() method, via the ToolRunner.
452   * @param argv Command-line arguments sent to DistCp.
453   */
454  public static void main(String argv[]) {
455    int exitCode;
456    try {
457      DistCp distCp = new DistCp();
458      Cleanup CLEANUP = new Cleanup(distCp);
459
460      ShutdownHookManager.get().addShutdownHook(CLEANUP,
461        SHUTDOWN_HOOK_PRIORITY);
462      exitCode = ToolRunner.run(getDefaultConf(), distCp, argv);
463    }
464    catch (Exception e) {
465      LOG.error("Couldn't complete DistCp operation: ", e);
466      exitCode = DistCpConstants.UNKNOWN_ERROR;
467    }
468    System.exit(exitCode);
469  }
470
471  /**
472   * Loads properties from distcp-default.xml into configuration
473   * object
474   * @return Configuration which includes properties from distcp-default.xml
475   */
476  private static Configuration getDefaultConf() {
477    Configuration config = new Configuration();
478    config.addResource(DISTCP_DEFAULT_XML);
479    return config;
480  }
481
482  private synchronized void cleanup() {
483    try {
484      if (metaFolder == null) return;
485
486      jobFS.delete(metaFolder, true);
487      metaFolder = null;
488    } catch (IOException e) {
489      LOG.error("Unable to cleanup meta folder: " + metaFolder, e);
490    }
491  }
492
493  private boolean isSubmitted() {
494    return submitted;
495  }
496
497  private static class Cleanup implements Runnable {
498    private final DistCp distCp;
499
500    Cleanup(DistCp distCp) {
501      this.distCp = distCp;
502    }
503
504    @Override
505    public void run() {
506      if (distCp.isSubmitted()) return;
507
508      distCp.cleanup();
509    }
510  }
511}