3

the following example describes how you can't calculate the number of distinct values without aggregating the rows using dplyr with sparklyr.

is there a work around that doesn't break the chain of commands?

more generally, how can you use sql like window functions on sparklyr data frames.

## generating a data set 

set.seed(.328)
df <- data.frame(
  ids = floor(runif(10, 1, 10)),
  cats = sample(letters[1:3], 10, replace = TRUE),
  vals = rnorm(10)
)



## copying to Spark

df.spark <- copy_to(sc, df, "df_spark", overwrite = TRUE)

# Source:   table<df_spark> [?? x 3]
# Database: spark_connection
#   ids  cats       vals
# <dbl> <chr>      <dbl>
#  9     a      0.7635935
#  3     a     -0.7990092
#  4     a     -1.1476570
#  6     c     -0.2894616
#  9     b     -0.2992151
#  2     c     -0.4115108
#  9     b      0.2522234
#  9     c     -0.8919211
#  6     c      0.4356833
#  6     b     -1.2375384
# # ... with more rows

# using the regular dataframe 

df %>% mutate(n_ids = n_distinct(ids))

# ids cats       vals n_ids
# 9    a  0.7635935     5
# 3    a -0.7990092     5
# 4    a -1.1476570     5
# 6    c -0.2894616     5
# 9    b -0.2992151     5
# 2    c -0.4115108     5
# 9    b  0.2522234     5
# 9    c -0.8919211     5
# 6    c  0.4356833     5
# 6    b -1.2375384     5


# using the sparklyr data frame 

df.spark %>% mutate(n_ids = n_distinct(ids))

Error: Window function `distinct()` is not supported by this database
zero323
  • 322,348
  • 103
  • 959
  • 935
Mouad_Seridi
  • 2,666
  • 15
  • 27
  • can't you use `length(unique(ids))` – Seymour Mar 28 '18 at 15:57
  • unfortunately no, it's not a `data.frame` object, it's a `tbl_spark` object. – Mouad_Seridi Mar 28 '18 at 16:03
  • So basically you want to filter duplicated records and for each of them count how many duplicated they have, right? – Seymour Mar 28 '18 at 16:05
  • no, i want to count the number of unique id's to use it as a denominator for a subsequent calculation. – Mouad_Seridi Mar 28 '18 at 16:08
  • 1
    I don't have Spark installed on this pc but I think about one hint and one possible approach. Hint: `df.spark %>% spark_apply(function(e) nrow(e), names = "n")` . Possible approach: `df.spark %>% spark_apply(nrow, group_by = "ids")` – Seymour Mar 28 '18 at 16:08
  • Concerning the question, to use SQL-like queries on sparklyr data.frame you might check http://spark.rstudio.com in the section **Using SQL** – Seymour Mar 28 '18 at 16:11
  • my goal is to have the value repeated for all columns, then when I want to use is as a denominator I can do `max(n_ids)` , a trick to carry over an aggregation while keeping the rows intact. – Mouad_Seridi Mar 28 '18 at 16:15
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/167751/discussion-between-seymour-and-mouad-s). – Seymour Mar 28 '18 at 16:16
  • Can you give the equivalent SQL query that you want to achieve? – cakraww Sep 21 '19 at 03:45
  • @cakraww `select count(distinct id) over()` – Mouad_Seridi Sep 21 '19 at 10:44

2 Answers2

6

The best approach here is to compute counts separately, either with countdistinct:

n_ids <- df.spark %>% 
   select(ids) %>% distinct() %>% count() %>% collect() %>%
   unlist %>% as.vector

df.spark %>% mutate(n_ids = n_ids)

or approx_count_distinct:

n_ids_approx <- df.spark %>% 
   select(ids) %>% summarise(approx_count_distinct(ids)) %>% collect() %>%
   unlist %>% as.vector

df.spark %>% mutate(n_ids = n_ids_approx)

It is a bit verbose, but window function approach used by dplyr is a dead end anyway, if you want to use global unbounded frame.

If you want exact results you can also:

df.spark %>% 
    spark_dataframe() %>% 
    invoke("selectExpr", list("COUNT(DISTINCT ids) as cnt_unique_ids")) %>% 
    sdf_register()
zero323
  • 322,348
  • 103
  • 959
  • 935
  • 2
    has anything changed on this front? it seems quite odd that sparklyr does not implement such an elementary sql function. – Mouad_Seridi Oct 02 '18 at 13:21
  • 1
    @Mouad_S `sparklyr` depends on a fairly naive SQL translation layer. Even if it "worked" the actual performance cost wouldn't be acceptable (mutate with aggregate is translated - `dbplyr::translate_sql(n_distinct(ids))` -> ` COUNT(DISTINCT "ids") OVER ()`. – zero323 Oct 02 '18 at 14:45
0

I want to link in this thread which answers this for sparklyr.

Using approx_count_distinct I think is the best solution. In my experience, dbplyr doesn't translate this function when using a window so it is better to write the SQL yourself.

mtcars_spk <- copy_to(sc, mtcars,"mtcars_spk",overwrite = TRUE)
mtcars_spk2 <- mtcars_spk %>%
                dplyr::mutate(test = paste0(gear, " ",carb)) %>%
                dplyr::mutate(discnt = sql("approx_count_distinct(test) OVER (PARTITION BY cyl)"))

This thread approaches the problem more generally and discusses CountDistinct v.s. approxCountDistinct

edog429
  • 216
  • 2
  • 7