0

I am doing different data processing operations over a big dataset (approx 50M rows per 14 columns, some of them being strings of no more than 14 characters) using arrow. I have used open_dataset to manipulate the data using arrow. After doing several operations, I am able to call compute on a final dataset I use before some grouping and summarising operations within a for loop. Based on this dataset, the code is the following:

Note: summarise_numeric is a custom function that summarises several variables

geo_groupings <- list(national = "", 
                      region = "region",
                      zip_code = "Zip Code")
var_groupings <- list(unsegmented = "",
                      no_gender = c("Age"),
                      no_age = c("Gender"),
                      all_variables = c("Gender", "Age"))

sales <- sales %>% compute()
# 3.2 Variables creation:
for (geo_group in names(geo_groupings)){
  for (var_group in names(var_groupings)){
    groups <- c("date", "customer")
    if (var_group != "unsegmented") groups <- c(groups, var_groupings[[var_group]])
    if (geo_group != "national") groups <- c(groups, geo_groupings[[geo_group]])
    groups <- rlang::syms(groups)
    
    stacked <- sales %>% 
      group_by(!!!groups[1:length(groups)]) %>% 
      summarise_numeric() %>% 
      mutate(Segment = tolower(paste(!!!groups[2:length(groups)], sep = "_"))) %>% 
      relocate(Segment, .after = "date") %>% 
      # We collect because there is not an implementation of tidyr::pivot_wider
      # for the arrow package yet
      collect()

Then I get the following error:

#> Error in `compute.arrow_dplyr_query()`:
#> ! Out of memory: realloc of size 8589934656 failed

The code works perfectly, the only issue is memory reallocation. I need to call collect because then I need to apply pivot_wider to stacked and this is not implemented as of arrow 11.0.0.3.

An example of how I would approach this example with arrow::map_batches in order to circumvent memory issues is the following:

map_batches(iris_dt, function(batch){
  batch %>% 
    group_by(Species) %>% 
    summarise(across(where(is.numeric), sum))  
}) %>% 
  collect()

For this particular example, it has the same result as

iris %>% 
  group_by(Species) %>% 
  summarise(across(where(is.numeric), sum)) %>% 
  collect()

However, when calling the same code in my big dataset

 stacked <- arrow::map_batches(sales, function(batch){
   batch %>%
     group_by(!!!groups[1:length(groups)]) %>% 
     summarise_numeric() %>% 
     mutate(customer = tolower(paste(!!!groups[2:length(groups)], sep = "_"))) %>% 
     relocate(customer, .after = "date") 
 }) %>% 
  collect()

The result that I get is not perfectly aggregated as you would expect from a group_by and summarise operation. I am wondering whether using the .lazy = FALSE argument would solve this issue.

  • 2
    If you have a determinate set of columns to spread to, you can use `arrow::map_batches()` as an alternative to `collect()`, which can let you iterate over RecordBatches instead of collecting the full dataset at once. – alistaire Apr 10 '23 at 16:51
  • @alistaire Could you please provide an example of how you would, for instance, create a batch for each relevant column in the data in my particular example? – Alberto Agudo Dominguez Apr 11 '23 at 12:12
  • RecordBatches are batches of records (rows) with all columns. In a lot of regards they behave like Arrow Tables/dataframes, though they're not, exactly—they're slightly lower-level. [Here are some docs.](https://arrow.apache.org/docs/r/reference/RecordBatch.html) `map_batches()` iterates over the RecordBatch instances in your Dataset, so if you can transform each batch consistently to whatever you want, you can transform the whole dataset without collecting. For an answer, you'll need to create [a reproducible example](https://stackoverflow.com/q/5963269/4497050) with sample data. – alistaire Apr 11 '23 at 16:52
  • Thank you for the response @alistaire. I am sorry but the data that I'm processing here is a bit sensitive so I am not able to reproduce the exact example here. However, I will edit the question including an example with the `iris` dataset so that you can see what I have done and what my question is – Alberto Agudo Dominguez Apr 12 '23 at 08:20
  • @alistaire My main question is whether using `map_batches` allows me to process the whole dataset in chunks, since the results that I got with my big dataset unexpectedly still require further aggregation – Alberto Agudo Dominguez Apr 12 '23 at 08:29

0 Answers0