11

I am trying to parallelize the extraction of data saved in some html documents and to store it into data.frames (some millions of documents, hence the usefulness of the parallelization).

In a first step, on the machine where I do register the queue, I select a subset of the html files and lapply to them the read_html function (from the rvest package, I have also tried the similar function from the XML package but I was getting memory leak problems) to obtain a unique list storing the content of many html pages.

Then I use an iterator on this list to obtain smaller chunks of it to be feeded to the foreach.

Inside the foreach I build up the data.frame(s) (using html_table function and some basic data manipulation) and I return a list whose elements are the cleaned data.frames.

I have tried to use both the doSNOW backend on win 8 and the doRedis one on ubuntu 16.04.

In the first case a list of empty lists is returned while in the second one an error of memory mapping is thrown; you can find the traceback at the very bottom of the question.

To my understanding the (chunks of) lists that I am sending to the cores are not behaving as I expect. I have gathered around that the list object may be just a set of pointers but I have not been able to confirm it; maybe this could be the issue? Is there an alternative to the "list way" to "encapsulate" the data of multiple html pages?

Below you can find some code reproducing the issue. I am a completely new to stack overflow, new to parallel programming and fairly new to R programming: any advice for improvement is welcome. Thank you all in advance.

library(rvest)
library(foreach)

#wikipedia pages of olympic medalist between 1992 and 2016 are
# downloaded for reproducibility
for(i in seq(1992, 2016, by=4)){

  html = paste("https://en.wikipedia.org/wiki/List_of_", i, "_Summer_Olympics_medal_winners", sep="")
  con = url(html)
  htmlCode = readLines(con)
  writeLines(htmlCode, con=paste(i, "medalists", sep="_"))
  close(con)

}

#declaring the redis backend (doSNOW code is also included below)

#note that I am using the package from 
#devtools::install_github("bwlewis/doRedis") due to a "nodelay error"
#(more info on that here: https://github.com/bwlewis/doRedis/issues/24)
# if it is not your case please drop the nodelay and timeout options

#Registering cores ---Ubuntu---
cores=2
library('doRedis')
options('redis:num'=TRUE)
registerDoRedis("jobs", nodelay=FALSE)
startLocalWorkers(n=cores, "jobs", timeout=2, nodelay=FALSE)
foreachOpt <- list(preschedule=FALSE)


#Registering cores ---Win---
#cores=2
#library("doSNOW")
#registerDoSNOW(makeCluster(cores, type = "SOCK"))


#defining the iterator
iterator <- function(x, ...) {
  i <- 1
  it <- idiv(length(x), ...)

  if(exists("chunks")){
    nextEl <- function() {
      n <- nextElem(it)
      ix <- seq(i, length=n)
      i <<- i + n
      x[ix]
    }
  }else{
    nextEl <- function() {
      n <- nextElem(it)
      ix <- seq(i, i+n-1)
      i <<- i + n
      x[ix]
    }
  }
  obj <- list(nextElem=nextEl)
  class(obj) <- c(
    'ivector', 'abstractiter','iter')
  obj
}

#reading files
names_files<-list.files()
html_list<-lapply(names_files, read_html)

#creating iterator
ChunkSize_html_list<-2
iter<-iterator(html_list, chunkSize=ChunkSize_html_list)

#defining expanding list (thanks StackOverflow and many thanks to
#JanKanis's answer : http://stackoverflow.com/questions/2436688/append-an-object-to-a-list-in-r-in-amortized-constant-time-o1  )
expanding_list <- function(capacity = 10) {
  buffer <- vector('list', capacity)
  length <- 0

  methods <- list()

  methods$double.size <- function() {
    buffer <<- c(buffer, vector('list', capacity))
    capacity <<- capacity * 2
  }

  methods$add <- function(val) {
    if(length == capacity) {
      methods$double.size()
    }

    length <<- length + 1
    buffer[[length]] <<- val
  }

  methods$as.list <- function() {
    b <- buffer[0:length]
    return(b)
  }

  methods
}

#parallelized part
clean_data<-foreach(ite=iter, .packages=c("itertools", "rvest"), .combine=c,
 .options.multicore=foreachOpt, .options.redis=list(chunkSize=1)) %dopar% {

  temp_tot <- expanding_list()
      for(g in 1:length(ite)){

        #extraction of data from tables
      tables <- html_table(ite[[g]], fill=T, header = T)

        for(i in 1:length(tables)){

          #just some basic data manipulation
          temp<-lapply(tables, function(d){d[nrow(d),]})
          temp_tot$add(temp)
          rm(temp)
          gc(verbose = F)
        }
      }
  #returning the list of cleaned data.frames to the foreach 
    temp_tot$as.list()
}

Error thrown when using the redis backend:

*** caught segfault ***
address 0x60, cause 'memory not mapped'


Traceback:
 1: .Call("xml2_doc_namespaces", PACKAGE = "xml2", doc)
 2: doc_namespaces(doc)
 3: xml_ns.xml_document(x)
 4: xml_ns(x)
 5: xpath_search(x$node, x$doc, xpath = xpath, nsMap = ns, num_results = Inf)
 6: xml_find_all.xml_node(x, ".//table")
 7: xml2::xml_find_all(x, ".//table")
 8: html_table.xml_document(ite[[g]], fill = T, header = T)
 9: html_table(ite[[g]], fill = T, header = T)
10: eval(expr, envir, enclos)
11: eval(.doRedisGlobals$expr, envir = .doRedisGlobals$exportenv)
12: doTryCatch(return(expr), name, parentenv, handler)
13: tryCatchOne(expr, names, parentenv, handlers[[1L]])
14: tryCatchList(expr, classes, parentenv, handlers)
15: tryCatch({    lapply(names(args), function(n) assign(n, args[[n]], pos = .doRedisGlobals$exportenv))    if (exists(".Random.seed", envir = .doRedisGlobals$exportenv)) {        assign(".Random.seed", .doRedisGlobals$exportenv$.Random.seed,             envir = globalenv())    }    tryCatch({        if (exists("set.seed.worker", envir = .doRedisGlobals$exportenv))             do.call("set.seed.worker", list(0), envir = .doRedisGlobals$exportenv)    }, error = function(e) cat(as.character(e), "\n"))    eval(.doRedisGlobals$expr, envir = .doRedisGlobals$exportenv)}, error = function(e) e)
16: FUN(X[[i]], ...)
17: lapply(work[[1]]$argsList, .evalWrapper)
18: redisWorker(queue = "jobs", host = "localhost", port = 6379,     iter = Inf, linger = 30, log = stdout(), timeout = 2, nodelay = FALSE)
aborting ...
dgdi
  • 320
  • 2
  • 10
  • 6
    Congrats for your first question and welcome on stackoverflow. – Tensibai Sep 01 '16 at 08:27
  • I think you are trying to be too clever here. I don't see a reason to use a closure. Why do you need this "expanding list"? Apparently, you know how large the lists need to be, so simply pre-allocate them using `vector(mode = "list", length = length(tables))`. – Roland Sep 01 '16 at 10:52
  • Hi Roland, I have two objections to your suggestion. The first, just to clarify the problem, is that "temp_tot" gathers all the (final rows of) all the tables (the i-loop) from all the pages (the g-loop) and the # of tables for every page is not known. I see that this could be solved by c() (at the end of the g-loop) the lists created in the i-loop (using your code). The second objection, that led me prefer the expanding.list(), is due to the nested structure arising from the "c() way" (first index inherited from the g-index, second one from the i-index) that the expanding.list() avoids – dgdi Sep 01 '16 at 13:30

1 Answers1

3

I think the problem is that you're creating the XML/HTML document objects on the master by calling "read_html", and then processing them on the workers. I've tried some experiments, and it looks like that doesn't work, probably because those objects can't be serialized, sent to the workers, and then deserialized correctly. I think the objects are corrupt, causing the workers to segfault when they try to operate on them using the "html_table" function.

I suggest that you modify your code to iterate over the file names so that the workers can call "read_html" themselves, thus avoiding serializing the XML document objects.


Here is some of the test code that I experimented with:

library(xml2)
library(snow)
cl <- makeSOCKcluster(3)
clusterEvalQ(cl, library(xml2))

# Create XML documents on the master
docs <- lapply(1:10,
      function(i) read_xml(paste0("<foo>", i, "</foo>")))

# Call xml_path on XML documents created on master
r1 <- lapply(docs, xml_path)            # correct results
r2 <- clusterApply(cl, docs, xml_path)  # incorrect results

# This seems to work...
docs2 <- clusterApply(cl, 1:10,
      function(i) read_xml(paste0("<foo>", i, "</foo>")))

# But this causes a segfault on the master
print(docs2)

I used snow functions directly to verify that the problem wasn't in foreach or doSNOW.

Steve Weston
  • 19,197
  • 4
  • 59
  • 75
  • Hi Steve, many thanks for your time, your answer indeed is a major progress and I can now start to clean. Using `clusterApply(cl, 1:10, function(i) html_table(read_xml(paste0("", i, "")), fill=T))` (with rvest exported to cluster) an accessible list of list of data.frames is returned (great!). I too got error with yours doc2, maybe an issue with print xml? However, in the question I was "incapsulating" the pages content by reading on master to allow other machines to join the job (in the "doRedis" case) while, please correct me if I am wrong, this is not possible with your strategy – dgdi Sep 02 '16 at 08:44
  • @dgdi I thought that moving the read_html function from the master to the workers didn't fundamentally change the way work was scheduled, but perhaps I'm missing something. – Steve Weston Sep 03 '16 at 13:41