I have a simple function in pandas for handling time-series. My problem is: the number of time-series I want to apply it to is very large. So I want to use pySpark to scale it out.
I face 2 problems:
- The schema must be passed explicitly can this made somewhat smoother to be passed implicitly?
- The code fails with:
Number of columns of the returned pandas.DataFrame doesn't match specified schema. Expected: 2 Actual: 3
-- how can I ensure that the schema is automatically matched? Notice: I do not want to manually specify it (ideally). In the worst case scenario - could I apply my function to a single time series and have the outputs of the pandas dataframes dtypes converted over to sparks expected schema?
pandas function
import pandas as pd
from pandas import Timestamp
df = pd.DataFrame({'time':['2020-01-01 00:00', '2020-01-01 03:00', '2020-01-01 04:00', '2020-01-06 00:00'], 'category':['1','1','1','1'], 'value':[5, 8, 7, 2]})
df['time'] = pd.to_datetime(df['time'])
display(df)
print(df.shape)
def my_complex_function(pdf):
# fill missing hours
pdf = pdf.set_index('time').resample('H').mean().fillna(0)
# some complex computation/business logic which is adding columns/simplified here:
pdf['foo'] = 'bar'
pdf['baz'] = 123
return pdf
print(df.groupby(['category']).apply(my_complex_function).reset_index().shape)
pyspark function
NOTICE: the version of Spark is: v3.0.1
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("anomlydetection").master("local[4]").config("spark.driver.memory", "2G").getOrCreate()
sdf = spark.createDataFrame(df)
sdf.printSchema()
def my_complex_function_spark(pdf: pd.DataFrame)-> pd.DataFrame:
# fill missing hours
pdf = pdf.set_index('time').resample('H').mean().fillna(0)
# some complex computation/business logic which is adding columns/simplified here:
pdf['foo'] = 'bar'
pdf['baz'] = 123
return pdf
# 1) can I somehow implicitly get a reference to the schema?
# 2) this function fails due to schema mismatch
sdf.groupby("category").applyInPandas(my_complex_function_spark, schema=df.schema).show()