0

I'm struggling with programming the following formula using Pyspark:

enter image description here

I tried to do that in python it looked the following (still not 100% sure it's correct):

win_lean = 5000

df['U_rms'] = df.Current1.pow(2).rolling(min_periods=1, window=win_len).apply(lambda x: np.sqrt(x.mean()))

In pyspark I was trying several attitudes:

  1. This one didn't work at all:
win_len = 5000
    
df = df.select("Current1").withColumn("U_rms", (F.pow("Current1", 2).Window( windowDuration=win_len).apply(lambda x: F.sqrt(x.mean()))))
  1. Using @udf still seems like not a giving me a correct result:
import numpy as np
from pyspark.sql.types import FloatType
from pyspark.sql import functions as F

    @udf(returnType=FloatType())
    def MyFunc(value):
      win_len=5000
      return float(
        np.sqrt(
          np.sum([np.square(value) for i in range(1, win_len+1)]) / win_len
        )
      )
    df = df.withColumn("u_rms1", MyFunc(F.col("Current1")))

In this case I also prefere not to have null values in the beginning since windowing starts after the first 5000 samples. Thank you in advance.

Egorsky
  • 179
  • 1
  • 11
  • 1
    your UDF is definitely not taking next 5000 elements. It is calculating on range(1, 5001) – s510 Aug 24 '22 at 12:59
  • @derFotik Okay so how should it be? Should I use something like `collect()`, `select()`, or `Iterrows()`? – Egorsky Aug 24 '22 at 13:01
  • Any data sample? – Lamanus Aug 24 '22 at 13:25
  • [this](https://stackoverflow.com/a/37861832/8279585) has a great explanation – samkart Aug 24 '22 at 13:26
  • @Lamanus well I have more than 200,000,000 rows. It's just values that was obtained for the current. The thing is that I need to do rolling mean from the square of the value in one particular column – Egorsky Aug 24 '22 at 13:29
  • @samkart this example is based on python only but still even with `@udf` this won't be possible to do since it doesn't take into account windowing and just calculates that for given set of values. – Egorsky Aug 24 '22 at 13:30
  • looking at the mathematical notation, if we take `m=3` it resolves to `sqrt(((x1_square * 1) + (x2_square * 2) + (x3_square * 3)) / 3)` -- correct? are these `x1, x2, x3` values from the `current` field that you want to use as a look forward window? – samkart Aug 24 '22 at 13:34
  • @samkart yes. we iterate over a certain amount of samples. It takes 5000 samples and do `sqrt((x1 ** 2 + x2**2 + x3**2 + ... + x5000**2)/5000)` and this process repeats until it reaches the end of the dataset – Egorsky Aug 24 '22 at 13:42
  • BTW, can you please share the link to the resource that you've referred the math notation from? there must be some helpful explanation on the variables – samkart Aug 24 '22 at 13:48

3 Answers3

1

I think you can use the window function to get rolling rmse.

from pyspark.sql import functions as f
from pyspark.sql import Window

df = spark.createDataFrame([[1], [4], [3], [5]], ['value'])
df.show()
+-----+
|value|
+-----+
|    1|
|    4|
|    3|
|    5|
+-----+

# from this to the next row, you can adjust like 5000.
w = Window.rowsBetween(0, 1)
​
df.withColumn('count', f.count(f.lit(1)).over(w)) \
  .withColumn('square', f.col('value') * f.col('value')) \
  .withColumn('mse', f.sum(f.col('square')).over(w) / f.col('count')) \
  .withColumn('rmse', f.sqrt(f.col('mse'))) \
  .show(truncate=False)
+-----+-----+------+----+------------------+
|value|count|square|mse |rmse              |
+-----+-----+------+----+------------------+
|1    |2    |1     |8.5 |2.9154759474226504|
|4    |2    |16    |12.5|3.5355339059327378|
|3    |2    |9     |17.0|4.123105625617661 |
|5    |1    |25    |25.0|5.0               |
+-----+-----+------+----+------------------+

If you want to do it for the all rows,

df.withColumn('count', f.count(f.lit(1)).over(w)) \
  .withColumn('square', f.col('value') * f.col('value')) \
  .agg(f.sum(f.col('square')).alias('mse')) \
  .withColumn('rmse', f.sqrt(f.col('mse'))) \
  .show(truncate=False)
+---+----------------+
|mse|rmse            |
+---+----------------+
|51 |7.14142842854285|
+---+----------------+
Lamanus
  • 12,898
  • 4
  • 21
  • 47
  • But what will happen with first 5000 samples over which the calculation won't start? – Egorsky Aug 24 '22 at 13:44
  • In my example, I calculate rmse for the current + 1 row, so it will start anyway and you can see the last row is still calculated but with only with the last row. If you want to calculate with the preceeding 4999 row (and current), you should change the window such as `Window.rowsBetween(-4999, Window.currentRow)` and then the first row will be calculated only with own row. – Lamanus Aug 24 '22 at 13:48
  • but shouldn't it be devided not by 2 `.withColumn('mse', f.sum(f.col('square')).over(w) / 2)` but by the amount of samples? – Egorsky Aug 24 '22 at 14:14
  • in this example, the last row will have only 1 value in the window. however, it will be divided by the static `2`. – samkart Aug 24 '22 at 14:14
  • 1
    You could add the count for the window and use that column. Answer is updated for that. – Lamanus Aug 24 '22 at 14:15
  • @Lamanus thank you. But what if I want to go through whole dataframe from top to the bottom? – Egorsky Aug 24 '22 at 14:43
  • 1
    Then it is better to do the groupby. – Lamanus Aug 24 '22 at 15:29
  • @Lamanus I don't really get why I have only one value if I want to do windowing. Which should calculate rmse for me each `window_len`. For this particular case I just get a general. Is it possible to do this windowing kind of a "dynamic" let's say? If I was not clear with my question from the very beginning then sorry – Egorsky Aug 25 '22 at 07:33
  • You can simply vary the numbers in `w`. For example, 0 to n for n+1 window. – Lamanus Aug 25 '22 at 20:42
1

A variation to the given answers - we can create arrays of the required amount of data and use that for calculations.

Note that I've added an id column that has the dataframe's row order (spark does not retain row order) to keep the sequence intact. You can remove it if not required, or create it using monotonically_increasing_id function.

spark.createDataFrame([[1, 1], [2, 4], [3, 3], [4, 5], [5, 9]], ['id', 'data']). \
    withColumn('data_sq', func.pow('data', 2).cast('double')). \
    withColumn('list_of_points', 
               func.collect_list('data_sq').over(wd.orderBy('id').rowsBetween(0, 1))
               ). \
    withColumn('list_of_points_sum', 
               func.expr('aggregate(list_of_points, cast(0 as double), (x, y) -> x + y)')
               ). \
    withColumn('rmse', 
               func.sqrt(func.col('list_of_points_sum') / func.size('list_of_points'))
               ). \
    show(5, False)

# +---+----+-------+--------------+------------------+------------------+
# |id |data|data_sq|list_of_points|list_of_points_sum|rmse              |
# +---+----+-------+--------------+------------------+------------------+
# |1  |1   |1.0    |[1.0, 16.0]   |17.0              |2.9154759474226504|
# |2  |4   |16.0   |[16.0, 9.0]   |25.0              |3.5355339059327378|
# |3  |3   |9.0    |[9.0, 25.0]   |34.0              |4.123105625617661 |
# |4  |5   |25.0   |[25.0, 81.0]  |106.0             |7.280109889280518 |
# |5  |9   |81.0   |[81.0]        |81.0              |9.0               |
# +---+----+-------+--------------+------------------+------------------+

I've used .rowsBetween(0, 1) to consider current row (0) and next row (1). You can update the 1 to 4999 for 5k rows.

samkart
  • 6,007
  • 2
  • 14
  • 29
  • But how can I apply windowing function to that? Especially if I want to move with the window of 5000 entries through whole dataset? – Egorsky Aug 25 '22 at 12:28
  • the `list_of_points` creates the list of data points based on the window you provide. it's using a window in this example (`wd.orderBy()...`) – samkart Aug 25 '22 at 13:04
0

The best solution is still to use pandas API which provides the exact result that I want to:

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from math import *
from pyspark.pandas.config import set_option
import pyspark.pandas as ps
%matplotlib inline

ps.set_option('compute.ops_on_diff_frames', True)

win_len = 5000

dfp = dfp.pandas_api()

dfp['u_rmse1'] = dfp.Current1.pow(2).rolling(min_periods=1, window=win_len).mean().apply(np.sqrt)

dfp.to_spark()

This code is completely understandable for a person who's programming in python. With pandas API it's also pretty fast.

Egorsky
  • 179
  • 1
  • 11