6

Using pyspark I'm reading a dataframe from parquet files on Amazon S3 like

dataS3 = sql.read.parquet("s3a://" + s3_bucket_in)

This works without problems. But then I try to write the data

dataS3.write.parquet("s3a://" + s3_bucket_out)

I do get the following exception

py4j.protocol.Py4JJavaError: An error occurred while calling o39.parquet.
: java.lang.IllegalArgumentException: java.net.URISyntaxException: 
Relative path in absolute URI: s3a://<s3_bucket_out>_temporary

It seems to me that Spark is trying to create a _temporary folder first, before it is writing to write into the given bucket. Can this be prevent somehow, so that spark is writing directly to the given output bucket?

asmaier
  • 11,132
  • 11
  • 76
  • 103

2 Answers2

10

You can't eliminate the _temporary file as that's used to keep the intermediate work of a query hidden until it's complete

But that's OK, as this isn't the problem. The problem is that the output committer gets a bit confused trying to write to the root directory (can't delete it, see)

You need to write to a subdirectory under a bucket, with a full prefix. e.g. s3a://mybucket/work/out .

I should add that trying to commit data to S3A is not reliable, precisely because of the way it mimics rename() by what is something like ls -rlf src | xargs -p8 -I% "cp % dst/% && rm %". Because ls has delayed consistency on S3, it can miss newly created files, so not copy them.

See: Improving Apache Spark for the details.

Right now, you can only reliably commit to s3a by writing to HDFS and then copying. EMR s3 works around this by using DynamoDB to offer a consistent listing

stevel
  • 12,567
  • 1
  • 39
  • 50
  • Could I then force spark to not put the `_temporary` folder onto S3, but store it locally instead? – asmaier Sep 28 '17 at 09:40
  • no, because it's a core part of the commit algorithm. Executors write data under _temporary; when all the workers are finished then the driver commits it with rename...which only works within a single filesystem. – stevel Oct 01 '17 at 19:57
  • @SteveLoughran Hello Steeve, I've just see this conference https://www.youtube.com/watch?v=BgHrff5yAQo where they thank you by the way, is there an better way 1 year after or is it still the same issue ? – Kiwy Nov 16 '18 at 12:13
  • Yes, the S3A Committers in Hadoop 3.1 (shipping in HDP-3.0) don't use rename to commit work. We lifted a lot of code from Ryan Blye. Ryan is busy on Iceberg these days too: take a look at that – stevel Nov 18 '18 at 15:54
4

I had the same issue when writing the root of S3 bucket:

df.save("s3://bucketname")

I resolved it by adding a / after the bucket name:

df.save("s3://bucketname/")
Cristik
  • 30,989
  • 25
  • 91
  • 127