28

I'm trying to make my Spark Streaming application reading his input from a S3 directory but I keep getting this exception after launching it with spark-submit script:

Exception in thread "main" java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively).
    at org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:66)
    at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.initialize(Jets3tNativeFileSystemStore.java:49)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59)
    at org.apache.hadoop.fs.s3native.$Proxy6.initialize(Unknown Source)
    at org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:216)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1386)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187)
    at org.apache.spark.streaming.StreamingContext.checkpoint(StreamingContext.scala:195)
    at MainClass$.main(MainClass.scala:1190)
    at MainClass.main(MainClass.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:292)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

I'm setting those variables through this block of code as suggested here http://spark.apache.org/docs/latest/ec2-scripts.html (bottom of the page):

val ssc = new org.apache.spark.streaming.StreamingContext(
  conf,
  Seconds(60))
ssc.sparkContext.hadoopConfiguration.set("fs.s3n.awsAccessKeyId",args(2))
ssc.sparkContext.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey",args(3))

args(2) and args(3) are my AWS Access Key ID and Secrete Access Key of course.

Why it keeps saying they are not set?

EDIT: I tried also this way but I get the same exception:

val lines = ssc.textFileStream("s3n://"+ args(2) +":"+ args(3) + "@<mybucket>/path/")
gprivitera
  • 933
  • 1
  • 8
  • 22

9 Answers9

26

Odd. Try also doing a .set on the sparkContext. Try also exporting env variables before you start the application:

export AWS_ACCESS_KEY_ID=<your access>
export AWS_SECRET_ACCESS_KEY=<your secret>

^^this is how we do it.

UPDATE: According to @tribbloid the above broke in 1.3.0, now you have to faff around for ages and ages with hdfs-site.xml, or your can do (and this works in a spark-shell):

val hadoopConf = sc.hadoopConfiguration;
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3.awsAccessKeyId", myAccessKey)
hadoopConf.set("fs.s3.awsSecretAccessKey", mySecretKey)
samthebest
  • 30,803
  • 25
  • 102
  • 142
  • 1
    You mean exporting them from the shell in the master and all slaves before using spark-submit? Or doing it inside the application with sys.env? – gprivitera Jun 05 '14 at 13:19
  • 1
    In the shell before you execute the application. Just in that shell, no need to execute it in any other shell other than the shell you execute the application. – samthebest Jun 05 '14 at 14:26
  • 1
    Exporting env variables before starting the application worked! Thank you! – gprivitera Jun 05 '14 at 15:01
  • 1
    Note: Setting `AWS_ACCESS_KEY_ID` or `fs.s3n.awsAccessKeyId` in the SparkConf().set(...) didn't do it. Setting `AWS_ACCESS_KEY_ID` in the Env or setting it in `spark-env.sh` did work. Sadly the non-working cases *should work*. – Joseph Lust Jan 21 '15 at 20:21
  • 2
    This no longer works after Spark 1.3. Now if you want to set up it statically. You have to add hdfs-site.xml into Spark's conf directory. There is no way to set it in commandline. I don't know the point of this design but it just happened – tribbloid Aug 16 '15 at 00:09
  • what would be the option if you had to juggle different keys at same time but didn't want to hardcode them in code using `hadoopConf`? – horatio1701d Jul 30 '16 at 09:18
22

The following configuration works for me, make sure you also set "fs.s3.impl":

val conf = new SparkConf().setAppName("Simple Application").setMaster("local")      
val sc = new SparkContext(conf)
val hadoopConf=sc.hadoopConfiguration;
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3.awsAccessKeyId",myAccessKey)
hadoopConf.set("fs.s3.awsSecretAccessKey",mySecretKey)
harel
  • 525
  • 5
  • 21
  • 1
    Much cleaner than dumping secrets into the Env. – Joseph Lust Jan 21 '15 at 20:25
  • 2
    It appears that in Python the hadoopConfiguration attribute is not available. Any ideas of a workaround? – Evan Zamir Jul 14 '15 at 16:36
  • In pyspark it's `hadoopConf=sc._jsc.hadoopConfiguration()` – Tristan Reid Jan 15 '16 at 01:02
  • @JosephLust cleaner is to not set these at all but to use IAM roles. You are forcing something to manage and protect these secrets when it isn't required. – Jayson Minard Jan 19 '16 at 20:24
  • @JaysonMinard good point. We don't use EC2 Roles on our Spark cluster due to multi-tenancy of jobs, where specific jobs are restricted to specific buckets. Our distributed configuration service takes care of providing the proper secrets to the proper job. – Joseph Lust Jan 20 '16 at 02:20
3

On AWS EMR the above suggestions did not work. Instead I updated the following properties in the conf/core-site.xml:

fs.s3n.awsAccessKeyId and fs.s3n.awsSecretAccessKey with your S3 credentials.

  • "Above" doesn't mean anything when the order of answers can change at any time. For EMR the answer from @ChristopherB looks correct. – Jayson Minard Jan 19 '16 at 20:21
3

For those using EMR, use the Spark build as described at https://github.com/awslabs/emr-bootstrap-actions/tree/master/spark and just reference S3 with the s3:// URI. No need to set S3 implementation or additional configuration as credentials are set by IAM or role.

ChristopherB
  • 2,038
  • 14
  • 18
3

I wanted to put the credentials more securely in a config file on one of my encrypted partitions. So I did export HADOOP_CONF_DIR=~/Private/.aws/hadoop_conf before running my spark application, and put a file in that directory (encrypted via ecryptfs) called core-site.xml containing the credentials like this:

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
  <property>
  <name>fs.s3n.awsAccessKeyId</name>
  <value>my_aws_access_key_id_here</value>
  </property>
  <property>
  <name>fs.s3n.awsSecretAccessKey</name>
  <value>my_aws_secret_access_key_here</value>
  </property>
</configuration>

HADOOP_CONF_DIR can also be set in conf/spark-env.sh.

nealmcb
  • 12,479
  • 7
  • 66
  • 91
2

Latest EMR releases (tested on 4.6.0) require the following configuration:

val sc = new SparkContext(conf)
val hadoopConf = sc.hadoopConfiguration
hadoopConf.set("fs.s3.impl", "com.amazon.ws.emr.hadoop.fs.EmrFileSystem")
hadoopConf.set("fs.s3.awsAccessKeyId", myAccessKey)
hadoopConf.set("fs.s3.awsSecretAccessKey", mySecretKey)

Although in most cases out of the box config should work - this is if you have different S3 credentials from the ones you launched the cluster with.

Dan Osipov
  • 1,429
  • 12
  • 15
1

In java, the following are the lines of code. You have to add AWS creds in SparkContext only and not SparkSession.

JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
sc.hadoopConfiguration().set("fs.s3a.access.key", AWS_KEY);
sc.hadoopConfiguration().set("fs.s3a.secret.key", AWS_SECRET_KEY);
Atihska
  • 4,803
  • 10
  • 56
  • 98
0

this works for me in 1.4.1 shell:

val conf = sc.getConf
conf.set("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
conf.set("spark.hadoop.fs.s3.awsAccessKeyId", <your access key>)
conf.set("spark.hadoop.fs.s3.awsSecretAccessKey", <your secret key>)
SparkHadoopUtil.get.conf.addResource(SparkHadoopUtil.get.newConfiguration(conf))
...
sqlContext.read.parquet("s3://...")
ru2nuts
  • 21
  • 2
  • This is technically the same as the answer from @harel, you just set from higher up in the configuration tree instead of going to the Hadoop configuration first. But it is the same. – Jayson Minard Jan 19 '16 at 20:23
  • Not the same for spark shell - you already have sc and sqlContext in the shell, @harel's answer creates a new conf and new sc. – ru2nuts Jan 22 '16 at 17:01
0

Augmenting @nealmcb's answer, the most straightforward way to do this is to define

HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop 

in conf/spark-env.sh or export that env variable in ~/.bashrc or ~/.bash_profile.

That will work as long as you can access s3 through hadoop. For instance, if you can run

hadoop fs -ls s3n://path/

then hadoop can see the s3 path.

If hadoop can't see the path, follow the advice of contained in How can I access S3/S3n from a local Hadoop 2.6 installation?

Community
  • 1
  • 1
Bob Baxley
  • 3,551
  • 1
  • 22
  • 28