0

Is there a way to merge two tables in pyspark - respect to a date, one presenting events linked to a date, and an other one presenting some other informations, presenting a period with a start and an end date ?

There is similar topics on python, but non on pyspark, like presented (using numpy) in this answer. My idea would not to get only one information but the complete available information in my right table.

In this example, I would get in df1, based on the id, all available information in df2 for this id, respecting the event_date including in the start_period and the end_period.

df1 = spark.createDataFrame([
(1,'a', datetime.datetime(2021,1,1)), 
(1,'b',  datetime.datetime(2021,1,5)), 
(1,'c',  datetime.datetime(2021,1,24)), 
(2,'d',  datetime.datetime(2021,1,10)), 
(2,'e' , datetime.datetime(2021,1,15))], ['id','event','event_date'])


df2 = spark.createDataFrame([
(1,'Xxz45','XX013', datetime.datetime(2021,1,1), datetime.datetime(2021,1,10)), 
(1,'Xasz','XX014', datetime.datetime(2021,1,11), datetime.datetime(2021,1,22)), 
(1,'Xbbd','XX015', datetime.datetime(2021,1,23), datetime.datetime(2021,1,26)), 
(1,'Xaaq','XX016', datetime.datetime(2021,1,27), datetime.datetime(2021,1,31))], ['id','info1','info2','start_period', 'end_period'])

[EDIT] The expected output would be (merging on id and on the event_date included in the period):

df_results = spark.createDataFrame([
(1, 'a', datetime.datetime(2021,1,1),'Xxz45','XX013'),
(1, 'b',  datetime.datetime(2021,1,5),'Xxz45','XX013'), 
(1, 'c',  datetime.datetime(2021,1,24),'Xbbd','XX015'), 
(2, 'd',  datetime.datetime(2021,1,10), NA, NA), 
(2, 'e' , datetime.datetime(2021,1,15), NA, NA)], ['id','event','event_date','info1','info2'])
Alex Germain
  • 411
  • 4
  • 16
  • 1
    given df1 and df2, what is your expected output? I'm guessing a simple join with event_date between the two periodes does not suffice? – ScootCork Oct 22 '21 at 20:49
  • You are right, that’s what I tried on `python` using `numpy`. Due to the amount of data (6M rows for df1 and 250k for df2), and due to the internal platform, I have to work on `pyspark` and tbh I don’t know how I could do this kind of join if df1 is between the two periods described in df2. But I’m happy to read it exists, do you have any hint to do it ? – Alex Germain Oct 24 '21 at 08:09
  • Is something like this would be a good solution (in terms of accuracy / time consumption) ? `df_results = df1.join(df2, df1.id == df2.id, "left").filter(df2.start_period <= df1.event_date).filter(df1.event_date <= df2.end_period)` Or it would be better to go through SQL statement ? – Alex Germain Oct 24 '21 at 11:58
  • This option would drop the none matched one unfortunately (`id=2` in this case) – Alex Germain Oct 24 '21 at 12:31

2 Answers2

2

You can left join df1 with df2 with condition start_period <= event_date <= end_period

from pyspark.sql import functions as F

(df1
    .join(df2, on=[df1['id'] == df2['id'], (df1['event_date'] >= df2['start_period']) & (df1['event_date'] <= df2['end_period'])], how='left')
    .drop(df2['id'])
    .drop('start_period', 'end_period')
    .show()
)

# Output
# +---+-----+-------------------+-----+-----+
# | id|event|         event_date|info1|info2|
# +---+-----+-------------------+-----+-----+
# |  1|    a|2021-01-01 00:00:00|Xxz45|XX013|
# |  1|    b|2021-01-05 00:00:00|Xxz45|XX013|
# |  1|    c|2021-01-24 00:00:00| Xbbd|XX015|
# |  2|    d|2021-01-10 00:00:00| null| null|
# |  2|    e|2021-01-15 00:00:00| null| null|
# +---+-----+-------------------+-----+-----+
pltc
  • 5,836
  • 1
  • 13
  • 31
  • I was not so far trying to filter it after the join ! I didn't though about including a condition IN the `join` ! Many thanks. – Alex Germain Oct 25 '21 at 06:44
0

What you can do is write an UDF that creates a new column in df2 from start_period and end_period, with values like

[
  datetime.datetime(2021,1,1),
  datetime.datetime(2021,1,2),
  datetime.datetime(2021,1,3),
  datetime.datetime(2021,1,4),
  datetime.datetime(2021,1,5),
  datetime.datetime(2021,1,6),
  datetime.datetime(2021,1,7),
  datetime.datetime(2021,1,8),
  datetime.datetime(2021,1,9),
  datetime.datetime(2021,1,10)
]

After that you can explode this column and get a row for every date in the list. Finally, you can do an ordinary join between df1 and df2.

I did not check whether there is any pushdown function to create the list of dates from the interval.

pltc
  • 5,836
  • 1
  • 13
  • 31
HadoopMarc
  • 1,356
  • 3
  • 11
  • You completely right, that’s what I was doing on a small amount of data, here I have 6M rows for df1 and 260k rows for df2, creating all serialize date will lead me to a huge df2. I was thinking on a way to tell me « ok bring me the information of df2 if df1 is between the period" – Alex Germain Oct 24 '21 at 08:07