2

This example is taken from the sparklyr documentation

https://spark.rstudio.com/guides/pipelines/

flights_pipeline <- ml_pipeline(sc) %>%
  ft_dplyr_transformer(
    tbl = df
   ) %>%
  ft_binarizer(
    input.col = "dep_delay",
    output.col = "delayed",
    threshold = 15
  ) %>%
  ft_bucketizer(
    input.col = "sched_dep_time",
    output.col = "hours",
    splits = c(400, 800, 1200, 1600, 2000, 2400)
  )  %>%
  ft_r_formula(delayed ~ month + day + hours + distance) %>% 
  ml_logistic_regression()

From the above example, it is clear that the pipeline is linear and it uses the inbuilt transformations of sparklyr and only dplyr functions to manipulate the data.

Is there a way that I can have custom transformer (ex: having a for loop in a custom defined function) in the sparklyr pipeline?

zero323
  • 322,348
  • 103
  • 959
  • 935
  • Yes, you can try `mutate` or `spark_apply` from Sparklyr. You can also use `invoke` for native Spark functions or UDFs – strawberryBeef Oct 19 '18 at 15:55
  • I used spark_apply to do some transformation. I get the desired result. myfunction<- function(){ **** } and df <- spark_apply(df, myfunction) but can we add spark_apply() in the pipeline like this ml_pipeline %>% myfunction() instead of the standard transformers like ft_dplyr_transformer that sparklyr has? – Ajay kumar reddy gopireddy Oct 19 '18 at 16:28
  • 1
    It is possible, but in general you'll have to implement it in Scala (you can check for example [How to create a custom Transformer from a UDF?](https://stackoverflow.com/q/35180527) for the most basic example) and then add R bindings. A bit too much for SO answer if you're looking for a full solution. – zero323 Oct 19 '18 at 20:26
  • https://github.com/rstudio/sparklyr/issues/479 looks like this is the one for R @user6910411 . Please correct me If I am wrong. – Ajay kumar reddy gopireddy Oct 19 '18 at 21:19
  • https://github.com/javierluraschi/sparkhello – Ajay kumar reddy gopireddy Oct 19 '18 at 21:52
  • Yeah, that's a good start. Also, if you're not familiar with the internals, you might [my answer to this question](https://stackoverflow.com/q/50534224/6910411) useful. – zero323 Oct 19 '18 at 22:14

1 Answers1

2

If your wrangling is simple enough then you can do it with SQL in the pipeline via ft_sql_transformer. For example, if you want to add a modify a column in the pipeline you can do:

flights_pipeline <- ml_pipeline(sc) %>%
  ft_dplyr_transformer(
    tbl = df
  ) %>%
  ft_sql_transformer(
    "select *,  distance + 47 as example from __THIS__") %>%
  ft_binarizer(
    input_col = "dep_delay",
    output_col = "delayed",
    threshold = 15
  ) %>%
  ft_bucketizer(
    input_col = "sched_dep_time",
    output_col = "hours",
    splits = c(400, 800, 1200, 1600, 2000, 2400)
  )  %>%
  ft_r_formula(delayed ~ month + day + hours + distance) %>% 
  ml_logistic_regression()

There are some limitations of the kind of SQL code you can run, but I hope this works for you. Here is the full example I tested. Note the modded column in the final table.

library(nycflights13)
library(sparklyr)
library(dplyr)
sc <- spark_connect(master = "local", spark_version = "2.2.0")

## * Using Spark: 2.2.0

spark_flights <- sdf_copy_to(sc, flights)


df <- spark_flights %>%
  filter(!is.na(dep_delay)) %>%
  mutate(
    month = paste0("m", month),
    day = paste0("d", day)
  ) %>%
  select(dep_delay, sched_dep_time, month, day, distance)


ft_dplyr_transformer(sc, df)


ft_dplyr_transformer(sc, df) %>%
  ml_param("statement")


flights_pipeline <- ml_pipeline(sc) %>%
  ft_dplyr_transformer(
    tbl = df
  ) %>%
  ft_sql_transformer(
    "select *,  distance + 47 as example from __THIS__") %>%
  ft_binarizer(
    input_col = "dep_delay",
    output_col = "delayed",
    threshold = 15
  ) %>%
  ft_bucketizer(
    input_col = "sched_dep_time",
    output_col = "hours",
    splits = c(400, 800, 1200, 1600, 2000, 2400)
  )  %>%
  ft_r_formula(delayed ~ month + day + hours + distance) %>% 
  ml_logistic_regression()


flights_pipeline


partitioned_flights <- sdf_partition(
  spark_flights,
  training = 0.01,
  testing = 0.01,
  rest = 0.98
)

fitted_pipeline <- ml_fit(
  flights_pipeline,
  partitioned_flights$training
)

fitted_pipeline

predictions <- ml_transform(
  fitted_pipeline,
  partitioned_flights$testing
)
strawberryBeef
  • 242
  • 1
  • 6