001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.tools; 020 021import java.io.IOException; 022import java.net.URL; 023import java.util.Random; 024 025import org.apache.commons.logging.Log; 026import org.apache.commons.logging.LogFactory; 027import org.apache.hadoop.classification.InterfaceAudience; 028import org.apache.hadoop.classification.InterfaceStability; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.conf.Configured; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.FileUtil; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.io.Text; 035import org.apache.hadoop.mapreduce.Cluster; 036import org.apache.hadoop.mapreduce.Job; 037import org.apache.hadoop.mapreduce.JobContext; 038import org.apache.hadoop.mapreduce.JobSubmissionFiles; 039import org.apache.hadoop.tools.CopyListing.*; 040import org.apache.hadoop.tools.mapred.CopyMapper; 041import org.apache.hadoop.tools.mapred.CopyOutputFormat; 042import org.apache.hadoop.tools.util.DistCpUtils; 043import org.apache.hadoop.util.ShutdownHookManager; 044import org.apache.hadoop.util.Tool; 045import org.apache.hadoop.util.ToolRunner; 046 047import com.google.common.annotations.VisibleForTesting; 048 049/** 050 * DistCp is the main driver-class for DistCpV2. 051 * For command-line use, DistCp::main() orchestrates the parsing of command-line 052 * parameters and the launch of the DistCp job. 053 * For programmatic use, a DistCp object can be constructed by specifying 054 * options (in a DistCpOptions object), and DistCp::execute() may be used to 055 * launch the copy-job. DistCp may alternatively be sub-classed to fine-tune 056 * behaviour. 057 */ 058@InterfaceAudience.Public 059@InterfaceStability.Evolving 060public class DistCp extends Configured implements Tool { 061 062 /** 063 * Priority of the shutdown hook. 064 */ 065 static final int SHUTDOWN_HOOK_PRIORITY = 30; 066 067 static final Log LOG = LogFactory.getLog(DistCp.class); 068 069 private DistCpOptions inputOptions; 070 private Path metaFolder; 071 072 private static final String PREFIX = "_distcp"; 073 private static final String WIP_PREFIX = "._WIP_"; 074 private static final String DISTCP_DEFAULT_XML = "distcp-default.xml"; 075 static final Random rand = new Random(); 076 077 private boolean submitted; 078 private FileSystem jobFS; 079 080 /** 081 * Public Constructor. Creates DistCp object with specified input-parameters. 082 * (E.g. source-paths, target-location, etc.) 083 * @param inputOptions Options (indicating source-paths, target-location.) 084 * @param configuration The Hadoop configuration against which the Copy-mapper must run. 085 * @throws Exception 086 */ 087 public DistCp(Configuration configuration, DistCpOptions inputOptions) throws Exception { 088 Configuration config = new Configuration(configuration); 089 config.addResource(DISTCP_DEFAULT_XML); 090 setConf(config); 091 this.inputOptions = inputOptions; 092 this.metaFolder = createMetaFolderPath(); 093 } 094 095 /** 096 * To be used with the ToolRunner. Not for public consumption. 097 */ 098 @VisibleForTesting 099 DistCp() {} 100 101 /** 102 * Implementation of Tool::run(). Orchestrates the copy of source file(s) 103 * to target location, by: 104 * 1. Creating a list of files to be copied to target. 105 * 2. Launching a Map-only job to copy the files. (Delegates to execute().) 106 * @param argv List of arguments passed to DistCp, from the ToolRunner. 107 * @return On success, it returns 0. Else, -1. 108 */ 109 @Override 110 public int run(String[] argv) { 111 if (argv.length < 1) { 112 OptionsParser.usage(); 113 return DistCpConstants.INVALID_ARGUMENT; 114 } 115 116 try { 117 inputOptions = (OptionsParser.parse(argv)); 118 setTargetPathExists(); 119 LOG.info("Input Options: " + inputOptions); 120 } catch (Throwable e) { 121 LOG.error("Invalid arguments: ", e); 122 System.err.println("Invalid arguments: " + e.getMessage()); 123 OptionsParser.usage(); 124 return DistCpConstants.INVALID_ARGUMENT; 125 } 126 127 try { 128 execute(); 129 } catch (InvalidInputException e) { 130 LOG.error("Invalid input: ", e); 131 return DistCpConstants.INVALID_ARGUMENT; 132 } catch (DuplicateFileException e) { 133 LOG.error("Duplicate files in input path: ", e); 134 return DistCpConstants.DUPLICATE_INPUT; 135 } catch (AclsNotSupportedException e) { 136 LOG.error("ACLs not supported on at least one file system: ", e); 137 return DistCpConstants.ACLS_NOT_SUPPORTED; 138 } catch (XAttrsNotSupportedException e) { 139 LOG.error("XAttrs not supported on at least one file system: ", e); 140 return DistCpConstants.XATTRS_NOT_SUPPORTED; 141 } catch (Exception e) { 142 LOG.error("Exception encountered ", e); 143 return DistCpConstants.UNKNOWN_ERROR; 144 } 145 return DistCpConstants.SUCCESS; 146 } 147 148 /** 149 * Implements the core-execution. Creates the file-list for copy, 150 * and launches the Hadoop-job, to do the copy. 151 * @return Job handle 152 * @throws Exception 153 */ 154 public Job execute() throws Exception { 155 Job job = createAndSubmitJob(); 156 157 if (inputOptions.shouldBlock()) { 158 waitForJobCompletion(job); 159 } 160 return job; 161 } 162 163 /** 164 * Create and submit the mapreduce job. 165 * @return The mapreduce job object that has been submitted 166 */ 167 public Job createAndSubmitJob() throws Exception { 168 assert inputOptions != null; 169 assert getConf() != null; 170 Job job = null; 171 try { 172 synchronized(this) { 173 //Don't cleanup while we are setting up. 174 metaFolder = createMetaFolderPath(); 175 jobFS = metaFolder.getFileSystem(getConf()); 176 job = createJob(); 177 } 178 if (inputOptions.shouldUseDiff()) { 179 DistCpSync distCpSync = new DistCpSync(inputOptions, getConf()); 180 if (distCpSync.sync()) { 181 createInputFileListingWithDiff(job, distCpSync); 182 } else { 183 throw new Exception("DistCp sync failed, input options: " 184 + inputOptions); 185 } 186 } 187 188 // Fallback to default DistCp if without "diff" option or sync failed. 189 if (!inputOptions.shouldUseDiff()) { 190 createInputFileListing(job); 191 } 192 193 job.submit(); 194 submitted = true; 195 } finally { 196 if (!submitted) { 197 cleanup(); 198 } 199 } 200 201 String jobID = job.getJobID().toString(); 202 job.getConfiguration().set(DistCpConstants.CONF_LABEL_DISTCP_JOB_ID, jobID); 203 LOG.info("DistCp job-id: " + jobID); 204 205 return job; 206 } 207 208 /** 209 * Wait for the given job to complete. 210 * @param job the given mapreduce job that has already been submitted 211 */ 212 public void waitForJobCompletion(Job job) throws Exception { 213 assert job != null; 214 if (!job.waitForCompletion(true)) { 215 throw new IOException("DistCp failure: Job " + job.getJobID() 216 + " has failed: " + job.getStatus().getFailureInfo()); 217 } 218 } 219 220 /** 221 * Set targetPathExists in both inputOptions and job config, 222 * for the benefit of CopyCommitter 223 */ 224 private void setTargetPathExists() throws IOException { 225 Path target = inputOptions.getTargetPath(); 226 FileSystem targetFS = target.getFileSystem(getConf()); 227 boolean targetExists = targetFS.exists(target); 228 inputOptions.setTargetPathExists(targetExists); 229 getConf().setBoolean(DistCpConstants.CONF_LABEL_TARGET_PATH_EXISTS, 230 targetExists); 231 } 232 /** 233 * Create Job object for submitting it, with all the configuration 234 * 235 * @return Reference to job object. 236 * @throws IOException - Exception if any 237 */ 238 private Job createJob() throws IOException { 239 String jobName = "distcp"; 240 String userChosenName = getConf().get(JobContext.JOB_NAME); 241 if (userChosenName != null) 242 jobName += ": " + userChosenName; 243 Job job = Job.getInstance(getConf()); 244 job.setJobName(jobName); 245 job.setInputFormatClass(DistCpUtils.getStrategy(getConf(), inputOptions)); 246 job.setJarByClass(CopyMapper.class); 247 configureOutputFormat(job); 248 249 job.setMapperClass(CopyMapper.class); 250 job.setNumReduceTasks(0); 251 job.setMapOutputKeyClass(Text.class); 252 job.setMapOutputValueClass(Text.class); 253 job.setOutputFormatClass(CopyOutputFormat.class); 254 job.getConfiguration().set(JobContext.MAP_SPECULATIVE, "false"); 255 job.getConfiguration().set(JobContext.NUM_MAPS, 256 String.valueOf(inputOptions.getMaxMaps())); 257 258 if (inputOptions.getSslConfigurationFile() != null) { 259 setupSSLConfig(job); 260 } 261 262 inputOptions.appendToConf(job.getConfiguration()); 263 return job; 264 } 265 266 /** 267 * Setup ssl configuration on the job configuration to enable hsftp access 268 * from map job. Also copy the ssl configuration file to Distributed cache 269 * 270 * @param job - Reference to job's handle 271 * @throws java.io.IOException - Exception if unable to locate ssl config file 272 */ 273 private void setupSSLConfig(Job job) throws IOException { 274 Configuration configuration = job.getConfiguration(); 275 URL sslFileUrl = configuration.getResource(inputOptions 276 .getSslConfigurationFile()); 277 if (sslFileUrl == null) { 278 throw new IOException( 279 "Given ssl configuration file doesn't exist in class path : " 280 + inputOptions.getSslConfigurationFile()); 281 } 282 Path sslConfigPath = new Path(sslFileUrl.toString()); 283 284 addSSLFilesToDistCache(job, sslConfigPath); 285 configuration.set(DistCpConstants.CONF_LABEL_SSL_CONF, sslConfigPath.getName()); 286 configuration.set(DistCpConstants.CONF_LABEL_SSL_KEYSTORE, sslConfigPath.getName()); 287 } 288 289 /** 290 * Add SSL files to distributed cache. Trust store, key store and ssl config xml 291 * 292 * @param job - Job handle 293 * @param sslConfigPath - ssl Configuration file specified through options 294 * @throws IOException - If any 295 */ 296 private void addSSLFilesToDistCache(Job job, 297 Path sslConfigPath) throws IOException { 298 Configuration configuration = job.getConfiguration(); 299 FileSystem localFS = FileSystem.getLocal(configuration); 300 301 Configuration sslConf = new Configuration(false); 302 sslConf.addResource(sslConfigPath); 303 304 Path localStorePath = getLocalStorePath(sslConf, 305 DistCpConstants.CONF_LABEL_SSL_TRUST_STORE_LOCATION); 306 job.addCacheFile(localStorePath.makeQualified(localFS.getUri(), 307 localFS.getWorkingDirectory()).toUri()); 308 configuration.set(DistCpConstants.CONF_LABEL_SSL_TRUST_STORE_LOCATION, 309 localStorePath.getName()); 310 311 localStorePath = getLocalStorePath(sslConf, 312 DistCpConstants.CONF_LABEL_SSL_KEY_STORE_LOCATION); 313 job.addCacheFile(localStorePath.makeQualified(localFS.getUri(), 314 localFS.getWorkingDirectory()).toUri()); 315 configuration.set(DistCpConstants.CONF_LABEL_SSL_KEY_STORE_LOCATION, 316 localStorePath.getName()); 317 318 job.addCacheFile(sslConfigPath.makeQualified(localFS.getUri(), 319 localFS.getWorkingDirectory()).toUri()); 320 321 } 322 323 /** 324 * Get Local Trust store/key store path 325 * 326 * @param sslConf - Config from SSL Client xml 327 * @param storeKey - Key for either trust store or key store 328 * @return - Path where the store is present 329 * @throws IOException -If any 330 */ 331 private Path getLocalStorePath(Configuration sslConf, String storeKey) throws IOException { 332 if (sslConf.get(storeKey) != null) { 333 return new Path(sslConf.get(storeKey)); 334 } else { 335 throw new IOException("Store for " + storeKey + " is not set in " + 336 inputOptions.getSslConfigurationFile()); 337 } 338 } 339 340 /** 341 * Setup output format appropriately 342 * 343 * @param job - Job handle 344 * @throws IOException - Exception if any 345 */ 346 private void configureOutputFormat(Job job) throws IOException { 347 final Configuration configuration = job.getConfiguration(); 348 Path targetPath = inputOptions.getTargetPath(); 349 FileSystem targetFS = targetPath.getFileSystem(configuration); 350 targetPath = targetPath.makeQualified(targetFS.getUri(), 351 targetFS.getWorkingDirectory()); 352 if (inputOptions.shouldPreserve(DistCpOptions.FileAttribute.ACL)) { 353 DistCpUtils.checkFileSystemAclSupport(targetFS); 354 } 355 if (inputOptions.shouldPreserve(DistCpOptions.FileAttribute.XATTR)) { 356 DistCpUtils.checkFileSystemXAttrSupport(targetFS); 357 } 358 if (inputOptions.shouldAtomicCommit()) { 359 Path workDir = inputOptions.getAtomicWorkPath(); 360 if (workDir == null) { 361 workDir = targetPath.getParent(); 362 } 363 workDir = new Path(workDir, WIP_PREFIX + targetPath.getName() 364 + rand.nextInt()); 365 FileSystem workFS = workDir.getFileSystem(configuration); 366 if (!FileUtil.compareFs(targetFS, workFS)) { 367 throw new IllegalArgumentException("Work path " + workDir + 368 " and target path " + targetPath + " are in different file system"); 369 } 370 CopyOutputFormat.setWorkingDirectory(job, workDir); 371 } else { 372 CopyOutputFormat.setWorkingDirectory(job, targetPath); 373 } 374 CopyOutputFormat.setCommitDirectory(job, targetPath); 375 376 Path logPath = inputOptions.getLogPath(); 377 if (logPath == null) { 378 logPath = new Path(metaFolder, "_logs"); 379 } else { 380 LOG.info("DistCp job log path: " + logPath); 381 } 382 CopyOutputFormat.setOutputPath(job, logPath); 383 } 384 385 /** 386 * Create input listing by invoking an appropriate copy listing 387 * implementation. Also add delegation tokens for each path 388 * to job's credential store 389 * 390 * @param job - Handle to job 391 * @return Returns the path where the copy listing is created 392 * @throws IOException - If any 393 */ 394 protected Path createInputFileListing(Job job) throws IOException { 395 Path fileListingPath = getFileListingPath(); 396 CopyListing copyListing = CopyListing.getCopyListing(job.getConfiguration(), 397 job.getCredentials(), inputOptions); 398 copyListing.buildListing(fileListingPath, inputOptions); 399 return fileListingPath; 400 } 401 402 /** 403 * Create input listing based on snapshot diff report. 404 * @param job - Handle to job 405 * @param distCpSync the class wraps the snapshot diff report 406 * @return Returns the path where the copy listing is created 407 * @throws IOException - If any 408 */ 409 private Path createInputFileListingWithDiff(Job job, DistCpSync distCpSync) 410 throws IOException { 411 Path fileListingPath = getFileListingPath(); 412 CopyListing copyListing = new SimpleCopyListing(job.getConfiguration(), 413 job.getCredentials(), distCpSync); 414 copyListing.buildListing(fileListingPath, inputOptions); 415 return fileListingPath; 416 } 417 418 /** 419 * Get default name of the copy listing file. Use the meta folder 420 * to create the copy listing file 421 * 422 * @return - Path where the copy listing file has to be saved 423 * @throws IOException - Exception if any 424 */ 425 protected Path getFileListingPath() throws IOException { 426 String fileListPathStr = metaFolder + "/fileList.seq"; 427 Path path = new Path(fileListPathStr); 428 return new Path(path.toUri().normalize().toString()); 429 } 430 431 /** 432 * Create a default working folder for the job, under the 433 * job staging directory 434 * 435 * @return Returns the working folder information 436 * @throws Exception - Exception if any 437 */ 438 private Path createMetaFolderPath() throws Exception { 439 Configuration configuration = getConf(); 440 Path stagingDir = JobSubmissionFiles.getStagingDir( 441 new Cluster(configuration), configuration); 442 Path metaFolderPath = new Path(stagingDir, PREFIX + String.valueOf(rand.nextInt())); 443 if (LOG.isDebugEnabled()) 444 LOG.debug("Meta folder location: " + metaFolderPath); 445 configuration.set(DistCpConstants.CONF_LABEL_META_FOLDER, metaFolderPath.toString()); 446 return metaFolderPath; 447 } 448 449 /** 450 * Main function of the DistCp program. Parses the input arguments (via OptionsParser), 451 * and invokes the DistCp::run() method, via the ToolRunner. 452 * @param argv Command-line arguments sent to DistCp. 453 */ 454 public static void main(String argv[]) { 455 int exitCode; 456 try { 457 DistCp distCp = new DistCp(); 458 Cleanup CLEANUP = new Cleanup(distCp); 459 460 ShutdownHookManager.get().addShutdownHook(CLEANUP, 461 SHUTDOWN_HOOK_PRIORITY); 462 exitCode = ToolRunner.run(getDefaultConf(), distCp, argv); 463 } 464 catch (Exception e) { 465 LOG.error("Couldn't complete DistCp operation: ", e); 466 exitCode = DistCpConstants.UNKNOWN_ERROR; 467 } 468 System.exit(exitCode); 469 } 470 471 /** 472 * Loads properties from distcp-default.xml into configuration 473 * object 474 * @return Configuration which includes properties from distcp-default.xml 475 */ 476 private static Configuration getDefaultConf() { 477 Configuration config = new Configuration(); 478 config.addResource(DISTCP_DEFAULT_XML); 479 return config; 480 } 481 482 private synchronized void cleanup() { 483 try { 484 if (metaFolder == null) return; 485 486 jobFS.delete(metaFolder, true); 487 metaFolder = null; 488 } catch (IOException e) { 489 LOG.error("Unable to cleanup meta folder: " + metaFolder, e); 490 } 491 } 492 493 private boolean isSubmitted() { 494 return submitted; 495 } 496 497 private static class Cleanup implements Runnable { 498 private final DistCp distCp; 499 500 Cleanup(DistCp distCp) { 501 this.distCp = distCp; 502 } 503 504 @Override 505 public void run() { 506 if (distCp.isSubmitted()) return; 507 508 distCp.cleanup(); 509 } 510 } 511}