1

I have two CSVs. df_sales, df_products. I want use pyspark to:

  1. Join df_sales and df_products on product_id. df_merged = df_sales.join(df_products,df_sales.product_id==df_products.product_id,"inner")
  2. Compute the summation of df_sales.num_pieces_sold per product. df_sales.groupby("product_id").agg(sum("num_pieces_sold"))

Both 1 and 2 would require the df_sales to be shuffled on product_id

How can I avoid shuffling df_sales 2 times?

Morteza
  • 441
  • 1
  • 6
  • 14
figs_and_nuts
  • 4,870
  • 2
  • 31
  • 56

1 Answers1

1

One solution to do what you ask would be to use repartition to shuffle the dataframe once, and then cache to keep the result in memory:

cached_df_sales = df_sales.repartition("product_id").cache()

# and then do your work
cached_df_sales\
    .join(df_products,cached_df_sales.product_id==df_products.product_id,"inner")
cached_df_sales.groupby("product_id").agg(sum("num_pieces_sold"))

However, I am not sure this is a good idea. Depending on its size, caching the entire df_sales dataframe might take a lot of memory. Also, the groupBy will only shuffle two columns of the dataframe, which could turn out to be rather inexpensive. I would start by making sure of that before trying to avoid a shuffle.

More generally, before trying to optimize anything, write it simply, run it, see what takes time and focus on that.

Oli
  • 9,766
  • 5
  • 25
  • 46
  • Yeah, you are right. I actually had tried caching the repartitioned data but it ended up being significantly slower than not caching it. I had read it somewhere that the output of a shuffle operation is always persisted on the reduce side and used in case it can be used later on. I wonder why that is not happening in my example – figs_and_nuts Jan 02 '23 at 08:13