3

How can I read a CSV file by chunks (its size is too large to read in once), and process all chunks using parallel package? Let's say I want to calculate the average of a column by chunks.

Without parallel I would use something like this:

library(readr)

f <- function(df_chunk, pos) mean(df_chunk$'mpg')

res <- read_csv_chunked(readr_example("mtcars.csv"), DataFrameCallback$new(f), chunk_size=10)

The result is:

> res
      [,1]
[1,] 20.37
[2,] 19.89
[3,] 20.39
[4,] 18.20

So I want these 4 average values to be calculated by child processes. Some code to achieve this could be something like this:

library(foreach)
library(doParallel)

registerDoParallel(6)

calc_avg <- function (iterable_df_chunks) {
  foreach(df_chunk = iterable_df_chunks, .combine = c) %dopar%
    mean(df_chunk$'mpg')
}

calc_avg(< some code with read_csv_chunked() >)

stopImplicitCluster()

Thank you!

ragesz
  • 9,009
  • 20
  • 71
  • 88

1 Answers1

0

Ok, I found a working solution. Function load_CPU only does some CPU intensive calculation to check that really the child processes do the job:

load_CPU <- function(n){
  i=3
  v=c(2)
  while (length(v)<=n-1){

    if (all((i%%v[v<ceiling(sqrt(i))])!=0)){ 
      v=c(v,i)
    }
    i=i+2;
  }
  return(v)
}

calc_avg <- function (path) foreach(y = read_csv_chunked(path, ListCallback$new(function (x, pos) x),
    chunk_size = 10), .combine = rbind, .export=c('load_CPU')) %dopar% {
        load_CPU(10000)
        mean(y$'mpg')
    }

calc_avg(readr_example("mtcars.csv"))

The result is:

          [,1]
result.1 20.37
result.2 19.89
result.3 20.39
result.4 18.20
ragesz
  • 9,009
  • 20
  • 71
  • 88