If your wrangling is simple enough then you can do it with SQL in the pipeline via ft_sql_transformer
. For example, if you want to add a modify a column in the pipeline you can do:
flights_pipeline <- ml_pipeline(sc) %>%
ft_dplyr_transformer(
tbl = df
) %>%
ft_sql_transformer(
"select *, distance + 47 as example from __THIS__") %>%
ft_binarizer(
input_col = "dep_delay",
output_col = "delayed",
threshold = 15
) %>%
ft_bucketizer(
input_col = "sched_dep_time",
output_col = "hours",
splits = c(400, 800, 1200, 1600, 2000, 2400)
) %>%
ft_r_formula(delayed ~ month + day + hours + distance) %>%
ml_logistic_regression()
There are some limitations of the kind of SQL code you can run, but I hope this works for you. Here is the full example I tested. Note the modded column in the final table.
library(nycflights13)
library(sparklyr)
library(dplyr)
sc <- spark_connect(master = "local", spark_version = "2.2.0")
## * Using Spark: 2.2.0
spark_flights <- sdf_copy_to(sc, flights)
df <- spark_flights %>%
filter(!is.na(dep_delay)) %>%
mutate(
month = paste0("m", month),
day = paste0("d", day)
) %>%
select(dep_delay, sched_dep_time, month, day, distance)
ft_dplyr_transformer(sc, df)
ft_dplyr_transformer(sc, df) %>%
ml_param("statement")
flights_pipeline <- ml_pipeline(sc) %>%
ft_dplyr_transformer(
tbl = df
) %>%
ft_sql_transformer(
"select *, distance + 47 as example from __THIS__") %>%
ft_binarizer(
input_col = "dep_delay",
output_col = "delayed",
threshold = 15
) %>%
ft_bucketizer(
input_col = "sched_dep_time",
output_col = "hours",
splits = c(400, 800, 1200, 1600, 2000, 2400)
) %>%
ft_r_formula(delayed ~ month + day + hours + distance) %>%
ml_logistic_regression()
flights_pipeline
partitioned_flights <- sdf_partition(
spark_flights,
training = 0.01,
testing = 0.01,
rest = 0.98
)
fitted_pipeline <- ml_fit(
flights_pipeline,
partitioned_flights$training
)
fitted_pipeline
predictions <- ml_transform(
fitted_pipeline,
partitioned_flights$testing
)