8

I have a big pyspark Dataframe which I want to save in myfile (.tsv) for further use. To do that, I defined the following code:

with open(myfile, "a") as csv_file:
        writer = csv.writer(csv_file, delimiter='\t')
        writer.writerow(["vertex" + "\t" + "id_source" + "\t" + "id_target" + "\t"+ "similarity"])
        
        for part_id in range(joinDesrdd_df.rdd.getNumPartitions()):
            part_rdd = joinDesrdd_df.rdd.mapPartitionsWithIndex(make_part_filter(part_id), True)
            data_from_part_rdd = part_rdd.collect()
            vertex_list = set()
           
            for row in data_from_part_rdd:
                writer.writerow([....])
            
        csv_file.flush() 

My code cannot pass this step, It generates an exception:

1.In the workers log:

19/07/22 08:58:57 INFO Worker: Executor app-20190722085320-0000/2 finished with state KILLED exitStatus 143
14: 19/07/22 08:58:57 INFO ExternalShuffleBlockResolver: Application app-20190722085320-0000 removed, cleanupLocalDirs = true
14: 19/07/22 08:58:57 INFO Worker: Cleaning up local directories for application app-20190722085320-0000
 5: 19/07/22 08:58:57 INFO Worker: Executor app-20190722085320-0000/1 finished with state KILLED exitStatus 143
 7: 19/07/22 08:58:57 INFO Worker: Executor app-20190722085320-0000/14 finished with state KILLED exitStatus 143
...

2.in the Job execution log:

Traceback (most recent call last):
  File "/project/6008168/tamouze/RWLastVersion2207/module1.py", line 306, in <module>
    for part_id in range(joinDesrdd_df.rdd.getNumPartitions()):
  File "/cvmfs/soft.computecanada.ca/easybuild/software/2017/Core/spark/2.3.0/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 88, in rdd
  File "/cvmfs/soft.computecanada.ca/easybuild/software/2017/Core/spark/2.3.0/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
  File "/cvmfs/soft.computecanada.ca/easybuild/software/2017/Core/spark/2.3.0/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/cvmfs/soft.computecanada.ca/easybuild/software/2017/Core/spark/2.3.0/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py", line 320, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o528.javaToPython.
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange hashpartitioning(id_source#263, id_target#292, similarity#258, 1024)
+- *(11) HashAggregate(keys=[id_source#263, id_target#292, similarity#258], functions=[], output=[id_source#263, id_target#292, similarity#258])

I don't know why this piece of my code generates the exception. Note that on small data, the execution is ok, but with big data it's not.

Also, please what is the best way to save a pysaprk dataframe for further use?

Update: I tried to replace the above with loop by the following:

joinDesrdd_df.withColumn("par_id",col('id_source')%50).repartition(50, 'par_id').write.format('parquet').partitionBy("par_id").save("/project/6008168/bib/RWLastVersion2207/randomWalkParquet/candidate.parquet")

I get similar exception:

19/07/22 21:10:18 INFO TaskSetManager: Finished task 653.0 in stage 11.0 (TID 2257) in 216940 ms on 172.16.140.237 (executor 14) (1017/1024)
19/07/22 21:11:32 ERROR FileFormatWriter: Aborting job null.
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange hashpartitioning(par_id#328, 50)
+- *(12) HashAggregate(keys=[id_source#263, id_target#292, similarity#258], functions=[], output=[id_source#263, id_target#292, similarity#258, par_id#328])
   +- Exchange hashpartitioning(id_source#263, id_target#292, similarity#258, 1024)
      +- *(11) HashAggregate(keys=[id_source#263, id_target#292, similarity#258], functions=[], output=[id_source#263, id_target#292, similarity#258])
         +- *(11) Project [id_source#263, id_target#292, similarity#258]
            +- *(11) BroadcastHashJoin [instance_target#65], [instance#291], Inner, BuildRight
Adarsh Kumar
  • 113
  • 2
  • 15
bib
  • 944
  • 3
  • 15
  • 32

1 Answers1

8

I'd suggest to use the Spark native write functionality:

joinDesrdd_df.write.format('csv').option("header", "true").save("path/to/the/output/csv/folder")

Spark will save each partition of the dataframe as a separate csv file into the path specified. You can control the number of files by the repartition method, which will give you a level of control of how much data each file will contain.

I'd also like to suggest to use ORC or Parquet data format for big datasets, since they are definitely more suited for storage of big datasets.

For example for parquet:

joinDesrdd_df.withColumn("par_id",col('id_source')%50). \
 repartition(50, 'par_id').write.format('parquet'). \
 save("/project/6008168/bib/RWLastVersion2207/randomWalkParquet/candidate.parquet")

To read it back into dataframe:

df = spark.read. \
 parquet("/project/6008168/bib/RWLastVersion2207/randomWalkParquet/candidate.parquet")
Richard Nemeth
  • 1,784
  • 1
  • 6
  • 16
  • I wanna try your suggestion but i this case how can re read all the generated partitions. Suppose please i want to save as parquet, how should be you line of code – bib Jul 23 '19 at 02:52
  • 2
    @bib.. this post has some good answers on reading partitions data from parquet file. https://stackoverflow.com/questions/33650421/reading-dataframe-from-partitioned-parquet-file – vikrant rana Jul 28 '19 at 16:58
  • using the the repartition method as suggested above, im still going into oom. – bib Aug 01 '19 at 18:51
  • Was your data skewed on partitioned column? – vikrant rana Aug 03 '19 at 20:35
  • also could you please show us the code, i.e; how you are creating that big dataframe before writing it to disk – vikrant rana Aug 04 '19 at 06:54