1

I'm doing SQL style join of two large, fixed size (lat,long) coordinate datasets by nearest neighbor search. Currently I'm using dplyr and data.table to do this. How can I optimize and parallelize my code for absolute runtime?

Previous attempts include native python, pandas, and multiprocessing which ended up being very slow. My current solution, using data.table to construct a table of nearest neighbors and dplyr to join based on this table, is the quickest but is still too slow.

library(dplyr)
library(data.table)
library(geosphere)

source <- data.table(lat = runif(1e3), long = runif(1e3)) %>% mutate(nrow = row_number())
dest <- data.table(lat = runif(5e4), long = runif(5e4)) %>% mutate(ind = row_number())
dest_mat <- as.matrix(dest[, c('long', 'lat')])
setDT(source)
# function that returns the index of the closest matching point in dest
mindist_ind <- function(source_lat, source_long) { return(which.min(distHaversine(c(source_long, source_lat), dest_mat))) }


nn_inds <- source[, j = list(ind = mindist_ind(lat, long)), by = 1:nrow(source)] # slowest line, gets index of nearest match in dest
nn_matches <- inner_join(nn_inds, dest, by = 'ind') # join final back to dest in order to get all nearest matches
sourcedest_matches <- inner_join(source, nn_matches, by = 'nrow') # join nearest matches to source by index

The source file is ~89 million rows, and dest is roughly ~50k rows. Current timing for various source sizes are as follows:

  • 1000 rows -> 46 seconds
  • 10000 rows -> 270 seconds
  • 100000 rows -> 2580 seconds
  • 1000000 rows -> 17172 seconds

While this is the quickest I've been able to get, for the full 89 million source file it would take an estimated 17-18 days to run which is far too long. I'm running this on a r4.16xlarge AWS EC2 instance, with 488 GB RAM, 32 cores, and 64 vCPUs. How can I optimize/parallelize this code to run quicker?

  • 1
    `` On my machine it takes 0 seconds, likely because I have 0 rows of data. `` Perhaps you could provide some reproducible sample data with which we can test/benchmark things? – r2evans Mar 28 '19 at 15:19
  • 1
    When you time the three steps, which one takes the longest? Btw, overwriting `final` makes it hard to follow the logic. If the set of rows stays the same, you can "update" the table in place rather than making a new table on each line, as explained in the intro materials for data.table. – Frank Mar 28 '19 at 15:21
  • 1
    @r2evans Good suggestion - I've edited the code to generate sample data – africabytoto Mar 28 '19 at 16:50
  • 1
    @Frank By far the most costly step is constructing nn_inds, the data table containing the indices of nearest matches in dest. In the code I run I modify the tables in place and have separated and named them differently here for clarity - good point. I'm hoping to either optimize this portion to run faster or utilize all available cores by splitting the source file since dest is only being read in the 3 processing steps. – africabytoto Mar 28 '19 at 16:51
  • Possible duplicate of [R: Distm for big data? Calculating minimum distances between two matrices](https://stackoverflow.com/questions/49863185/r-distm-for-big-data-calculating-minimum-distances-between-two-matrices) – F. Privé Mar 28 '19 at 18:02
  • I'm pretty sure `distHaversine` calculates pairwise distances and for uneven matrices it just recycles. Are you sure that's what you want? I imagine you'd want the distance between each source point and every dest point for all source points. This probably means that your existing timings are vastly underestimating how long this will take. https://stackoverflow.com/questions/23615063/calculating-great-circle-distance-matrix – Michael Mar 28 '19 at 21:43
  • consider that the following returns a vector of 4 distances not a matrix of 2x4 distances: `distHaversine(data.frame(c(0,0),c(0,0)),data.frame(c(90,85,84,80),c(90,85,84,80)))` – Michael Mar 28 '19 at 21:44
  • have you thought of adding a spatial index to the dest to speed up the nearest neighbour find? – Ian Turton Mar 29 '19 at 15:51

1 Answers1

1

I'm assuming the code you've provided in your question isn't actually what you want. Your code calculates the distance between pairwise rows of source and dest, recycling source to match the length of dest.

What you probably want, and what this answer provides, is to find the nearest point in dest for every point in source. (see my comments on your question)

Calculating distance matrices is computationally intensive. Assuming R packages are approximately equally efficient at calculating distance matrices, really the only way to speed this up is to parallelize over the distance matrix calculation. It's unfortunate that the matrix with more rows is the reference points, because parallelization can only occur over subsets of the source points. (ie, you need to consider all dest points to find the nearest dest point to any given source)

library(parallel)
library(sp)
#nonparallel version
x2 <- copy(source)
temp <- spDists(x2[, .(long,lat)],dest_mat,longlat=TRUE)
system.time(final2 <- x2[, c("long_dest","lat_dest"):=as.list(dest_mat[apply(temp,1,which.min),]) ])

#parallel version

source_l <- split(source, rep(1:10,each=100))

cl <- makeCluster(getOption("cl.cores", 4))
clusterExport(cl, "dest_mat") #this is slow but I don't think there's a way to get around this

system.time(
  v <- parLapply(cl=cl, X=source_l, fun=function(x){
    library(sp)
    library(data.table)
    temp <- spDists(x[, .(long,lat)],dest_mat,longlat=TRUE)
    x[, c("long_dest","lat_dest"):=as.list(dest_mat[apply(temp,1,which.min),]) ]
    x
  })
)

stopCluster(cl)

final <- rbindlist(v)
identical(final[order(nrow)],final2)

You'll need to play around with whether using more than 32 processes actually speeds things up. Hyperthreading can be a mixed bag and it's not always easy to predict whether it has any benefit. Unfortunately there's no guarantee you'll have enough RAM to run the optimal number of processes. Not only is this slow but it's also memory intensive. If you get errors indicating that you've run out of memory you'll need to decrease the number of processes or rent an EC2 machine with more memory.

Finally, I'll note that which.min returns the index of the first minima if there are ties. So the results will depend on the order of rows in dest_mat.

Michael
  • 5,808
  • 4
  • 30
  • 39