Overview
This problem can be efficiently solved in two parts.
The first part consists in finding the matching rows in df_a
and df_b
as well as the range of rows of df_c
based on ts
. This can can be done very quickly using a parallel Numba implementation (while consuming only a fraction of the input datasets).
The second part consists in computing the user-defined function based that are possibly Pandas ones. This later operation is inherently slow and memory expensive. Indeed, Pandas functions operate mainly on dataframes/series which are not efficient here. Iterating over Pandas dataframe containing generic pure-Python type is known to be painfully slow. Building many small dataframe is slow (there is a pretty high overhead to even create an empty dataframe) but memory efficient. Creating a big dataframe is significantly faster but this is clearly not memory efficient since it nearly force many rows to be replicated (dozens or even hundreds of time due to the number of items in df_c
to extract per id of df_a
/df_b
. In the end, the fastest Pandas solution will be far slower than the optimal time (by at least one order of magnitude). Also note that parallelism will barely help here because of the GIL preventing multithreaded code to be fast and pickling preventing multiprocessing to be fast. In addition, tools like Numba or Cython cannot help for user-defined generic pure-Python functions. AFAIK, the only way to make this part really fast and memory efficient is simply not to apply generic pandas functions on huge dataframes or generic pure-Python functions.
Part 1: Extracting dataframe rows
The first part can be done using a parallel Numba (JIT compiler) code. While Numba do not supports Pandas directly, Pandas uses mainly Numpy internally which is well supported by Numba. The computation can be split in many chunks computed efficiently in parallel. The main idea is to build a fast index of df_b
so to merge df_a
and df_b
in linear time, and use a binary search so to find ranges of matching rows in df_c
. The resulting code is very fast. The thing is the output format is not very efficient for the part 2. Here is the code:
import numba as nb
import numpy as np
import pandas as pd
# Feel free to change the signature based on the actual type of your dataframe. Smaller types take less memory and tends to be faster because of that.
@nb.njit('(int64[::1], int64[::1], int64[::1], int64[::1], int64[::1])', parallel=True)
def find_matching_rows(df_a_id, df_a_ts, df_b_id, df_b_ts, df_c_ts):
# Build an index of `df_b` IDs
b_index = {df_b_id[i]: i for i in range(df_b_id.size)}
# Mark the `df_a` rows found in `df_b` (parallel)
found = np.empty(df_a_id.size, np.bool_)
for a_row in nb.prange(df_a_id.size):
a_id = df_a_id[a_row]
found[a_row] = a_id in b_index
# Count the number of valid rows (parallel)
count = 0
for a_row in nb.prange(df_a_id.size):
count += found[a_row]
# Count the number of valid item per chunk and
# the offsets of the output of each chunk (mainly parallel)
chunk_size = 32768
chunk_count = (found.size + chunk_size - 1) // chunk_size
count_by_chunk = np.empty(chunk_count, np.int32)
for i in nb.prange(chunk_count):
count_by_chunk[i] = np.sum(found[i*chunk_size:(i+1)*chunk_size])
out_offsets = np.zeros(chunk_count + 1, np.int32)
for i in range(chunk_count):
out_offsets[i+1] = out_offsets[i] + count_by_chunk[i]
assert out_offsets[chunk_count] == count
# Main chunk-based computation (parallel)
a_rows = np.empty(count, np.int32) # `df_a` indices
b_rows = np.empty(count, np.int32) # `df_b` indices
c_rows = np.empty((count, 2), np.int32) # Start/end indices
for chunk_id in nb.prange(chunk_count):
a_row_start = chunk_id * chunk_size
a_row_end = min(df_a_id.size, a_row_start + chunk_size)
offset = out_offsets[chunk_id]
for a_row in range(a_row_start, a_row_end):
# Discard ids of `df_a` not in `df_b`
if not found[a_row]:
continue
a_id = df_a_id[a_row]
b_row = b_index[a_id]
ts_a, ts_b = df_a_ts[a_row], df_b_ts[b_row]
ts_min, ts_max = min(ts_a, ts_b), max(ts_a, ts_b)
c_start_row = np.searchsorted(df_c_ts, ts_min, 'left') # Included
c_end_row = np.searchsorted(df_c_ts, ts_max, 'right') # Excluded
# If the is no row found in `df_c`
if c_start_row >= c_end_row:
c_start_row = c_end_row = -1 # Not discarded (may be useful)
# Save results
a_rows[offset] = a_row
b_rows[offset] = b_row
c_rows[offset, 0] = c_start_row
c_rows[offset, 1] = c_end_row
offset += 1
return (a_rows, b_rows, c_rows)
Here is the way to call the function:
a_rows, b_rows, c_rows = find_matching_rows(
df_a['id'].values, df_a['ts'].values,
df_b['id'].values, df_b['ts'].values,
df_c['ts'].values
)
Part 2: dataframe & user-defined functions
As seen before, generic approaches are inherently inefficient (for both speed and memory-usage). One solution is to tweak your operation to apply them directly in the previous Numba code. This would make the overall implementation both very fast (ie. parallel and JIT-compiled) and memory efficient (ie. computed on the fly -- no need for a huge temporary dataframe). That being said, Numba do not support generic pure-Python object types nor pandas functions so this can require some non-trivial code rework regarding your actual dataframe.
The inefficient alternative is to create a big temporary dataframe from index-based arrays previously created by find_matching_rows
. Here is an example of Numba code to do that:
@nb.njit('(int32[::1], int32[::1], int32[:,::1])')
def build_df_index(a_rows, b_rows, c_rows):
n = a_rows.size
# Count he total number of rows to be computed in df_c
count = 0
for i in range(n):
count += c_rows[i, 1] - c_rows[i, 0]
new_a_rows = np.empty(count, np.int32)
new_b_rows = np.empty(count, np.int32)
new_c_rows = np.empty(count, np.int32)
offset = 0
for i in range(n):
for j in range(c_rows[i, 1] - c_rows[i, 0]):
new_a_rows[offset] = a_rows[i]
new_b_rows[offset] = b_rows[i]
new_c_rows[offset] = c_rows[i,0] + j
offset += 1
return (new_a_rows, new_b_rows, new_c_rows)
The resulting index arrays can be use to create the final dataframe with df_a.iloc[new_a_rows]
, df_b.iloc[new_b_rows]
and df_c.iloc[new_c_rows]
for example. If your actual dataframe contains only uniform types or ones that Numba supports, then you can directly generate this temporary dataframe with Numba (significantly faster than Pandas iloc
, especially if performed in parallel).