0

There are a number of questions asking precisely the same thing but none within the context of a sparklyr environment. How does one group by a column and then concatenate the values of some other column as a list?

For example the following results in the desired output in a local R environment.

mtcars %>%
  distinct(gear, cyl) %>%
  group_by(gear) %>%
  summarize(test_list = paste0(cyl, collapse = ";")) %>% 
  select(gear, test_list) %>%
  as.data.frame() %>%
  print()


   gear test_list
1    3     6;8;4
2    4       6;4
3    5     4;8;6

But registering that same table to spark and using the same code errors (sql parsing error, probably it attempts to apply spark's cocollapse function instead of R's C based collapse function) on the mutate (see code below). I know pyspark and spark SQL have collect_set() function that achieves the desired effect, is there something analogous for sparklyr?

sdf_copy_to(sc, x = mtcars, name = "mtcars_test")

tbl(sc, "mtcars_test") %>%
  distinct(gear, cyl) %>%
  group_by(gear) %>%
  summarize(test_list = paste0(cyl, collapse = ";"))

Error:

Error : org.apache.spark.sql.catalyst.parser.ParseException: 

In pyspark, the following approach is similar (except concatenated column is an array that can be collapsed).

from pyspark.sql.functions import collect_set

df2 = spark.table("mtcars_test")
df2.groupby("gear").agg(collect_set('cyl')).createOrReplaceTempView("mtcars_test_cont")

display(spark.table("mtcars_test_cont"))

gear collect_set(cyl)
 3   [8, 4, 6]
 4   [4, 6]
 5   [8, 4, 6]
Vivek Atal
  • 468
  • 5
  • 11
Cyrus Mohammadian
  • 4,982
  • 6
  • 33
  • 62

1 Answers1

1

Instead of using R functions, you could have used Spark SQL syntax directly by wrapping it inside sql function from dbplyr. Below is an example script to get desired output:

sdf_copy_to(sc, x = mtcars, name = "mtcars_test")

tbl(sc, "mtcars_test") %>%
  group_by(gear) %>%
  summarize(test_list = sql("array_join(collect_set(cast(cyl as int)), ';')"))

#>   gear test_list
#>  <dbl> <chr>    
#>     4  6;4      
#>     3  6;4;8    
#>     5  6;4;8 

I just changed the last line of your code where you used paste0 function.

This is one reason why I prefer SparkR more than sparklyr, as almost all the syntax of PySpark works in the same manner.

SparkR::agg(
  SparkR::group_by(SparkR::createDataFrame(mtcars), SparkR::column("gear")),
  test_list = SparkR::array_join(
    SparkR::collect_set(SparkR::cast(SparkR::column("cyl"), "integer")),
    ";"
  )
) %>% 
  SparkR::collect()

#>  gear test_list
#>    4       6;4
#>    3     6;4;8
#>    5     6;4;8
Vivek Atal
  • 468
  • 5
  • 11