You can use collect_list
or collect_set
:
set.seed(1)
df <- copy_to(
sc, tibble(group = rep(c("a", "b"), 3), value = runif(6)),
name = "df"
)
result <- df %>% group_by(group) %>% summarise(values = collect_list(value))
result
# Source: lazy query [?? x 2]
# Database: spark_connection
group values
<chr> <list>
1 b <list [3]>
2 a <list [3]>
which is translated to the following query:
result %>% show_query()
<SQL>
SELECT `group`, COLLECT_LIST(`value`) AS `values`
FROM `df`
GROUP BY `group`
with corresponding execution plan:
result %>% optimizedPlan()
<jobj[213]>
org.apache.spark.sql.catalyst.plans.logical.Aggregate
Aggregate [group#259], [group#259, collect_list(value#260, 0, 0) AS values#345]
+- InMemoryRelation [group#259, value#260], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `df`
+- Scan ExistingRDD[group#259,value#260]
and schema (with array<...>
column):
root
|-- group: string (nullable = true)
|-- values: array (nullable = true)
| |-- element: double (containsNull = true)
Please keep in mind that:
- Operation like this one is very expensive in a distributed system.
- Depending on the data distribution might not be feasible.
- Complex types are somewhat hard to handle in Spark in general, and
sparklyr
with it's tidy data focus, doesn't make things easier. To process the result efficiently you may require a Scala extension.