3

I am new to pyspark

I have a dataset which looks like (just a snapshot of few columns)

data description

I want to group my data by key. My key is

CONCAT(a.div_nbr,a.cust_nbr)

My ultimate goal is to convert the data into JSON, formated like this

k1[{v1,v2,....},{v1,v2,....}], k2[{v1,v2,....},{v1,v2,....}],....

e.g

248138339 [{ PRECIMA_ID:SCP 00248 0000138339, PROD_NBR:5553505, PROD_DESC:Shot and a Beer Battered Onion Rings (5553505 and 9285840) , PROD_BRND:Molly's Kitchen,PACK_SIZE:4/2.5 LB, QTY_UOM:CA } , 
        { PRECIMA_ID:SCP 00248 0000138339 , PROD_NBR:6659079 , PROD_DESC:Beef Chuck Short Rib Slices, PROD_BRND:Stockyards , PACK_SIZE:12 LBA , QTY_UOM:CA} ,{...,...,} ],

1384611034793[{},{},{}],....

I have created a dataframe (I am joining two tables basically to get some more fields)

joinstmt = sqlContext.sql(
          "SELECT a.precima_id , CONCAT(a.div_nbr,a.cust_nbr) as
                  key,a.prod_nbr , a.prod_desc,a.prod_brnd ,      a.pack_size , a.qty_uom , a.sales_opp , a.prc_guidance , a.pim_mrch_ctgry_desc , a.pim_mrch_ctgry_id , b.start_date,b.end_date 

FROM scoop_dtl a join scoop_hdr b on (a.precima_id =b.precima_id)")

Now, in order to get the above result I need to group by the result based on key, I did the following

groupbydf = joinstmt.groupBy("key")

This resulted intp a grouped data and after reading I got to know that I cannot use it directly and I need to convert it back into dataframes to store it.

I am new to it, need some help inorder to convert it back into dataframes or I would appreciate if there are any other ways as well.

ZygD
  • 22,092
  • 39
  • 79
  • 102
jeetu
  • 31
  • 1
  • 3

2 Answers2

5

If your joined dataframe looks like this:

gender  age
M   5
F   50
M   10
M   10
F   10

You can then use below code to get desired output

joinedDF.groupBy("gender") \ 
    .agg(collect_list("age").alias("ages")) \
    .write.json("jsonOutput.txt")

Output would look like below:

{"gender":"F","ages":[50,10]}
{"gender":"M","ages":[5,10,10]}

In case you have multiple columns like name, salary. You can add columns like below:

df.groupBy("gender")
    .agg(collect_list("age").alias("ages"),collect_list("name").alias("names"))

Your output would look like:

{"gender":"F","ages":[50,10],"names":["ankit","abhay"]}
{"gender":"M","ages":[5,10,10],"names":["snchit","mohit","rohit"]}
Sanchit Grover
  • 998
  • 1
  • 6
  • 9
  • thank you -- per the op's question, how may we extend your solution to data with more fields? E.g. if joinedDF contains [{'gender': 'M', 'name': 'kelly', 'age': 20}, {'gender': M, 'name': 'bob', 'age': 41}], then on grouping by 'gender' we achieve: {'gender': 'M', 'names':['kelly', 'bob'], 'ages': [20, 41]} – Quetzalcoatl Jan 12 '18 at 20:24
  • 1
    Updated my answer. hope that helps. – Sanchit Grover Jan 16 '18 at 05:00
  • 2
    But are the collected list items ordered ? e.g. age 50 corresponds to ankit and age 10 corresponds to abhay in your examples ? – user238607 Aug 02 '18 at 13:47
2

You cannot use GroupedData directly. It has to be aggregated first. It could be partially covered by aggregation with built-in functions like collect_list but it is simply not possible to achieve desired output, with values used to represent keys, using DataFrameWriter.

In can try something like this instead:

from pyspark.sql import Row
import json

def make_json(kvs):
  k, vs = kvs
  return json.dumps({k[0]: list(vs)})

(df.select(struct(*keys), values)
    .rdd
    .mapValues(Row.asDict)
    .groupByKey()
    .map(make_json))

and saveAsTextFile.

zero323
  • 322,348
  • 103
  • 959
  • 935