0

The format of input data as follows:

+--------------------+-------------+--------------------+
|           date     |       user  |           product  |
+--------------------+-------------+--------------------+
|        2016-10-01  |        Tom  |           computer |
+--------------------+-------------+--------------------+
|        2016-10-01  |        Tom  |           iphone   |
+--------------------+-------------+--------------------+
|        2016-10-01  |       Jhon  |             book   |
+--------------------+-------------+--------------------+
|        2016-10-02  |        Tom  |             pen    |
+--------------------+-------------+--------------------+
|        2016-10-02  |       Jhon  |             milk   |
+--------------------+-------------+--------------------+

And the format of output as follows:

+-----------+-----------------------+
|     user  |        products       |
+-----------------------------------+
|     Tom   |   computer,iphone,pen |
+-----------------------------------+
|     Jhon  |          book,milk    |  
+-----------------------------------+

The output shows all products every user bought order by date.

I want to process these data using Spark, who Can you help me, please? Thank you.

StrongYoung
  • 762
  • 1
  • 7
  • 17
  • 3
    Possible duplicate of [Spark Dataframe groupby with agg performing list appending](http://stackoverflow.com/questions/34202997/spark-dataframe-groupby-with-agg-performing-list-appending) – mtoto Oct 13 '16 at 13:50
  • Possible duplicate of [Concatenate columns in apache spark dataframe](http://stackoverflow.com/questions/31450846/concatenate-columns-in-apache-spark-dataframe) – Alberto Bonsanto Oct 13 '16 at 13:58

3 Answers3

2

Better to use map-reduceBykey() combination rather than groupBy.. Also assuming the data doesn't have the

#Read the data using val ordersRDD = sc.textFile("/file/path")
val ordersRDD = sc.parallelize( List(("2016-10-01","Tom","computer"), 
    ("2016-10-01","Tom","iphone"), 
    ("2016-10-01","Jhon","book"), 
    ("2016-10-02","Tom","pen"), 
    ("2016-10-02","Jhon","milk")))

#group by (date, user), sort by key & reduce by user & concatenate products
val dtusrGrpRDD = ordersRDD.map(rec => ((rec._2, rec._1), rec._3))
   .sortByKey().map(x=>(x._1._1, x._2))
   .reduceByKey((acc, v) => acc+","+v)

#if needed, make it to DF
scala> dtusrGrpRDD.toDF("user", "product").show()
+----+-------------------+
|user|            product|
+----+-------------------+
| Tom|computer,iphone,pen|
|Jhon|          book,milk|
+----+-------------------+
KiranM
  • 1,306
  • 1
  • 11
  • 20
  • which version is your Spark, please? Here has a error: cannot resolve symbol sortByKey. – StrongYoung Oct 14 '16 at 03:05
  • Using DataFrames instead of RDDs is generally preferred since you can let the execution engine take care of deciding the optimal execution plan, and it will automatically apply various optimizations (predicate push-down, code generation, etc). https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html – Ryan Widmaier Oct 17 '16 at 12:40
1

If you are using a HiveContext (which you should be):

Example using python:

from pyspark.sql.functions import collect_set

df = ... load your df ...
new_df = df.groupBy("user").agg(collect_set("product").alias("products"))

If you don't want the resulting list in products deduped, you can use collect_list instead.

Ryan Widmaier
  • 7,948
  • 2
  • 30
  • 32
0

For dataframes it is two-liner:

import org.apache.spark.sql.functions.collect_list
//collect_set nistead of collect_list if you don't want duplicates
val output =  join.groupBy("user").agg(collect_list($"product"))

GroupBy will give you a grouped user set post which you can iterate and collect_list or collect_set on the grouped dataset.

Vinayak Mishra
  • 341
  • 4
  • 11