2

This question is a follow-up from this thread

I'd like to perform three actions on a disk frame

  1. Count the distinct values of the field id grouped by two columns (key_a and key_b)
  2. Count the distinct values of the field id grouped by the first of two columns (key_a)
  3. Add a column with the distinct values for the first column / the distinct values across both columns

This is my code

      my_df <-
        data.frame(
          key_a = rep(letters, 384),
          key_b = rep(rev(letters), 384),
          id = sample(1:10^6, 9984)
        )
      
      my_df %>% 
        select(key_a, key_b, id) %>% 
        chunk_group_by(key_a, key_b) %>% 
        # stage one
        chunk_summarize(count = n_distinct(id)) %>% 
        collect %>% 
        group_by(key_a, key_b) %>% 
        # stage two
        mutate(count_summed = sum(count)) %>%
        group_by(key_a) %>% 
        mutate(count_all = sum(count)) %>% 
        ungroup() %>% 
        mutate(percent_of_total = count_summed / count_all)

My data is in the format of a disk frame, not a data frame, and it has 100M rows and 8 columns.

I'm following the two step instructions described in this documentation

I'm concerned that the collect will crash my machine since it brings everything to ram

Do I have to use collect in order to use dplyr group bys in disk frame?

Cauder
  • 2,157
  • 4
  • 30
  • 69

1 Answers1

2

You should always use srckeep to load only those columns you need into memory.

my_df %>% 
        srckeep(c("key_a", "key_b", "id")) %>%
        # select(key_a, key_b, id) %>% # no need if you use srckeep
        chunk_group_by(key_a, key_b) %>% 
        # stage one
        chunk_summarize(count = n_distinct(id)) %>% 
        collect %>% 
        group_by(key_a, key_b) %>% 
        # stage two
        mutate(count_summed = sum(count)) %>%
        group_by(key_a) %>% 
        mutate(count_all = sum(count)) %>% 
        ungroup() %>% 
        mutate(percent_of_total = count_summed / count_all)

collect will only bring the results of computing chunk_group_by and chunk_summarize into RAM. It shouldn't crash your machine.

You must use collect just like other systems like Spark.

But if you are computing n_distinct, that can be done in one-stage anyway

 my_df %>% 
        srckeep(c("key_a", "key_b", "id")) %>%
        #select(key_a, key_b, id) %>% 
        group_by(key_a, key_b) %>% 
        # stage one
        summarize(count = n_distinct(id)) %>% 
        collect

If you really concerned about RAM usage, you can reduce the number of workers to 1

setup_disk.frame(workers=1)
my_df %>% 
        srckeep(c("key_a", "key_b", "id")) %>%
        #select(key_a, key_b, id) %>% 
        group_by(key_a, key_b) %>% 
        # stage one
        summarize(count = n_distinct(id)) %>% 
        collect

setup_disk.frame()
xiaodai
  • 14,889
  • 18
  • 76
  • 140
  • Can you speak more to why fewer workers reduces RAM usage? – Cauder Sep 21 '20 at 07:32
  • I replaced my `select` with `srckeep` and it gave me this error: Error in fst::read_fst(filename, columns = keep, as.data.table = TRUE, : Selected column not found. The code worked with the example data, but not my disk frame – Cauder Sep 21 '20 at 07:38
  • I tried another column and it didn't give me an error, but the number of columns remain unchanged – Cauder Sep 21 '20 at 07:43
  • `srckeep(c("col", "col1"))` it works!. Make sure you have the c and make sure you don't have misspelled column names. – xiaodai Sep 21 '20 at 11:50