If you're using spark 3.1+, you can use percentile_approx
to calculate the quantiles, and do rest of the calculations in pyspark. In case your spark version does not have that function, we can use an UDF that uses numpy.quantile
for the quantile calculation. I've shown both in the code.
data_sdf = spark.sparkContext.parallelize(data_ls).toDF(['pressure', 'ts']). \
withColumn('ts', func.col('ts').cast('timestamp')). \
withColumn('dt_hr', func.date_format('ts', 'yyyyMMddHH'))
# +--------+-------------------+----------+
# |pressure| ts| dt_hr|
# +--------+-------------------+----------+
# | 358.64|2022-01-01 00:00:00|2022010100|
# | 354.98|2022-01-01 00:10:00|2022010100|
# | 350.34|2022-01-01 00:20:00|2022010100|
# | 429.69|2022-01-01 00:30:00|2022010100|
# | 420.41|2022-01-01 00:40:00|2022010100|
# | 413.82|2022-01-01 00:50:00|2022010100|
# | 409.42|2022-01-01 01:00:00|2022010101|
# | 409.67|2022-01-01 01:10:00|2022010101|
# | 413.33|2022-01-01 01:20:00|2022010101|
# | 405.03|2022-01-01 01:30:00|2022010101|
# | 1209.42|2022-01-01 01:40:00|2022010101|
# | 405.03|2022-01-01 01:50:00|2022010101|
# +--------+-------------------+----------+
getting the quantiles (showing both methods; use whichever is available)
# spark 3.1+ has percentile_approx
pressure_quantile_sdf = data_sdf. \
groupBy('dt_hr'). \
agg(func.percentile_approx('pressure', [0.2, 0.8]).alias('quantile_20_80'))
# +----------+----------------+
# | dt_hr| quantile_20_80|
# +----------+----------------+
# |2022010100|[354.98, 420.41]|
# |2022010101|[405.03, 413.33]|
# +----------+----------------+
# lower versions use UDF
def numpy_quantile_20_80(list_col):
import numpy as np
q_20 = np.quantile(list_col, 0.2)
q_80 = np.quantile(list_col, 0.8)
return [float(q_20), float(q_80)]
numpy_quantile_20_80_udf = func.udf(numpy_quantile_20_80, ArrayType(FloatType()))
pressure_quantile_sdf = data_sdf. \
groupBy('dt_hr'). \
agg(func.collect_list('pressure').alias('pressure_list')). \
withColumn('quantile_20_80', numpy_quantile_20_80_udf(func.col('pressure_list')))
# +----------+--------------------+----------------+
# | dt_hr| pressure_list| quantile_20_80|
# +----------+--------------------+----------------+
# |2022010100|[358.64, 354.98, ...|[354.98, 420.41]|
# |2022010101|[409.42, 409.67, ...|[405.03, 413.33]|
# +----------+--------------------+----------------+
outlier calculation would be easy with the quantile info
pressure_quantile_sdf = pressure_quantile_sdf. \
withColumn('quantile_20', func.col('quantile_20_80')[0]). \
withColumn('quantile_80', func.col('quantile_20_80')[1]). \
withColumn('min_q_20', func.col('quantile_20') - 1.5 * (func.col('quantile_80') - func.col('quantile_20'))). \
withColumn('max_q_80', func.col('quantile_80') + 1.5 * (func.col('quantile_80') - func.col('quantile_20'))). \
select('dt_hr', 'min_q_20', 'max_q_80')
# +----------+------------------+------------------+
# | dt_hr| min_q_20| max_q_80|
# +----------+------------------+------------------+
# |2022010100|256.83502197265625| 518.5549926757812|
# |2022010101|392.58001708984375|425.77996826171875|
# +----------+------------------+------------------+
# outlier calc -- select columns that are required
data_sdf. \
join(pressure_quantile_sdf, 'dt_hr', 'left'). \
withColumn('is_outlier', ((func.col('pressure') > func.col('max_q_80')) | (func.col('pressure') < func.col('min_q_20'))).cast('int')). \
show()
# +----------+--------+-------------------+------------------+------------------+----------+
# | dt_hr|pressure| ts| min_q_20| max_q_80|is_outlier|
# +----------+--------+-------------------+------------------+------------------+----------+
# |2022010100| 358.64|2022-01-01 00:00:00|256.83502197265625| 518.5549926757812| 0|
# |2022010100| 354.98|2022-01-01 00:10:00|256.83502197265625| 518.5549926757812| 0|
# |2022010100| 350.34|2022-01-01 00:20:00|256.83502197265625| 518.5549926757812| 0|
# |2022010100| 429.69|2022-01-01 00:30:00|256.83502197265625| 518.5549926757812| 0|
# |2022010100| 420.41|2022-01-01 00:40:00|256.83502197265625| 518.5549926757812| 0|
# |2022010100| 413.82|2022-01-01 00:50:00|256.83502197265625| 518.5549926757812| 0|
# |2022010101| 409.42|2022-01-01 01:00:00|392.58001708984375|425.77996826171875| 0|
# |2022010101| 409.67|2022-01-01 01:10:00|392.58001708984375|425.77996826171875| 0|
# |2022010101| 413.33|2022-01-01 01:20:00|392.58001708984375|425.77996826171875| 0|
# |2022010101| 405.03|2022-01-01 01:30:00|392.58001708984375|425.77996826171875| 0|
# |2022010101| 1209.42|2022-01-01 01:40:00|392.58001708984375|425.77996826171875| 1|
# |2022010101| 405.03|2022-01-01 01:50:00|392.58001708984375|425.77996826171875| 0|
# +----------+--------+-------------------+------------------+------------------+----------+