0

I have a set of meetings and a participation table. For meetings I have a start and an end time, and for participation table, each row corresponds to a participant joining a meeting. The participation table includes only instances that have actually happened. I would like to find K other meetings that the participant could have joined, with the constraint that the meetings were available at the time (i.e., K other meetings that were going on at the same time).

Here is an example:

Given the two matrices below, Meeting availability:

| Meeting | Start time           | End time              |
| ------- | -------------------- | --------------------- |
| M1      | Nov 01, 2021 8:00 AM | Nov 01, 2021 9:00 AM  |
| M2      | Nov 01, 2021 8:00 AM | Nov 01, 2021 8:45 AM  |
| M3      | Nov 01, 2021 8:15 AM | Nov 01, 2021 8:45 AM  |
| M4      | Nov 01, 2021 8:15 AM | Nov 01, 2021 9:00 AM  |
| M5      | Nov 01, 2021 9:00 AM | Nov 01, 2021 10:00 AM |
| M6      | Nov 01, 2021 9:00 AM | Nov 01, 2021 9:45 AM  |
| M7      | Nov 01, 2021 9:15 AM | Nov 01, 2021 9:45 AM  |
| M8      | Nov 01, 2021 9:15 AM | Nov 01, 2021 10:00 AM |

and participation:

| User | Meeting | Joined time          |
|------|---------|----------------------|
| U1   | M1      | Nov 01, 2021 8:01 AM |
| U2   | M3      | Nov 01, 2021 8:16 AM |
| U3   | M7      | Nov 01, 2021 9:16 AM |

I'd like the output to look like this (assuming K=2):

| User | Meeting | Joined time          | K other meetings possible |
|------|---------|----------------------|---------------------------|
| U1   | M1      | Nov 01, 2021 8:01 AM | [M2]                      |
| U2   | M3      | Nov 01, 2021 8:16 AM | [M4, M2]                  |
| U3   | M7      | Nov 01, 2021 9:16 AM | [M8, M5]                  |

Basically, I would like to calculate a new column which includes K random meetings that were going on at the same time (with minute granularity) at the time participant joined a meeting.

One idea is to create a per-minute meeting availability table (similar to this solution), join it with participation table on datetime, and then use a udf to pick K random meetings. But that might result in out of memory issues. Any other idea is appreciated. Assume that per-minute availability table exists. Is there any option other than joining and filtering?

  • Does this answer your question? [How to join two dataframes for which column values are within a certain range?](https://stackoverflow.com/questions/46525786/how-to-join-two-dataframes-for-which-column-values-are-within-a-certain-range) – alparslan mimaroğlu Nov 30 '21 at 08:56

1 Answers1

1

I'm not sure how you would get a combination of two tables without joining them, but I'll provide a solution which doesn't have to add a bunch of extra per-minute data.

Starting with creating the example data:

# create the example dataframes
meeting_df = sc.createDataFrame(
    # using pandas to initialize the example df
    pd.DataFrame(
        {
            "Meeting": [f"M{i}" for i in range(1, 9)],
            "Start time": [
                datetime(2021, 11, 1, 8),
                datetime(2021, 11, 1, 8),
                datetime(2021, 11, 1, 8, 15),
                datetime(2021, 11, 1, 8, 15),
                datetime(2021, 11, 1, 9),
                datetime(2021, 11, 1, 9),
                datetime(2021, 11, 1, 9, 15),
                datetime(2021, 11, 1, 9, 15),
            ],
            "End time": [
                datetime(2021, 11, 1, 9),
                datetime(2021, 11, 1, 8, 45),
                datetime(2021, 11, 1, 8, 45),
                datetime(2021, 11, 1, 9),
                datetime(2021, 11, 1, 10),
                datetime(2021, 11, 1, 9, 45),
                datetime(2021, 11, 1, 9, 45),
                datetime(2021, 11, 1, 10),
            ],
        }
    )
)
participation_df = sc.createDataFrame(
    pd.DataFrame(
        {
            "User": ["U1", "U2", "U3"],
            "Meeting": ["M1", "M3", "M7"],
            "Joined time": [
                datetime(2021, 11, 1, 8, 1),
                datetime(2021, 11, 1, 8, 16),
                datetime(2021, 11, 1, 9, 16),
            ],
        }
    )
)

Then, join the tables to get all the possible meetings for each join time besides the meeting that was actually joined.

all_possible_meetings = participation_df.join(
    meeting_df,
    on=(
        participation_df["Joined time"].between(meeting_df["Start time"], meeting_df["End time"])
        & (participation_df["Meeting"] != meeting_df["Meeting"])
    ),
).select(
    participation_df["User"],
    participation_df["Meeting"],
    participation_df["Joined time"],
    meeting_df["Meeting"].alias("Other meeting"),
)
all_possible_meetings.show()

+----+-------+-------------------+-------------+
|User|Meeting|        Joined time|Other meeting|
+----+-------+-------------------+-------------+
|  U1|     M1|2021-11-01 08:01:00|           M2|
|  U2|     M3|2021-11-01 08:16:00|           M1|
|  U2|     M3|2021-11-01 08:16:00|           M2|
|  U2|     M3|2021-11-01 08:16:00|           M4|
|  U3|     M7|2021-11-01 09:16:00|           M5|
|  U3|     M7|2021-11-01 09:16:00|           M6|
|  U3|     M7|2021-11-01 09:16:00|           M8|
+----+-------+-------------------+-------------+

Add row numbers to this dataframe based on a randomly generated number per row so we can select the top K per user. We partition by the user, meeting, and join time so this numbering happens for each user join time.

all_possible_meetings_numbered = all_possible_meetings.withColumn(
    "row",
    F.row_number().over(
        Window.partitionBy("User", "Meeting", "Joined time").orderBy(F.rand())
    )
)
all_possible_meetings_numbered.show()

+----+-------+-------------------+-------------+---+
|User|Meeting|        Joined time|Other meeting|row|
+----+-------+-------------------+-------------+---+
|  U1|     M1|2021-11-01 08:01:00|           M2|  1|
|  U2|     M3|2021-11-01 08:16:00|           M2|  1|
|  U2|     M3|2021-11-01 08:16:00|           M4|  2|
|  U2|     M3|2021-11-01 08:16:00|           M1|  3|
|  U3|     M7|2021-11-01 09:16:00|           M6|  1|
|  U3|     M7|2021-11-01 09:16:00|           M8|  2|
|  U3|     M7|2021-11-01 09:16:00|           M5|  3|
+----+-------+-------------------+-------------+---+

Then select only the top K rows per user/meeting/join time and collect the results as a list. In this case, I set K = 2.

result_df = all_possible_meetings_numbered.filter(
    F.col("row") <= K
).groupBy(
    "User", "Meeting", "Joined time"
).agg(
    F.collect_list("Other meeting").alias(f"{K} other meetings possible")
)
result_df.show()

+----+-------+-------------------+-------------------------+
|User|Meeting|        Joined time|2 other meetings possible|
+----+-------+-------------------+-------------------------+
|  U1|     M1|2021-11-01 08:01:00|                     [M2]|
|  U2|     M3|2021-11-01 08:16:00|                 [M2, M1]|
|  U3|     M7|2021-11-01 09:16:00|                 [M6, M5]|
+----+-------+-------------------+-------------------------+
rchome
  • 2,623
  • 8
  • 21
  • This is an elegant solution and works for when the scale of the problem is relatively low. For my case, however, number of interactions and number of concurrent meetings are pretty high, so joining and finding all the possible interactions are inefficient (I would have to use a huge cluster), since I'll be filtering down to small Ks (<10). In a way, I need a custom join that stops the join after finding enough matches for each row. – Hossein Mousavi Dec 02 '21 at 00:11
  • On the same note, would Spark be smart enough to combine join and filter in the same run? Since, a separate join will create a huge dataframe with a need for a large total memory, but the combination will only create a dataframe with a size <= K*size(original participation dataframe). – Hossein Mousavi Dec 02 '21 at 00:14
  • I'm not too familiar with the internals of Spark's query optimization. You would need to analyze the query plan with `explain()` to see if there are any inefficiencies. One thing off the top of my head is you could change the `row_number()` to be based on an existing column instead of `rand()` to take advantage of an existing partitioning so Spark can avoid computing a new column and sorting the data based on that column. This might bias the results a bit but it might not matter depending on your use case. – rchome Dec 02 '21 at 00:41