0

I got the following DataFrame:

>>> df.show(50)

+--------------------+-------------+----------------+----+
|        User Hash ID|         Word|sum(Total Count)|rank|
+--------------------+-------------+----------------+----+
|00095808cdc611fb5...|       errors|               5|   1|
|00095808cdc611fb5...|         text|               3|   2|
|00095808cdc611fb5...|  information|               3|   3|
|00095808cdc611fb5...|   department|               2|   4|
|00095808cdc611fb5...|        error|               2|   5|
|00095808cdc611fb5...|         data|               2|   6|
|00095808cdc611fb5...|          web|               2|   7|
|00095808cdc611fb5...|         list|               2|   8|
|00095808cdc611fb5...|  recognition|               2|   9|
|00095808cdc611fb5...|     pipeline|               2|  10|
|000ac87bf9c1623ee...|consciousness|              14|   1|
|000ac87bf9c1623ee...|         book|               3|   2|
|000ac87bf9c1623ee...|        place|               2|   3|
|000ac87bf9c1623ee...|      mystery|               2|   4|
|000ac87bf9c1623ee...|       mental|               2|   5|
|000ac87bf9c1623ee...|     flanagan|               2|   6|
|000ac87bf9c1623ee...|      account|               2|   7|
|000ac87bf9c1623ee...|        world|               2|   8|
|000ac87bf9c1623ee...|      problem|               2|   9|
|000ac87bf9c1623ee...|       theory|               2|  10|

This shows some for each user the 10 most frequent words he read. I would like to create a dictionary, which then can be saved to a file, with the following format:

User : <top 1 word>, <top 2 word> .... <top 10 word>

To achieve this, I thought it might be more efficient to cut down the df as much as possible, before converting it. Thus, I tried:

>>> df.groupBy("User Hash ID").agg(collect_list("Word")).show(20)
+--------------------+--------------------+
|        User Hash ID|  collect_list(Word)|
+--------------------+--------------------+
|00095808cdc611fb5...|[errors, text, in...|
|000ac87bf9c1623ee...|[consciousness, b...|
|0038ccf6e16121e7c...|[potentials, orga...|
|0042bfbafc6646f47...|[fuel, car, consu...|
|00a19396b7bb52e40...|[face, recognitio...|
|00cec95a2c007b650...|[force, energy, m...|
|00df9406cbab4575e...|[food, history, w...|
|00e6e2c361f477e1c...|[image, based, al...|
|01636d715de360576...|[functional, lang...|
|01a778c390e44a8c3...|[trna, genes, pro...|
|01ab9ade07743d66b...|[packaging, car, ...|
|01bdceea066ec01c6...|[anthropology, de...|
|020c643162f2d581b...|[laser, electron,...|
|0211604d339d0b3db...|[food, school, ve...|
|0211e8f09720c7f47...|[privacy, securit...|
|021435b2c4523dd31...|[life, rna, origi...|
|0239620aa740f1514...|[method, image, d...|
|023ad5d85a948edfc...|[web, user, servi...|
|02416836b01461574...|[parts, based, ad...|
|0290152add79ae1d8...|[data, score, de,...|
+--------------------+--------------------+

From here, it should be more straight forward to generate that dictionary However, I cannot be sure if by using this agg function I am guaranteed that the words are in the correct order! That is why I am hesitant and wanted to get some feedback on maybe better options

kklaw
  • 452
  • 2
  • 4
  • 14

2 Answers2

0

First of all, if you go from a dataframe to a dictionary, you may have to face some memory issue as you will bring all the content of the dataframe to your driver (dictionary is a python object, not a spark object).

You are not that far away from a working solution. I'd do it that way :

from pyspark.sql import functions as F

df.groupBy("User Hash ID").agg(
    F.collect_list(F.struct("Word", "sum(Total Count)", "rank")).alias("data")
)

This will create a data column where you have your 3 fields, aggregated by user id.

Then, to go from a dataframe to a dict object, you can use for example toJSON or Row object method asDict

marc_s
  • 732,580
  • 175
  • 1,330
  • 1,459
Steven
  • 14,048
  • 6
  • 38
  • 73
  • Actually, it is not a necessity to go from the dataframe to a dict. I just need a file that has the form: `, ` for each line – kklaw May 30 '22 at 17:33
0

Based on answers provided here - collect_list by preserving order based on another variable

you can write below query to make sure you have top 5 in correct order

import pyspark.sql.functions as F

grouped_df = dft.groupby("userid") \
               .agg(F.sort_array(F.collect_list(F.struct("rank", "word"))) \
.alias("collected_list")) \
.withColumn("sorted_list",F.slice(F.col("collected_list.word"),start=1,length=5)) \
.drop("collected_list")\
.show(truncate=False)
Anjaneya Tripathi
  • 1,191
  • 1
  • 3
  • 8