3

I am trying to merge multiple rows into one column as vaild json format in spark dataframe (spark 1.6.1). and then I want it to be stored in mysql table.

my origin spark dataframe like below:

|user_id   |product_id|price       | 
|A         |p1        |3000        |
|A         |p2        |1500        |
|B         |P1        |3000        |
|B         |P3        |2000        |

I want to convert above table like this:

|user_id   |contents_json 
|A         |{(product_id:p1, price:3000), (product_id:p2, price:1500)} 
|B         |{{product_id:p1, price:3000), (product_id:p3, price:2000)} 

and then put above table into mysql table.

it is exactly opposite way of explode but I can't find a right way.

JH.Lee
  • 35
  • 1
  • 6

1 Answers1

9

I assume you are looking for below shown JSON output.

from pyspark.sql.functions import col, collect_list, struct

df = sc.parallelize([('A','P1',3000), ('A','P2',1500),
                     ('B','P1',3000), ('B','P3',2000)]).toDF(["user_id", "product_id","price"])

> Spark2.0

df1 = df.\
    groupBy("user_id").agg(collect_list(struct(col("product_id"),col("price"))).alias("contents_json"))
df1.show()

Spark1.6

zipCols = psf.udf(
  lambda x, y: list(zip(x, y)),
  ArrayType(StructType([
      # Adjust types to reflect data types
      StructField("product_id", StringType()),
      StructField("price", IntegerType())
  ]))
)

df1 = df.\
    groupBy("user_id").agg(
        zipCols(
            collect_list(col("product_id")), 
            collect_list(col("price"))
        ).alias("contents_json")
    )

for row in df1.toJSON().collect():
    print row

Output is:

{"user_id":"B","contents_json":[{"product_id":"P1","price":3000},{"product_id":"P3","price":2000}]}
{"user_id":"A","contents_json":[{"product_id":"P1","price":3000},{"product_id":"P2","price":1500}]}
Zoe
  • 27,060
  • 21
  • 118
  • 148
Prem
  • 11,775
  • 1
  • 19
  • 33
  • Thanks for the answer. I tried your way but I faced error message. I think it should be spark version issue. My spark version is 1.6.1 Do you know how to avoid below error message? – JH.Lee Sep 09 '17 at 00:06
  • `AnalysisException: u'No handler for Hive udf class org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCollectList because: Only primitive type arguments are accepted but struct was passed as parameter 1..;'` – JH.Lee Sep 09 '17 at 00:09
  • I succeeded to make array for each column but I am still trying to combine these two columns as key value pair. `df1 = df \ .groupBy("user_id") \ .agg(collect_list(col("product_id")).alias("product_id"), collect_list(col("price")).alias("price"))` – JH.Lee Sep 09 '17 at 01:08
  • @JH.Lee I tested it on 2.2.0. I think you may build the key value pair manually using this [link](https://stackoverflow.com/questions/31450846/concatenate-columns-in-apache-spark-dataframe) instead of using `struct`. e.g. `df.withColumn("contents_json_tmp", concat(lit('{"product_id":'), col("product_id"), lit(', "price":'), col("price"), lit('}')))` and then use collect_list to make list of `contents_json_tmp` in `contents_json` column. Hope it solves the issue. – Prem Sep 09 '17 at 06:20
  • Thanks @Prem. But it showed different `AnalysisExeption` error message. `AnalysisException: u"expression 'pythonUDF' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;"` – JH.Lee Sep 12 '17 at 00:48
  • @Marie - can you please help JH.Lee resolving above v1.6 error? – Prem Sep 12 '17 at 04:28
  • It seems that you've added extra computations ( a column `pythonUDF` that you refer to after the `groupBy` or within it), can you print what you did differently? – MaFF Sep 12 '17 at 05:50
  • 1
    [We don't allow fluff in posts](https://meta.stackoverflow.com/q/260776/6296561). Please refrain from further rollbacks – Zoe Nov 14 '21 at 10:39