0

I want to optimize a following function. I'm using a broadcast inner join, which I assume is not fast enough.

I have a DataFrame of intervals with attributes: timestamp_start, timestamp_end And a time series Data Frame tuple with attributes: timestamp, value.

Function then returns all values that belong in one of the intervals:


def filter_intervals(intervals, df):

    df = df.join(broadcast(intervals),
        [df.timestamp >= intervals.timestamp_start,
         df.timestamp <= intervals.timestamp_end],
         how='inner')
    return df

How should I rewrite a function that would be more efficient?

bigruno
  • 54
  • 8
  • How big is `intervals` (in records, approximatelly)? Is it initially a DataFrame or you create a DataFrame from it for using it in the join? – Dmitry Y. Apr 26 '21 at 19:44
  • Let's assume that interval size is 500-2500 rows. Initially its a DataFrame.But I might as well collect the data if that would help. – bigruno Apr 27 '21 at 10:34

1 Answers1

1

If intervals is not very big, I would try to create a custom function is_in_interval(t). You can preliminary sort intervals by timestamp_start and use binary search to find the interval (if there is any). Also, instead of joining datasets I would create a UDF. Something like this:

import pyspark.sql.functions as F
from pyspark.sql.types import BooleanType

def filter_intervals(sorted_intervals, df):

  def is_in_interval(t):
    ...

  is_in_interval_udf = F.udf(is_valid_id, BooleanType())

  return df.filter(is_in_interval_udf("timestamp"))

In this case you will not have to broadcast intervals because there will be a local copy of intervals on each of the executors. But again this will be efficient only if intervals is not too big and fits executor's memory.

This is a post about Binary Search in Python: Binary search (bisection) in Python

Dmitry Y.
  • 185
  • 8