2

I am working with the R programming language.

Suppose I have the following two data frames:

set.seed(123)

df_1 <- data.frame(
  name_1 = c("john", "david", "alex", "kevin", "trevor", "xavier", "tom", "michael", "troy", "kelly", "chris", "henry", "taylor", "ryan", "peter"),
  lon = rnorm(15, mean = -74.0060, sd = 0.01),
  lat = rnorm(15, mean = 40.7128, sd = 0.01)
)

df_2 <- data.frame(
  name_2 = c("matthew", "tyler", "sebastian", "julie", "anna", "tim", "david", "nigel", "sarah", "steph", "sylvia", "boris", "theo", "malcolm"),
  lon = rnorm(14, mean = -74.0060, sd = 0.01),
  lat = rnorm(14, mean = 40.7128, sd = 0.01)
)

My Problem: For each person in df_1, I am trying to find out the 5 closest people (haversine distance) to this person in df_1 and record various distance statistics (e.g. mean, median, max, min standard deviation).

Here is my own attempt at solving this problem.

First I defined a function that calculates the distance between each pair of points:

## PART 1
 library(geosphere)

haversine_distance <- function(lon1, lat1, lon2, lat2) {
  distHaversine(c(lon1, lat1), c(lon2, lat2))
}

Then, I used a loop to calculate all comparasions:

## PART 2
# Create a matrix to store results
distances <- matrix(nrow = nrow(df_1), ncol = nrow(df_2))

# calculate the distances
for (i in 1:nrow(df_1)) {
    for (j in 1:nrow(df_2)) {
        distances[i, j] <- haversine_distance(df_1$lon[i], df_1$lat[i], df_2$lon[j], df_2$lat[j])
    }
}

# Create final
final <- data.frame(
    name_1 = rep(df_1$name_1, each = nrow(df_2)),
    lon_1 = rep(df_1$lon, each = nrow(df_2)),
    lat_1 = rep(df_1$lat, each = nrow(df_2)),
    name_2 = rep(df_2$name_2, nrow(df_1)),
    lon_2 = rep(df_2$lon, nrow(df_1)),
    lat_2 = rep(df_2$lat, nrow(df_1)),
    distance = c(distances)
)

Finally, I kept the 5 minimum distances per person:

## PART 3
# Keep only first 5 rows for each unique value of final$name_1
final <- final[order(final$name_1, final$distance), ]
final <- final[ave(final$distance, final$name_1, FUN = seq_along) <= 5, ]


# Calculate summary statistics for each unique person in final$name_1
final_summary <- aggregate(distance ~ name_1,
                           data = final,
                           FUN = function(x) c(min = min(x),
                                               max = max(x),
                                               mean = mean(x),
                                               median = median(x),
                                               sd = sd(x)))
final_summary <- do.call(data.frame, final_summary)
names(final_summary)[-(1)] <- c("min_distance", "max_distance", "mean_distance", "median_distance", "sd_distance")


final_summary$closest_people <- tapply(final$name_2,
                                       final$name_1,
                                       FUN = function(x) paste(sort(x), collapse = ", "))


# break closest_people column into multiple columns
n <- 5
closest_people_split <- strsplit(final_summary$closest_people, ", ")
final_summary[paste0("closest_", seq_len(n))] <- do.call(rbind, closest_people_split)

My Question: The above code seems to work, but I am interesting in trying to improve the speed of this code (i.e. PART 2) when df_1 and df_2 become very large in size. As such, I am looking into options involving parallel computing using functionalities such as doParallel, parLapply, SNOW, etc.

As I am not overly familiar with this, I tried to look into an option with the doParallel (https://www.rdocumentation.org/packages/parallel/versions/3.4.1/topics/mclapply) library:

library(parallel)

distances <- matrix(nrow = nrow(df_1), ncol = nrow(df_2))

# calculate the distances
distances <- mclapply(1:nrow(df_1), function(i) {
  sapply(1:nrow(df_2), function(j) {
    haversine_distance(df_1$lon[i], df_1$lat[i], df_2$lon[j], df_2$lat[j])
  })
})

The code seems to have run - but I am not sure if what I have done is correct and if this is actually improving the speed of this code.

Can someone please show me how to do this? Is my parallel attempt correct?

Thanks!

stats_noob
  • 5,401
  • 4
  • 27
  • 83
  • 1
    For some situations it might make sense to convert this into two steps, one allocating each observation into a sector, and then another step that only searches within adjacent sectors, since for a sufficiently large sample, the vast majority of distance calculations can be skipped. – Jon Spring May 21 '23 at 22:53
  • @ Jon Spring: Thank you so much for your reply! If you have time, could you please show me how to do this? – stats_noob May 22 '23 at 06:18
  • @ Jon Spring: In general, what do you think of the approach I wrote in my own question using "mclapply"? Is this a valid approach? Thank you so much! – stats_noob May 22 '23 at 06:19
  • If you are working with really a lot of data I would recommend looking into kd-trees. If I have time I'll whip up an example but it is a bit more complicated with haversine distances – JonasV May 24 '23 at 13:02
  • Have a look at this answer https://stackoverflow.com/a/59915415/2761575 for a fast and memory-efficient implementation in RCpp for a similar problem (generating a complete distance matrix from a single set of points). That answer could be adapted to your case pretty easily. – dww May 25 '23 at 19:15
  • @ dww: thank you for your reply! I will read more about this! – stats_noob May 25 '23 at 19:37
  • @stats_noob so, did'nt you find an answer ? – MrSmithGoesToWashington Jun 14 '23 at 07:41

4 Answers4

4

Although still quadratic the following:

haversine_distance <- function(lon1, lat1, lon2, lat2) {
  distHaversine(c(lon1, lat1), c(lon2, lat2))
}

# calculate the distances
for (i in 1:nrow(df_1)) {
    for (j in 1:nrow(df_2)) {
        distances[i, j] <- haversine_distance(df_1$lon[i], df_1$lat[i], df_2$lon[j], df_2$lat[j])
    }
}

can be rewritten in vectorised form like so:

haversine_distance <- function(lon1, lat1, lon2, lat2) {
  distHaversine(cbind(lon1, lat1), cbind(lon2, lat2))
}

# calculate the distances
distances <- expand.grid(i = seq_len(nrow(df_1)), j = seq_len(nrow(df_2)))
# Do not compare to self
distances <- distances[distances$i != distances$j, ]
distances$dist <- haversine_distance(
  df_1$lon[distances$i], df_1$lat[distances$i],
  df_2$lon[distances$j], df_2$lat[distances$j]
)

this will be a lot faster because the code is now vectorised (the example below shows a factor 100 speedup on my machine):

df_1 <- df_1[sample(nrow(df_1), 5E2, replace=TRUE), ]
df_2 <- df_2[sample(nrow(df_2), 5E2, replace=TRUE), ]

system.time({
## PART 2
# Create a matrix to store results
distances <- matrix(nrow = nrow(df_1), ncol = nrow(df_2))
# calculate the distances
for (i in 1:nrow(df_1)) {
    for (j in 1:nrow(df_2)) {
        distances[i, j] <- haversine_distance(df_1$lon[i], df_1$lat[i], df_2$lon[j], df_2$lat[j])
    }
}
})

##   user  system elapsed 
## 17.427   0.015  17.479  

system.time({
# calculate the distances
distances <- expand.grid(i = seq_len(nrow(df_1)), j = seq_len(nrow(df_2)))
# Do not compare to self
distances <- distances[distances$i != distances$j, ]
distances$dist <- haversine_distance(
  df_1$lon[distances$i], df_1$lat[distances$i],
  df_2$lon[distances$j], df_2$lat[distances$j]
)
})
##   user  system elapsed 
##  0.058   0.024   0.082 

The form of distances makes also it easier to do the next step (selecting the top 5; using the original solution from the OP):

distances <- distances[order(distances$i, distances$dist), ]
distances <- distances[ave(distances$dist, distances$i, FUN = seq_along) <= 5, ]

Parallel version

In my system with the example dataset above (each 500 records;) this is slower than running on a single CPU because of the time to start the calculations and copy the data to the nodes.

# calculate the distances
distances <- expand.grid(i = seq_len(nrow(df_1)), j = seq_len(nrow(df_2)))
# Do not compare to self
distances <- distances[distances$i != distances$j, ]
# Start the cluster
cl <- parallel::makeCluster(4)
# Load libraries on cluster nodes
parallel::clusterEvalQ(cl, library(geosphere))
# Split into groups
distances$node <- floor(seq(0, 4-0.0001, length.out = nrow(distances)))
distances <- split(distances, distances$node)
# Run the computation on the nodes:
distances <- parallel::parLapply(cl, distances, function(distances, df_1, df_2, haversine_distance) {
  distances$dist <- haversine_distance(
    df_1$lon[distances$i], df_1$lat[distances$i],
    df_2$lon[distances$j], df_2$lat[distances$j]
  )
  distances
}, df_1, df_2, haversine_distance = haversine_distance)
# Combine the results
distances <- do.call(rbind, distances)
# Close the cluster
parallel::stopCluster(cl)
Jan van der Laan
  • 8,005
  • 1
  • 20
  • 35
  • @ Jan van der Laan: Thank you so much for your answer! As such, what do you think of the approach I suggested in my question? Is my approach even performing parallel computing? As such, how is your approach an improvement over mine? thank you so much! – stats_noob May 21 '23 at 20:00
  • @stats_noob Sorry for the delay. I focussed mainly on the double for-loop you have in your example: in general you want avoid looping over your data. A lot of functions in R work a lot faster vectorised. This also goes for `haversine_distance`. I added a benchmark comparing the two solutions. – Jan van der Laan May 22 '23 at 08:57
  • @stats_noob I also addes a parallel version. The original version was not parallel. It could have been if `distHaversine` would have implemented this. – Jan van der Laan May 22 '23 at 09:14
  • 1
    Minimizing communication in the parallel version could make it more efficient. Pushing the summarisation of the top 5 results to the cluster nodes, I was able to get decent scaling of a 2000-by-2000 record scenario from 3.3 s in the main process to 1.3 s in a 4 node cluster. – Mikko Marttila May 24 '23 at 18:09
1

You could use apply:

cbind(df_1[1], 
    t(apply(df_1[-1], 1,\(y)
       c(min = min(x <- sort(geosphere::distHaversine(y, df_2[-1]))[1:5]),
         max = max(x),
          mean = mean(x),
          median = median(x),
          sd = sd(x)))))

    name_1       min       max      mean    median        sd
1     john  423.1875 1948.9521 1106.4374 1052.8789 674.69139
2    david  602.9369  941.3102  752.1558  715.3872 159.37550
3     alex 1765.7750 2428.5429 2013.7843 1828.6055 294.37805
4    kevin  638.9259  834.5504  715.5252  644.2898 102.23793
5   trevor  520.1834  650.9167  609.4363  631.9494  52.96026
6   xavier  972.9730 1767.1953 1369.5604 1396.8569 371.03190
7      tom  243.6729  530.4778  426.2490  447.8639 110.26649
8  michael  581.9209 1504.5642 1057.1773 1012.5247 378.81712
9     troy  549.4500 1035.0599  782.8799  828.5550 220.72034
10   kelly  491.6430 1130.9239  717.7716  658.7015 248.96974
11   chris 1389.1659 2106.7084 1644.0448 1455.8430 316.31565
12   henry  394.8684  894.5358  647.1996  670.9220 236.69562
13  taylor  170.5171  746.6206  470.0857  439.8022 227.39141
14    ryan  342.8375 1243.7473  970.0721 1052.6759 367.08513
15   peter  195.4891 1455.0204  834.2543  830.2758 539.69009
Onyambu
  • 67,392
  • 3
  • 24
  • 53
  • @ Onyambu: Thank you so much for your answer! As such, what do you think of the approach I suggested in my question? Is my approach even performing parallel computing? As such, how is your approach an improvement over mine? thank you so much! – stats_noob May 21 '23 at 20:00
1

Regarding speed, I think your approach with mclapply is fine and should indeed help speed up your calculations (although you don't really need to initialize an empty matrix beforehand). In case you want to know more about parallelization in general, maybe also check this answer.

I've seen other questions mentioning the Haversine distance in the past, and you can squeeze more performance if you optimize the calculations, either in R or C++ (and I link again to the proxy package, which can potentially facilitate your work).

One thing left to optimize is space (memory) utilization. If you're interested in k nearest neighbors and your data frames become really big, it can make a big difference to store only the distances you need instead of the whole distance matrix. Since I had already done some groundwork in the answer I linked above, I adapted the Rcpp code to your specific use case, although I didn't add a lot of edge-case handling (e.g. specifying more desired neighbors than there are rows in df_2):

// [[Rcpp::depends(RcppParallel,RcppThread)]]

#include <algorithm> // lower_bound
#include <cstddef> // size_t
#include <math.h> // sin, cos, sqrt, atan2, pow
#include <limits>
#include <vector>

#include <RcppThread.h>
#include <Rcpp.h>
#include <RcppParallel.h>

using namespace std;
using namespace Rcpp;
using namespace RcppParallel;

class HaversineCalculator : public Worker
{
public:
  HaversineCalculator(const DataFrame& df_1,
                      const DataFrame& df_2,
                      const int k,
                      const NumericVector& distances)
    : lon1_(as<NumericVector>(df_1["lon"]))
    , lat1_(as<NumericVector>(df_1["lat"]))
    , lon2_(as<NumericVector>(df_2["lon"]))
    , lat2_(as<NumericVector>(df_2["lat"]))
    , k_(k)
    , dist_(distances)
    , cos_lat1_(df_1.nrows())
    , cos_lat2_(df_2.nrows())
    , neighbors_(distances.length(), numeric_limits<size_t>::max())
  {
    // terms for distance calculation
    for (size_t i = 0; i < cos_lat1_.size(); ++i) {
      cos_lat1_[i] = cos(lat1_[i] * to_rad);
    }
    for (size_t i = 0; i < cos_lat2_.size(); ++i) {
      cos_lat2_[i] = cos(lat2_[i] * to_rad);
    }
  }

  vector<size_t> neighbors_;

  void operator()(size_t begin, size_t end) {
    for (size_t i = begin; i < end; ++i) { // iterate over our chunk of df_1
      if (RcppThread::isInterrupted()) return;

      auto const distances_begin = dist_.begin() + i * k_;
      auto const distances_end = distances_begin + k_;
      auto const neighbors_begin = neighbors_.begin() + i * k_;

      for (size_t j = 0; j < lon2_.size(); ++j) { // iterate over all df_2 entries
        // haversine distance
        double d_lon = (lon2_[j] - lon1_[i]) * to_rad;
        double d_lat = (lat2_[j] - lat1_[i]) * to_rad;
        double d_hav = pow(sin(d_lat / 2), 2) + cos_lat1_[i] * cos_lat2_[j] * pow(sin(d_lon / 2), 2);
        if (d_hav > 1) d_hav = 1;
        d_hav = 2 * atan2(sqrt(d_hav), sqrt(1 - d_hav)) * 6378137;

        auto dist_index = lower_bound(distances_begin, distances_end, d_hav); // std
        if (dist_index < distances_end) {
          for (auto k = distances_end - 1; k > dist_index; --k) {
            // shift potentially valid neighbors and their distances
            auto offset = k - distances_begin;
            *(neighbors_begin + offset) = *(neighbors_begin + offset - 1);
            *k = *(k - 1);
          }

          *dist_index = d_hav;
          *(neighbors_begin + (dist_index - distances_begin)) = j;
        }
      }
    }
  }

private:
  static double to_rad;

  const RVector<double> lon1_;
  const RVector<double> lat1_;
  const RVector<double> lon2_;
  const RVector<double> lat2_;

  const int k_;
  RVector<double> dist_;

  vector<double> cos_lat1_;
  vector<double> cos_lat2_;
};

double HaversineCalculator::to_rad = 3.1415926535897 / 180;

// [[Rcpp::export]]
DataFrame haversine_nn(const DataFrame& df_1, const DataFrame& df_2, const int k) {
  NumericVector distances(k * df_1.nrows(), numeric_limits<double>::max());
  HaversineCalculator hc(df_1, df_2, k, distances);

  // you could play around with operations_per_call
  // see also:
  // - https://rcppcore.github.io/RcppParallel/#grain_size
  // - https://stackoverflow.com/a/14878734/5793905
  unsigned int operations_per_call = 100;
  unsigned int grain = operations_per_call / df_2.nrows() + (operations_per_call % df_2.nrow() != 0);
  Rcout << "Processing " << grain << " row(s) from df_1 on each thread call.\n";
  parallelFor(0, df_1.nrows(), hc, grain);
  RcppThread::checkUserInterrupt();

  CharacterVector names_1(k * df_1.nrows());
  CharacterVector neighbors(k * df_1.nrows());

  CharacterVector name_1 = df_1["name_1"];
  CharacterVector name_2 = df_2["name_2"];

  for (size_t i = 0; i < df_1.nrows(); ++i) {
    for (size_t j = 0; j < k; ++j) {
      size_t offset = (i * k) + j;
      names_1[offset] = name_1[i];

      size_t name_2_index = hc.neighbors_[offset];
      if (name_2_index < df_2.nrows()) {
        neighbors[offset] = name_2[name_2_index];
      } else {
        neighbors[offset] = "logic_error";
      }
    }
  }

  return DataFrame::create(_["name_1"] = names_1, _["name_2"] = neighbors, _["distance"] = distances);
}

I saved that in haversine_nn.cpp and then ran:

library(Rcpp)
library(RcppParallel)
library(RcppThread)

sourceCpp("haversine_nn.cpp")
haversine_nn(df_1, df_2, 5L)

I believe somewhere in your code you have an issue detecting the top 5 neighbors, at least my results seem to differ, but even looking at your distances matrix from part 2, there seems to be something off.

And maybe some explicit things about the C++ code.

Note that the distance column of the returned data frame is allocated beforehand and used directly to avoid concatenating things in the end, using offsets to figure out which indices are in the valid range depending on the row from df_1 that is being processed.

There's no synchronization because each chunk is completely disjoint from the others, so if the logic is correct, each thread will only write to parts of the vector that are never going to be accessed by the other threads.

Be careful with handling of size_t because it's unsigned, so even comparing variables with that type to negative values can give you weird reults (it certainly happened to me).

The grain and operations_per_call have a similar goal as mc.preschedule in mclapply, although the former provide a bit finer control. It's not easy to know what's optimal, you'd probably have to profile with your own data, ideally different data sets.

And for completeness, this still has df_1.nrows() * df_2.nrows() runtime complexity, I don't think there's a way to avoid that since you have to calculate all distances even if you only care about a subset of them. It just has lower space complexity.

Alexis
  • 4,950
  • 1
  • 18
  • 37
  • I think the logic problem in the OP's code is `distance = c(distances)` when first creating the `final` data frame, I think that doesn't "flatten" as assumed, it concatenates columns, not rows. – Alexis May 26 '23 at 21:43
-1

This is an answer I already gave, with absolutely no succes, here. You may adapt for your purpose.

For very heavy data, I would use something like this (but don't know how this solution behave compared to others solutions provided ) :

rm(list=ls())
gc()

#  packages needed
for (package in c('data.table', 'stringr', 'stringdist')) {
  if (!require(package, character.only = TRUE, quietly = TRUE)) {
    install.packages(package, dependencies = TRUE)
    library(package, character.only = TRUE)
  }
}

# Data
tbl.data <- data.table(name = c("CANON PVT. LTD", "Antila,Thomas", "Greg", "St.Luke's Hospital", 
                              "Z_SANDSTONE COOLING LTD", "St.Luke's Hospital", 
                              "CANON PVT. LTD.",
                              "SANDSTONE COOLING LTD",
                              "Greg", "ANTILA,THOMAS"))

# A string with just letters
tbl.data[, string_ := str_replace_all(tolower(str_replace_all(name, "[^[:graph:]]", "")),"[[:punct:][:digit:]]", "")]

# Create a table for comprisons ----
################################ -
tbl.a <- tbl.data[, .(string_1 = string_, names1 = name, one = 1, id1 = .I)]
tbl.b <- tbl.data[, .(string_2 = string_,  names2 =name, one = 1, id2 = .I)]
  # avoid unneeded lines :
tbl.a <- unique(tbl.a)
tbl.b <- unique(tbl.b)
  # the table of all couples :
tbl.dedoubl <- merge(tbl.a, tbl.b, by = "one",  allow.cartesian = TRUE)
  # avoid unneeded comparisons :
tbl.dedoubl <- tbl.dedoubl[id1 < id2]

# Distance calculations : ----
################################ -
tbl.dedoubl[, distance := stringdist(string_1, string_2, method = "lv")]

# Lines suspected to be doubles : ----
################################### -
tbl.dedoubl[distance <= 2]