1

Let's say I have following pandas dataframe contains value over time or date:

import pandas as pd

pdf = pd.DataFrame(data={'date':['2020-10-16','2020-10-17','2020-10-18','2020-10-19','2020-10-20','2020-10-21','2020-10-22','2020-10-23','2020-10-24','2020-10-25','2020-10-26','2020-10-27','2020-10-28','2020-10-29','2020-10-30','2020-10-31','2020-11-01','2020-11-02','2020-11-03','2020-11-04','2020-11-05','2020-11-06','2020-11-07','2020-11-08','2020-11-09','2020-11-10','2020-11-11','2020-11-12','2020-11-13','2020-11-14','2020-11-15'],
                        'value':[161967, 161270, 148508, 152442, 157504, 157118, 155674, 134522, 213384, 163242, 217415, 221502, 146267, 143621, 145875, 139488, 104466, 94825, 143686, 151952, 161074, 161417, 135042, 148768, 131428, 127816, 151905, 180498, 177899, 193950, 12]})
pdf

or I have following Spark dataframe with similar data:

import pyspark.sql.types
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, Row, SQLContext
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, TimestampType, DateType

dict  = [ ('2020-10-16', 161967),
          ('2020-10-17', 161270),
          ('2020-10-18', 148508),
          ('2020-10-19', 152442),
          ('2020-10-20', 157504),
          ('2020-10-21', 157118),
          ('2020-10-22', 155674),
          ('2020-10-23', 134522),
          ('2020-10-24', 213384),
          ('2020-10-25', 163242),
          ('2020-10-26', 217415),
          ('2020-10-27', 221502),
          ('2020-10-28', 146267),
          ('2020-10-29', 143621),
          ('2020-10-30', 145875),
          ('2020-10-31', 139488),
          ('2020-11-01', 104466),
          ('2020-11-02', 94825),
          ('2020-11-03', 143686),
          ('2020-11-04', 151952),
          ('2020-11-05', 161074),
          ('2020-11-06', 161417),
          ('2020-11-07', 135042),
          ('2020-11-08', 148768),
          ('2020-11-09', 131428),
          ('2020-11-10', 127816),
          ('2020-11-11', 151905),
          ('2020-11-12', 180498),
          ('2020-11-13', 177899),
          ('2020-11-14', 193950),
          ('2020-11-15', 12),

  ]

schema = StructType([ 
    StructField("date",        StringType(),    True), \
    StructField("value",       IntegerType(),   True), \
  ])
 
#create a Spark dataframe
sc= SparkContext()
sqlContext = SQLContext(sc)
sdf = sqlContext.createDataFrame(data=dict,schema=schema)
sdf.printSchema()
sdf.sort('date').show(truncate = False)

I inspired from this answer to detect peaks and valleys via below code:

from scipy.signal import find_peaks
import numpy as np
import matplotlib.pyplot as plt

# Input signal from Pandas dataframe
t = pdf.date
x = pdf.value

# Set thresholds
# std calculated on 10-90 percentile data, without outliers is used for threshold
thresh_top    = np.median(x) + 1 * np.std(x)
thresh_bottom = np.median(x) - 1 * np.std(x)


# Find indices of peaks & of valleys (from inverting the signal)
peak_idx, _   = find_peaks(x,  height =  thresh_top)
valley_idx, _ = find_peaks(-x, height = -thresh_bottom)

# Plot signal
plt.figure(figsize=(14,12))
plt.plot(t, x   , color='b', label='data')
plt.scatter(t, x, s=10,c='b',label='value')

# Plot threshold
plt.plot([min(t), max(t)], [thresh_top, thresh_top],       '--',  color='r', label='peaks-threshold')
plt.plot([min(t), max(t)], [thresh_bottom, thresh_bottom], '--',  color='g', label='valleys-threshold')

# Plot peaks (red) and valleys (blue)
plt.plot(t[peak_idx],   x[peak_idx],   "x", color='r', label='peaks')
plt.plot(t[valley_idx], x[valley_idx], "x", color='g', label='valleys')


plt.xticks(rotation=45)
plt.ylabel('value')
plt.xlabel('timestamp')
plt.title(f'data over time')
plt.legend( loc='lower left')
plt.gcf().autofmt_xdate()
plt.show()

and it works and plot it successfully when I read data from Pandas dataframe pdf before creating Spark dataframe. once I created Spark dataframe sdf even if you run the same cell on notebook within reading and input signal from pandas pdf doesn't work anymore and gives following error:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-12-1c79b34272c5> in <module>()
     23 
     24 # Plot threshold
---> 25 plt.plot([min(t), max(t)], [thresh_top, thresh_top],       '--',  color='r', label='peaks-threshold')
     26 plt.plot([min(t), max(t)], [thresh_bottom, thresh_bottom], '--',  color='g', label='valleys-threshold')
     27 

2 frames
/content/spark-3.1.2-bin-hadoop2.7/python/pyspark/sql/column.py in _to_java_column(col)
     47             "{0} of type {1}. "
     48             "For column literals, use 'lit', 'array', 'struct' or 'create_map' "
---> 49             "function.".format(col, type(col)))
     50     return jcol
     51 

TypeError: Invalid argument, not a string or column: 0     2020-10-16
1     2020-10-17
2     2020-10-18
3     2020-10-19
4     2020-10-20
5     2020-10-21
6     2020-10-22
7     2020-10-23
8     2020-10-24
9     2020-10-25
10    2020-10-26
11    2020-10-27
12    2020-10-28
13    2020-10-29
14    2020-10-30
15    2020-10-31
16    2020-11-01
17    2020-11-02
18    2020-11-03
19    2020-11-04
20    2020-11-05
21    2020-11-06
22    2020-11-07
23    2020-11-08
24    2020-11-09
25    2020-11-10
26    2020-11-11
27    2020-11-12
28    2020-11-13
29    2020-11-14
30    2020-11-15
Name: date, dtype: object of type <class 'pandas.core.series.Series'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.

when I try it directly to read data from spark dataframe using:

# Input signal from Spark dataframe
t = [val.date  for val in sdf.select('date').collect()]
x = [val.value for val in sdf.select('value').collect()]

sadly the plotting code doesn't work and throw out following error:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-14-9cfede77d0fb> in <module>()
     31 # Find indices of peaks & of valleys (from inverting the signal)
     32 peak_idx, _   = find_peaks(x,  height =  thresh_top)
---> 33 valley_idx, _ = find_peaks(-x, height = -thresh_bottom)
     34 
     35 

TypeError: bad operand type for unary -: 'list'

I spent lots of time but I couldn't fixed this bug. I also open to other non-find_peaks() solutions to plot spike detection like this numpythonic answer if I can adapt to the code and apply on Spark dataframe. I have tried many things you could check in this google Colab Notebook and feel free to run/test/edit it for quick debugging.

Mario
  • 1,631
  • 2
  • 21
  • 51
  • I realized that also there are some issues to use SciPy libraries in Scala based on this [post](https://stackoverflow.com/questions/40370759/using-python-scipy-from-spark-scala) but I'm not sure if it is the case in PySpark. Hopefully there is a non-SciPy solution for this problem. – Mario Nov 17 '21 at 19:47

1 Answers1

2

The problem is that you are not working with the same objects.

When you work with pandas and you get x = pdf.value you actually get Series object. This object can take - in front and it knows that it has to convert the values in it to negative.

But when you work with PySpark and you collect values, you get list object and if you put - in front you get error:

TypeError: bad operand type for unary -: 'list'

Which tells you that it doesn't know how to deal with it.

  • So the first thing to do, instead of:
valley_idx, _ = find_peaks(-x, height=-thresh_bottom)

You have to convert values to negative, for example:

valley_idx, _ = find_peaks([-i for i in x], height=-thresh_bottom)
  • Next, find_peaks will return ndarray which again cannot be used with list in:
plt.plot(t[peak_idx], x[peak_idx], "x", color="r", label="peaks")

So you'll have to do it manually, for example:

plt.plot(
        [t[i] for i in peak_idx],
        [x[i] for i in peak_idx],
        "x",
        color="r",
        label="peaks",
    )

I've reproduced your plot with the following code (+ calculating median and std_dev in PySpark as an example):

# data is the same
# ...

# create a Spark dataframe
spark = SparkSession.builder.getOrCreate()
sdf = spark.createDataFrame(data=data, schema=schema)
std_dev = sdf.select(F.stddev(F.col("value")).alias("std")).collect()[0]["std"]
median = (
    sdf.groupBy("value")
    .agg(F.expr("percentile_approx(value, 0.5)").alias("med"))
    .collect()[0]["med"]
)
thresh_top = median + 1 * std_dev
thresh_bottom = median - 1 * std_dev
t = sdf.select("date").rdd.flatMap(lambda x: x).collect()
x = sdf.select("value").rdd.flatMap(lambda x: x).collect()
peak_idx, _ = find_peaks(x, height=thresh_top)
valley_idx, _ = find_peaks([-i for i in x], height=-thresh_bottom)

plt.figure(figsize=(14, 12))
plt.plot(t, x, color="b", label="data")
plt.scatter(t, x, s=10, c="b", label="value")

# Plot threshold
plt.plot(
    [min(t), max(t)],
    [thresh_top, thresh_top],
    "--",
    color="r",
    label="peaks-threshold",
)
plt.plot(
    [min(t), max(t)],
    [thresh_bottom, thresh_bottom],
    "--",
    color="g",
    label="valleys-threshold",
)

# Plot peaks (red) and valleys (blue)
plt.plot(
    [t[i] for i in peak_idx],
    [x[i] for i in peak_idx],
    "x",
    color="r",
    label="peaks",
)
plt.plot(
    [t[i] for i in valley_idx],
    [x[i] for i in valley_idx],
    "x",
    color="g",
    label="valleys",
)

# ...
vladsiv
  • 2,718
  • 1
  • 11
  • 21
  • numpy array to list conversion is better using `tolist()` method: https://numpy.org/doc/stable/reference/generated/numpy.ndarray.tolist.html – dankal444 Nov 17 '21 at 21:51
  • Thanks for your input. I tried to reproduce plot using your solution but on [Colab Notebook](https://colab.research.google.com/drive/13Fz__TUJSWpwVVTDVPRJeyvPsremKnfM?usp=sharing) if you check it out, but I faced `TypeError: Invalid argument, not a string or column: ['2020-10-16', ..., '2020-11-15'] of type . For column literals, use 'lit', 'array', 'struct' or 'create_map' function.` Would you run your solution on the Notebook to debug the issue completely? – Mario Nov 18 '21 at 00:39
  • I couldn't reach the plot and I already faced the current error when I tried to cast the `date` column before `collect()` using `.cast(DateType())` unsuccessfully and comment it in the last cell. The cell before the last cell in Colab Notebook is your Solution. – Mario Nov 18 '21 at 00:45
  • @Mario Please remove `from pyspark.sql.functions import *` from the top and try again, it clashes the namespace for functions `max`, `min`. Always use `from pyspark.sql import functions as F` and use `F.` so you don't have similar problems. – vladsiv Nov 18 '21 at 08:06
  • Thanks to your tips, I just could fix and reproduce the plot but surprisingly the thresholds are not the same when I compared the plots out of pandas dataframe & plot out of Spark dataframe. You can see them on Notebook or this [screenshot](https://i.imgur.com/fNwlGGF.png). I printed them on the legend at the left bottom of the plots nevertheless you can distinguish it on plots as well. I noticed that you didn't use NumPy in threshold calculations `np.median(x) + 1 * np.std(x)`. Is there any special reason? – Mario Nov 18 '21 at 10:20
  • Oh, interesting. Could you try again with `np.median(x) + 1 * np.std(x)` just use `x` from pyspark `collect`? Nope, no special reason, you can use `numpy` if you want. Just wanted to show you different options. – vladsiv Nov 18 '21 at 10:20
  • Great, and many thanks for your contribution to fixing this issue; you made my day. – Mario Nov 18 '21 at 10:21