1

The input pyspark dataframe has one row per key_id and date_month. For one random key_id it looks like this

+--------+-------------+---------+---------+
| key_id | date_month  | value_1 | value_2 |
+--------+-------------+---------+---------+
|      1 | 2019-02-01  |   1.135 | 'a'     |
|      1 | 2019-03-01  |   0.165 | 'b'     |
|      1 | 2019-04-01  |     0.0 | null    |
+--------+-------------+---------+---------+

It needs to be resampled to weekly granularity to look like this

+--------+-------------+---------+---------+
| key_id |  date_week  | value_1 | value_2 |
+--------+-------------+---------+---------+
|      1 | 2019-02-04  |   1.135 | 'a'     |
|      1 | 2019-02-11  |   1.135 | 'a'     |
|      1 | 2019-02-18  |   1.135 | 'a'     |
|      1 | 2019-02-25  |   1.135 | 'a'     |
|      1 | 2019-03-04  |   0.165 | 'b'     |
|      1 | 2019-03-11  |   0.165 | 'b'     |
|      1 | 2019-03-18  |   0.165 | 'b'     |
|      1 | 2019-03-25  |   0.165 | 'b'     |
|      1 | 2019-04-01  |     0.0 | null    |
|      1 | 2019-04-08  |     0.0 | null    |
|      1 | 2019-04-15  |     0.0 | null    |
|      1 | 2019-04-22  |     0.0 | null    |
|      1 | 2019-04-29  |     0.0 | null    |
+--------+-------------+---------+---------+

Currently it is ~ 30 lines of code of switching between PySpark dataframes and Pandas: wrangling date range, joins, etc.

Is there a way of doing it in PySpark in a straightforward way?

I tried Pandas resampling from months to weeks, but I cannot figure out how to make it work when my "primary key" is a combination of a date_month and key_id.

At the moment number of rows in the initial dataframe is low ~250K and, I guess, converting a PySpark dataframe toPandas() and then doing the transformation in Pandas is a viable option.

Denys
  • 4,287
  • 8
  • 50
  • 80

1 Answers1

0

Solution below involves making a mapper of months to weeks (where weeks are the Mondays of the month), and joining it onto your original data.

Boring section to mimic your data:

## Replicate data with join trick to get out nulls
## Convert string to date format

import pyspark.sql.functions as F

c = ['key_id','date_month','value_1']
d = [(1,'2019-02-01',1.135),
        (1,'2019-03-01',0.165),
        (1,'2019-04-01',0.0)]

c2 = ['date_month','value_2']
d2 = [('2019-02-01','a'),
      ('2019-03-01','b')]

df = spark.createDataFrame(d,c)
df2 = spark.createDataFrame(d2,c2)

test_df = df.join(df2, how = 'left', on = 'date_month')

test_df_date = test_df.withColumn('date_month', F.to_date(test_df['date_month']))

test_df_date.orderBy('date_month').show() 

Your data:

+----------+------+-------+-------+
|date_month|key_id|value_1|value_2|
+----------+------+-------+-------+
|2019-02-01|     1|  1.135|      a|
|2019-03-01|     1|  0.165|      b|
|2019-04-01|     1|    0.0|   null|
+----------+------+-------+-------+

Build the mapper, using a neat trick from: get all the dates between two dates in Spark DataFrame

Ends with a mapper of month, to week beginnings in the month (You could do this straight to your raw data instead of creating a mapper.)

## Build month to week mapper

## Get first and last of each month, and number of days between
months = test_df_date.select('date_month').distinct()
months = months.withColumn('date_month_end', F.last_day(F.col('date_month')))
months = months.withColumn('days', F.datediff(F.col('date_month_end'), 
                                              F.col('date_month')))

## Use trick from https://stackoverflow.com/questions/51745007/get-all-the-dates-between-two-dates-in-spark-dataframe 
## Adds a column 'day_in_month' with all days in the month from first to last. 
## 
months = months.withColumn("repeat", F.expr("split(repeat(',', days), ',')"))\
    .select("*", F.posexplode("repeat").alias("day_in_month", "val"))\
    .drop("repeat", "val", "days")\
    .withColumn("day_in_month", F.expr("date_add(date_month, day_in_month)"))\

## Add integer day of week value - Sunday == 1, Monday == 2,
## Filter by mondays,
## Rename and drop columns 
months = months.withColumn('day', F.dayofweek(F.col('day_in_month')))
months = months.filter(F.col('day') == 2)
month_week_mapper = months.withColumnRenamed('day_in_month', 'date_week')\
    .drop('day', 'date_month_end')

month_week_mapper.orderBy('date_week').show()

Mapper is as follows:

+----------+----------+
|date_month| date_week|
+----------+----------+
|2019-02-01|2019-02-04|
|2019-02-01|2019-02-11|
|2019-02-01|2019-02-18|
|2019-02-01|2019-02-25|
|2019-03-01|2019-03-04|
|2019-03-01|2019-03-11|
|2019-03-01|2019-03-18|
|2019-03-01|2019-03-25|
|2019-04-01|2019-04-01|
|2019-04-01|2019-04-08|
|2019-04-01|2019-04-15|
|2019-04-01|2019-04-22|
|2019-04-01|2019-04-29|
+----------+----------+

Then we perform a left join onto the original data, each month gets joined to each of the respective weeks. The last line just drops excess columns, and reorders rows/columns to match your desired output.

## Perform the join, and do some cleanup to get results into order/format specified above. 
out_df = test_df_date.join(month_week_mapper, on = 'date_month', how = 'left')

out_df.drop('date_month')\
    .select('key_id','date_week','value_1','value_2')\
    .orderBy('date_week')\
    .show()
## Gives me an output of:
+------+----------+-------+-------+
|key_id| date_week|value_1|value_2|
+------+----------+-------+-------+
|     1|2019-02-04|  1.135|      a|
|     1|2019-02-11|  1.135|      a|
|     1|2019-02-18|  1.135|      a|
|     1|2019-02-25|  1.135|      a|
|     1|2019-03-04|  0.165|      b|
|     1|2019-03-11|  0.165|      b|
|     1|2019-03-18|  0.165|      b|
|     1|2019-03-25|  0.165|      b|
|     1|2019-04-01|    0.0|   null|
|     1|2019-04-08|    0.0|   null|
|     1|2019-04-15|    0.0|   null|
|     1|2019-04-22|    0.0|   null|
|     1|2019-04-29|    0.0|   null|
+------+----------+-------+-------+

That should work with your KeyID column, though you will need to test it with some slightly more varied data to be sure.

I would definitely advocate doing it something like the above, rather than converting to Pandas and back again. df.toPandas is pretty slow, and if the size of your data increases over time, the Pandas method will at some point fail, and you (or who ever maintains the code) will come across this issue then anyways.

alzinos
  • 71
  • 10