I am trying to implement forecast model in my pyspark analytics and we are getting as below error. kindly help me where exactly, we may have to apply changes or fix the issue.
Error:
PythonException: 'ValueError: Dataframe has less than 2 non-NaN rows.', from , line 17. Full traceback below: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 367.0 failed 4 times, most recent failure: Lost task 0.3 in stage 367.0 (TID 541) (172.26.145.6 executor 0): org.apache.spark.api.python.PythonException: 'ValueError: Dataframe has less than 2 non-NaN rows.', from , line 17. Full traceback below: Traceback (most recent call last): File "", line 17, in pd_apply_forecast File "/databricks/python/lib/python3.8/site-packages/prophet/forecaster.py", line 1113, in fit raise ValueError('Dataframe has less than 2 non-NaN rows.') ValueError: Dataframe has less than 2 non-NaN rows.
My code :
import re
import pandas as pd
import pyspark.pandas as ps
from prophet import Prophet
def run_row_outlier_check(df: DataFrame, min_date, start_date, groupby_cols, job_id) -> DataFrame:
"""
| Generate dataframe containing prophet model forecasting of row counts
"""
pd_schema = StructType([
StructField(groupby_col, StringType(), True),
StructField("ds", DateType(), True),
StructField("y", IntegerType(), True),
StructField("yhat", FloatType(), True),
StructField("yhat_lower", FloatType(), True),
StructField("yhat_upper", FloatType(), True),
StructField("trend", FloatType(), True),
StructField("trend_lower", FloatType(), True),
StructField("trend_upper", FloatType(), True),
StructField("additive_terms", FloatType(), True),
StructField("additive_terms_lower", FloatType(), True),
StructField("additive_terms_upper", FloatType(), True),
StructField("weekly", FloatType(), True),
StructField("weekly_lower", FloatType(), True),
StructField("weekly_upper", FloatType(), True),
StructField("yearly", FloatType(), True),
StructField("yearly_lower", FloatType(), True),
StructField("yearly_upper", FloatType(), True),
StructField("multiplicative_terms", FloatType(), True),
StructField("multiplicative_terms_lower", FloatType(), True),
StructField("multiplicative_terms_upper", FloatType(), True)
])
# dataframe of consecutive dates
df_rundates = (ps.DataFrame({'date':pd.date_range(start=min_date, end=(date.today() - timedelta(days=1)))})).to_spark()
# combine + explode to create row for each date and grouped col (e.g. business segment)
df_bizlist = (
df.filter(f"{date_col} >= coalesce(date_sub(date 'today', {num_days_check}), '{start_date}')")
.groupBy(groupby_col)
.count()
.orderBy(col("count").desc())
)
df_rundates_bus = (
df_rundates
.join(df_bizlist, how='full')
.select(df_bizlist[groupby_col], df_rundates["date"].alias("ds"))
)
# create input dataframe for prophet forecast
df_grouped_cnt = df.groupBy(groupby_cols).count()
df_input = (
df_rundates_bus.selectExpr(f"{groupby_col}", "to_date(ds) as ds")
.join(df_grouped_cnt.selectExpr(f"{groupby_col}", f"{date_col} as ds", "count as y"), on=['ds',f'{groupby_col}'], how='left')
.withColumn("y", coalesce("y", lit(0)))
.repartition(sc.defaultParallelism, "ds")
)
# cache dataframe to improve performance
# df_input.cache()
# .repartition(sc.defaultParallelism, "ds")
# forecast
df_forecast = (
df_input
.groupBy(groupby_col)
.applyInPandas(pd_apply_forecast, schema=pd_schema)
)
# filter forecast with outlier scores
df_rowoutliers = (
df_forecast
.filter("y > 0 AND (y > yhat_upper OR y < array_max(array(yhat_lower,0)))")
.withColumn("check_type", lit("row_count"))
.withColumn("deduct_score", expr("round(sqrt(pow(y-yhat, 2) / pow(yhat_lower - yhat_upper,2)))").cast('int'))
.select(
col("check_type"),
col("ds").alias("ref_date"),
col(groupby_col).alias("ref_dimension"),
col("y").cast('int').alias("actual"),
col("deduct_score"),
col("yhat").alias("forecast"),
col("yhat_lower").alias("forecast_lower"),
col("yhat_upper").alias("forecast_upper")
)
)
return add_metadata_columns(df_forecast, job_id), add_metadata_columns(df_rowoutliers, job_id)
def pd_apply_forecast(pd_history: pd.DataFrame) -> pd.DataFrame:
# remove missing values and filter out null business segments
pd_history = (pd_history[pd_history[groupby_col].notnull()]
.dropna())
# instantiate the model, configure the parameters
model = Prophet(
growth='linear',
yearly_seasonality='auto', # default: auto
weekly_seasonality='auto', # default: auto
daily_seasonality=False, # default: auto
seasonality_mode='additive'
)
# fit the model
model.fit(pd_history)
# configure predictions
pd_future = model.make_future_dataframe(
periods=365,
freq='d',
include_history=True
)
# make predictions
pd_forecast = model.predict(pd_future)
# ASSEMBLE EXPECTED RESULT SET
# --------------------------------------
# get relevant fields from forecast
pd_f = pd_forecast[ ['ds', 'yhat', 'yhat_lower', 'yhat_upper', 'trend', 'trend_lower', 'trend_upper', 'additive_terms', 'additive_terms_lower', 'additive_terms_upper', 'weekly', 'weekly_lower', 'weekly_upper', 'yearly', 'yearly_lower', 'yearly_upper', 'multiplicative_terms', 'multiplicative_terms_lower', 'multiplicative_terms_upper'] ].set_index('ds')
# get relevant fields from history
pd_h = pd_history[ ['ds', groupby_col, 'y'] ].set_index('ds')
# join history and forecast
pd_results = pd_f.join(pd_h, how='left')
pd_results.reset_index(level=0, inplace=True)
# filter out null dimensions
pd_results = pd_results[pd_results[groupby_col].notnull()]
# return predictions
return pd_results[ [groupby_col, 'ds', 'y', 'yhat', 'yhat_lower', 'yhat_upper', 'trend', 'trend_lower', 'trend_upper', 'additive_terms', 'additive_terms_lower', 'additive_terms_upper', 'weekly', 'weekly_lower', 'weekly_upper', 'yearly', 'yearly_lower', 'yearly_upper', 'multiplicative_terms', 'multiplicative_terms_lower', 'multiplicative_terms_upper'] ]