0

This question is essentially a duplicated of this question, except I am working in R. The pyspark solution looks solid, but I haven't been able to figure out how to apply collect_list over a window function in the same way in sparklyr.

I have a Spark DataFrame with the following structure:

------------------------------
userid |     date     | city
------------------------------
   1   |  2018-08-02  |   A
   1   |  2018-08-03  |   B
   1   |  2018-08-04  |   C
   2   |  2018-08-17  |   G
   2   |  2018-08-20  |   E
   2   |  2018-08-23  |   F

I am trying to group the DataFrame by userid, order each group by date, and collapse the city column into a concatenation of its values. Desired output:

------------------
userid | cities
------------------
   1   |  A, B, C
   2   |  G, E, F

The trouble is that each method I've tried to do this with has resulted in some users (appx. 3% on a test of 5000 users) not having their "cities" column in the correct order.


Attempt 1: using dplyr and collect_list.

my_sdf %>%
  dplyr::group_by(userid) %>%
  dplyr::arrange(date) %>%
  dplyr::summarise(cities = paste(collect_list(city), sep = ", ")))

Attempt 2: using replyr::gapply since the operation fits the description of "Grouped-Order-Apply".

get_cities <- . %>%
   summarise(cities = paste(collect_list(city), sep = ", "))

my_sdf %>%
  replyr::gapply(gcolumn = "userid",
                 f = get_cities,
                 ocolumn = "date",
                 partitionMethod = "group_by")

Attempt 3: write as a SQL window function.

my_sdf %>% 
  spark_session(sc) %>%
  sparklyr::invoke("sql", 
                   "SELECT userid, CONCAT_WS(', ', collect_list(city)) AS cities
                   OVER (PARTITION BY userid
                         ORDER BY date)
                   FROM my_sdf") %>%
  sparklyr::sdf_register() %>%
  sparklyr::sdf_copy_to(sc, ., "my_sdf", overwrite = T)

^ throws the following error:

Error: org.apache.spark.sql.catalyst.parser.ParseException: 
mismatched input 'OVER' expecting <EOF>(line 2, pos 19)

== SQL ==
SELECT userid, conversion_location, CONCAT_WS(' > ', collect_list(channel)) AS path
                   OVER (PARTITION BY userid, conversion_location
-------------------^^^
                         ORDER BY occurred_at)
                   FROM paths_model
zack
  • 370
  • 1
  • 2
  • 13

2 Answers2

0

Solved! I misunderstood how collect_list() and Spark SQL could work together. I didn't realize a list could be returned, I thought that the concatenation had to take place within the query. The following produces the desired result:

spark_output <- spark_session(sc) %>%
  sparklyr::invoke("sql", 
                   "SELECT userid, collect_list(city)
                   OVER (PARTITION BY userid
                         ORDER BY date
                         ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)
                   AS cities
                   FROM my_sdf") %>%
  sdf_register() %>%
  group_by(userid) %>%
  filter(row_number(userid) == 1) %>%
  ungroup() %>%
  mutate(cities = paste(cities, sep = " > ")) %>%
  sdf_register()
zack
  • 370
  • 1
  • 2
  • 13
  • link to sparklyr issue I opened on the topic: https://github.com/rstudio/sparklyr/issues/2009 – zack May 15 '19 at 20:12
-1

Ok: so I admit that the following solution is not at all efficient (it uses a for loop and is actually a lot of code for what seems like it could be a simple task), but I believe this should work:

#install.packages("tidyverse") # if needed
library(tidyverse)

df <- tribble(
  ~userid, ~date, ~city,
  1   ,  "2018-08-02"  ,   "A",
  1   ,  "2018-08-03"  ,   "B",
  1   ,  "2018-08-04"  ,   "C",
  2   ,  "2018-08-17"  ,   "G",
  2   ,  "2018-08-20"  ,   "E",
  2   ,  "2018-08-23"  ,   "F"
)

cityPerId <- df %>% 
  spread(key = date, value = city) 

toMutate <- NA
for (i in 1:nrow(cityPerId)) {
  cities <- cityPerId[i,][2:ncol(cityPerId)] %>% t() %>%
    as.vector() %>% 
    na.omit()
  collapsedCities <- paste(cities, collapse = ",")
  toMutate <- c(toMutate, collapsedCities)
}
toMutate <- toMutate[2:length(toMutate)]

final <- cityPerId %>% 
  mutate(cities = toMutate) %>% 
  select(userid, cities)

Felix T.
  • 520
  • 3
  • 11
  • Thanks for the idea -- this works fine for data frames that are stored locally, but `spread` and bracket subsetting doesn't work on Spark DataFrames. – zack May 10 '19 at 20:28
  • Got it; have you tried these analogous functions? `reshape::cast()`, `spreadsheets::pviot()`, or `databases::unfold()` for spreading the data? – Felix T. May 10 '19 at 20:31
  • Doesn't look like `reshape` is compatible with Spark DataFrames, and I couldn't locate the other two functions/packages. – zack May 10 '19 at 21:26