If you have millions of lines, you’re better off starting by optimizing the complexity (big-o) of your algorithm before parallelizing. Here you’re comparing all a
items with all b
ranges, and what’s more with the slow method .apply()
and manually iterating on rows − let’s also replace all that with pandas/numpy primitives.
The free bonus is that you don’t need to handle parallelism yourself anymore, numpy will do it under the hood for you. See for example this guide under the « Use parallel primitives » part.
I’ll suppose for optimising purposes that you have N
values in file_a and M
distinct intervals.
1. We can start by coalescing all your b
intervals into the minimum number of distinct intervals
>>> starts = df['start'].value_counts().sort_index()
>>> ends = df['end'].value_counts().sort_index()
>>> ends.index += 1
>>> idx = ends.index.join(starts.index, how='outer')
>>> depth = (starts.reindex(idx, fill_value=0) - ends.reindex(idx, fill_value=0)).cumsum()
>>> depth[depth.eq(0) | depth.eq(0).shift(fill_value=True)]
0 1
36 0
38 1
40 0
41 1
86 0
87 1
103 0
dtype: int64
>>> min_intervals = pd.DataFrame({
... 'start': idx[depth.eq(0).shift(fill_value=True)],
... 'end': idx[depth.eq(0)] - 1
... })
>>> min_intervals
start end
0 0 35
1 38 39
2 41 85
3 87 102
The main subtlety here is the +1
and -1
to the end bound, because the bound is inclusive and we want to compute the union of contiguous intervals correctly, i.e. [3, 5]
and [6, 8]
should be [3, 8]
Of course if you know your intervals are already separate, then you can skip all that and do min_intervals = file_b_df[['start', 'end']].sort_values('start')
.
2. Compare values with the sorted bounds, O(N×M) -> O(N×log(M))
Note that now min_intervals
is sorted and that if we stack it, we get the bounds in sorted order. Now let’s add back the +1
and we can use pd.Series.searchsorted()
to find out where in this sequence of bounds a number should be inserted to preserve the order:
>>> bounds = min_intervals.stack()
>>> bounds.loc[slice(None), ['end']] += 1
>>> bounds
0 start 0
end 36
1 start 38
end 40
2 start 41
end 86
3 start 87
end 103
dtype: int64
>>> bounds.searchsorted([0, 1, 35, 36, 100, 86], 'right')
array([1, 1, 1, 2, 7, 6])
As you can see numbers that are in the intervals (0, 1 and 35 in [0, 35], 100 in [87, 103]) return an uneven number, and numbers that are not in an interval (36, 86) return an even number.
>>> file_a = pd.DataFrame({'val': [0, 1, 35, 36, 100, 86]})
>>> contained = pd.Series(bounds.searchsorted(file_a['val'], 'right'), index=file_a.index).mod(2).eq(1)
>>> file_a[contained]
val
0 0
1 1
2 35
4 100
>>> file_a[~contained]
val
3 36
5 86
This works well if you don’t want to modify file_a
of course if you’re willing to sort file_a
you can get even faster results (supposing N > M).
3. Compare sorted values with interval bounds, O(N×log(M)) -> O(log(N)×M)
>>> file_a.sort_values('val', inplace=True)
From there we can now use searchsorted
on file_a
, e.g. to compute the rows of values that are contained:
>>> rows = pd.Series(file_a['val'].searchsorted(bounds, 'left'), index=bounds.index)
>>> rows
0 start 0
end 3
1 start 4
end 4
2 start 4
end 4
3 start 5
end 6
dtype: int64
>>> pd.concat([file_a.iloc[slice(*v)] for k, v in rows.groupby(level=0)])
val
0 0
1 1
2 35
4 100
Or we can construct a boolean indexer in the same way we did interval intersections:
>>> contained = rows.xs('start', 'index', 1).value_counts().reindex(rows.unique(), fill_value=0)
>>> contained -= rows.xs('end', 'index', 1).value_counts().reindex(rows.unique(), fill_value=0)
>>> contained = contained.cumsum().reindex(np.arange(len(file_a))).ffill().eq(1)
>>> contained.index = file_a.index # Index was row numbers, use file_a’s index
>>> contained
0 True
1 True
2 True
3 False
5 False
4 True
dtype: bool
>>> file_a[contained]
val
0 0
1 1
2 35
4 100
>>> file_a[~contained]
val
3 36
5 86
Now the other upside of this strategy is that no looping is ever done in code we have written. So we don’t need to care about parallelizing other than allowing numpy to use it. If you really want to add parallel optimisation yourself, this other question will be of interest.