2

I've read some questions on the subjects as well as some tutorials but failed to resolve my problem so decided to ask myself.

I have a large collection of big files of types say A, B, C; and I need to left join B, C with A on some conditions. I work on a remote server with 64 CPU and 240GB so naturally I'd like to use it power and process in parallel. A crucial knowledge I have is that if a_i file could be successfully joined only with b_i, b_(i+1) from B, same for C. My initial try was to have a 'join_i' function for 'a_i' file and then run it in parallel (I have 448 files). However, there was no significant time improvement and in fact as I watched performance - sadly, CPU were loaded on a very low percentage. As far as I could dig into the issue, I think the bottleneck is IO, especially because all files are big. Is it a valid hypothesis? In any case, on a second try I decided to go through each file sequentially, but use parallel advantage within each iteration. However, after numerous attempts I didn't get any luck here as well. I tried to make a minimal example below where parallel is much much slower (and in fact on my real data it freezes). What is wrong here? Is it a code mistake or some deeper misunderstanding of how parallel in R works? Also, I tried some multidplyr and mclapply but in both cases no luck either. I also want to point out, that reading files takes more than a join itself: within 1 iteration reading takes about 30 sec (I use fread, unzipping inside it through cmd) while join takes about 10 sec. What is the best strategy here given this? Thanks in advance!

library(dplyr) 

A=data.frame(cbind('a', c(1:10), sample(1:(2*10^6), 10^6, replace=F))) %>% mutate_all(as.character)
B=data.frame(cbind('b', c(1:10), sample(1:(2*10^6), 10^6, replace=F))) %>% mutate_all(as.character)
C=data.frame(cbind('c', c(1:10), sample(1:(2*10^6), 10^6, replace=F))) %>% mutate_all(as.character)


chunk_join=function(i, A, B, C)
{
  A_i=A %>% filter(X2==i)
  B_i=B %>% filter(X2==i) %>% select(X1, X3)
  C_i=C %>% filter(X2==i) %>% select(X1, X3)
  join_i=A_i %>% left_join(B_i, by=c('X3')) %>% left_join(C_i, by=c('X3'))
}

library(parallel)
library(foreach)
cl = parallel::makeCluster(10)
doParallel::registerDoParallel(cl)

# not parallel 

s1=Sys.time()
join1=data.frame()
join1 = foreach(j=1:10, .combine='rbind', 
                .packages=c('dplyr'), 
                .export=c('chunk_join','A', 'B', 'C')) %do%
                {
                  join_i=chunk_join(j, A, B, C)
                }
t1=Sys.time()-s1
colnames(join1)[4:5]=c('joinedB', 'joinedC')
r1=c(sum(!is.na(join1$joinedB)), sum(!is.na(join1$joinedC)))

# parallel 
s2=Sys.time()
join2=data.frame()
join2 = foreach(j=1:10, .combine='rbind', 
                .packages=c('dplyr'), 
                .export=c('chunk_join','A', 'B', 'C')) %dopar%
                {
                  join_i=chunk_join(j, A, B, C)
                }
t2=Sys.time()-s2
stopCluster(cl)
colnames(join2)[4:5]=c('joinedB', 'joinedC')
r2=c(sum(!is.na(join2$joinedB)), sum(!is.na(join2$joinedC)))

R=rbind(r1, r2)
T=rbind(t1, t2)

R
T

On my server this gives around 5s for %do% and over 1m for %dopar%. Note, that this is for join itself, without even taking into account time for making clusters. By the way, can someone also comment how many cluster shall I have? Say, I partition data on X even-sized chunks and have Y CPU available - shall I put Y - as much as possible, or X, or some other number of clusters?

Evgeny
  • 47
  • 6
  • More generally, instead of doing an expensive filter operation everywhere, where you have to send the entire data.frames to all 10 workers, split the data.frames first. – Axeman Apr 10 '19 at 20:51
  • thanks for the suggestion! you mean create a list of 10 filtered data frames beforehand instead of filter on each iteration? what is the best way for doing so for further processing? I tried multidplyr's partition but in the end gave it up. but don't think this is the main part where parallel fails.. – Evgeny Apr 10 '19 at 21:00
  • Perhaps something like `join3 <- clusterMap( cl, fun = function(a, b, c) { a %>% left_join(b, by = 'X3') %>% left_join(c, by = 'X3') }, a = split(A, A$X2), b = split(B, B$X2), c = split(C, C$X2) ) %>% bind_rows()` – Axeman Apr 10 '19 at 21:03
  • have you made cl through parallel or something else? I have 'not a valid cluster' error while running your suggestion – Evgeny Apr 10 '19 at 21:20
  • If you are on a unix system (mac or linux) you should prefer `mclapply` since it doesn't do expensive data copying for each core. – thc Apr 10 '19 at 21:21

1 Answers1

2

There are two issues why your multithreading is slow:

1) Data transfer to new threads 2) Data transfer from new threads back to main threads

Issues #1 is completely avoided by using mclapply, which doesn't copy data unless it is modified, on unix systems. (makeCluster by default uses sockets to transfer data).

Issue #2 cannot be avoided using mclapply, but what you can do is to minimize the amount of data you transfer back to the main thread.

Naive mclapply:

join3 = mclapply(1:10, function(j) {
  join_i=chunk_join(j, A, B, C)
}, mc.cores=4) %>% rbindlist

Slighty smarter mclapply:

chunk_join2=function(i, A, B, C)
{
  A_i=A %>% filter(X2==i)
  B_i=B %>% filter(X2==i) %>% select(X1, X3)
  C_i=C %>% filter(X2==i) %>% select(X1, X3)
  join_i=A_i %>% left_join(B_i, by=c('X3')) %>% left_join(C_i, by=c('X3'))
  join_i[,c(-1,-2,-3)]
}
A <- arrange(A, X2)
join5 = mclapply(1:10, function(j) {
  join_i=chunk_join2(j, A, B, C)
}, mc.cores=4) %>% rbindlist
join5 <- cbind(A, join5)

Benchmarks:

Single threaded: 4.014s 

Naive mclapply: 1.860 s

Slightly smarter mclapply: 1.363 s

If your data has a lot of columns, you can see how Issue #2 will completely bog down the system. You can do even better by e.g. returning the indices of B and C instead of whole data.frame subset.

thc
  • 9,527
  • 1
  • 24
  • 39
  • Yes! This looks very much like a cause of my problem as A-files indeed have 100+ columns. Thanks for pointing this out! One thing: is last cbind safe? i.e. can one be sure that rbindlist will order chunks always in the same order as they were in A regardless of if say chunk 3 finished before chunk 2? – Evgeny Apr 11 '19 at 08:26
  • No it's not safe in general, unless you order your dataset. You can see the extra `A <- arrange(A, X2)` command in the 2nd approach makes it in the same order as the lapply loop (it is included in the benchmark). For your full dataset, I would try to develop a solution that returns matching indices of A and B/C, etc. – thc Apr 11 '19 at 16:37
  • yes, I understood the 'arrange'. in any case, as I have 100+ columns, leaving only one column with key improve the situation a lot! great point to leave not necessary columns before mclapply and add them after, thanks! one more question: what do I do if I want to do mclapply for (f)reading in parallel - load in R list several those large files simultaneously? As I observe, reading itself is indeed going in parallel but then it 'freezes' (only 1 CPU is active) for a long time - is it the same reason of expensive transfer from threads to main? Any workaround/advice here? thanks in advance – Evgeny Apr 11 '19 at 17:53
  • The 1 CPU part is where it's doing the unzipping. You could test out whether mclapply is faster. But if you load in the data a lot, I would suggest saving it in another format. – thc Apr 11 '19 at 19:23
  • thank you for your help! I think there is still a room for improvement in reading data part (it takes some 85% of all processing time), but your suggestions did help a lot! thanks! – Evgeny Apr 12 '19 at 15:07
  • Indeed. One simple improvement is using parallel gzip (`pigz`) instead of normal gzip to decompress your files. But again, reading gzip text files is inherently slow and inefficient. – thc Apr 12 '19 at 16:36