10

I am performing a rolling median calculation on individual time series dataframes, then I want to concat/append the results.

# UDF for rolling median
median_udf = udf(lambda x: float(np.median(x)), FloatType())

series_list = ['0620', '5914']
SeriesAppend=[]

for item in series_list:
    # Filter for select item
    series = test_df.where(col("ID").isin([item]))
    # Sort time series
    series_sorted = series.sort(series.ID, 
    series.date).persist()
    # Calculate rolling median
    series_sorted = series_sorted.withColumn("list", 
        collect_list("metric").over(w)) \
        .withColumn("rolling_median", median_udf("list"))

    SeriesAppend.append(series_sorted)

SeriesAppend

[DataFrame[ntwrk_genre_cd: string, date: date, mkt_cd: string, syscode: string, ntwrk_cd: string, syscode_ntwrk: string, metric: double, list: array, rolling_median: float], DataFrame[ntwrk_genre_cd: string, date: date, mkt_cd: string, syscode: string, ntwrk_cd: string, syscode_ntwrk: string, metric: double, list: array, rolling_median: float]]

When I attempt to .show():

'list' object has no attribute 'show'
Traceback (most recent call last):
AttributeError: 'list' object has no attribute 'show'

I realize this is saying the object is a list of dataframes. How do I convert to a single dataframe?

I know that the following solution works for an explicit number of dataframes, but I want my for-loop to be agnostic to the number of dataframes:

from functools import reduce
from pyspark.sql import DataFrame

dfs = [df1,df2,df3]
df = reduce(DataFrame.unionAll, dfs)

Is there a way to generalize this to non-explicit dataframe names?

mwhee
  • 652
  • 2
  • 6
  • 17
  • 1
    I guess you need `union`. Have a look at this [answer](https://stackoverflow.com/a/33744540/9274732), a method to union several dataframes from a list is explicited – Ben.T May 29 '19 at 16:19
  • 2
    union them all together. One way is to use `functools.reduce` and do the following: `reduce(lambda a, b: a.union(b), SeriesAppend[1:], SeriesAppend[0])` – pault May 29 '19 at 16:20
  • 1
    Possible duplicate of [Spark unionAll multiple dataframes](https://stackoverflow.com/questions/37612622/spark-unionall-multiple-dataframes). Second answer is for pyspark. – pault May 29 '19 at 16:29
  • If you add `"ID"` into your window `w` as another partitionBy argument, you do not need to do the for loop and union at all. Just subset the dataframe into the ids you want `test_df = test_df.where(col("ID").isin(series_list))` and you are good to go. – Richard Nemeth May 29 '19 at 18:13
  • Richard, that suggestion would work, but I will not know all my ID's. For instance, there will be somewhere around 30k series, but the exact N is not determined. – mwhee May 29 '19 at 18:58
  • 2
    @mwhee what do you mean by explicit number of dataframes? the point of using `reduce` is to perform the function (here union) as many times as you need it. If you do `df = reduce(DataFrame.unionAll, SeriesAppend)` outside of the `for` loop, you don't need to specify the number of dataframe anywhere. Or there is something else I missed/don't understand? – Ben.T May 29 '19 at 19:51

2 Answers2

18

Thanks everyone! To sum up - the solution uses Reduce and unionAll:

from functools import reduce
from pyspark.sql import DataFrame

SeriesAppend=[]

for item in series_list:
    # Filter for select item
    series = test_df.where(col("ID").isin([item]))
    # Sort time series
    series_sorted = series.sort(series.ID, 
    series.date).persist()
    # Calculate rolling median
    series_sorted = series_sorted.withColumn("list", 
         collect_list("metric").over(w)) \
         .withColumn("rolling_median", median_udf("list"))

    SeriesAppend.append(series_sorted)

df_series = reduce(DataFrame.unionAll, SeriesAppend)
mwhee
  • 652
  • 2
  • 6
  • 17
2

Another option would be to union your dataframes as you loop through, rather than collect them in a list and union afterwards. You can achieve this by setting a unioned_df variable to 'None' before the loop, and on the first iteration of the loop, setting the unioned_df to the current dataframe. All subsequent iterations of the loop then are unioned to the (now existing) unioned_df.

# UDF for rolling median
median_udf = udf(lambda x: float(np.median(x)), FloatType())

series_list = ['0620', '5914']
unioned_df = None

for item in series_list:
    # Filter for select item
    series = test_df.where(col("ID").isin([item]))
    # Sort time series
    series_sorted = series.sort(series.ID, 
        series.date).persist()
    # Calculate rolling median
    series_sorted = series_sorted.withColumn("list", collect_list("metric").over(w)) \
                                 .withColumn("rolling_median", median_udf("list"))

    # If unioned_df doesn't exist, create it using current iteration of series_sorted.
    # Otherwise append current iteration of series_sorted to the existing unioned_df.
    if not unioned_df:
        unioned_df = series_sorted
    else:
        unioned_df = unioned_df.union(series_sorted)
Daniel
  • 73
  • 8
  • Thanks, I think this is the more straightforward answer and what I instinctively thought of, but @mwhee's answer above seems more elegant. Do you know which of the two is more "pyspark-y"? – AHegde Jul 29 '23 at 18:53