0

I have a large csv file with data in following format.

cityId1,name,address,.......,zip

cityId2,name,address,.......,zip

cityId1,name,address,.......,zip

........

cityIdN,name,address,.......,zip

I am performing following operation on the above csv file:

  1. Group by cityId as key and list of resources as value

    df1.groupBy($"cityId").agg(collect_list(struct(cols.head, cols.tail: _*)) as "resources")

  2. Change it to jsonRDD

    val jsonDataRdd2 = df2.toJSON.rdd

  3. Iterate through each Partition and upload to s3 per key

  • I can not use dataframe partitionby write because of business logic constraints (how other services read from S3 )

My Questions:

  • What is the default size of a spark partition?
  • Let's say default size of partition is X MBs and there is one large record present in the dataFrame with key having Y MBs of data (Y > X) , what would happen in this scenario?
  • Do I need to worry about having the same key in different partitions in that case?
Jumpo
  • 3
  • 4
  • hard to follow. can u show code pls? – thebluephantom Oct 23 '20 at 11:46
  • Does this answer your question? [How does spark.csv determine the number of partitions on read?](https://stackoverflow.com/questions/50496935/how-does-spark-csv-determine-the-number-of-partitions-on-read) – suj1th Oct 23 '20 at 14:28

1 Answers1

0

In answer to your questions:

  • When reading from secondary storage (S3, HDFS) the partitions are equal to block size of file system, 128MB or 256MB; but you can repartition RDDs immediately, not Data Frames. (For JDBC and Spark Structured Streaming the partitions are dynamic in size.)

  • When applying 'wide transformations' and re-partitioning the number and size of partitions most likely change. The size of a given partition has a maximum value. In Spark 2.4.x the partition size increased to 8GB. So, if any transformation (e.g. collect_list in combination with groupBy) gens more than this maximum size, you will get an error and the program aborts. So you need to partition wisely or in your case have sufficient number of partitions for aggregation - see spark.sql.shuffle.partitions parameter.

  • The parallel model for processing by Spark relies on 'keys' being allocated via hash, range partitioning, etc. being distributed to one and only one partition - shuffling. So, iterating through a partition foreachPartition, mapPartitions there is no issue.

thebluephantom
  • 16,458
  • 8
  • 40
  • 83
  • would you please take a look at this question? https://stackoverflow.com/q/72397146/6640504 , Many thanks. – M_Gh May 27 '22 at 15:45