2

There's a couple of earlier related questions, but none of which solve the issue for me:

My use case is the following: I have a large database of data that needs to be plotted. Each plot takes a few seconds to create due to some necessary pre-processing of the data and the plotting itself (ggplot2). I need to do a large number of plots. My thinking is that I will connect to the database via dplyr without downloading all the data to memory. Then I have a function that fetches a subset of the data to be plotted. This approach works fine when using single-threading, but when I try to use parallel processing I run into SQL errors related to the connection MySQL server has gone away.

Now, I recently solved the same issue working in Python, in which case the solution was simply to kill the current connection inside the function, which forced the establishment of a new connection. I did this using connection.close() where connection is from Django's django.db.

My problem is that I cannot find an R equivalent of this approach. I thought I had found the solution when I found the pool package for R:

This package enables the creation of object pools for various types of objects in R, to make it less computationally expensive to fetch one. Currently the only supported pooled objects are DBI connections (see the DBI package for more info), which can be used to query a database either directly through DBI or through dplyr. However, the Pool class is general enough to allow for pooling of any R objects, provided that someone implements the backend appropriately (creating the object factory class and all the required methods) -- a vignette with instructions on how to do so will be coming soon.

My code is too large to post here, but essentially, it looks like this:

#libraries loaded as necessary

#connect to the db in some kind of way
#with dplyr
db = src_mysql(db_database, username = db_username, password = db_password) 
#with RMySQL directly
db = dbConnect(RMySQL::MySQL(), dbname = db_database, username = db_username, password = db_password)
#with pool
db = pool::dbPool(RMySQL::MySQL(), 
                  dbname = db_database, 
                  username = db_username, 
                  password = db_password, 
                  minSize = 4)
#I tried all 3

#connect to a table
some_large_table = tbl(db, 'table')

#define the function
some_function = function(some_id) {
    #fetch data from table
    subtable = some_large_table %>% filter(id == some_id) %>% collect()

    #do something with the data
    something(subtable)
}

#parallel process
mclapply(vector_of_ids,
         FUN = some_function,
         mc.cores = num_of_threads)
marc_s
  • 732,580
  • 175
  • 1,330
  • 1,459
CoderGuy123
  • 6,219
  • 5
  • 59
  • 89

1 Answers1

6

The code you have above is not the equivalent of your Python code, and that is the key difference. What you did in Python is totally possible in R (see MWE below). However, the code you have above is not:

kill[ing] the current connection inside the function, which forced the establishment of a new connection.

What it is trying (and failing) to do is to make a database connection travel from the parent process to each child process opened by the call to mclapply. This is not possible. Database connections can never travel across process boundaries no matter what.

This is an example of the more general "rule" that the child process cannot affect the state of the parent process, period. For example, the child process also cannot write to memory locations. You can’t plot (to the parent process’s graphics device) from those child processes either.

In order to do the same thing you did in Python, you need to open a new connection inside of the function FUN (the second argument to mclapply) if you want it to be truly parallel. I.e. you have to make sure that the dbConnect call happens inside the child process.

This eliminates the point of pool (though it’s perfectly safe to use), since pool is useful when you reuse connections and generally want them to be easily accessible. For your parallel use case, since you can't cross process boundaries, this is useless: you will always need to open and close the connection for each new process, so you might as well skip pool entirely.

Here's the correct "translation" of your Python solution to R:

library(dplyr)

getById <- function(id) {
  # create a connection and close it on exit
  conn <- DBI::dbConnect(
    drv = RMySQL::MySQL(),
    dbname = "shinydemo",
    host = "shiny-demo.csa7qlmguqrf.us-east-1.rds.amazonaws.com",
    username = "guest",
    password = "guest"
  )
  on.exit(DBI::dbDisconnect(conn))

  # get a specific row based on ID
  conn %>% tbl("City") %>% filter(ID == id) %>% collect()
}

parallel::mclapply(1:10, getById, mc.cores = 12)
Bárbara Borges
  • 889
  • 7
  • 15