This question is essentially a duplicated of this question, except I am working in R. The pyspark solution looks solid, but I haven't been able to figure out how to apply collect_list
over a window function in the same way in sparklyr.
I have a Spark DataFrame with the following structure:
------------------------------
userid | date | city
------------------------------
1 | 2018-08-02 | A
1 | 2018-08-03 | B
1 | 2018-08-04 | C
2 | 2018-08-17 | G
2 | 2018-08-20 | E
2 | 2018-08-23 | F
I am trying to group the DataFrame by userid
, order each group by date
, and collapse the city
column into a concatenation of its values. Desired output:
------------------
userid | cities
------------------
1 | A, B, C
2 | G, E, F
The trouble is that each method I've tried to do this with has resulted in some users (appx. 3% on a test of 5000 users) not having their "cities" column in the correct order.
Attempt 1: using dplyr
and collect_list
.
my_sdf %>%
dplyr::group_by(userid) %>%
dplyr::arrange(date) %>%
dplyr::summarise(cities = paste(collect_list(city), sep = ", ")))
Attempt 2: using replyr::gapply
since the operation fits the description of "Grouped-Order-Apply".
get_cities <- . %>%
summarise(cities = paste(collect_list(city), sep = ", "))
my_sdf %>%
replyr::gapply(gcolumn = "userid",
f = get_cities,
ocolumn = "date",
partitionMethod = "group_by")
Attempt 3: write as a SQL window function.
my_sdf %>%
spark_session(sc) %>%
sparklyr::invoke("sql",
"SELECT userid, CONCAT_WS(', ', collect_list(city)) AS cities
OVER (PARTITION BY userid
ORDER BY date)
FROM my_sdf") %>%
sparklyr::sdf_register() %>%
sparklyr::sdf_copy_to(sc, ., "my_sdf", overwrite = T)
^ throws the following error:
Error: org.apache.spark.sql.catalyst.parser.ParseException:
mismatched input 'OVER' expecting <EOF>(line 2, pos 19)
== SQL ==
SELECT userid, conversion_location, CONCAT_WS(' > ', collect_list(channel)) AS path
OVER (PARTITION BY userid, conversion_location
-------------------^^^
ORDER BY occurred_at)
FROM paths_model