5

I'm trying to write an ETL process that merges two datasets before a union I add a column to each dataset, the fresher dataset gets 2's, the older dataset gets 1's, then if rows have duplicate primary keys I drop the row that has a 1 in the old/new column. I've tried writing this in several ways, most recently by doing an:

orderBy(keys, desc(old/new)).dropDuplicates(keys)

But on large datasets I always get massive slowdowns with a message that says:

16/09/21 20:31:45 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (0  time so far)
16/09/21 20:32:00 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (1  time so far)
16/09/21 20:32:16 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (2  times so far)
16/09/21 20:32:31 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (3  times so far)
16/09/21 20:32:47 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (4  times so far)
16/09/21 20:33:02 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (5  times so far)
16/09/21 20:33:18 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (6  times so far)
16/09/21 20:33:33 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (7  times so far)
16/09/21 20:33:49 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (8  times so far)
16/09/21 20:34:04 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (9  times so far)
16/09/21 20:34:19 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (10  times so far)
16/09/21 20:34:35 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (11  times so far)
16/09/21 20:34:50 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (12  times so far)
16/09/21 20:35:06 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (13  times so far)
16/09/21 20:35:21 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (14  times so far)
16/09/21 20:35:37 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (15  times so far)
16/09/21 20:35:52 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (16  times so far)
16/09/21 20:36:07 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (17  times so far)
16/09/21 20:36:23 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (18  times so far)
16/09/21 20:36:38 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (19  times so far)
16/09/21 20:36:53 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (20  times so far)
16/09/21 20:37:09 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (21  times so far)
16/09/21 20:37:24 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (22  times so far)
16/09/21 20:37:40 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (23  times so far)
16/09/21 20:37:55 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (24  times so far)
16/09/21 20:38:10 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (25  times so far)
16/09/21 20:38:25 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (26  times so far)
16/09/21 20:38:41 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (27  times so far)
16/09/21 20:38:56 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (28  times so far)
16/09/21 20:39:25 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (0  time so far)
16/09/21 20:39:45 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (1  time so far)
16/09/21 20:40:05 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (2  times so far)
16/09/21 20:40:26 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (3  times so far)
16/09/21 20:40:46 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (4  times so far)
16/09/21 20:41:07 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (5  times so far)
16/09/21 20:41:27 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (6  times so far)
16/09/21 20:41:47 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (7  times so far)
16/09/21 20:42:07 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (8  times so far)
16/09/21 20:42:28 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (9  times so far)
16/09/21 20:42:49 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (10  times so far)
16/09/21 20:43:09 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (11  times so far)
16/09/21 20:43:30 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (12  times so far)
16/09/21 20:43:50 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (13  times so far)
16/09/21 20:44:11 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (14  times so far)
16/09/21 20:44:31 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (15  times so far)
16/09/21 20:44:52 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (16  times so far)
16/09/21 20:45:13 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (17  times so far)
16/09/21 20:45:33 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (18  times so far)
16/09/21 20:45:53 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (19  times so far)
16/09/21 20:46:14 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (20  times so far)
16/09/21 20:46:34 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (21  times so far)
16/09/21 20:46:54 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (22  times so far)
16/09/21 20:47:14 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (23  times so far)
16/09/21 20:47:34 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (24  times so far)
16/09/21 20:47:54 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (25  times so far)
16/09/21 20:48:14 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (26  times so far)
16/09/21 20:48:34 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (27  times so far)
16/09/21 20:48:54 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (28  times so far)

And upon inspection of the Spark UI there's only one thread that's working overtime while the rest have already finished.
enter image description here
Is it possible to get that spread out among threads?

Brady Auen
  • 215
  • 3
  • 13

1 Answers1

1

You approach this problem in a way which by design amplifies any possible issues related to the data skew. Since you start with reordering data by key and indicator variable you shuffle data first, possibly creating highly unbalanced partition. Any reduction applied after that won't be able to compensate this.

There at least two methods which can be used to achieve the same results while fully benefit from the map side reduction. I explained both in my answer to SPARK DataFrame: select the first row of each group so just to reiterate:

  • You can use struct ordering to choose minimum / maximum row per group.
  • You can use statically typed Dataset with groupByKey followed by reduceGroups.
Community
  • 1
  • 1
zero323
  • 322,348
  • 103
  • 959
  • 935
  • How could I be creating a skew if I'm doing an orderBy on primary keys? I've tried your Window.partitionBy method and had the issue, but I'll explore the other alternatives you provided. – Brady Auen Sep 21 '16 at 21:45
  • Window functions will do pretty much the same thing (repartition first) so it is not something you want to do in this case. `orderBy` is more or less equivalent to `partitionBy` so if some keys are particularly common it will result in non-uniform distribution. What you want in case like this is an effective map-side combine. It may not resolve the problem but it should at least improve overall performance. – zero323 Sep 21 '16 at 21:51
  • Right, but in this case my keys are primary keys/composite keys so they should all be unique and I'm still getting the issue. – Brady Auen Sep 22 '16 at 14:58
  • 1
    @zero323 I am trying to read from oracle and store in parquet file , while storing parquet i am df .write.format("parquet") .mode("overwrite") .partitionBy("year","month") .save(parquet_file) But there is a lot of shuffling going on , and data being spilling to disk in 100GBs , and out of 30 executors only one is taking a lot time ...how to paralallize it ? – BdEngineer Jan 09 '19 at 15:13
  • @BdEngineer I am stuck with same scenario, while performing saveAsTable lots of data is spilling to disk and only 1 executor is working at a time. Were you able to solve your issue, I know it was 4 years ago so hope you remember it – Himanshu Jan 19 '23 at 13:37