0

Goal: after doing transformations and such to my dataframe, I need to repartition(1) then write with a custom name to S3. Many of questions I have found on this subject talk about the inability to rename S3 objects and I don't want to "copy then delete original" approach as that won't scale very well.

Here is my working code:

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = databaseName, table_name = tableName,  transformation_ctx = "datasource0")
datasink4 = datasource0.toDF().repartition(1)
datasink4 = datasink4.write.format('csv').mode("overwrite").save("s3://bucket-name-here/" + tableName + "/" + tableName + ".csv")

tableName is a variable I defined earlier in the code. This code is not failing, but is instead creating an object with the name: "s3://bucket-name-here/tableName/tablename.csv/part-0000-dfrandomstringofcharacters.csv"

So it is repartitioning correctly but is not saving as I want it to. What am I doing wrong? How can I save directly from a dataframe (or dynamicframe, I'm open to that) with a custom name I define?

nojohnny101
  • 514
  • 7
  • 26
  • If you're concerned with approaches that *"won't scale well"* you may want to focus more on the [expensive `repartition(1)`](https://stackoverflow.com/q/31610971/5858851) than the *"copy and delete"*. – pault Mar 12 '19 at 17:50
  • @pault thanks for pointing that out. I will consider that but was trying to tackle this problem first. – nojohnny101 Mar 12 '19 at 18:01
  • @William not sure what your ultimate goal is, but I usually avoid the repartition, write the file to hdfs, and concatenate the output from the CLI. You can probably modify [this answer](https://stackoverflow.com/a/47932523/5858851) to do the same thing on s3. **Edit** I also realized that the question I linked may answer your original question as well. – pault Mar 12 '19 at 18:07
  • This deviates from the original question but the destination is an S3 bucket that can be queried by Athena. If we build a data catalog with crawlers from a bunch of part files, will that work? Additionally, if we want to perform logic on that "table" (made up of a bunch of files) such as inserts/deletes, then how does that work across partitioned files? – nojohnny101 Mar 12 '19 at 18:25
  • @William Also not sure what your ultimate goal is, but you might try using the RDD method [`mapPartitionsWithIndex()`](https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations) to write each partition as a separate file in S3. (You would produce each file as a side-effect of the map function, and return something like `[None]` as the mapped partition.) – Jesse Amano Mar 12 '19 at 22:40
  • Further research has shown this is just not possible within pyspark. As other threads have suggested, one can execute a mv command (essentially a copy/delete) on the part file once it is written. – nojohnny101 Mar 15 '19 at 19:28

0 Answers0