I am looking to run dplyr functions on a Spark dataframe.
How do I run dplyr functions on a Spark dataframe through Databricks? No matter how I modify my code it always has the same error with a different dplyr
function.
HDEF_df_test
is a Spark dataframe with the following structure:
ds TICKER y
1/5/22 HDEF 23.87
1/6/22 HDEF 23.90
1/7/22 HDEF 24.20
1/10/22 HDEF 24.20
1/11/22 HDEF 24.45
1/12/22 HDEF 24.60
Code below
Sys.setenv(DOWNLOAD_STATIC_LIBV8 = 1)
remotes::install_github("jeroen/V8")
devtools::install_version("rstantools", version = "2.0.0")
install.packages('prophet')
forecast <- function(x){
library(prophet)
library(dplyr)
preds <- x %>%
dplyr::group_by(TICKER) %>%
dplyr::mutate(weekdays = weekdays(ds)) %>%
dplyr::filter(weekdays != "Saturday" & weekdays != "Sunday") %>%
dplyr::do(predict(prophet(., daily.seasonality = TRUE, yearly.seasonality = TRUE),
filter(make_future_dataframe(prophet(., daily.seasonality = TRUE, yearly.seasonality = TRUE), periods = 14), weekdays(ds) != "Saturday" & weekdays(ds) != "Sunday"))) %>%
dplyr::select(ds,
TICKER,
yhat)
preds
}
forecast(HDEF_df_test)
Error below:
Error in UseMethod("group_by") :
no applicable method for 'group_by' applied to an object of class "SparkDataFrame"
Some(<code style = 'font-size:10pt'> Error in UseMethod("do"): no applicable method for 'group_by' applied to an object of class "SparkDataFrame" </code
The error goes away when I convert the SparkDataFrame to a data.frame like so:
test <- SparkR::collect(HDEF_df_test)
forecast(HDEF_df_test)
However, when I run the next gapply function using the data.frame, it doesn't work hence why I need to keep it as a Spark.Dataframe.
output_schema <- SparkR::structType(
structField("ds", "date"),
structField("TICKER", "string"),
structField("yhat", "double")
)
results <- SparkR::gapply(x = test,
cols = "TICKER",
func = forecast,
schema = output_schema)
Error in (function (classes, fdef, mtable) :
unable to find an inherited method for function ‘gapply’ for signature ‘"data.frame"’