I'm struggling with programming the following formula using Pyspark:
I tried to do that in python it looked the following (still not 100% sure it's correct):
win_lean = 5000
df['U_rms'] = df.Current1.pow(2).rolling(min_periods=1, window=win_len).apply(lambda x: np.sqrt(x.mean()))
In pyspark I was trying several attitudes:
- This one didn't work at all:
win_len = 5000
df = df.select("Current1").withColumn("U_rms", (F.pow("Current1", 2).Window( windowDuration=win_len).apply(lambda x: F.sqrt(x.mean()))))
- Using @udf still seems like not a giving me a correct result:
import numpy as np
from pyspark.sql.types import FloatType
from pyspark.sql import functions as F
@udf(returnType=FloatType())
def MyFunc(value):
win_len=5000
return float(
np.sqrt(
np.sum([np.square(value) for i in range(1, win_len+1)]) / win_len
)
)
df = df.withColumn("u_rms1", MyFunc(F.col("Current1")))
In this case I also prefere not to have null values in the beginning since windowing starts after the first 5000 samples. Thank you in advance.