2

I am facing java.io.IOException: s3n://bucket-name : 400 : Bad Request error while loading Redshift data through spark-redshift library:

The Redshift cluster and the s3 bucket both are in mumbai region.

Here is the full error stack:

2017-01-13 13:14:22 WARN  TaskSetManager:66 - Lost task 0.0 in stage 0.0 (TID 0, master): java.io.IOException: s3n://bucket-name : 400 : Bad Request
            at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.processException(Jets3tNativeFileSystemStore.java:453)
            at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.processException(Jets3tNativeFileSystemStore.java:427)
            at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.handleException(Jets3tNativeFileSystemStore.java:411)
            at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:181)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
            at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
            at org.apache.hadoop.fs.s3native.$Proxy10.retrieveMetadata(Unknown Source)
            at org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:476)
            at com.databricks.spark.redshift.RedshiftRecordReader.initialize(RedshiftInputFormat.scala:115)
            at com.databricks.spark.redshift.RedshiftFileFormat$$anonfun$buildReader$1.apply(RedshiftFileFormat.scala:92)
            at com.databricks.spark.redshift.RedshiftFileFormat$$anonfun$buildReader$1.apply(RedshiftFileFormat.scala:80)
            at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(fileSourceInterfaces.scala:279)
            at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(fileSourceInterfaces.scala:263)
            at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:116)
            at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
            at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
            at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
            at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
            at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
            at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
            at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
            at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
            at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
            at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
            at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
            at org.apache.spark.scheduler.Task.run(Task.scala:86)
            at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            at java.lang.Thread.run(Thread.java:745)
    Caused by: org.jets3t.service.impl.rest.HttpException: 400 Bad Request
            at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:425)
            at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:279)
            at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRestHead(RestStorageService.java:1052)
            at org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectImpl(RestStorageService.java:2264)
            at org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectDetailsImpl(RestStorageService.java:2193)
            at org.jets3t.service.StorageService.getObjectDetails(StorageService.java:1120)
            at org.jets3t.service.StorageService.getObjectDetails(StorageService.java:575)
            at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:174)
            ... 30 more

And here is my java code for the same:

SparkContext sparkContext = SparkSession.builder().appName("CreditModeling").getOrCreate().sparkContext();
sparkContext.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem");
sparkContext.hadoopConfiguration().set("fs.s3a.awsAccessKeyId", fs_s3a_awsAccessKeyId);
sparkContext.hadoopConfiguration().set("fs.s3a.awsSecretAccessKey", fs_s3a_awsSecretAccessKey);
sparkContext.hadoopConfiguration().set("fs.s3a.endpoint", "s3.ap-south-1.amazonaws.com");

SQLContext sqlContext=new SQLContext(sparkContext);
Dataset dataset= sqlContext
        .read()
        .format("com.databricks.spark.redshift")
        .option("url", redshiftUrl)
        .option("query", query)
        .option("aws_iam_role", aws_iam_role)
        .option("tempdir", "s3a://bucket-name/temp-dir")
        .load();

I was able to solve the problem on spark local mode by doing following changes (referred this):

1) I have replaced the jets3t jar to 0.9.4

2) Changed jets3t configuration properties to support the aws4 version bucket as follows:

Jets3tProperties myProperties = Jets3tProperties.getInstance(Constants.JETS3T_PROPERTIES_FILENAME);
myProperties.setProperty("s3service.s3-endpoint", "s3.ap-south-1.amazonaws.com");
myProperties.setProperty("storage-service.request-signature-version", "AWS4-HMAC-SHA256");
myProperties.setProperty("uploads.stream-retry-buffer-size", "2147483646");

But now i am trying to run the job in a clustered mode (spark standalone mode or with a resource manager MESOS) and the error appears again :(

Any help would be appreciated!

Amit Valse
  • 95
  • 10

2 Answers2

3

Actual Problem:

Updating Jets3tProperties, to support AWS s3 signature version 4, at runtime worked on local mode but not on cluster mode because the properties were only getting updated on the driver JVM but not on any of the executor JVM's.

Solution:

I found a workaround to update the Jets3tProperties on all executors by referring to this link.

By referring to the above link I have put an additional code snippet, to update the Jets3tProperties, inside .foreachPartition() function which will run it for the first partition created on any of the executors.

Here is the code:

 Dataset dataset= sqlContext
            .read()
            .format("com.databricks.spark.redshift")
            .option("url", redshiftUrl)
            .option("query", query)
            .option("aws_iam_role", aws_iam_role)
            .option("tempdir", "s3a://bucket-name/temp-dir")
            .load();

dataset.foreachPartition(rdd -> {
    boolean first=true;
    if(first){
        Jets3tProperties myProperties =
                Jets3tProperties.getInstance(Constants.JETS3T_PROPERTIES_FILENAME);
        myProperties.setProperty("s3service.s3-endpoint", "s3.ap-south-1.amazonaws.com");
        myProperties
                .setProperty("storage-service.request-signature-version", "AWS4-HMAC-SHA256");
        myProperties.setProperty("uploads.stream-retry-buffer-size", "2147483646");
        first = false;
    }
});
Amit Valse
  • 95
  • 10
  • Is there a better solution to this? I'm also stuck here. – Sudev Ambadi Aug 22 '17 at 11:47
  • @SudevAmbadi Answer to your question is no there is no direct solution available anywhere, this is the hack I had to put. It needs to be handled through Jets3t library. – Amit Valse Oct 07 '17 at 17:49
1

that stack implies that you're using the older s3n connector, based on jets3t. you are setting permissions which only work with S3a, the newer one. Use a URL like s3a:// to pick up the new entry.

Given you are trying to use V4 API, you'll need to set the fs.s3a.endpoint too. The 400/bad-request response is one you'd see if you tried to auth with v4 against the central endpointd

stevel
  • 12,567
  • 1
  • 39
  • 50
  • Thanks for your reply @Steve Loughran :) I have replaced the variable names with actual values in the question. Now as you can see I have set the tempdir with a URL s3a:// as you have mentioned. – Amit Valse Jan 15 '17 at 15:38
  • Also I have put actual fs.s3a.endpoint value for mumbai region. And you are right regarding the reason for 400/bad-request as you mentioned "The 400/bad-request response is one you'd see if you tried to auth with v4 against the central endpointd". – Amit Valse Jan 15 '17 at 15:44
  • But all the changes I have made here work perfectly with local mode but not on cluster mode. So my guess is that it might just be getting updated on driver JVM and not on executor JVM's. Does this make sense? – Amit Valse Jan 15 '17 at 15:47
  • In last comment I meant the jets3t properties which I am overwriting might just be getting updated on driver JVM and not on executor JVM's. – Amit Valse Jan 15 '17 at 16:38
  • certainly if you are playing with system properties for jets3t, they don't propagate. That doesn't hold for S3a properties set in the spark conf, "spark.hadoop.fs.s3a...". —they will propagate, which is something I am confident of, given that is I how I work with them – stevel Jan 15 '17 at 17:00
  • Actually I also tried updating the [properties in jets3t source code](https://bitbucket.org/jmurty/jets3t/src/cf9ce49d7f44befa939fe8ca1578fb44c06b0daa/configs/jets3t.properties?at=Release-0.9.4&fileviewer=file-view-default), compiled and replaced the jar with ../spark-2.0.2-bin-hadoop2.7/jars/jets3t-0.9.3.jar on all the nodes, but still same error. – Amit Valse Jan 15 '17 at 17:26
  • like I said: avoid s3n. Use s3a URLs. I've never tried v4 auth with jets3t; hadoop only supports v4 auth with the S3a filesystem client. You must have s3a:// in the URL – stevel Jan 15 '17 at 18:18
  • Yes I have used s3a URLs only but in error i am getting s3n somehow. – Amit Valse Jan 15 '17 at 18:21
  • well, that's going to be something wrong with your code or system setup. The good news: long-lived spark processes are easy to hook up debuggers too. the bad news, that's pretty much the next step remaining. Try with logging at DEBUG level first tho'. – stevel Jan 16 '17 at 10:57
  • Yes trying to find out in the DEBUG logs. Thanks for your help :) I will update here once I find the solution. – Amit Valse Jan 16 '17 at 11:09