NOTE: Hadoop’s s3: and s3n: connectors have been removed. Please use s3a: as the connector to data hosted in S3 with Apache Hadoop.
Consult the s3n documentation for migration instructions.
See also:
Apache Hadoop’s hadoop-aws module provides support for AWS integration. applications to easily use this support.
To include the S3A client in Apache Hadoop’s default classpath:
Make sure thatHADOOP_OPTIONAL_TOOLS in includes hadoop-aws in its list of optional modules to add in the classpath.
For client side interaction, you can declare that relevant JARs must be loaded in your ~/.hadooprc file:
hadoop_add_to_classpath_tools hadoop-aws
The settings in this file does not propagate to deployed applications, but it will work for local clients such as the hadoop fs command.
Hadoop’s “S3A” client offers high-performance IO against Amazon S3 object store and compatible implementations.
There other Hadoop connectors to S3. Only S3A is actively maintained by the Hadoop project itself.
S3A depends upon two JARs, alongside hadoop-common and its dependencies.
The versions of hadoop-common and hadoop-aws must be identical.
To import the libraries into a Maven build, add hadoop-aws JAR to the build dependencies; it will pull in a compatible aws-sdk JAR.
The hadoop-aws JAR does not declare any dependencies other than that dependencies unique to it, the AWS SDK JAR. This is simplify excluding/tuning Hadoop dependency JARs in downstream applications. The hadoop-client or hadoop-common dependency must be declared
<properties> <!-- Your exact Hadoop version here--> <hadoop.version>3.0.0</hadoop.version> </properties> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-aws</artifactId> <version>${hadoop.version}</version> </dependency> </dependencies>
Amazon S3 is an example of “an object store”. In order to achieve scalability and especially high availability, S3 has —as many other cloud object stores have done— relaxed some of the constraints which classic “POSIX” filesystems promise.
The S3Guard feature attempts to address some of these, but it cannot do so completely. Do read these warnings and consider how they apply.
For further discussion on these topics, please consult The Hadoop FileSystem API Definition.
Amazon S3 is an example of “an object store”. In order to achieve scalability and especially high availability, S3 has —as many other cloud object stores have done— relaxed some of the constraints which classic “POSIX” filesystems promise.
The S3A clients mimics directories by:
Here are some of the consequences:
The final three issues surface when using S3 as the immediate destination of work, as opposed to HDFS or other “real” filesystem.
The S3A committers are the sole mechanism available to safely save the output of queries directly into S3 object stores through the S3A filesystem.
The object authorization model of S3 is much different from the file authorization model of HDFS and traditional file systems. The S3A client simply reports stub information from APIs that would query this metadata:
S3A does not really enforce any authorization checks on these stub permissions. Users authenticate to an S3 bucket using AWS credentials. It’s possible that object ACLs have been defined to enforce authorization at the S3 side, but this happens entirely within the S3 service, not within the S3A implementation.
Your AWS credentials not only pay for services, they offer read and write access to the data. Anyone with the credentials can not only read your datasets —they can delete them.
Do not inadvertently share these credentials through means such as
If you do any of these: change your credentials immediately!
Except when interacting with public S3 buckets, the S3A client needs the credentials needed to interact with buckets.
The client supports multiple authentication mechanisms and can be configured as to which mechanisms to use, and their order of use. Custom implementations of com.amazonaws.auth.AWSCredentialsProvider may also be used.
<property> <name>fs.s3a.access.key</name> <description>AWS access key ID. Omit for IAM role-based or provider-based authentication.</description> </property> <property> <name>fs.s3a.secret.key</name> <description>AWS secret key. Omit for IAM role-based or provider-based authentication.</description> </property> <property> <name></name> <description> Comma-separated class names of credential provider classes which implement com.amazonaws.auth.AWSCredentialsProvider. These are loaded and queried in sequence for a valid set of credentials. Each listed class must implement one of the following means of construction, which are attempted in order: 1. a public constructor accepting and org.apache.hadoop.conf.Configuration, 2. a public static method named getInstance that accepts no arguments and returns an instance of com.amazonaws.auth.AWSCredentialsProvider, or 3. a public default constructor. Specifying org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider allows anonymous access to a publicly accessible S3 bucket without any credentials. Please note that allowing anonymous access to an S3 bucket compromises security and therefore is unsuitable for most use cases. It can be useful for accessing public data sets without requiring AWS credentials. If unspecified, then the default list of credential provider classes, queried in sequence, is: 1. org.apache.hadoop.fs.s3a.BasicAWSCredentialsProvider: supports static configuration of AWS access key ID and secret access key. See also fs.s3a.access.key and fs.s3a.secret.key. 2. com.amazonaws.auth.EnvironmentVariableCredentialsProvider: supports configuration of AWS access key ID and secret access key in environment variables named AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, and AWS_SESSION_TOKEN as documented in the AWS SDK. 3. com.amazonaws.auth.InstanceProfileCredentialsProvider: supports use of instance profile credentials if running in an EC2 VM. </description> </property> <property> <name>fs.s3a.session.token</name> <description> Session token, when using org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider as one of the providers. </description> </property>
S3A supports configuration via the standard AWS environment variables.
The core environment variables are for the access key and associated secret:
export export AWS_SECRET_ACCESS_KEY=my.secret.key
If the environment variable AWS_SESSION_TOKEN is set, session authentication using “Temporary Security Credentials” is enabled; the Key ID and secret key must be set to the credentials for that specific sesssion.
These environment variables can be used to set the authentication credentials instead of properties in the Hadoop configuration.
Important: These environment variables are generally not propagated from client to server when YARN applications are launched. That is: having the AWS environment variables set when an application is launched will not permit the launched application to access S3 resources. The environment variables must (somehow) be set on the hosts/processes where the work is executed.
The standard way to authenticate is with an access key and secret key using the properties in the configuration file.
The S3A client follows the following authentication chain:
S3A can be configured to obtain client authentication providers from classes which integrate with the AWS SDK by implementing the com.amazonaws.auth.AWSCredentialsProvider Interface. This is done by listing the implementation classes, in order of preference, in the configuration option
Important: AWS Credential Providers are distinct from Hadoop Credential Providers. As will be covered later, Hadoop Credential Providers allow passwords and other secrets to be stored and transferred more securely than in XML configuration files. AWS Credential Providers are classes which can be used by the Amazon AWS SDK to obtain an AWS login from a different source in the system, including environment variables, JVM properties and configuration files.
There are three AWS Credential Providers inside the hadoop-aws JAR:
classname | description |
org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider | Session Credentials |
org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider | Simple name/secret credentials |
org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider | Anonymous Login |
There are also many in the Amazon SDKs, in particular two which are automatically set up in the authentication chain:
classname | description |
com.amazonaws.auth.InstanceProfileCredentialsProvider | EC2 Metadata Credentials |
com.amazonaws.auth.EnvironmentVariableCredentialsProvider | AWS Environment Variables |
Applications running in EC2 may associate an IAM role with the VM and query the EC2 Instance Metadata Service for credentials to access S3. Within the AWS SDK, this functionality is provided by InstanceProfileCredentialsProvider, which internally enforces a singleton instance in order to prevent throttling problem.
Temporary Security Credentials can be obtained from the Amazon Security Token Service; these consist of an access key, a secret key, and a session token.
To authenticate with these:
<property> <name></name> <value>org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider</value> </property> <property> <name>fs.s3a.access.key</name> <value>SESSION-ACCESS-KEY</value> </property> <property> <name>fs.s3a.secret.key</name> <value>SESSION-SECRET-KEY</value> </property> <property> <name>fs.s3a.session.token</name> <value>SECRET-SESSION-TOKEN</value> </property>
The lifetime of session credentials are fixed when the credentials are issued; once they expire the application will no longer be able to authenticate to AWS.
Specifying org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider allows anonymous access to a publicly accessible S3 bucket without any credentials. It can be useful for accessing public data sets without requiring AWS credentials.
<property> <name></name> <value>org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider</value> </property>
Once this is done, there’s no need to supply any credentials in the Hadoop configuration or via environment variables.
This option can be used to verify that an object store does not permit unauthenticated access: that is, if an attempt to list a bucket is made using the anonymous credentials, it should fail —unless explicitly opened up for broader access.
hadoop fs -ls \ -D \ s3a://landsat-pds/
Allowing anonymous access to an S3 bucket compromises security and therefore is unsuitable for most use cases.
If a list of credential providers is given in, then the Anonymous Credential provider must come last. If not, credential providers listed after it will be ignored.
Simple name/secret credentials with SimpleAWSCredentialsProvider
This is is the standard credential provider, which supports the secret key in fs.s3a.access.key and token in fs.s3a.secret.key values. It does not support authentication with logins credentials declared in the URLs.
<property> <name></name> <value>org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider</value> </property>
Apart from its lack of support of user:password details being included in filesystem URLs (a dangerous practise that is strongly discouraged), this provider acts exactly at the basic authenticator used in the default authentication chain.
This means that the default S3A authentication chain can be defined as
<property> <name></name> <value> org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider, com.amazonaws.auth.EnvironmentVariableCredentialsProvider, com.amazonaws.auth.InstanceProfileCredentialsProvider </value> </property>
It is critical that you never share or leak your AWS credentials. Loss of credentials can leak/lose all your data, run up large bills, and significantly damage your organisation.
Never share your secrets.
Never commit your secrets into an SCM repository. The git secrets can help here.
Avoid using s3a URLs which have key and secret in the URL. This is dangerous as the secrets leak into the logs.
Never include AWS credentials in bug reports, files attached to them, or similar.
If you use the AWS_ environment variables, your list of environment variables is equally sensitive.
Never use root credentials. Use IAM user accounts, with each user/application having its own set of credentials.
Use IAM permissions to restrict the permissions individual users and applications have. This is best done through roles, rather than configuring individual users.
Avoid passing in secrets to Hadoop applications/commands on the command line. The command line of any launched program is visible to all users on a Unix system (via ps), and preserved in command histories.
Explore using IAM Assumed Roles for role-based permissions management: a specific S3A connection can be made with a different assumed role and permissions from the primary user account.
Consider a workflow in which usera and applications are issued with short-lived session credentials, configuring S3A to use these through the TemporaryAWSCredentialsProvider.
Have a secure process in place for cancelling and re-issuing credentials for users and applications. Test it regularly by using it to refresh credentials.
When running in EC2, the IAM EC2 instance credential provider will automatically obtain the credentials needed to access AWS services in the role the EC2 VM was deployed as. This credential provider is enabled in S3A by default.
The safest way to keep the AWS login keys a secret within Hadoop is to use Hadoop Credentials.
The Hadoop Credential Provider Framework allows secure “Credential Providers” to keep secrets outside Hadoop configuration files, storing them in encrypted files in local or Hadoop filesystems, and including them in requests.
The S3A configuration options with sensitive data (fs.s3a.secret.key, fs.s3a.access.key, fs.s3a.session.token and fs.s3a.server-side-encryption.key) can have their data saved to a binary file stored, with the values being read in when the S3A filesystem URL is used for data access. The reference to this credential provider then declareed in the hadoop configuration.
For additional reading on the Hadoop Credential Provider API see: Credential Provider API.
The following configuration options can be storeed in Hadoop Credential Provider stores.
fs.s3a.access.key fs.s3a.secret.key fs.s3a.session.token fs.s3a.server-side-encryption.key fs.s3a.server-side-encryption-algorithm
The first three are for authentication; the final two for encryption. Of the latter, only the encryption key can be considered “sensitive”. However, being able to include the algorithm in the credentials allows for a JCECKS file to contain all the options needed to encrypt new data written to S3.
A credential file can be created on any Hadoop filesystem; when creating one on HDFS or a Unix filesystem the permissions are automatically set to keep the file private to the reader —though as directory permissions are not touched, users should verify that the directory containing the file is readable only by the current user.
hadoop credential create fs.s3a.access.key -value 123 \ -provider jceks:// hadoop credential create fs.s3a.secret.key -value 456 \ -provider jceks://
A credential file can be listed, to see what entries are kept inside it
hadoop credential list -provider jceks:// Listing aliases for CredentialProvider: jceks:// fs.s3a.secret.key fs.s3a.access.key
At this point, the credentials are ready for use.
The URL to the provider must be set in the configuration property, either on the command line or in XML configuration files.
<property> <name></name> <value>jceks://</value> <description>Path to interrogate for protected credentials.</description> </property>
Because this property only supplies the path to the secrets file, the configuration option itself is no longer a sensitive item.
The property is global to all filesystems and secrets. There is another property, which only lists credential providers for S3A filesystems. The two properties are combined into one, with the list of providers in the fs.s3a. property taking precedence over that of the list (i.e. they are prepended to the common list).
<property> <name></name> <value /> <description> Optional comma separated list of credential providers, a list which is prepended to that set in </description> </property>
This was added to support binding different credential providers on a per bucket basis, without adding alternative secrets in the credential list. However, some applications (e.g Hive) prevent the list of credential providers from being dynamically updated by users. As per-bucket secrets are now supported, it is better to include per-bucket keys in JCEKS files and other sources of credentials.
Once the provider is set in the Hadoop configuration, hadoop commands work exactly as if the secrets were in an XML file.
hadoop distcp \ hdfs:// s3a://glacier1/ hadoop fs -ls s3a://glacier1/
The path to the provider can also be set on the command line:
hadoop distcp \ -D \ hdfs:// s3a://glacier1/ hadoop fs \ -D \ -ls s3a://glacier1/
Because the provider path is not itself a sensitive secret, there is no risk from placing its declaration on the command line.
All S3A client options are configured with options with the prefix fs.s3a..
The client supports Per-bucket configuration to allow different buckets to override the shared settings. This is commonly used to change the endpoint, encryption and authentication mechanisms of buckets. S3Guard options, various minor options.
Here are the S3A properties for use in production. The S3Guard options are documented in the S3Guard documenents; some testing-related options are covered in Testing.
<property> <name>fs.s3a.connection.maximum</name> <value>15</value> <description>Controls the maximum number of simultaneous connections to S3.</description> </property> <property> <name>fs.s3a.connection.ssl.enabled</name> <value>true</value> <description>Enables or disables SSL connections to S3.</description> </property> <property> <name>fs.s3a.endpoint</name> <description>AWS S3 endpoint to connect to. An up-to-date list is provided in the AWS Documentation: regions and endpoints. Without this property, the standard region ( is assumed. </description> </property> <property> <name></name> <value>false</value> <description>Enable S3 path style access ie disabling the default virtual hosting behaviour. Useful for S3A-compliant storage providers as it removes the need to set up DNS for virtual hosting. </description> </property> <property> <name></name> <description>Hostname of the (optional) proxy server for S3 connections.</description> </property> <property> <name>fs.s3a.proxy.port</name> <description>Proxy server port. If this property is not set but is, port 80 or 443 is assumed (consistent with the value of fs.s3a.connection.ssl.enabled).</description> </property> <property> <name>fs.s3a.proxy.username</name> <description>Username for authenticating with proxy server.</description> </property> <property> <name>fs.s3a.proxy.password</name> <description>Password for authenticating with proxy server.</description> </property> <property> <name>fs.s3a.proxy.domain</name> <description>Domain for authenticating with proxy server.</description> </property> <property> <name>fs.s3a.proxy.workstation</name> <description>Workstation for authenticating with proxy server.</description> </property> <property> <name>fs.s3a.attempts.maximum</name> <value>20</value> <description>How many times we should retry commands on transient errors.</description> </property> <property> <name>fs.s3a.connection.establish.timeout</name> <value>5000</value> <description>Socket connection setup timeout in milliseconds.</description> </property> <property> <name>fs.s3a.connection.timeout</name> <value>200000</value> <description>Socket connection timeout in milliseconds.</description> </property> <property> <name>fs.s3a.paging.maximum</name> <value>5000</value> <description>How many keys to request from S3 when doing directory listings at a time.</description> </property> <property> <name>fs.s3a.threads.max</name> <value>10</value> <description> Maximum number of concurrent active (part)uploads, which each use a thread from the threadpool.</description> </property> <property> <name>fs.s3a.socket.send.buffer</name> <value>8192</value> <description>Socket send buffer hint to amazon connector. Represented in bytes.</description> </property> <property> <name>fs.s3a.socket.recv.buffer</name> <value>8192</value> <description>Socket receive buffer hint to amazon connector. Represented in bytes.</description> </property> <property> <name>fs.s3a.threads.keepalivetime</name> <value>60</value> <description>Number of seconds a thread can be idle before being terminated.</description> </property> <property> <name></name> <value>5</value> <description>Number of (part)uploads allowed to the queue before blocking additional uploads.</description> </property> <property> <name>fs.s3a.multipart.size</name> <value>100M</value> <description>How big (in bytes) to split upload or copy operations up into. A suffix from the set {K,M,G,T,P} may be used to scale the numeric value. </description> </property> <property> <name>fs.s3a.multipart.threshold</name> <value>2147483647</value> <description>How big (in bytes) to split upload or copy operations up into. This also controls the partition size in renamed files, as rename() involves copying the source file(s). A suffix from the set {K,M,G,T,P} may be used to scale the numeric value. </description> </property> <property> <name>fs.s3a.multiobjectdelete.enable</name> <value>true</value> <description>When enabled, multiple single-object delete requests are replaced by a single 'delete multiple objects'-request, reducing the number of requests. Beware: legacy S3-compatible object stores might not support this request. </description> </property> <property> <name>fs.s3a.acl.default</name> <description>Set a canned ACL for newly created and copied objects. Value may be Private, PublicRead, PublicReadWrite, AuthenticatedRead, LogDeliveryWrite, BucketOwnerRead, or BucketOwnerFullControl.</description> </property> <property> <name>fs.s3a.multipart.purge</name> <value>false</value> <description>True if you want to purge existing multipart uploads that may not have been completed/aborted correctly</description> </property> <property> <name>fs.s3a.multipart.purge.age</name> <value>86400</value> <description>Minimum age in seconds of multipart uploads to purge</description> </property> <property> <name>fs.s3a.signing-algorithm</name> <description>Override the default signing algorithm so legacy implementations can still be used</description> </property> <property> <name>fs.s3a.server-side-encryption-algorithm</name> <description>Specify a server-side encryption algorithm for s3a: file system. Unset by default. It supports the following values: 'AES256' (for SSE-S3), 'SSE-KMS' and 'SSE-C' </description> </property> <property> <name>fs.s3a.server-side-encryption.key</name> <description>Specific encryption key to use if fs.s3a.server-side-encryption-algorithm has been set to 'SSE-KMS' or 'SSE-C'. In the case of SSE-C, the value of this property should be the Base64 encoded key. If you are using SSE-KMS and leave this property empty, you'll be using your default's S3 KMS key, otherwise you should set this property to the specific KMS key id.</description> </property> <property> <name>fs.s3a.buffer.dir</name> <value>${hadoop.tmp.dir}/s3a</value> <description>Comma separated list of directories that will be used to buffer file uploads to.</description> </property> <property> <name>fs.s3a.block.size</name> <value>32M</value> <description>Block size to use when reading files using s3a: file system. </description> </property> <property> <name>fs.s3a.user.agent.prefix</name> <value></value> <description> Sets a custom value that will be prepended to the User-Agent header sent in HTTP requests to the S3 back-end by S3AFileSystem. The User-Agent header always includes the Hadoop version number followed by a string generated by the AWS SDK. An example is "User-Agent: Hadoop 2.8.0, aws-sdk-java/1.10.6". If this optional property is set, then its value is prepended to create a customized User-Agent. For example, if this configuration property was set to "MyApp", then an example of the resulting User-Agent would be "User-Agent: MyApp, Hadoop 2.8.0, aws-sdk-java/1.10.6". </description> </property> <property> <name>fs.s3a.impl</name> <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> <description>The implementation class of the S3A Filesystem</description> </property> <property> <name>fs.AbstractFileSystem.s3a.impl</name> <value>org.apache.hadoop.fs.s3a.S3A</value> <description>The implementation class of the S3A AbstractFileSystem.</description> </property> <property> <name>fs.s3a.readahead.range</name> <value>64K</value> <description>Bytes to read ahead during a seek() before closing and re-opening the S3 HTTP connection. This option will be overridden if any call to setReadahead() is made to an open stream.</description> </property> <property> <name>fs.s3a.list.version</name> <value>2</value> <description>Select which version of the S3 SDK's List Objects API to use. Currently support 2 (default) and 1 (older API).</description> </property>
The S3A client makes a best-effort attempt at recovering from network failures; this section covers the details of what it does.
The S3A divides exceptions returned by the AWS SDK into different categories, and chooses a different retry policy based on their type and whether or not the failing operation is idempotent.
These are all considered unrecoverable: S3A will make no attempt to recover from them.
The status code 400, Bad Request usually means that the request is unrecoverable; it’s the generic “No” response. Very rarely it does recover, which is why it is in this category, rather than that of unrecoverable failures.
These failures will be retried with a fixed sleep interval set in fs.s3a.retry.interval, up to the limit set in fs.s3a.retry.limit.
Some network failures are considered to be retriable if they occur on idempotent operations; there’s no way to know if they happened after the request was processed by S3.
These failures will be retried with a fixed sleep interval set in fs.s3a.retry.interval, up to the limit set in fs.s3a.retry.limit.
Important: DELETE is considered idempotent, hence: FileSystem.delete() and FileSystem.rename() will retry their delete requests on any of these failures.
The issue of whether delete should be idempotent has been a source of historical controversy in Hadoop.
Because S3 is eventually consistent and doesn’t support an atomic create-no-overwrite operation, the choice is more ambiguous.
Currently S3A considers delete to be idempotent because it is convenient for many workflows, including the commit protocols. Just be aware that in the presence of transient failures, more things may be deleted than expected. (For anyone who considers this to be the wrong decision: rebuild the hadoop-aws module with the constant S3AFileSystem.DELETE_CONSIDERED_IDEMPOTENT set to false).
When S3A or Dynamo DB returns a response indicating that requests from the caller are being throttled, an exponential back-off with an initial interval and a maximum number of requests.
<property> <name>fs.s3a.retry.throttle.limit</name> <value>${fs.s3a.attempts.maximum}</value> <description> Number of times to retry any throttled request. </description> </property> <property> <name>fs.s3a.retry.throttle.interval</name> <value>1000ms</value> <description> Interval between retry attempts on throttled requests. </description> </property>
Throttling of S3 requests is all too common; it is caused by too many clients trying to access the same shard of S3 Storage. This generally happen if there are too many reads, those being the most common in Hadoop applications. This problem is exacerbated by Hive’s partitioning strategy used when storing data, such as partitioning by year and then month. This results in paths with little or no variation at their start, which ends up in all the data being stored in the same shard(s).
Here are some expensive operations; the more of these taking place against part of an S3 bucket, the more load it experiences. * Many clients trying to list directories or calling getFileStatus on paths (LIST and HEAD requests respectively) * The GET requests issued when reading data. * Random IO used when reading columnar data (ORC, Parquet) means that many more GET requests than a simple one-per-file read. * The number of active writes to that part of the S3 bucket.
A special case is when enough data has been written into part of an S3 bucket that S3 decides to split the data across more than one shard: this is believed to be one by some copy operation which can take some time. While this is under way, S3 clients access data under these paths will be throttled more than usual.
Mitigation strategies
Different S3 buckets can be accessed with different S3A client configurations. This allows for different endpoints, data read and write strategies, as well as login details.
As an example, a configuration could have a base configuration to use the IAM role information available when deployed in Amazon EC2.
<property> <name></name> <value>com.amazonaws.auth.InstanceProfileCredentialsProvider</value> </property>
This will become the default authentication mechanism for S3A buckets.
A bucket s3a://nightly/ used for nightly data can then be given a session key:
<property> <name>fs.s3a.bucket.nightly.access.key</name> <value>AKAACCESSKEY-2</value> </property> <property> <name>fs.s3a.bucket.nightly.secret.key</name> <value>SESSIONSECRETKEY</value> </property> <property> <name>fs.s3a.bucket.nightly.session.token</name> <value>Short-lived-session-token</value> </property> <property> <name></name> <value>org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider</value> </property>
Finally, the public s3a://landsat-pds/ bucket can be accessed anonymously:
<property> <name></name> <value>org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider</value> </property>
Secrets in JCEKS files or provided by other Hadoop credential providers can also be configured on a per bucket basis. The S3A client will look for the per-bucket secrets be
Consider a JCEKS file with six keys:
fs.s3a.access.key fs.s3a.secret.key fs.s3a.server-side-encryption-algorithm fs.s3a.bucket.nightly.access.key fs.s3a.bucket.nightly.secret.key fs.s3a.bucket.nightly.session.token fs.s3a.bucket.nightly.server-side-encryption.key fs.s3a.bucket.nightly.server-side-encryption-algorithm
When accessing the bucket s3a://nightly/, the per-bucket configuration options for that bucket will be used, here the access keys and token, and including the encryption algorithm and key.
S3 Buckets are hosted in different “regions”, the default being “US-East”. The S3A client talks to this region by default, issuing HTTP requests to the server
S3A can work with buckets from any region. Each region has its own S3 endpoint, documented by Amazon.
While it is generally simpler to use the default endpoint, working with V4-signing-only regions (Frankfurt, Seoul) requires the endpoint to be identified. Expect better performance from direct connections —traceroute will give you some insight.
If the wrong endpoint is used, the request may fail. This may be reported as a 301/redirect error, or as a 400 Bad Request: take these as cues to check the endpoint setting of a bucket.
Here is a list of properties defining all AWS S3 regions, current as of June 2017:
<!-- This is the default endpoint, which can be used to interact with any v2 region. --> <property> <name>central.endpoint</name> <value></value> </property> <property> <name>canada.endpoint</name> <value></value> </property> <property> <name>frankfurt.endpoint</name> <value></value> </property> <property> <name>ireland.endpoint</name> <value></value> </property> <property> <name>london.endpoint</name> <value></value> </property> <property> <name>mumbai.endpoint</name> <value></value> </property> <property> <name>ohio.endpoint</name> <value></value> </property> <property> <name>oregon.endpoint</name> <value></value> </property> <property> <name>sao-paolo.endpoint</name> <value></value> </property> <property> <name>seoul.endpoint</name> <value></value> </property> <property> <name>singapore.endpoint</name> <value></value> </property> <property> <name>sydney.endpoint</name> <value></value> </property> <property> <name>tokyo.endpoint</name> <value></value> </property> <property> <name>virginia.endpoint</name> <value>${central.endpoint}</value> </property>
This list can be used to specify the endpoint of individual buckets, for example for buckets in the central and EU/Ireland endpoints.
<property> <name>fs.s3a.bucket.landsat-pds.endpoint</name> <value>${central.endpoint}</value> <description>The endpoint for s3a://landsat-pds URLs</description> </property> <property> <name></name> <value>${ireland.endpoint}</value> <description>The endpoint for s3a://eu-dataset URLs</description> </property>
Why explicitly declare a bucket bound to the central endpoint? It ensures that if the default endpoint is changed to a new region, data store in US-east is still reachable.
The original S3A client implemented file writes by buffering all data to disk as it was written to the OutputStream. Only when the stream’s close() method was called would the upload start.
This made output slow, especially on large uploads, and could even fill up the disk space of small (virtual) disks.
Hadoop 2.7 added the S3AFastOutputStream alternative, which Hadoop 2.8 expanded. It is now considered stable and has replaced the original S3AOutputStream, which is no longer shipped in hadoop.
The “fast” output stream
Because it starts uploading while data is still being written, it offers significant benefits when very large amounts of data are generated. The in memory buffering mechanisms may also offer speedup when running adjacent to S3 endpoints, as disks are not used for intermediate data storage.
<property> <name></name> <value>disk</value> <description> The buffering mechanism to use. Values: disk, array, bytebuffer. "disk" will use the directories listed in fs.s3a.buffer.dir as the location(s) to save data prior to being uploaded. "array" uses arrays in the JVM heap "bytebuffer" uses off-heap memory within the JVM. Both "array" and "bytebuffer" will consume memory in a single stream up to the number of blocks set by: fs.s3a.multipart.size * If using either of these mechanisms, keep this value low The total number of threads performing work across all threads is set by fs.s3a.threads.max, with values setting the number of queued work items. </description> </property> <property> <name>fs.s3a.multipart.size</name> <value>100M</value> <description>How big (in bytes) to split upload or copy operations up into. A suffix from the set {K,M,G,T,P} may be used to scale the numeric value. </description> </property> <property> <name></name> <value>8</value> <description> Maximum Number of blocks a single output stream can have active (uploading, or queued to the central FileSystem instance's pool of queued operations. This stops a single stream overloading the shared thread pool. </description> </property>
If the amount of data written to a stream is below that set in fs.s3a.multipart.size, the upload is performed in the OutputStream.close() operation —as with the original output stream.
The published Hadoop metrics monitor include live queue length and upload operation counts, so identifying when there is a backlog of work/ a mismatch between data generation rates and network bandwidth. Per-stream statistics can also be logged by calling toString() on the current stream.
Files being written are still invisible until the write completes in the close() call, which will block until the upload is completed.
When is set to disk, all data is buffered to local hard disks prior to upload. This minimizes the amount of memory consumed, and so eliminates heap size as the limiting factor in queued uploads —exactly as the original “direct to disk” buffering.
<property> <name></name> <value>disk</value> </property> <property> <name>fs.s3a.buffer.dir</name> <value>${hadoop.tmp.dir}/s3a</value> <description>Comma separated list of directories that will be used to buffer file uploads to.</description> </property>
This is the default buffer mechanism. The amount of data which can be buffered is limited by the amount of available disk space.
When is set to bytebuffer, all data is buffered in “Direct” ByteBuffers prior to upload. This may be faster than buffering to disk, and, if disk space is small (for example, tiny EC2 VMs), there may not be much disk space to buffer with.
The ByteBuffers are created in the memory of the JVM, but not in the Java Heap itself. The amount of data which can be buffered is limited by the Java runtime, the operating system, and, for YARN applications, the amount of memory requested for each container.
The slower the upload bandwidth to S3, the greater the risk of running out of memory —and so the more care is needed in tuning the upload settings.
<property> <name></name> <value>bytebuffer</value> </property>
When is set to array, all data is buffered in byte arrays in the JVM’s heap prior to upload. This may be faster than buffering to disk.
The amount of data which can be buffered is limited by the available size of the JVM heap heap. The slower the write bandwidth to S3, the greater the risk of heap overflows. This risk can be mitigated by tuning the upload settings.
<property> <name></name> <value>array</value> </property>
Both the Array and Byte buffer buffer mechanisms can consume very large amounts of memory, on-heap or off-heap respectively. The disk buffer mechanism does not use much memory up, but will consume hard disk capacity.
If there are many output streams being written to in a single process, the amount of memory or disk used is the multiple of all stream’s active memory/disk use.
Careful tuning may be needed to reduce the risk of running out memory, especially if the data is buffered in memory.
There are a number parameters which can be tuned:
The total number of threads available in the filesystem for data uploads or any other queued filesystem operation. This is set in fs.s3a.threads.max
The number of operations which can be queued for execution:, awaiting a thread:
The number of blocks which a single output stream can have active, that is: being uploaded by a thread, or queued in the filesystem thread queue:
How long an idle thread can stay in the thread pool before it is retired: fs.s3a.threads.keepalivetime
When the maximum allowed number of active blocks of a single stream is reached, no more blocks can be uploaded from that stream until one or more of those active blocks’ uploads completes. That is: a write() call which would trigger an upload of a now full datablock, will instead block until there is capacity in the queue.
How does that come together?
As the pool of threads set in fs.s3a.threads.max is shared (and intended to be used across all threads), a larger number here can allow for more parallel operations. However, as uploads require network bandwidth, adding more threads does not guarantee speedup.
The extra queue of tasks for the thread pool ( covers all ongoing background S3A operations (future plans include: parallelized rename operations, asynchronous directory operations).
When using memory buffering, a small value of limits the amount of memory which can be consumed per stream.
When using disk buffering a larger value of does not consume much memory. But it may result in a large number of blocks to compete with other filesystem operations.
We recommend a low value of; enough to start background upload without overloading other parts of the system, then experiment to see if higher values deliver more throughput —especially from VMs running on EC2.
<property> <name></name> <value>4</value> <description> Maximum Number of blocks a single output stream can have active (uploading, or queued to the central FileSystem instance's pool of queued operations. This stops a single stream overloading the shared thread pool. </description> </property> <property> <name>fs.s3a.threads.max</name> <value>10</value> <description>The total number of threads available in the filesystem for data uploads *or any other queued filesystem operation*.</description> </property> <property> <name></name> <value>5</value> <description>The number of operations which can be queued for execution</description> </property> <property> <name>fs.s3a.threads.keepalivetime</name> <value>60</value> <description>Number of seconds a thread can be idle before being terminated.</description> </property>
There are two mechanisms for cleaning up after leftover multipart uploads: - Hadoop s3guard CLI commands for listing and deleting uploads by their age. Documented in the S3Guard section. - The configuration parameter fs.s3a.multipart.purge, covered below.
If a large stream write operation is interrupted, there may be intermediate partitions uploaded to S3 —data which will be billed for.
These charges can be reduced by enabling fs.s3a.multipart.purge, and setting a purge time in seconds, such as 86400 seconds —24 hours. When an S3A FileSystem instance is instantiated with the purge time greater than zero, it will, on startup, delete all outstanding partition requests older than this time.
<property> <name>fs.s3a.multipart.purge</name> <value>true</value> <description>True if you want to purge existing multipart uploads that may not have been completed/aborted correctly</description> </property> <property> <name>fs.s3a.multipart.purge.age</name> <value>86400</value> <description>Minimum age in seconds of multipart uploads to purge</description> </property>
If an S3A client is instantiated with fs.s3a.multipart.purge=true, it will delete all out of date uploads in the entire bucket. That is: it will affect all multipart uploads to that bucket, from all applications.
Leaving fs.s3a.multipart.purge to its default, false, means that the client will not make any attempt to reset or change the partition rate.
The best practise for using this option is to disable multipart purges in normal use of S3A, enabling only in manual/scheduled housekeeping operations.
The S3A Filesystem client supports the notion of input policies, similar to that of the Posix fadvise() API call. This tunes the behavior of the S3A client to optimise HTTP GET requests for the different use cases.
See Improving data input performance through fadvise for the details.
S3A metrics can be monitored through Hadoop’s metrics2 framework. S3A creates its own metrics system called s3a-file-system, and each instance of the client will create its own metrics source, named with a JVM-unique numerical ID.
As a simple example, the following can be added to to write all S3A metrics to a log file every 10 seconds: *.period=10
Lines in that file will be structured like the following:
1511208770680 s3aFileSystem.s3aFileSystem: Context=s3aFileSystem, s3aFileSystemId=892b02bb-7b30-4ffe-80ca-3a9935e1d96e, bucket=bucket,, files_created=1, files_copied=2, files_copied_bytes=10000, files_deleted=5, fake_directories_deleted=3, directories_created=3, directories_deleted=0, ignored_errors=0, op_copy_from_local_file=0, op_exists=0, op_get_file_status=15, op_glob_status=0, op_is_directory=0, op_is_file=0, op_list_files=0, op_list_located_status=0, op_list_status=3, op_mkdirs=1, op_rename=2, object_copy_requests=0, object_delete_requests=6, object_list_requests=23, object_continue_list_requests=0, object_metadata_requests=46, object_multipart_aborted=0, object_put_bytes=0, object_put_requests=4, object_put_requests_completed=4, stream_write_failures=0, stream_write_block_uploads=0, stream_write_block_uploads_committed=0, stream_write_block_uploads_aborted=0, stream_write_total_time=0, stream_write_total_data=0, s3guard_metadatastore_put_path_request=10, s3guard_metadatastore_initialization=0, object_put_requests_active=0, object_put_bytes_pending=0, stream_write_block_uploads_active=0, stream_write_block_uploads_pending=0, stream_write_block_uploads_data_pending=0, S3guard_metadatastore_put_path_latencyNumOps=0, S3guard_metadatastore_put_path_latency50thPercentileLatency=0, S3guard_metadatastore_put_path_latency75thPercentileLatency=0, S3guard_metadatastore_put_path_latency90thPercentileLatency=0, S3guard_metadatastore_put_path_latency95thPercentileLatency=0, S3guard_metadatastore_put_path_latency99thPercentileLatency=0
Depending on other configuration, metrics from other systems, contexts, etc. may also get recorded, for example the following:
1511208770680 metricssystem.MetricsSystem: Context=metricssystem,, NumActiveSources=1, NumAllSources=1, NumActiveSinks=1, NumAllSinks=0, Sink_fileNumOps=2, Sink_fileAvgTime=1.0, Sink_fileDropped=0, Sink_fileQsize=0, SnapshotNumOps=5, SnapshotAvgTime=0.0, PublishNumOps=2, PublishAvgTime=0.0, DroppedPubAll=0
Note that low-level metrics from the AWS SDK itself are not currently included in these metrics.
Hadoop’s distcp tool is often used to copy data between a Hadoop cluster and Amazon S3. See Copying Data Between a Cluster and Amazon S3 for details on S3 copying specifically.
The distcp update command tries to do incremental updates of data. It is straightforward to verify when files do not match when they are of different length, but not when they are the same size.
Distcp addresses this by comparing file checksums on the source and destination filesystems, which it tries to do even if the filesystems have incompatible checksum algorithms.
The S3A connector can provide the HTTP etag header to the caller as the checksum of the uploaded file. Doing so will break distcp operations between hdfs and s3a.
For this reason, the etag-as-checksum feature is disabled by default.
<property> <name>fs.s3a.etag.checksum.enabled</name> <value>false</value> <description> Should calls to getFileChecksum() return the etag value of the remote object. WARNING: if enabled, distcp operations between HDFS and S3 will fail unless -skipcrccheck is set. </description> </property>
If enabled, distcp between two S3 buckets can use the checksum to compare objects. Their checksums should be identical if they were either each uploaded as a single file PUT, or, if in a multipart PUT, in blocks of the same size, as configured by the value fs.s3a.multipart.size.
To disable checksum verification in distcp, use the -skipcrccheck option:
hadoop distcp -update -skipcrccheck /user/alice/datasets s3a://alice-backup/datasets