11

I need to fetch data from DynamoDB tables with Spark using Java. It works fine with user’s access key and secret key:

final JobConf jobConf = new JobConf(sc.hadoopConfiguration());
jobConf.set("dynamodb.servicename", "dynamodb");
jobConf.set("dynamodb.input.tableName", tableName);
jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat");
jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat");
jobConf.set("dynamodb.awsAccessKeyId",  accessKey);
jobConf.set("dynamodb.awsSecretAccessKey", secretKey);
jobConf.set("dynamodb.endpoint", endpoint);

I need to use AWS assumed role and STS (at least by security reasons) for fetching data from DynamoDB exactly with spark. Is it possible? I found that it possible to use assumed role to access AWS S3 with spark (https://issues.apache.org/jira/browse/HADOOP-12537, https://hadoop.apache.org/docs/current3/hadoop-aws/tools/hadoop-aws/index.html), but haven’t found similar idea for DynamoDB.

For receiving STS temporary credentials I use the following code:

AWSSecurityTokenService stsClient = AWSSecurityTokenServiceClientBuilder.defaultClient();
AssumeRoleRequest assumeRequest = new AssumeRoleRequest()
        .withRoleArn(roleArn)  // arn:aws:iam::XXXXXXX:role/assume-role-DynamoDB-ReadOnly
        .withDurationSeconds(3600)
        .withRoleSessionName("assumed-role-session");
AssumeRoleResult assumeResult = stsClient.assumeRole(assumeRequest);
Credentials credentials = assumeResult.getCredentials();

Invoking credentials.getAccessKeyId(), credentials.getSecretAccessKey() and credentials.getSessionToken() return generated temporary credentials. With these credentials I successfully could take data from DynamoDB using java aws sdk AmazonDynamoDBClient (non-spark approach).

Is it possible with spark? Does spark allow to use something like the following: jobConf.set("dynamodb.awsSessionToken”, sessionToken) ?

Vasyl Sarzhynskyi
  • 3,689
  • 2
  • 22
  • 55

1 Answers1

3

Looking through the code, you may be able to use the dynamodb.customAWSCredentialsProvider with an instance of com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider to get what you want working.

https://github.com/awslabs/emr-dynamodb-connector/blob/master/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/DynamoDBConstants.java#L30

https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/STSAssumeRoleSessionCredentialsProvider.html


EDIT: So this was a little harder than I first thought. I ended up implementing my own wrapper around STSAssumeRoleSessionCredentialsProvider.

package foo.bar;

import com.amazonaws.auth.AWSSessionCredentials;
import com.amazonaws.auth.AWSSessionCredentialsProvider;
import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;

public class HadoopSTSAssumeRoleSessionCredentialsProvider
        implements AWSSessionCredentialsProvider, Configurable {

    private static final String ROLE_ARN_CONF = "assumed.creds.role.arn";
    private static final String SESSION_NAME_CONF = "assumed.creds.session.name";

    private Configuration configuration;
    private STSAssumeRoleSessionCredentialsProvider delegate;

    public AWSSessionCredentials getCredentials() {
        return delegate.getCredentials();
    }

    public void refresh() {
        delegate.refresh();
    }

    public void setConf(Configuration configuration) {
        this.configuration = configuration;
        String roleArn = configuration.get(ROLE_ARN_CONF);
        String sessionName = configuration.get(SESSION_NAME_CONF);

        if (roleArn == null || roleArn.isEmpty() || sessionName == null || sessionName.isEmpty()) {
            throw new IllegalStateException("Please set " + ROLE_ARN_CONF + " and "
                    + SESSION_NAME_CONF + " before use.");
        }
        delegate = new STSAssumeRoleSessionCredentialsProvider.Builder(
                roleArn, sessionName).build();
    }

    public Configuration getConf() {
        return configuration;
    }
}

And then you can use it like this:

val ddbConf: JobConf = new JobConf(sc.hadoopConfiguration)

ddbConf.set("dynamodb.customAWSCredentialsProvider",
    "foo.bar.HadoopSTSAssumeRoleSessionCredentialsProvider")
ddbConf.set("assumed.creds.role.arn", "roleArn")
ddbConf.set("assumed.creds.session.name", "sessionName")
vkubushyn
  • 121
  • 6
  • I tried the above solution but I get java.lang.RuntimeException: Custom AWSCredentialsProvider not found error. Anyone facing the same issue? – YBathia Sep 10 '19 at 20:55
  • If it's not able to find the class then it's probably not on your classpath or there is an issue with the package/class name. I ended up building a superjar with my custom credential provider included. One thing to note is that if there are issues with the IAM role or something of that nature, the exceptions will be silent. There's not a very good logging mechanism for those kinds of errors inside Spark. – vkubushyn Sep 11 '19 at 22:09
  • I am getting the ClassNotFoundException as well. I am using an uber jar and have verified that the class exists in the jar on all of the workers. Anyone else run into this? – taylorcressy May 15 '20 at 04:01
  • So we use the Java SDK to create/call an AWS Glue job. Not sure how this would work on native Hadoop though: `Map arguments = new HashMap<>();` `arguments.put("--extra-jars", "s3://myBucket/some.jar);` `new StartJobRunRequest().withArguments(arguments)...` You may want to look at https://stackoverflow.com/questions/37132559/add-jars-to-a-spark-job-spark-submit The `spark.executor.extraClassPath` is probably your ticket. – vkubushyn Jul 20 '20 at 18:46