001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.tools; 020 021import java.io.IOException; 022import java.util.Random; 023 024import org.apache.commons.logging.Log; 025import org.apache.commons.logging.LogFactory; 026import org.apache.hadoop.classification.InterfaceAudience; 027import org.apache.hadoop.classification.InterfaceStability; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.conf.Configured; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.io.Text; 033import org.apache.hadoop.mapreduce.Cluster; 034import org.apache.hadoop.mapreduce.Job; 035import org.apache.hadoop.mapreduce.JobContext; 036import org.apache.hadoop.mapreduce.JobSubmissionFiles; 037import org.apache.hadoop.tools.CopyListing.*; 038import org.apache.hadoop.tools.mapred.CopyMapper; 039import org.apache.hadoop.tools.mapred.CopyOutputFormat; 040import org.apache.hadoop.tools.util.DistCpUtils; 041import org.apache.hadoop.util.ShutdownHookManager; 042import org.apache.hadoop.util.Tool; 043import org.apache.hadoop.util.ToolRunner; 044 045import com.google.common.annotations.VisibleForTesting; 046 047/** 048 * DistCp is the main driver-class for DistCpV2. 049 * For command-line use, DistCp::main() orchestrates the parsing of command-line 050 * parameters and the launch of the DistCp job. 051 * For programmatic use, a DistCp object can be constructed by specifying 052 * options (in a DistCpOptions object), and DistCp::execute() may be used to 053 * launch the copy-job. DistCp may alternatively be sub-classed to fine-tune 054 * behaviour. 055 */ 056@InterfaceAudience.Public 057@InterfaceStability.Evolving 058public class DistCp extends Configured implements Tool { 059 060 /** 061 * Priority of the shutdown hook. 062 */ 063 static final int SHUTDOWN_HOOK_PRIORITY = 30; 064 065 static final Log LOG = LogFactory.getLog(DistCp.class); 066 067 private DistCpOptions inputOptions; 068 private Path metaFolder; 069 070 private static final String PREFIX = "_distcp"; 071 private static final String WIP_PREFIX = "._WIP_"; 072 private static final String DISTCP_DEFAULT_XML = "distcp-default.xml"; 073 static final Random rand = new Random(); 074 075 private boolean submitted; 076 private FileSystem jobFS; 077 078 /** 079 * Public Constructor. Creates DistCp object with specified input-parameters. 080 * (E.g. source-paths, target-location, etc.) 081 * @param inputOptions Options (indicating source-paths, target-location.) 082 * @param configuration The Hadoop configuration against which the Copy-mapper must run. 083 * @throws Exception 084 */ 085 public DistCp(Configuration configuration, DistCpOptions inputOptions) throws Exception { 086 Configuration config = new Configuration(configuration); 087 config.addResource(DISTCP_DEFAULT_XML); 088 setConf(config); 089 this.inputOptions = inputOptions; 090 this.metaFolder = createMetaFolderPath(); 091 } 092 093 /** 094 * To be used with the ToolRunner. Not for public consumption. 095 */ 096 @VisibleForTesting 097 DistCp() {} 098 099 /** 100 * Implementation of Tool::run(). Orchestrates the copy of source file(s) 101 * to target location, by: 102 * 1. Creating a list of files to be copied to target. 103 * 2. Launching a Map-only job to copy the files. (Delegates to execute().) 104 * @param argv List of arguments passed to DistCp, from the ToolRunner. 105 * @return On success, it returns 0. Else, -1. 106 */ 107 @Override 108 public int run(String[] argv) { 109 if (argv.length < 1) { 110 OptionsParser.usage(); 111 return DistCpConstants.INVALID_ARGUMENT; 112 } 113 114 try { 115 inputOptions = (OptionsParser.parse(argv)); 116 setTargetPathExists(); 117 LOG.info("Input Options: " + inputOptions); 118 } catch (Throwable e) { 119 LOG.error("Invalid arguments: ", e); 120 System.err.println("Invalid arguments: " + e.getMessage()); 121 OptionsParser.usage(); 122 return DistCpConstants.INVALID_ARGUMENT; 123 } 124 125 try { 126 execute(); 127 } catch (InvalidInputException e) { 128 LOG.error("Invalid input: ", e); 129 return DistCpConstants.INVALID_ARGUMENT; 130 } catch (DuplicateFileException e) { 131 LOG.error("Duplicate files in input path: ", e); 132 return DistCpConstants.DUPLICATE_INPUT; 133 } catch (AclsNotSupportedException e) { 134 LOG.error("ACLs not supported on at least one file system: ", e); 135 return DistCpConstants.ACLS_NOT_SUPPORTED; 136 } catch (XAttrsNotSupportedException e) { 137 LOG.error("XAttrs not supported on at least one file system: ", e); 138 return DistCpConstants.XATTRS_NOT_SUPPORTED; 139 } catch (Exception e) { 140 LOG.error("Exception encountered ", e); 141 return DistCpConstants.UNKNOWN_ERROR; 142 } 143 return DistCpConstants.SUCCESS; 144 } 145 146 /** 147 * Implements the core-execution. Creates the file-list for copy, 148 * and launches the Hadoop-job, to do the copy. 149 * @return Job handle 150 * @throws Exception 151 */ 152 public Job execute() throws Exception { 153 Job job = createAndSubmitJob(); 154 155 if (inputOptions.shouldBlock()) { 156 waitForJobCompletion(job); 157 } 158 return job; 159 } 160 161 /** 162 * Create and submit the mapreduce job. 163 * @return The mapreduce job object that has been submitted 164 */ 165 public Job createAndSubmitJob() throws Exception { 166 assert inputOptions != null; 167 assert getConf() != null; 168 Job job = null; 169 try { 170 synchronized(this) { 171 //Don't cleanup while we are setting up. 172 metaFolder = createMetaFolderPath(); 173 jobFS = metaFolder.getFileSystem(getConf()); 174 job = createJob(); 175 } 176 if (inputOptions.shouldUseDiff()) { 177 if (!DistCpSync.sync(inputOptions, getConf())) { 178 inputOptions.disableUsingDiff(); 179 } 180 } 181 createInputFileListing(job); 182 183 job.submit(); 184 submitted = true; 185 } finally { 186 if (!submitted) { 187 cleanup(); 188 } 189 } 190 191 String jobID = job.getJobID().toString(); 192 job.getConfiguration().set(DistCpConstants.CONF_LABEL_DISTCP_JOB_ID, jobID); 193 LOG.info("DistCp job-id: " + jobID); 194 195 return job; 196 } 197 198 /** 199 * Wait for the given job to complete. 200 * @param job the given mapreduce job that has already been submitted 201 */ 202 public void waitForJobCompletion(Job job) throws Exception { 203 assert job != null; 204 if (!job.waitForCompletion(true)) { 205 throw new IOException("DistCp failure: Job " + job.getJobID() 206 + " has failed: " + job.getStatus().getFailureInfo()); 207 } 208 } 209 210 /** 211 * Set targetPathExists in both inputOptions and job config, 212 * for the benefit of CopyCommitter 213 */ 214 private void setTargetPathExists() throws IOException { 215 Path target = inputOptions.getTargetPath(); 216 FileSystem targetFS = target.getFileSystem(getConf()); 217 boolean targetExists = targetFS.exists(target); 218 inputOptions.setTargetPathExists(targetExists); 219 getConf().setBoolean(DistCpConstants.CONF_LABEL_TARGET_PATH_EXISTS, 220 targetExists); 221 } 222 /** 223 * Create Job object for submitting it, with all the configuration 224 * 225 * @return Reference to job object. 226 * @throws IOException - Exception if any 227 */ 228 private Job createJob() throws IOException { 229 String jobName = "distcp"; 230 String userChosenName = getConf().get(JobContext.JOB_NAME); 231 if (userChosenName != null) 232 jobName += ": " + userChosenName; 233 Job job = Job.getInstance(getConf()); 234 job.setJobName(jobName); 235 job.setInputFormatClass(DistCpUtils.getStrategy(getConf(), inputOptions)); 236 job.setJarByClass(CopyMapper.class); 237 configureOutputFormat(job); 238 239 job.setMapperClass(CopyMapper.class); 240 job.setNumReduceTasks(0); 241 job.setMapOutputKeyClass(Text.class); 242 job.setMapOutputValueClass(Text.class); 243 job.setOutputFormatClass(CopyOutputFormat.class); 244 job.getConfiguration().set(JobContext.MAP_SPECULATIVE, "false"); 245 job.getConfiguration().set(JobContext.NUM_MAPS, 246 String.valueOf(inputOptions.getMaxMaps())); 247 248 if (inputOptions.getSslConfigurationFile() != null) { 249 setupSSLConfig(job); 250 } 251 252 inputOptions.appendToConf(job.getConfiguration()); 253 return job; 254 } 255 256 /** 257 * Setup ssl configuration on the job configuration to enable hsftp access 258 * from map job. Also copy the ssl configuration file to Distributed cache 259 * 260 * @param job - Reference to job's handle 261 * @throws java.io.IOException - Exception if unable to locate ssl config file 262 */ 263 private void setupSSLConfig(Job job) throws IOException { 264 Configuration configuration = job.getConfiguration(); 265 Path sslConfigPath = new Path(configuration. 266 getResource(inputOptions.getSslConfigurationFile()).toString()); 267 268 addSSLFilesToDistCache(job, sslConfigPath); 269 configuration.set(DistCpConstants.CONF_LABEL_SSL_CONF, sslConfigPath.getName()); 270 configuration.set(DistCpConstants.CONF_LABEL_SSL_KEYSTORE, sslConfigPath.getName()); 271 } 272 273 /** 274 * Add SSL files to distributed cache. Trust store, key store and ssl config xml 275 * 276 * @param job - Job handle 277 * @param sslConfigPath - ssl Configuration file specified through options 278 * @throws IOException - If any 279 */ 280 private void addSSLFilesToDistCache(Job job, 281 Path sslConfigPath) throws IOException { 282 Configuration configuration = job.getConfiguration(); 283 FileSystem localFS = FileSystem.getLocal(configuration); 284 285 Configuration sslConf = new Configuration(false); 286 sslConf.addResource(sslConfigPath); 287 288 Path localStorePath = getLocalStorePath(sslConf, 289 DistCpConstants.CONF_LABEL_SSL_TRUST_STORE_LOCATION); 290 job.addCacheFile(localStorePath.makeQualified(localFS.getUri(), 291 localFS.getWorkingDirectory()).toUri()); 292 configuration.set(DistCpConstants.CONF_LABEL_SSL_TRUST_STORE_LOCATION, 293 localStorePath.getName()); 294 295 localStorePath = getLocalStorePath(sslConf, 296 DistCpConstants.CONF_LABEL_SSL_KEY_STORE_LOCATION); 297 job.addCacheFile(localStorePath.makeQualified(localFS.getUri(), 298 localFS.getWorkingDirectory()).toUri()); 299 configuration.set(DistCpConstants.CONF_LABEL_SSL_KEY_STORE_LOCATION, 300 localStorePath.getName()); 301 302 job.addCacheFile(sslConfigPath.makeQualified(localFS.getUri(), 303 localFS.getWorkingDirectory()).toUri()); 304 305 } 306 307 /** 308 * Get Local Trust store/key store path 309 * 310 * @param sslConf - Config from SSL Client xml 311 * @param storeKey - Key for either trust store or key store 312 * @return - Path where the store is present 313 * @throws IOException -If any 314 */ 315 private Path getLocalStorePath(Configuration sslConf, String storeKey) throws IOException { 316 if (sslConf.get(storeKey) != null) { 317 return new Path(sslConf.get(storeKey)); 318 } else { 319 throw new IOException("Store for " + storeKey + " is not set in " + 320 inputOptions.getSslConfigurationFile()); 321 } 322 } 323 324 /** 325 * Setup output format appropriately 326 * 327 * @param job - Job handle 328 * @throws IOException - Exception if any 329 */ 330 private void configureOutputFormat(Job job) throws IOException { 331 final Configuration configuration = job.getConfiguration(); 332 Path targetPath = inputOptions.getTargetPath(); 333 FileSystem targetFS = targetPath.getFileSystem(configuration); 334 targetPath = targetPath.makeQualified(targetFS.getUri(), 335 targetFS.getWorkingDirectory()); 336 if (inputOptions.shouldPreserve(DistCpOptions.FileAttribute.ACL)) { 337 DistCpUtils.checkFileSystemAclSupport(targetFS); 338 } 339 if (inputOptions.shouldPreserve(DistCpOptions.FileAttribute.XATTR)) { 340 DistCpUtils.checkFileSystemXAttrSupport(targetFS); 341 } 342 if (inputOptions.shouldAtomicCommit()) { 343 Path workDir = inputOptions.getAtomicWorkPath(); 344 if (workDir == null) { 345 workDir = targetPath.getParent(); 346 } 347 workDir = new Path(workDir, WIP_PREFIX + targetPath.getName() 348 + rand.nextInt()); 349 FileSystem workFS = workDir.getFileSystem(configuration); 350 if (!DistCpUtils.compareFs(targetFS, workFS)) { 351 throw new IllegalArgumentException("Work path " + workDir + 352 " and target path " + targetPath + " are in different file system"); 353 } 354 CopyOutputFormat.setWorkingDirectory(job, workDir); 355 } else { 356 CopyOutputFormat.setWorkingDirectory(job, targetPath); 357 } 358 CopyOutputFormat.setCommitDirectory(job, targetPath); 359 360 Path logPath = inputOptions.getLogPath(); 361 if (logPath == null) { 362 logPath = new Path(metaFolder, "_logs"); 363 } else { 364 LOG.info("DistCp job log path: " + logPath); 365 } 366 CopyOutputFormat.setOutputPath(job, logPath); 367 } 368 369 /** 370 * Create input listing by invoking an appropriate copy listing 371 * implementation. Also add delegation tokens for each path 372 * to job's credential store 373 * 374 * @param job - Handle to job 375 * @return Returns the path where the copy listing is created 376 * @throws IOException - If any 377 */ 378 protected Path createInputFileListing(Job job) throws IOException { 379 Path fileListingPath = getFileListingPath(); 380 CopyListing copyListing = CopyListing.getCopyListing(job.getConfiguration(), 381 job.getCredentials(), inputOptions); 382 copyListing.buildListing(fileListingPath, inputOptions); 383 return fileListingPath; 384 } 385 386 /** 387 * Get default name of the copy listing file. Use the meta folder 388 * to create the copy listing file 389 * 390 * @return - Path where the copy listing file has to be saved 391 * @throws IOException - Exception if any 392 */ 393 protected Path getFileListingPath() throws IOException { 394 String fileListPathStr = metaFolder + "/fileList.seq"; 395 Path path = new Path(fileListPathStr); 396 return new Path(path.toUri().normalize().toString()); 397 } 398 399 /** 400 * Create a default working folder for the job, under the 401 * job staging directory 402 * 403 * @return Returns the working folder information 404 * @throws Exception - EXception if any 405 */ 406 private Path createMetaFolderPath() throws Exception { 407 Configuration configuration = getConf(); 408 Path stagingDir = JobSubmissionFiles.getStagingDir( 409 new Cluster(configuration), configuration); 410 Path metaFolderPath = new Path(stagingDir, PREFIX + String.valueOf(rand.nextInt())); 411 if (LOG.isDebugEnabled()) 412 LOG.debug("Meta folder location: " + metaFolderPath); 413 configuration.set(DistCpConstants.CONF_LABEL_META_FOLDER, metaFolderPath.toString()); 414 return metaFolderPath; 415 } 416 417 /** 418 * Main function of the DistCp program. Parses the input arguments (via OptionsParser), 419 * and invokes the DistCp::run() method, via the ToolRunner. 420 * @param argv Command-line arguments sent to DistCp. 421 */ 422 public static void main(String argv[]) { 423 int exitCode; 424 try { 425 DistCp distCp = new DistCp(); 426 Cleanup CLEANUP = new Cleanup(distCp); 427 428 ShutdownHookManager.get().addShutdownHook(CLEANUP, 429 SHUTDOWN_HOOK_PRIORITY); 430 exitCode = ToolRunner.run(getDefaultConf(), distCp, argv); 431 } 432 catch (Exception e) { 433 LOG.error("Couldn't complete DistCp operation: ", e); 434 exitCode = DistCpConstants.UNKNOWN_ERROR; 435 } 436 System.exit(exitCode); 437 } 438 439 /** 440 * Loads properties from distcp-default.xml into configuration 441 * object 442 * @return Configuration which includes properties from distcp-default.xml 443 */ 444 private static Configuration getDefaultConf() { 445 Configuration config = new Configuration(); 446 config.addResource(DISTCP_DEFAULT_XML); 447 return config; 448 } 449 450 private synchronized void cleanup() { 451 try { 452 if (metaFolder == null) return; 453 454 jobFS.delete(metaFolder, true); 455 metaFolder = null; 456 } catch (IOException e) { 457 LOG.error("Unable to cleanup meta folder: " + metaFolder, e); 458 } 459 } 460 461 private boolean isSubmitted() { 462 return submitted; 463 } 464 465 private static class Cleanup implements Runnable { 466 private final DistCp distCp; 467 468 Cleanup(DistCp distCp) { 469 this.distCp = distCp; 470 } 471 472 @Override 473 public void run() { 474 if (distCp.isSubmitted()) return; 475 476 distCp.cleanup(); 477 } 478 } 479}