DistCp (distributed copy) is a tool used for large inter/intra-cluster copying. It uses MapReduce to effect its distribution, error handling and recovery, and reporting. It expands a list of files and directories into input to map tasks, each of which will copy a partition of the files specified in the source list.
[The erstwhile implementation of DistCp] (http://hadoop.apache.org/docs/r1.2.1/distcp.html) has its share of quirks and drawbacks, both in its usage, as well as its extensibility and performance. The purpose of the DistCp refactor was to fix these shortcomings, enabling it to be used and extended programmatically. New paradigms have been introduced to improve runtime and setup performance, while simultaneously retaining the legacy behaviour as default.
This document aims to describe the design of the new DistCp, its spanking new features, their optimal use, and any deviance from the legacy implementation.
The most common invocation of DistCp is an inter-cluster copy:
bash$ hadoop distcp hdfs://nn1:8020/foo/bar \ hdfs://nn2:8020/bar/foo
This will expand the namespace under /foo/bar on nn1 into a temporary file, partition its contents among a set of map tasks, and start a copy on each NodeManager from nn1 to nn2.
One can also specify multiple source directories on the command line:
bash$ hadoop distcp hdfs://nn1:8020/foo/a \ hdfs://nn1:8020/foo/b \ hdfs://nn2:8020/bar/foo
Or, equivalently, from a file using the -f option:
bash$ hadoop distcp -f hdfs://nn1:8020/srclist \ hdfs://nn2:8020/bar/foo
Where srclist contains
hdfs://nn1:8020/foo/a hdfs://nn1:8020/foo/b
When copying from multiple sources, DistCp will abort the copy with an error message if two sources collide, but collisions at the destination are resolved per the options specified. By default, files already existing at the destination are skipped (i.e. not replaced by the source file). A count of skipped files is reported at the end of each job, but it may be inaccurate if a copier failed for some subset of its files, but succeeded on a later attempt.
It is important that each NodeManager can reach and communicate with both the source and destination file systems. For HDFS, both the source and destination must be running the same version of the protocol or use a backwards-compatible protocol; see [Copying Between Versions] (#Copying_Between_Versions_of_HDFS).
After a copy, it is recommended that one generates and cross-checks a listing of the source and destination to verify that the copy was truly successful. Since DistCp employs both Map/Reduce and the FileSystem API, issues in or between any of the three could adversely and silently affect the copy. Some have had success running with -update enabled to perform a second pass, but users should be acquainted with its semantics before attempting this.
It’s also worth noting that if another client is still writing to a source file, the copy will likely fail. Attempting to overwrite a file being written at the destination should also fail on HDFS. If a source file is (re)moved before it is copied, the copy will fail with a FileNotFoundException.
Please refer to the detailed Command Line Reference for information on all the options available in DistCp.
-update is used to copy files from source that don’t exist at the target or differ from the target version. -overwrite overwrites target-files that exist at the target.
The Update and Overwrite options warrant special attention since their handling of source-paths varies from the defaults in a very subtle manner. Consider a copy from /source/first/ and /source/second/ to /target/, where the source paths have the following contents:
hdfs://nn1:8020/source/first/1 hdfs://nn1:8020/source/first/2 hdfs://nn1:8020/source/second/10 hdfs://nn1:8020/source/second/20
When DistCp is invoked without -update or -overwrite, the DistCp defaults would create directories first/ and second/, under /target. Thus:
distcp hdfs://nn1:8020/source/first hdfs://nn1:8020/source/second hdfs://nn2:8020/target
would yield the following contents in /target:
hdfs://nn2:8020/target/first/1 hdfs://nn2:8020/target/first/2 hdfs://nn2:8020/target/second/10 hdfs://nn2:8020/target/second/20
When either -update or -overwrite is specified, the contents of the source-directories are copied to target, and not the source directories themselves. Thus:
distcp -update hdfs://nn1:8020/source/first hdfs://nn1:8020/source/second hdfs://nn2:8020/target
would yield the following contents in /target:
hdfs://nn2:8020/target/1 hdfs://nn2:8020/target/2 hdfs://nn2:8020/target/10 hdfs://nn2:8020/target/20
By extension, if both source folders contained a file with the same name (say, 0), then both sources would map an entry to /target/0 at the destination. Rather than to permit this conflict, DistCp will abort.
Now, consider the following copy operation:
distcp hdfs://nn1:8020/source/first hdfs://nn1:8020/source/second hdfs://nn2:8020/target
With sources/sizes:
hdfs://nn1:8020/source/first/1 32 hdfs://nn1:8020/source/first/2 32 hdfs://nn1:8020/source/second/10 64 hdfs://nn1:8020/source/second/20 32
And destination/sizes:
hdfs://nn2:8020/target/1 32 hdfs://nn2:8020/target/10 32 hdfs://nn2:8020/target/20 64
Will effect:
hdfs://nn2:8020/target/1 32 hdfs://nn2:8020/target/2 32 hdfs://nn2:8020/target/10 64 hdfs://nn2:8020/target/20 32
1 is skipped because the file-length and contents match. 2 is copied because it doesn’t exist at the target. 10 and 20 are overwritten since the contents don’t match the source.
If -update is used, 1 is skipped because the file-length and contents match. 2 is copied because it doesn’t exist at the target. 10 and 20 are overwritten since the contents don’t match the source. However, if -append is additionally used, then only 10 is overwritten (source length less than destination) and 20 is appended with the change in file (if the files match up to the destination’s original length).
If -overwrite is used, 1 is overwritten as well.
-diff option syncs files from a source cluster to a target cluster with a snapshot diff. It copies, renames and removes files in the snapshot diff list.
-update option must be included when -diff option is in use.
Most cloud providers don’t work well with sync at the moment.
Usage:
hadoop distcp -update -diff <from_snapshot> <to_snapshot> <source> <destination>
Example:
hadoop distcp -update -diff snap1 snap2 /src/ /dst/
The command above applies changes from snapshot snap1 to snap2 (i.e. snapshot diff from snap1 to snap2) in /src/ to /dst/. Obviously, it requires /src/ to have both snapshots snap1 and snap2. But the destination /dst/ must also have a snapshot with the same name as <from_snapshot>, in this case snap1. The destination /dst/ should not have new file operations (create, rename, delete) since snap1. Note that when this command finishes, a new snapshot snap2 will NOT be created at /dst/.
-update is required to use -diff option.
For instance, in /src/, if 1.txt is added and 2.txt is deleted after the creation of snap1 and before creation of snap2, the command above will copy 1.txt from /src/ to /dst/ and delete 2.txt from /dst/.
Sync behavior will be elaborated using experiments below.
Some preparations before we start.
# Create source and destination directories hdfs dfs -mkdir /src/ /dst/ # Allow snapshot on source hdfs dfsadmin -allowSnapshot /src/ # Create a snapshot (empty one) hdfs dfs -createSnapshot /src/ snap1 # Allow snapshot on destination hdfs dfsadmin -allowSnapshot /dst/ # Create a from_snapshot with the same name hdfs dfs -createSnapshot /dst/ snap1 # Put one text file under /src/ echo "This is the 1st text file." > 1.txt hdfs dfs -put 1.txt /src/ # Create the second snapshot hdfs dfs -createSnapshot /src/ snap2 # Put another text file under /src/ echo "This is the 2nd text file." > 2.txt hdfs dfs -put 2.txt /src/ # Create the third snapshot hdfs dfs -createSnapshot /src/ snap3
Then we run distcp sync:
hadoop distcp -update -diff snap1 snap2 /src/ /dst/
The command above should succeed. 1.txt will be copied from /src/ to /dst/. Again, -update option is required.
If we run the same command again, we will get DistCp sync failed exception because the destination has added a new file 1.txt since snap1. That being said, if we remove 1.txt manually from /dst/ and run the sync, the command will succeed.
First do a clean up from Experiment 1.
hdfs dfs -rm -skipTrash /dst/1.txt
Run sync command, note the <to_snapshot> has been changed from snap2 in Experiment 1 to snap3.
hadoop distcp -update -diff snap1 snap3 /src/ /dst/
Both 1.txt and 2.txt will be copied to /dst/.
Continuing from the end of Experiment 2:
hdfs dfs -rm -skipTrash /dst/2.txt # Create snap2 at destination, it contains 1.txt hdfs dfs -createSnapshot /dst/ snap2 # Delete 1.txt from source hdfs dfs -rm -skipTrash /src/1.txt # Create snap4 at source, it only contains 2.txt hdfs dfs -createSnapshot /src/ snap4
Run sync command now:
hadoop distcp -update -diff snap2 snap4 /src/ /dst/
2.txt is copied and 1.txt is deleted under /dst/.
Note that, though both /src/ and /dst/ have snapshot with the same name snap2, the snapshots don’t need to have the same content. That means, if you have a 1.txt in /dst/’s snap2 but they have different contents, 1.txt will still be removed from /dst/. The sync command doesn’t check the contents of the files that is going to be deleted. It simply follows the snapshot diff list between <from_snapshot> and <to_snapshot>.
Also, if we delete 1.txt from /dst/ before creating snap2 on /dst/ in the steps above, so that /dst/’s snap2 doesn’t have 1.txt before running sync command, the command will still succeed. It won’t throw exception while trying to delete 1.txt from /dst/ which doesn’t exist.
This section only applies to HDFS.
If the target and all of the source pathnames are in the /.reserved/raw hierarchy, then ‘raw’ namespace extended attributes will be preserved. ‘raw’ xattrs are used by the system for internal functions such as encryption meta data. They are only visible to users when accessed through the /.reserved/raw hierarchy.
raw xattrs are preserved based solely on whether /.reserved/raw prefixes are supplied. The -p (preserve, see below) flag does not impact preservation of raw xattrs.
To prevent raw xattrs from being preserved, simply do not use the /.reserved/raw prefix on any of the source and target paths.
If the /.reserved/rawprefix is specified on only a subset of the source and target paths, an error will be displayed and a non-0 exit code returned.
Flag | Description | Notes |
---|---|---|
-p[rbugpcaxt] | Preserve r: replication number b: block size u: user g: group p: permission c: checksum-type a: ACL x: XAttr t: timestamp | When -update is specified, status updates will not be synchronized unless the file sizes also differ (i.e. unless the file is re-created). If -pa is specified, DistCp preserves the permissions also because ACLs are a super-set of permissions. The option -pr is only valid if both source and target directory are not erasure coded. Note: If -p option’s are not specified, then by default block size is preserved. |
-i | Ignore failures | As explained in the Appendix, this option will keep more accurate statistics about the copy than the default case. It also preserves logs from failed copies, which can be valuable for debugging. Finally, a failing map will not cause the job to fail before all splits are attempted. |
-log <logdir> | Write logs to <logdir> | DistCp keeps logs of each file it attempts to copy as map output. If a map fails, the log output will not be retained if it is re-executed. |
-v | Log additional info (path, size) in the SKIP/COPY log | This option can only be used with -log option. |
-m <num_maps> | Maximum number of simultaneous copies | Specify the number of maps to copy data. Note that more maps may not necessarily improve throughput. |
-overwrite | Overwrite destination | If a map fails and -i is not specified, all the files in the split, not only those that failed, will be recopied. As discussed in the Usage documentation, it also changes the semantics for generating destination paths, so users should use this carefully. |
-update | Overwrite if source and destination differ in size, blocksize, or checksum | As noted in the preceding, this is not a “sync” operation. The criteria examined are the source and destination file sizes, blocksizes, and checksums; if they differ, the source file replaces the destination file. As discussed in the Usage documentation, it also changes the semantics for generating destination paths, so users should use this carefully. |
-append | Incremental copy of file with same name but different length | If the source file is greater in length than the destination file, the checksum of the common length part is compared. If the checksum matches, only the difference is copied using read and append functionalities. The -append option only works with -update without -skipcrccheck |
-f <urilist_uri> | Use list at <urilist_uri> as src list | This is equivalent to listing each source on the command line. The urilist_uri list should be a fully qualified URI. |
-filters | The path to a file containing a list of pattern strings, one string per line, such that paths matching the pattern will be excluded from the copy. | Support regular expressions specified by java.util.regex.Pattern. |
-filelimit <n> | Limit the total number of files to be <= n | Deprecated! Ignored in the new DistCp. |
-sizelimit <n> | Limit the total size to be <= n bytes | Deprecated! Ignored in the new DistCp. |
-delete | Delete the files existing in the dst but not in src | The deletion is done by FS Shell. So the trash will be used, if it is enable. Delete is applicable only with update or overwrite options. |
-strategy {dynamic|uniformsize} | Choose the copy-strategy to be used in DistCp. | By default, uniformsize is used. (i.e. Maps are balanced on the total size of files copied by each map. Similar to legacy.) If “dynamic” is specified, DynamicInputFormat is used instead. (This is described in the Architecture section, under InputFormats.) |
-bandwidth | Specify bandwidth per map, in MB/second. | Each map will be restricted to consume only the specified bandwidth. This is not always exact. The map throttles back its bandwidth consumption during a copy, such that the net bandwidth used tends towards the specified value. |
-atomic {-tmp <tmp_dir>} | Specify atomic commit, with optional tmp directory. | -atomic instructs DistCp to copy the source data to a temporary target location, and then move the temporary target to the final-location atomically. Data will either be available at final target in a complete and consistent form, or not at all. Optionally, -tmp may be used to specify the location of the tmp-target. If not specified, a default is chosen. Note: tmp_dir must be on the final target cluster. |
-async | Run DistCp asynchronously. Quits as soon as the Hadoop Job is launched. | The Hadoop Job-id is logged, for tracking. |
-diff <oldSnapshot> <newSnapshot> | Use snapshot diff report between given two snapshots to identify the difference between source and target, and apply the diff to the target to make it in sync with source. | This option is valid only with -update option and the following conditions should be satisfied.
|
-rdiff <newSnapshot> <oldSnapshot> | Use snapshot diff report between given two snapshots to identify what has been changed on the target since the snapshot <oldSnapshot> was created on the target, and apply the diff reversely to the target, and copy modified files from the source’s <oldSnapshot>, to make the target the same as <oldSnapshot>. | This option is valid only with -update option and the following conditions should be satisfied.
|
-numListstatusThreads | Number of threads to use for building file listing | At most 40 threads. |
-skipcrccheck | Whether to skip CRC checks between source and target paths. | |
-blocksperchunk <blocksperchunk> | Number of blocks per chunk. When specified, split files into chunks to copy in parallel | If set to a positive value, files with more blocks than this value will be split into chunks of <blocksperchunk> blocks to be transferred in parallel, and reassembled on the destination. By default, <blocksperchunk> is 0 and the files will be transmitted in their entirety without splitting. This switch is only applicable when the source file system implements getBlockLocations method and the target file system implements concat method. |
-copybuffersize <copybuffersize> | Size of the copy buffer to use. By default, <copybuffersize> is set to 8192B | |
-xtrack <path> | Save information about missing source files to the specified path. | This option is only valid with -update option. This is an experimental property and it cannot be used with -atomic option. |
-direct | Write directly to destination paths | Useful for avoiding potentially very expensive temporary file rename operations when the destination is an object store |
The components of the new DistCp may be classified into the following categories:
The DistCp Driver components are responsible for:
Parsing the arguments passed to the DistCp command on the command-line, via:
Assembling the command arguments into an appropriate DistCpOptions object, and initializing DistCp. These arguments include:
Orchestrating the copy operation by:
The parser-elements are exercised only from the command-line (or if DistCp::run() is invoked). The DistCp class may also be used programmatically, by constructing the DistCpOptions object, and initializing a DistCp object appropriately.
The copy-listing-generator classes are responsible for creating the list of files/directories to be copied from source. They examine the contents of the source-paths (files/directories, including wild-cards), and record all paths that need copy into a SequenceFile, for consumption by the DistCp Hadoop Job. The main classes in this module include:
Based on whether a source-file-list is specified in the DistCpOptions, the source-listing is generated in one of the following ways:
One may customize the method by which the copy-listing is constructed by providing a custom implementation of the CopyListing interface. The behaviour of DistCp differs here from the legacy DistCp, in how paths are considered for copy.
One may also customize the filtering of files which shouldn’t be copied by passing the current supported implementation of CopyFilter interface or a new implementation can be written. This can be specified by setting the distcp.filters.class in the DistCpOptions:
The legacy implementation only lists those paths that must definitely be copied on to target. E.g. if a file already exists at the target (and -overwrite isn’t specified), the file isn’t even considered in the MapReduce Copy Job. Determining this during setup (i.e. before the MapReduce Job) involves file-size and checksum-comparisons that are potentially time-consuming.
The new DistCp postpones such checks until the MapReduce Job, thus reducing setup time. Performance is enhanced further since these checks are parallelized across multiple maps.
The InputFormats and MapReduce components are responsible for the actual copy of files and directories from the source to the destination path. The listing-file created during copy-listing generation is consumed at this point, when the copy is carried out. The classes of interest here include:
UniformSizeInputFormat: This implementation of org.apache.hadoop.mapreduce.InputFormat provides equivalence with Legacy DistCp in balancing load across maps. The aim of the UniformSizeInputFormat is to make each map copy roughly the same number of bytes. Apropos, the listing file is split into groups of paths, such that the sum of file-sizes in each InputSplit is nearly equal to every other map. The splitting isn’t always perfect, but its trivial implementation keeps the setup-time low.
DynamicInputFormat and DynamicRecordReader: The DynamicInputFormat implements org.apache.hadoop.mapreduce.InputFormat, and is new to DistCp. The listing-file is split into several “chunk-files”, the exact number of chunk-files being a multiple of the number of maps requested for in the Hadoop Job. Each map task is “assigned” one of the chunk-files (by renaming the chunk to the task’s id), before the Job is launched. Paths are read from each chunk using the DynamicRecordReader, and processed in the CopyMapper. After all the paths in a chunk are processed, the current chunk is deleted and a new chunk is acquired. The process continues until no more chunks are available. This “dynamic” approach allows faster map-tasks to consume more paths than slower ones, thus speeding up the DistCp job overall.
CopyMapper: This class implements the physical file-copy. The input-paths are checked against the input-options (specified in the Job’s Configuration), to determine whether a file needs copy. A file will be copied only if at least one of the following is true:
CopyCommitter: This class is responsible for the commit-phase of the DistCp job, including:
By default, DistCp makes an attempt to size each map comparably so that each copies roughly the same number of bytes. Note that files are the finest level of granularity, so increasing the number of simultaneous copiers (i.e. maps) may not always increase the number of simultaneous copies nor the overall throughput.
The new DistCp also provides a strategy to “dynamically” size maps, allowing faster data-nodes to copy more bytes than slower nodes. Using -strategy dynamic (explained in the Architecture), rather than to assign a fixed set of source-files to each map-task, files are instead split into several sets. The number of sets exceeds the number of maps, usually by a factor of 2-3. Each map picks up and copies all files listed in a chunk. When a chunk is exhausted, a new chunk is acquired and processed, until no more chunks remain.
By not assigning a source-path to a fixed map, faster map-tasks (i.e. data-nodes) are able to consume more chunks, and thus copy more data, than slower nodes. While this distribution isn’t uniform, it is fair with regard to each mapper’s capacity.
The dynamic-strategy is implemented by the DynamicInputFormat. It provides superior performance under most conditions.
Tuning the number of maps to the size of the source and destination clusters, the size of the copy, and the available bandwidth is recommended for long-running and regularly run jobs.
For copying between two different major versions of Hadoop (e.g. between 1.X and 2.X), one will usually use WebHdfsFileSystem. Unlike the previous HftpFileSystem, as webhdfs is available for both read and write operations, DistCp can be run on both source and destination cluster. Remote cluster is specified as webhdfs://<namenode_hostname>:<http_port>. When copying between same major versions of Hadoop cluster (e.g. between 2.X and 2.X), use hdfs protocol for better performance.
Use the “swebhdfs://” scheme when webhdfs is secured with SSL. For more information see SSL Configurations for SWebHDFS.
As has been mentioned in the preceding, should a map fail to copy one of its inputs, there will be several side-effects.
DistCp works with Object Stores such as Amazon S3, Azure WASB and OpenStack Swift.
Prequisites
DistCp can be used to upload data
hadoop distcp -direct hdfs://nn1:8020/datasets/set1 s3a://bucket/datasets/set1
To download data
hadoop distcp s3a://bucket/generated/results hdfs://nn1:8020/results
To copy data between object stores
hadoop distcp s3a://bucket/generated/results \ wasb://updates@example.blob.core.windows.net
And do copy data within an object store
hadoop distcp wasb://updates@example.blob.core.windows.net/current \ wasb://updates@example.blob.core.windows.net/old
And to use -update to only copy changed files.
hadoop distcp -update -numListstatusThreads 20 \ swift://history.cluster1/2016 \ hdfs://nn1:8020/history/2016
Because object stores are slow to list files, consider setting the -numListstatusThreads option when performing a -update operation on a large directory tree (the limit is 40 threads).
When DistCp -update is used with object stores, generally only the modification time and length of the individual files are compared, not any checksums. The fact that most object stores do have valid timestamps for directories is irrelevant; only the file timestamps are compared. However, it is important to have the clock of the client computers close to that of the infrastructure, so that timestamps are consistent between the client/HDFS cluster and that of the object store. Otherwise, changed files may be missed/copied too often.
Notes
The -atomic option causes a rename of the temporary data, so significantly increases the time to commit work at the end of the operation. Furthermore, as Object Stores other than (optionally) wasb:// do not offer atomic renames of directories the -atomic operation doesn’t actually deliver what is promised. Avoid.
The -append option is not supported.
The -diff and rdiff options are not supported
CRC checking will not be performed, irrespective of the value of the -skipCrc flag.
All -p options, including those to preserve permissions, user and group information, attributes checksums and replication are generally ignored. The wasb:// connector will preserve the information, but not enforce the permissions.
Some object store connectors offer an option for in-memory buffering of output —for example the S3A connector. Using such option while copying large files may trigger some form of out of memory event, be it a heap overflow or a YARN container termination. This is particularly common if the network bandwidth between the cluster and the object store is limited (such as when working with remote object stores). It is best to disable/avoid such options and rely on disk buffering.
Copy operations within a single object store still take place in the Hadoop cluster —even when the object store implements a more efficient COPY operation internally
That is, an operation such as
hadoop distcp s3a://bucket/datasets/set1 s3a://bucket/datasets/set2
Copies each byte down to the Hadoop worker nodes and back to the bucket. As well as being slow, it means that charges may be incurred.
The -direct option can be used to write to object store target paths directly, avoiding the potentially very expensive temporary file rename operations that would otherwise occur.
Why does -update not create the parent source-directory under a pre-existing target directory? The behaviour of -update and -overwrite is described in detail in the Usage section of this document. In short, if either option is used with a pre-existing destination directory, the contents of each source directory is copied over, rather than the source-directory itself. This behaviour is consistent with the legacy DistCp implementation as well.
How does the new DistCp differ in semantics from the Legacy DistCp?
Why does the new DistCp use more maps than legacy DistCp? Legacy DistCp works by figuring out what files need to be actually copied to target before the copy-job is launched, and then launching as many maps as required for copy. So if a majority of the files need to be skipped (because they already exist, for example), fewer maps will be needed. As a consequence, the time spent in setup (i.e. before the M/R job) is higher. The new DistCp calculates only the contents of the source-paths. It doesn’t try to filter out what files can be skipped. That decision is put off till the M/R job runs. This is much faster (vis-a-vis execution-time), but the number of maps launched will be as specified in the -m option, or 20 (default) if unspecified.
Why does DistCp not run faster when more maps are specified? At present, the smallest unit of work for DistCp is a file. i.e., a file is processed by only one map. Increasing the number of maps to a value exceeding the number of files would yield no performance benefit. The number of maps launched would equal the number of files.
Why does DistCp run out of memory? If the number of individual files/directories being copied from the source path(s) is extremely large (e.g. 1,000,000 paths), DistCp might run out of memory while determining the list of paths for copy. This is not unique to the new DistCp implementation. To get around this, consider changing the -Xmx JVM heap-size parameters, as follows:
bash$ export HADOOP_CLIENT_OPTS="-Xms64m -Xmx1024m" bash$ hadoop distcp /source /target