1

The following code is used to load data from oracle to S3.

source_data = spark.read.format("jdbc").option("url", url).option("dbtable", "scott.emp").option("fetchSize","10000").option("user", user).option("password", password) .option("driver", driver).load()
hadoopConf = spark_context._jsc.hadoopConfiguration()
hadoopConf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoopConf.setBoolean("fs.s3a.sse.enabled", True)
source_data.write.mode('overwrite').parquet("s3a://bucket1/folder1/emp")

I am able to read data but the code throws an error when I am trying to write to S3 directly. Error:

org.apache.hadoop.fs.FileAlreadyExistsException: Can't make directory for path 's3a://bucket1/folder1' since it is a file.
at org.apache.hadoop.fs.s3a.S3AFileSystem.mkdirs(S3AFileSystem.java:861)
at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1881)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:313)
at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:162)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:139)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:677)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:677)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:677)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:286)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:272)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:230)
at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:567)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)

Questions:

  1. Does s3a has some sort of cache which needs to be cleared?
  2. IS s3a a good option over s3?
Sow
  • 71
  • 1
  • 4

1 Answers1

0

The S3A connector has a notion of directory markers, "ends in a /" and files, "doesn't end in a /", and doesn't allow you to create files under files.

Delete it

  • Hadoop CLI: hadoop fs -rm -R s3a://bucket1/folder1
  • AWS console/CLI: the usual way

Does s3a has some sort of cache which needs to be cleared?

Yes, but unless you have it turned on, it's not the problem. The cache is there to handle s3 CRUD inconsistencies.

IS s3a a good option over s3?

It's the best open source S3 connector for applications using the Hadoop API. The EMR one is closed source. The presto one has some nice touches but it's optimised for the specific use cases Presto does.

stevel
  • 12,567
  • 1
  • 39
  • 50
  • Hello Steve. I was writing to an s3 compatible storage (with spark), which works with the s3a connector, and got an org.apache.hadoop.fs.FileAlreadyExistsException. This happened even though the write mode was set to "overwrite". How should this be handled? It seems to work incorrectly. – pavel_orekhov Jul 09 '23 at 13:49
  • Saw HADOOP-15542, and it seems to be different from what I have, because my exception complains about a plain file, not a file that looks like a directory. Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: s3a://hdp-temp/orekhov-ps/s3a-job-magic_prod/event_type=action_kit_clicked/event_date=2023-02-08/part-16199-c8c26c63-07dd-45ed-8014-2dbb26b07546.c000.zlib.orc already exists – pavel_orekhov Jul 09 '23 at 14:00
  • pavel, you are using minio standalone, aren't you? if so: look for hadoop jiras related to that. If not, create a new one an we an go from there .(if you need to create a new jra account say "stevel said I should" in the justification) – stevel Jul 10 '23 at 15:54
  • Steve, my problem was solved by setting spark.hadoop.orc.overwrite.output.file to true as suggested here: https://stackoverflow.com/questions/57471781/spark-filealreadyexistsexception-on-stage-failure. Btw, could you tell me, are you in any slack chats for this topic? I am currently learning about the s3a committer, and I want to ask a few questions about magic vs directory committers, I recently learned about the magic committer, but when I read the spark-cloud integration guide, they recommend using the directory committer, so, I'm a little confused. – pavel_orekhov Jul 10 '23 at 16:16
  • I am not using minio standalone, I am using obs from huawei, btw. – pavel_orekhov Jul 10 '23 at 16:19
  • Steve, btw, using the magic committer, the _SUCCESS file gets written into the .spark-staging directory, which gets removed right away after the job has finished. Is it a bug? Maybe my spark version isn't new enough (2.4.5). – pavel_orekhov Jul 10 '23 at 16:51
  • 1. you are doing insert overwrite, aren't you? doesn't work that well with AWS s3;, if your store's s3 rename is O(1) it may work – stevel Jul 11 '23 at 13:13
  • 2. reading on committers; https://github.com/steveloughran/zero-rename-committer/releases/tag/tag_release_2021-05-17 – stevel Jul 11 '23 at 13:14
  • 3. worry less about spark version and more about hadoop dependencies, go for 3.3.5 or later. this lets you list a second location to save _SUCCESS files if you want to keep the stats, and has many other s3-related bits of work. – stevel Jul 11 '23 at 13:15
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/254436/discussion-between-pavel-orekhov-and-stevel). – pavel_orekhov Jul 11 '23 at 13:22