I am dealing with spark dataframes with very long columns that represents time domain signals. Lots of millions of rows.
I need to perform signal processing on these using some functions from SciPy, which require me to input the columns as numpy arrays. Specifically I am trying to use these 2:
https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.stft.html https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.savgol_filter.html
My current approach is to turn the column into a dataframe using collect as follows:
x = np.array(df.select("column_name").collect()).reshape(-1)
Then I feed the result to SciPy
However:
- This is very slow
- Collect loads all the data in the driver and therefore is not scalable
Please, could somebody help me find the most performing way to do this?
I found this somewhat old post, but there is no conclusion to it and I can find the following objections to it:
How to convert a pyspark dataframe column to numpy array
Objections to that post:
- It is not clear to me that Dask Arrays are compatible with SciPy, because Dask apparently only implements a subset of Numpy algorithms.
- The problem of how to convert from a PySpark dataframe to an array in the first place (using
collect()
currently) would not be solved by dask.
Please, help on this is much needed and would be greatly appreciated.
Many thanks in advance.