3

I use the memoise package to cache queries to an arrow dataset but I sometimes get mismatches/"collisions" in hashes and therefore the wrong values are returned.

I have isolated the problem and replicated it in the MWE below. The issue is that the rlang::hash() (which memoise uses) of an arrow query that first filters then summarises does not depend on the filter.

My question is: is this something that I can fix (because I used it wrongly) or is this a bug in the one of the packages (I am happy to create an issue), if so, should this be reported to arrow, rlang::hash(), or even R6?

MWE

For example, all three queries below have the same hash but they should be different (and looking at the results, the results obviously are...)

library(arrow)
library(dplyr)

ds_file <- file.path(tempdir(), "mtcars")

write_dataset(mtcars, ds_file)
ds <- open_dataset(ds_file)

# 1) Create three different queries =======

# Query 1 with mpg > 25 ----
query1 <- ds |> 
  filter(mpg > 25) |> 
  group_by(vs) |> 
  summarise(n = n(), mean_mpg = mean(mpg))

# Query 2 with mpg > 0 ----
query2 <- ds |> 
  filter(mpg > 0) |> 
  group_by(vs) |> 
  summarise(n = n(), mean_mpg = mean(mpg))

# Query 3 with filter on cyl ----
query3 <- ds |> 
  filter(cyl == 4) |> 
  group_by(vs) |> 
  summarise(n = n(), mean_mpg = mean(mpg))


# 2) Lets compare the hashes: the main issue ======
rlang::hash(query1)
#> [1] "f505339fd65df6ef53728fcc4b0e55f7"
rlang::hash(query2)
#> [1] "f505339fd65df6ef53728fcc4b0e55f7"
rlang::hash(query3)
#> [1] "f505339fd65df6ef53728fcc4b0e55f7"
# ERROR HERE: they should be different as the queries are different!

# 3) Lets also compare the results: clearly different =====
query1 |> collect()
#> # A tibble: 2 × 3
#>      vs     n mean_mpg
#>   <dbl> <int>    <dbl>
#> 1     1     5     30.9
#> 2     0     1     26

query2 |> collect()
#> # A tibble: 2 × 3
#>      vs     n mean_mpg
#>   <dbl> <int>    <dbl>
#> 1     0    18     16.6
#> 2     1    14     24.6

query3 |> collect()
#> # A tibble: 2 × 3
#>      vs     n mean_mpg
#>   <dbl> <int>    <dbl>
#> 1     1    10     26.7
#> 2     0     1     26

Note that the same error happens when I use digest.

When I print the queries, they are printed as if they were identical... (I reported this bug here to arrow)

query1
#> FileSystemDataset (query)
#> vs: double
#> n: int32
#> mean_mpg: double
#> 
#> See $.data for the source Arrow object

query2
#> FileSystemDataset (query)
#> vs: double
#> n: int32
#> mean_mpg: double
#> 
#> See $.data for the source Arrow object

query3
#> FileSystemDataset (query)
#> vs: double
#> n: int32
#> mean_mpg: double
#> 
#> See $.data for the source Arrow object

but when I query the $.data argument of the query, I see that they are in fact different

query1$.data
#> FileSystemDataset (query)
#> mpg: double
#> vs: double
#> 
#> * Aggregations:
#> n: sum(1)
#> mean_mpg: mean(mpg)
#> * Filter: (mpg > 25)            #<=========
#> * Grouped by vs
#> See $.data for the source Arrow object

query2$.data
#> FileSystemDataset (query)
#> mpg: double
#> vs: double
#> 
#> * Aggregations:
#> n: sum(1)
#> mean_mpg: mean(mpg)
#> * Filter: (mpg > 0)             #<=========
#> * Grouped by vs
#> See $.data for the source Arrow object

query3$.data
#> FileSystemDataset (query)
#> mpg: double
#> vs: double
#> 
#> * Aggregations:
#> n: sum(1)
#> mean_mpg: mean(mpg)
#> * Filter: (cyl == 4)            #<=========
#> * Grouped by vs
#> See $.data for the source Arrow object

but again rlang::hash() cannot find a difference:

rlang::hash(query1$.data)
#> [1] "b7f743cd635f7dc06356b827a6974df8"
rlang::hash(query2$.data)
#> [1] "b7f743cd635f7dc06356b827a6974df8"
rlang::hash(query3$.data)
#> [1] "b7f743cd635f7dc06356b827a6974df8"

If it helps, the query objects are R6 objects with class arrow_dplyr_query (see also its source code in apache/arrow)

Memoise use case

For completeness sake and to put the problem into perspective, I use the following to cache the results, which should return different values (see above) but doesn't!

library(arrow)
library(memoise)
library(dplyr)

ds_file <- file.path(tempdir(), "mtcars")

write_dataset(mtcars, ds_file)
ds <- open_dataset(ds_file)

collect_cached <- memoise::memoise(dplyr::collect,
                                   cache = cachem::cache_mem(logfile = stdout()))

# Query 1 with mpg > 25 ----
ds |> 
  filter(mpg > 25) |> 
  group_by(vs) |> 
  summarise(n = n(), mean_mpg = mean(mpg)) |> 
  collect_cached()
#> [2022-11-25 09:16:28.586] cache_mem get: key "2edd901226498414056dcc54eaa49415"
#> [2022-11-25 09:16:28.586] cache_mem get: key "2edd901226498414056dcc54eaa49415" is missing
#> [2022-11-25 09:16:28.705] cache_mem set: key "2edd901226498414056dcc54eaa49415"
#> [2022-11-25 09:16:28.706] cache_mem prune
#> # A tibble: 2 × 3
#>      vs     n mean_mpg
#>   <dbl> <int>    <dbl>
#> 1     1     5     30.9
#> 2     0     1     26

# Query 2 with mpg > 0 ----
# this is wrongly matched to the first query and returns wrong results... 
ds |> 
  filter(mpg > 0) |> 
  group_by(vs) |> 
  summarise(n = n(), mean_mpg = mean(mpg)) |> 
  collect_cached()
#> [2022-11-25 09:16:28.820] cache_mem get: key "2edd901226498414056dcc54eaa49415"
#> [2022-11-25 09:16:28.820] cache_mem get: key "2edd901226498414056dcc54eaa49415" found    #< ERROR HERE! as the hash is identical
#> # A tibble: 2 × 3
#>      vs     n mean_mpg
#>   <dbl> <int>    <dbl>
#> 1     1     5     30.9
#> 2     0     1     26

Note that we get the same result although the queries are different (yet their hashes are identical, hence this question).

bretauv
  • 7,756
  • 2
  • 20
  • 57
David
  • 9,216
  • 4
  • 45
  • 78

2 Answers2

2

This is very much a hack ... but perhaps it'll be enough? I was able to find something unique-enough about the intermediate "query" that included its filter components by capturing the output from show_query, and using that as the hash= argument to memoise:

hashfun <- function(x) {
  x$x <- capture.output(show_query(x$x))
  rlang::hash(x)
}
collect_cached <- memoise::memoise(
  dplyr::collect,
  cache = cachem::cache_mem(logfile = stdout()),
  hash = hashfun)

ds |> 
  filter(mpg > 25) |> 
  group_by(vs) |> 
  summarise(n = n(), mean_mpg = mean(mpg)) |> 
  collect_cached()
# [2022-11-25 08:14:56.596] cache_mem get: key "e6184e282e05875139e8afd2a071f329"
# [2022-11-25 08:14:56.596] cache_mem get: key "e6184e282e05875139e8afd2a071f329" is missing
# [2022-11-25 08:14:56.616] cache_mem set: key "e6184e282e05875139e8afd2a071f329"
# [2022-11-25 08:14:56.616] cache_mem prune
# # A tibble: 2 x 3
#      vs     n mean_mpg
#   <dbl> <int>    <dbl>
# 1     1     5     30.9
# 2     0     1     26  

#### different filter, should be a "miss"
ds |> 
  filter(mpg > 0) |> 
  group_by(vs) |> 
  summarise(n = n(), mean_mpg = mean(mpg)) |> 
  collect_cached()
# [2022-11-25 08:15:06.745] cache_mem get: key "88312b31b29050ff029900f4dfc58a9f"
# [2022-11-25 08:15:06.745] cache_mem get: key "88312b31b29050ff029900f4dfc58a9f" is missing
# [2022-11-25 08:15:06.767] cache_mem set: key "88312b31b29050ff029900f4dfc58a9f"
# [2022-11-25 08:15:06.767] cache_mem prune
# # A tibble: 2 x 3
#      vs     n mean_mpg
#   <dbl> <int>    <dbl>
# 1     0    18     16.6
# 2     1    14     24.6

#### repeat of filter `mpg > 0`, should be a "hit"
ds |> 
  filter(mpg > 0) |> 
  group_by(vs) |> 
  summarise(n = n(), mean_mpg = mean(mpg)) |> 
  collect_cached()
# . + > 
# [2022-11-25 08:15:24.825] cache_mem get: key "88312b31b29050ff029900f4dfc58a9f"
# [2022-11-25 08:15:24.825] cache_mem get: key "88312b31b29050ff029900f4dfc58a9f" found
# # A tibble: 2 x 3
#      vs     n mean_mpg
#   <dbl> <int>    <dbl>
# 1     0    18     16.6
# 2     1    14     24.6

The object passed to hashfun is a list, where the first argument appears to be a checksum or salt of a sort (we'll ignore it), and all remaining arguments (named or otherwise) are determined by the formals of the cached function. In our case, since we're caching collect, it accepts x= (which we see) and ...= (which we don't):

debugonce(hashfun)
ds |> 
  filter(mpg > 0) |> 
  group_by(vs) |> 
  summarise(n = n(), mean_mpg = mean(mpg)) |> 
  collect_cached()
# debugging in: encl$`_hash`(c(encl$`_f_hash`, args, lapply(encl$`_additional`, 
#     function(x) eval(x[[2L]], environment(x)))))
# debug at #1: {
#     x$x <- capture.output(show_query(x$x))
#     rlang::hash(x)
# }

x
# [[1]]
# [1] "1e4b92a7ebe8b4bcb1afbd44c9a72a72"
# 
# $x
# FileSystemDataset (query)
# vs: double
# n: int32
# mean_mpg: double
# 
# See $.data for the source Arrow object

show_query(x$x)
# ExecPlan with 6 nodes:
# 5:SinkNode{}
#   4:ProjectNode{projection=[vs, n, mean_mpg]}
#     3:GroupByNode{keys=["vs"], aggregates=[
#       hash_sum(n, {skip_nulls=true, min_count=1}),
#       hash_mean(mean_mpg, {skip_nulls=false, min_count=0}),
#     ]}
#       2:ProjectNode{projection=["n": 1, "mean_mpg": mpg, vs]}
#         1:FilterNode{filter=(mpg > 0)}
#           0:SourceNode{}

Just replacing x$x with the return from show_query(x$x) didn't seem to work since there appear to be things only in the printed form that are not readily available to rlang::hash, so I chose capture.output.

r2evans
  • 141,215
  • 6
  • 77
  • 149
  • I found that this will not depend on the file or the filename. I included either a `tools::md5sum()` or just a call to `gsub(getwd(), "", x$x$files)` to get the filenames (without path to project...) included in the hash calculation. Otherwise different files with the same structure will be a cache missmatch. – David Nov 29 '22 at 07:50
  • 1
    Unfortunately, in my testing, `ds$files` exists, but it goes away the moment we "do something" (such as `filter(..)`). We can sometimes use `.. %>% mutate(f = arrow::add_filename()) %>% distinct(f)` to get at the filenames, but that doesn't work after an aggregating call (i.e., `summarize`). I see no way to get to the filename in a way that is robust to the type of filter/summarize done previously in the pipeline. Sorry @David, I don't know how best to add it atm. I'll continue to think about it, though ... but you might need to look into other caching methods. – r2evans Nov 29 '22 at 12:19
  • Thanks for pointing that error out! That is indeed the case for me as well! – David Nov 30 '22 at 08:55
  • 1
    What worked for me (tbh, a very stupid hack) is to add a unique query to each dataset. Eg. `ds1 |> filter(1 == 1)`, `ds2 |> filter(2 == 2)` and so on, to distinguish ds1 and ds2 – David Nov 30 '22 at 10:29
1

Edit: See comments this does not work!

I have modified your MWE below to call $.data or explain on the query which both do not evaluate the query (pretty sure for .data, 100% for explain) but seem to change the R6 enough to create unique hashes. As you have opened an issue for the underlying problem over at GitHub, this should be a pretty simple workaround with no performance hit.

library(arrow)
library(dplyr)

ds_file <- file.path(tempdir(), "mtcars")

write_dataset(mtcars, ds_file)
ds <- open_dataset(ds_file)

# 1) Create three different queries =======

# Query 1 with mpg > 25 ----
query1 <- ds |> 
  filter(mpg > 25) |> 
  group_by(vs) |> 
  summarise(n = n(), mean_mpg = mean(mpg))

# Query 2 with mpg > 0 ----
query2 <- ds |> 
  filter(mpg > 0) |> 
  group_by(vs) |> 
  summarise(n = n(), mean_mpg = mean(mpg))

# Query 3 with filter on cyl ----
query3 <- ds |> 
  filter(cyl == 4) |> 
  group_by(vs) |> 
  summarise(n = n(), mean_mpg = mean(mpg))

query1$.data
#> FileSystemDataset (query)
#> mpg: double
#> vs: double
#> 
#> * Aggregations:
#> n: sum(1)
#> mean_mpg: mean(mpg)
#> * Filter: (mpg > 25)
#> * Grouped by vs
#> See $.data for the source Arrow object
explain(query2)
#> ExecPlan with 6 nodes:
#> 5:SinkNode{}
#>   4:ProjectNode{projection=[vs, n, mean_mpg]}
#>     3:GroupByNode{keys=["vs"], aggregates=[
#>      hash_sum(n, {skip_nulls=true, min_count=1}),
#>      hash_mean(mean_mpg, {skip_nulls=false, min_count=0}),
#>     ]}
#>       2:ProjectNode{projection=["n": 1, "mean_mpg": mpg, vs]}
#>         1:FilterNode{filter=(mpg > 0)}
#>           0:SourceNode{}
# 2) Lets compare the hashes: the main issue ======
rlang::hash(query1)
#> [1] "8bbf29208ccbc95fc1bc46f2f2dfe10d"
rlang::hash(query2)
#> [1] "ae5c80b8ed0cc884df40926f3a985b27"
rlang::hash(query3)
#> [1] "3826d824e4c9be046ac5f09dcb60959d"
assignUser
  • 211
  • 4
  • `explain(.)` doesn't work; if you define all three `query[123]`, then do `explain(query1);explain(query2);explain(query3)`, you'll see that `rlang::hash(query1); rlang::hash(query2); rlang::hash(query3)` has changed but is still invariant. Similarly, if you do `query1$.data; query2$.data; query3$.data`, the hash changes but is still invariant. It's a good thought, though! – r2evans Nov 25 '22 at 14:13
  • 1
    (The reason you see three different hashes at the end of your code is that you call `$.data` on one, `explain(.)` on another, and leave the third untouched.) – r2evans Nov 25 '22 at 14:13
  • 1
    Interesting, thanks for testing. The issue is with `serialize` which `rlang::hash` uses internally. I am guessing that the issue is that the R6 object is just a collection of pointers to arrow-cpp which is then updated by calling `explain` etc.. (unserializing a query does not work with `external pointer to null`) So this will likley be fixed by correcting the print method for queries... – assignUser Nov 25 '22 at 14:52