2

I have this dataset over here (e.g. students wrote an exam many times over a period of years and either pass or failed - I am interested in studying the effect of the previous test on the next test):

   id = sample.int(10000, 100000, replace = TRUE)
res = c(1,0)
results = sample(res, 100000, replace = TRUE)
date_exam_taken = sample(seq(as.Date('1999/01/01'), as.Date('2020/01/01'), by="day"), 100000, replace = TRUE)


my_data = data.frame(id, results, date_exam_taken)
my_data <- my_data[order(my_data$id, my_data$date_exam_taken),]

my_data$general_id = 1:nrow(my_data)
my_data$exam_number = ave(my_data$general_id, my_data$id, FUN = seq_along)
my_data$general_id = NULL

     id results date_exam_taken exam_number
7992   1       1      2004-04-23           1
24837  1       0      2004-12-10           2
12331  1       1      2007-01-19           3
34396  1       0      2007-02-21           4
85250  1       0      2007-09-26           5
11254  1       1      2009-12-20           6

I wrote this standard FOR LOOP and everything seems to work fine:

my_list = list()

for (i in 1:length(unique(my_data$id)))
    
{ 
    {tryCatch({
        
        start_i = my_data[my_data$id == i,]
        
        pairs_i =  data.frame(first = head(start_i$results, -1), second = tail(start_i$results, -1))
        frame_i =  as.data.frame(table(pairs_i))
        frame_i$id = i
        print(frame_i)
        my_list[[i]] = frame_i
    }, error = function(e){})
    }}


 final_a = do.call(rbind.data.frame, my_list)

Now, I am trying to "optimize" this loop by using "doParallel" libraries in R.

Using this post (How to transform a "for loop" in a "foreach" loop in R?) as a tutorial, I tried to convert my loop as follows:

# does this mean I should set makeCluster() to makeCluster(8)???
 > detectCores()
[1] 8

my_list = list()
max = length(unique(my_data$id))

library(doParallel)
registerDoParallel(cl <- makeCluster(3))

# note: for some reason, this loop isn't printing?

test = foreach(i = 1:max, .combine = "rbind") %dopar% {

    {tryCatch({
        
        start_i = my_data[my_data$id == i,]
        
        pairs_i =  data.frame(first = head(start_i$results, -1), second = tail(start_i$results, -1))
        frame_i =  as.data.frame(table(pairs_i))
        frame_i$id = i
        print(frame_i)
        my_list[[i]] = frame_i
    }, error = function(e){})
    }}


 final_b = do.call(rbind.data.frame, test)

Based on this - I have the following questions:

  • Have I correctly used the "doParallel" functionalities as they are intended to be used?

  • Is there yet a better way to do this?

  • Note: I am looking to run this code on a dataset with around 10 million unique ID's

user438383
  • 5,716
  • 8
  • 28
  • 43
stats_noob
  • 5,401
  • 4
  • 27
  • 83
  • (Disclaimer: I'm the author) You might find the JottR blog post 'Parallelize a For-Loop by Rewriting it as an Lapply Call ' (2019-01-11) useful – HenrikB Dec 11 '22 at 19:16

1 Answers1

2

Here is a way with the parallel code written as a function.
I split the data by id beforehand, instead of comparing each id with the current index i. This saves some time. It also saves time to extract the results vector only once.

I don't know why, I haven't found any errors in my parallel code but the final data.frame is not equal to the sequential output final_a, it has more rows.

This is system dependent but as you can see in the timings, the 6 cores run is the fastest.

library(parallel)
library(doParallel)
#> Loading required package: foreach
#> Loading required package: iterators

parFun <- function(my_data, ncores) {
  split_data <- split(my_data, my_data$id)
  
  registerDoParallel(cl <- makeCluster(ncores))
  on.exit(stopCluster(cl))
  
  test <- foreach(i = seq_along(split_data)) %dopar% {
    start_i_results <- split_data[[i]]$results
    n <- length(start_i_results)
    if(n > 1L) {
      tryCatch({
        pairs_i <- data.frame(first = start_i_results[-n], 
                              second = start_i_results[-1L])
        frame_i <- as.data.frame(table(pairs_i))
        frame_i$id <- i
        frame_i
      }, error = function(e) {e})
    } else NULL
  }
  final_b <- do.call(rbind.data.frame, test)
  final_b
}

set.seed(2022)
id <- sample.int(10000, 100000, replace = TRUE)
res <- c(1,0)
results <- sample(res, 100000, replace = TRUE)
date_exam_taken <- sample(seq(as.Date('1999/01/01'), as.Date('2020/01/01'), by="day"), 100000, replace = TRUE)
my_data <- data.frame(id, results, date_exam_taken)

my_data <- my_data[order(my_data$id, my_data$date_exam_taken),]

my_data$general_id = 1:nrow(my_data)
my_data$exam_number = ave(my_data$general_id, my_data$id, FUN = seq_along)
my_data$general_id = NULL

t0 <- system.time({
  my_list = list()
  
  for (i in 1:length(unique(my_data$id)))
    
  { 
    {tryCatch({
      
      start_i = my_data[my_data$id == i,]
      
      pairs_i =  data.frame(first = head(start_i$results, -1), second = tail(start_i$results, -1))
      frame_i =  as.data.frame(table(pairs_i))
      frame_i$id = i
      # print(frame_i)
      my_list[[i]] = frame_i
    }, error = function(e){})
    }}
  final_a = do.call(rbind.data.frame, my_list)
})

ncores <- detectCores()

# run with 3 cores
t3 <- system.time(parFun(my_data, 3L))
# run with 6 cores and save the result in `res6`
t6 <- system.time(res6 <- parFun(my_data, ncores - 2L))
rbind(t0, t3, t6)[,1:3]
#>    user.self sys.self elapsed
#> t0     12.86     1.00   15.37
#> t3      3.50     0.22    8.37
#> t6      3.61     0.46    7.65

head(final_a, 10)
#>    first second Freq id
#> 1      0      0    2  1
#> 2      1      0    3  1
#> 3      0      1    4  1
#> 4      1      1    0  1
#> 5      0      0    5  2
#> 6      1      0    3  2
#> 7      0      1    2  2
#> 8      1      1    0  2
#> 9      0      0    0  3
#> 10     1      0    1  3
head(res6, 10)
#>    first second Freq id
#> 1      0      0    2  1
#> 2      1      0    3  1
#> 3      0      1    4  1
#> 4      1      1    0  1
#> 5      0      0    5  2
#> 6      1      0    3  2
#> 7      0      1    2  2
#> 8      1      1    0  2
#> 9      0      0    0  3
#> 10     1      0    1  3

str(final_a)
#> 'data.frame':    38945 obs. of  4 variables:
#>  $ first : Factor w/ 2 levels "0","1": 1 2 1 2 1 2 1 2 1 2 ...
#>  $ second: Factor w/ 2 levels "0","1": 1 1 2 2 1 1 2 2 1 1 ...
#>  $ Freq  : int  2 3 4 0 5 3 2 0 0 1 ...
#>  $ id    : int  1 1 1 1 2 2 2 2 3 3 ...
str(res6)
#> 'data.frame':    38949 obs. of  4 variables:
#>  $ first : Factor w/ 2 levels "0","1": 1 2 1 2 1 2 1 2 1 2 ...
#>  $ second: Factor w/ 2 levels "0","1": 1 1 2 2 1 1 2 2 1 1 ...
#>  $ Freq  : int  2 3 4 0 5 3 2 0 0 1 ...
#>  $ id    : int  1 1 1 1 2 2 2 2 3 3 ...

Created on 2022-12-11 with reprex v2.0.2


Edit

The following version seems much faster.

parFun2 <- function(my_data, ncores) {
  registerDoParallel(cl <- makeCluster(ncores))
  on.exit(stopCluster(cl))
  
  results_list <- split(my_data$results, my_data$id)
  
  test <- foreach(i = seq_along(results_list)) %dopar% {
    start_i_results <- results_list[[i]]
    n <- length(start_i_results)
    if(n > 1L) {
      tbl <- table(first = start_i_results[-n], 
                   second = start_i_results[-1L])
      frame_i <- as.data.frame(tbl)
      frame_i$id <- i
      frame_i
    } else NULL
  }
  data.table::rbindlist(test)
}
Rui Barradas
  • 70,273
  • 8
  • 34
  • 66
  • @ Rui Barradas: Thank you so much for your answer! Suppose I have 10 million unique ID's in my dataset - will this code " split_data <- split(my_data, my_data$id)" that will split the data into individual chunks still be a good idea? – stats_noob Dec 11 '22 at 18:26
  • 1
    @stats_noob That's a though question. Maybe the answer is no, not anymore. Can you try it and give feedback on what happened? – Rui Barradas Dec 11 '22 at 18:30
  • @ Rui Barradas : Sure! I will try that! – stats_noob Dec 11 '22 at 18:31
  • @ Rui Barradas: Is it possible to add a print statement to your code? I have been trying to do something like this https://stackoverflow.com/questions/74759716/r-printing-individual-iterations-from-loops ... but so far I could not find a direct way to do this? – stats_noob Dec 11 '22 at 18:40
  • 1
    For near-live progress updates when running in parallel, but also relaying of "print statements", message and warnings, see – HenrikB Dec 11 '22 at 19:17
  • @ Rui Barradas: do you think "doSNOW" and "makeClusterPSOCK()" might be useful in this question? Or is it irrelevant given that we are using "doParallel" and "foreach"? – stats_noob Dec 11 '22 at 20:44
  • @ Henrik B: thank you for your reply! I will check it out! – stats_noob Dec 11 '22 at 20:45
  • @stats_noob No, I don't believe that doSNOW will be better than `foreach/dopar`. – Rui Barradas Dec 11 '22 at 23:01
  • Thank you! makeClusterPSOCK also wont be any better? – stats_noob Dec 11 '22 at 23:04
  • @ Rui Barradas: Did I correctly vectorize this function here? https://stackoverflow.com/questions/74792478/vectorize-functions-in-r I tried to take this question and vectorize my code - is it correct in your opinion ? thanks! – stats_noob Dec 14 '22 at 07:36