3

I am running a spark job which processes about 2 TB of data. The processing involves:

  1. Read data (avrò files)
  2. Explode on a column which is a map type
  3. OrderBy key from the exploded column
  4. Filter the DataFrame (I have a very small(7) set of keys (call it keyset) that I want to filter the df for). I do a df.filter(col("key").isin(keyset: _*) )
  5. I write this df to a parquet (this dataframe is very small)
  6. Then I filter the original dataframe again for all the key which are not in the keyset df.filter(!col("key").isin(keyset: _*) ) and write this to a parquet. This is the larger dataset.

The original avro data is about 2TB. The processing takes about 1 hr. I would like to optimize it. I am caching the dataframe after step 3, using shuffle partition size of 6000. min executors = 1000, max = 2000, executor memory = 20 G, executor core = 2. Any other suggestions for optimization ? Would a left join be better performant than filter ?

SCouto
  • 7,808
  • 5
  • 32
  • 49
ap2014
  • 91
  • 5
  • How many nodes are there in the Spark cluster and what kind of storage drives are being used? Reading and writing 4 TB of data in total (2 TB read + 2 TB write) in one hour gives 1,1 GB/s, which, depending on the configuration, might be close to the physical limit or not. – Hristo Iliev Apr 25 '20 at 17:52

3 Answers3

3

All look right to me. If you have small dataset then isin is okay.

1) Ensure that you can increase the number of cores. executor core=5

More than 5 cores not recommended for each executor. This is based on a study where any application with more than 5 concurrent threads would start hampering the performance.

2) Ensure that you have good/uniform partition strucutre.

Example (only for debug purpose not for production):

  import org.apache.spark.sql.functions.spark_partition_id
  yourcacheddataframe.groupBy(spark_partition_id).count.show()

This is will print spark partition number and how many records exists in each partition. based on that you can repartition, if you wanot more parllelism.

3) spark.dynamicAllocation.enabled could be another option.

For Example :

spark-submit --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.cachedExecutorIdleTimeout=100 --conf spark.shuffle.service.enabled=true

along with all other required props ..... thats for that job. If you give these props in spark-default.conf it would be applied for all jobs.

With all these aforementioned options your processing time might lower.

Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121
  • +1 does dynamic allocation works within the same spark App/job? This is more of across multiple spark apps isn't it? – Aravind Yarram Apr 25 '20 at 18:01
  • if you use `spark-submit --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.cachedExecutorIdleTimeout=100 --conf spark.shuffle.service.enabled=true ` along with all other required props ..... thats for that job. If you give these props in `spark-default.conf` thats for all the applications runnning in the cluster. – Ram Ghadiyaram Apr 26 '20 at 00:21
  • I meant to ask if it can reuse the "free resources" within the same app – Aravind Yarram Apr 26 '20 at 01:22
  • yes. based on the outstanding workload it will allocate and will work. if you are intested you see [ExecutorAllocationManager.scala](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala) for complete & better understanding. – Ram Ghadiyaram Apr 26 '20 at 02:18
  • if you are okay please care to accept [the answer as owner](https://meta.stackexchange.com/a/5235/369717) – Ram Ghadiyaram May 05 '20 at 20:59
0

spark.dynamicAllocation.enabled is enabled

partition sizes are quite uneven (based on the size of output parquet part files) since I am doing an orderBy key and some keys are more frequent than others.

keyset is a really small set (7 elements)

ap2014
  • 91
  • 5
0

On top of what has been mentioned, a few suggestions depending on your requirements and cluster:

  1. If the job can run at 20g executor memory and 5 cores, you may be able to fit more workers by decreasing the executor memory and keeping 5 cores
  2. Is the orderBy actually required? Spark ensures that rows are ordered within partitions, but not between partitions which usually isn't terribly useful.
  3. Are the files required to be in specific locations? If not, adding a
df.withColumn("in_keyset", when( col('key').isin(keyset), lit(1)).otherwise(lit(0)). \
write.partitionBy("in_keyset").parquet(...)

may speed up the operation to prevent the data from being read in + exploded 2x. The partitionBy ensures that the items in the keyset are in a different directory than the other keys.

ayplam
  • 1,943
  • 1
  • 14
  • 20
  • The files are supposed to be in different hive tables. That is why I was doing the filtering twice. I added caching to prevent reading twice – ap2014 Apr 27 '20 at 16:06
  • What do you mean by Spark ensures that rows are ordered within partitions ? What does it order by ? – ap2014 Apr 27 '20 at 20:48
  • Let's say hypothetically your data is `[1,2,3,4,5,6,7,8]` and it is partitioned into two separate files. part-0001 can contain `[1,3,5,7]` while part-0002 can contain `[2,4,6,8]`. The values are only ordered within the partitions, I don't believe that part-0001 is guaranteed to contain all the smallest values. Also the `partitionBy` may do what you want - see https://stackoverflow.com/questions/31341498/save-spark-dataframe-as-dynamic-partitioned-table-in-hive. – ayplam Apr 29 '20 at 06:06