1

At the moment I have 9 functions which do specific calculations to a data frame - average balance per month included, rolling P&L, period start balances, ratio calculation.

Each of those functions produce the following: the first columns are the group by columns which the function accepts and the final column is the statistic calculation.

I.e.

Each of those functions produce a spark data frame that has the same group by variables(same first columns - 1 column if the group by variables is only 1, 2 columns if the group by variables are 2, etc.) and 1 column where the values are the specific calculation - examples of which I listed at the beginning.

Because each of those functions do different calculations, I need to produce a data frame for each one and then join them to produce a report

I join them on the group by variables because they are common in all of them(each individual statistic report).

But doing 7-8 and even more joins is very slow.

Is there a way to add those columns together without using join?

Thank you.

Toby Djelyinski
  • 128
  • 1
  • 11
  • Can you not use window functions for those operations, and then dedup the df retaining the required columns. – samkart Dec 03 '19 at 11:09

1 Answers1

0

I can think of multiple approaches. But this looks like a good use case for a new pandas udf spark api.

You can define one group_by udf. The udf will receive the aggregated group as a pandas dataframe. You apply 9 aggregate functions on the group and return a pandas dataframe with additional 9 aggregated columns. Spark will combine each new returned pandas dataframe into a large spark dataframe.

e.g

# given you want to aggregate average and ratio
@pandas_udf("month long, avg double, ratio dbl", PandasUDFType.GROUPED_MAP)
def compute(pdf):
    # pdf is a pandas.DataFrame
    pdf['avg'] = compute_avg(pdf)
    pdf['ratio'] = compute_ratio(pdf)
    return pdf

df.groupby("month").apply(compute).show()

See Pandas-UDF#Grouped-Map

If you cluster is on a lower version you have 2 options:

  1. Stick to dataframe api and write custom aggregate functions. See answer. They have a horrible api but usage would look like this.
df.groupBy(df.month).agg(
  my_avg_func(df.amount).alias('avg'),
  my_ratio_func(df.amount).alias('ratio'),
  1. Fall back to good ol' rdd map reduce api
 #pseudocode
 def _create_tuple_key(record):
    return (record.month, record)
 def _compute_stats(acc, record):
    acc['count'] += 1
    acc['avg'] =  _accumulate_avg(acc['count'], record)
    acc['ratio'] =  _accumulate_ratio(acc['count'], record)
    return acc
 df.toRdd.map(__create_tuple_key).reduceByKey(_compute_stats)
dre-hh
  • 7,840
  • 2
  • 33
  • 44
  • isn't turning each statistic spark df into pandas df slow ? because we are talking about transforming more than 10 spark dfs into pandas dfs ? – Toby Djelyinski Dec 03 '19 at 10:20
  • the code will not turn each statistic df into pandas df. A pandas df will be created for each aggregate group. of your data. so lets say for each month of data. so pandas is just used as general purpose data structure for the grouped records. This should be way faster then joining 9 dataframes. Joining is an operation wich needs data shuffeling and spark communication with the worker nodes. With this strategy this is happening only 1 time. Where in in the above case it happens 9 times. – dre-hh Dec 03 '19 at 10:30
  • The code will shuffle groups of data on each worker. Then on each worker a pandas dataframe will be created 1 time for each group. Computation functions then run locally on the pandas in memory datastructure. Then spark will aggregate the grouped results. This resembles the original rdd map reduce pattern. Check out the docs i linked , they call it now "split-apply-combine" pattern – dre-hh Dec 03 '19 at 10:32
  • Btw. Your original code might be just slow, because you were not caching temporary computation results. And each additional join, recomputes the prev aggregations again and again. It is not enough to save the temp dataframe as variable. Before you join, you need to call df.cache or df.persist. (cache is just alias for persist with some default option). However a pandas-udf is a more elegant approach as it computes all the 9 stats on the fly – dre-hh Dec 03 '19 at 10:41
  • with your last comment are you saying that performance might be improved if when I return the df from each function i do a .cache() ? btw. thanks for the detailed answer! – Toby Djelyinski Dec 03 '19 at 12:17
  • 1
    Yes. Without seeing the code I am not 100% sure. But i would try it as an easy temporary optimization. Store each df in separate variable. Perform aggregation and persist the df in memory, disk or both. df.cache will use persist with MEMORY_AND_DISK option. Perform join on persisted df. – dre-hh Dec 03 '19 at 12:44
  • @dre-hh How does your code handle different groupBy columns. According to the question we could want to calculate average based on col1, col2 and ratio based on col1, col2, col3. How does your code aggregate these two outputs then? Wouldn't a join be necessary based on some key? – wypul Dec 04 '19 at 10:36
  • @wypul i did not focus in the answer about the groupby strategy. in the question you also wrote `produce a spark data frame that has the same group by variables`. If the groups are different, computing, caching and joining dataframes is a more clear strategy. If you are not satisfied with performance. Group By by the most overlapping group and then apply a pandas udf on that group. then write multiple aggregate functions to aggregate the subgroup on different subgroup variables and return multipel columns for each subgroup – dre-hh Dec 04 '19 at 11:08