12

I am trying to parallelise a pipe. In the pipe there is a tidyr command ("tidyr::complete"). This breaks down the code once run in parallel, as the object class is not recognised.

Is there an alternative in dplyr to complete?

library(dplyr)
library(tidyr)
library(zoo)


test <- tibble(year=c(1,2,3,4,5,5,1,4,5),
               var_1=c(1,1,1,1,1,1,2,2,2), 
               var_2=c(1,1,1,1,1,2,3,3,3), 
               var_3=c(0,5,NA,15,20,NA,1,NA,NA))

max_year <- max(test$year,na.rm = T)
min_year <- min(test$year,na.rm = T)

SERIAL


test_serial <- test %>% 
  group_by(var_1,var_2) %>% 
  complete(var_1, year = seq(min_year,max_year)) %>%
  mutate(
    var_3 = na.approx(var_3,na.rm = FALSE),
    var_3 = if(all(is.na(var_3))) NA else na.spline(var_3,na.rm = FALSE))


PARALLEL (THIS FAILS)

devtools::install_github("hadley/multidplyr")
library(multidplyr)

cl <- new_cluster(2)
cluster_copy(cl, c("test","max_year","min_year"))
cluster_library(cl, c("dplyr","tidyr","zoo"))

test_parallel <- test %>% group_by(var_1,var_2) %>% partition(cl)
test_parallel <- test_parallel %>% 
  dplyr::group_by(var_1,var_2) %>% 
  tidyr::complete(var_1, year = seq(min_year,max_year)) %>%
  dplyr::mutate(
    var_3 = na.approx(var_3,na.rm = FALSE),
    var_3 = if(all(is.na(var_3))) NA else na.spline(var_3,na.rm = FALSE)) %>% 
  collect()

This is the error message

Error in UseMethod("complete_") : 
  no applicable method for 'complete_' applied to an object of class "multidplyr_party_df"
MCS
  • 1,071
  • 9
  • 23

1 Answers1

11

Multidplyr allows you to :

  1. split the data up using partition()
  2. process each partition on a dedicated node
  3. collect() the results

All data processing tasks are not suitable for the previous workflow.

In particular, complete needs to know all the possible values in the input data in order to create the missing rows, which means that this operation as a whole can't be split, that's why no applicable method is available.

In the example you provide, each node would receive a single var_1, var_2 pair without knowing what the other nodes got, which doesn't allow to achieve the expected result in parallel.

However, as you already know that year = seq(min_year,max_year), you could parallelize the complete task for this variable only, splitting tasks by var_1, for example with the furrr package:

library(furrr)
plan(multiprocess)
test_parallel <- test %>% 
  group_by(var_1,var_2) %>% 
  complete(var_1) %>% split(.$var_1) %>% 
  furrr::future_map(~{
    complete(.x, year = seq(min_year,max_year)) %>%
    dplyr::mutate(
        var_3 = na.approx(var_3,na.rm = FALSE),
        var_3 = if(all(is.na(var_3))) NA else na.spline(var_3,na.rm = FALSE)) 
    }) %>% bind_rows()

> identical(c(test_serial$var_1,test_serial$var_2,test_serial$var_3,test_serial$year),
+           c(test_parallel$var_1,test_parallel$var_2,test_parallel$var_3,test_parallel$year))
[1] TRUE

To be tested on a larger dataset to measure potential performance improvements.

Waldi
  • 39,242
  • 6
  • 30
  • 78
  • But, were I able to assign to each node a whole group to complete (say var_1), then will I be able to use complete? – MCS Jun 29 '20 at 15:50
  • I got the following warning: #Warning message: [ONE-TIME WARNING] Forked processing ('multicore') is disabled in future (>= 1.13.0) when running R from RStudio, because it is considered unstable. Because of this, plan("multicore") will fall back to plan("sequential"), and plan("multiprocess") will fall back to plan("multisession") - not plan("multicore") as in the past. For more details, how to control forked processing or not, and how to silence this warning in future R sessions, see ?future::supportsMulticore – MCS Jun 29 '20 at 16:46
  • As proof, system.time() revealed that the furrr solution was 4X slower than the standard one – MCS Jun 29 '20 at 16:46
  • 4
    Multitasking is most of the time slower on an small example dataset because you need to open tasks, transmit the data to them and collect the result back, which for 15 rows takes much more time than processing the data directly, see my last sentence ;-) – Waldi Jun 29 '20 at 16:50