1

My input dataframe looks like this:

+----------+-------+-------+
| timestamp| weight|     id| 
+----------+-------+-------+
|01-01-2022|    123| abc123|
|02-02-2022|    456| def456|
|03-03-2022|    789| ghi789|
+----------+-------+-------+

The goal is to write this dataframe records into an .json file with the following format

{"summaries":[{"id":"abc123","timestamp":"01-01-2022","weight":123},{"id":"def456","timestamp":"02-02-2022","weight":456},{"id":"ghi789","timestamp":"03-03-2022","weight":789}],"status":200}

Therefore I want my dataframe to come out like this in order to write it to the json file:

+--------------------------------------------------------+-------+
|                                               summaries| status|
+--------------------------------------------------------+-------+
|[{"timestamp":"01-01-2022", "weight":123, "id":"abc123"},
{"timestamp":"01-01-2022", "weight":456, "id":"def456"},
{"timestamp":"01-01-2022", "weight":789, "id":"ghi789"}} |    200|
+--------------------------------------------------------+--------+

I've created a starting point of my dataframe:

data = [('01-01-2022', 123, 'abc123'), ('02-02-2022', 456, 'def456'), ('03-03-2022', 789, 'ghi789')]
columns = ["timestamp", "weight", "id"]

df = spark.createDataFrame(data, columns)

I have 2 strategies that I tried

1.

dfConvert = (df.withColumn("summaries", struct("timestamp", "weight", "id")))

However, from there I have difficulties on how to concatenate the records in one record, and adding the 'status' column.

2.

# make rows from the dataframe

rows = df.rdd.map(lambda row: row.asDict()).collect()#print(rows)

dfConvert = spark.createDataFrame([(rows, "200")],["summaries", "status"])

However, with this strategy I am writing to the memory, which I want to avoid, as later on in the process I will have large data sets and this code is less-performant than the withColumn method.

NOTE: The groupBy method won't work, because i will have duplicated records in each of the columns

Then the writing is going successfully

dfConvert.write.format('json').mode("overwrite").save("MyDocuments/write_path")
samkart
  • 6,007
  • 2
  • 14
  • 29
Meeldurb
  • 25
  • 6

2 Answers2

1

You can achieve this using a combination of create_map & map_concat to achieve this

The solution can be broken down into the following steps

  • Create a map structure using create_map by specificying F.lit(column_name) as the key and the corresponding column value as its column
  • Utilise the above created structure using map_concat to accept and combine them together

Data Preparation

data = [('01-01-2022', 123, 'abc123'), ('02-02-2022', 456, 'def456'), ('03-03-2022', 789, 'ghi789')]
columns = ["timestamp", "weight", "id"]

sparkDF = sql.createDataFrame(data, columns)

sparkDF.show()

+----------+------+------+
| timestamp|weight|    id|
+----------+------+------+
|01-01-2022|   123|abc123|
|02-02-2022|   456|def456|
|03-03-2022|   789|ghi789|
+----------+------+------+

Create Map

sparkDF.select(*[(F.create_map(F.lit(c),F.col(c))) for c in columns]).show(truncate=False)\
       .show()

+-------------------------+-------------------+--------------+
|map(timestamp, timestamp)|map(weight, weight)|map(id, id)   |
+-------------------------+-------------------+--------------+
|{timestamp -> 01-01-2022}|{weight -> 123}    |{id -> abc123}|
|{timestamp -> 02-02-2022}|{weight -> 456}    |{id -> def456}|
|{timestamp -> 03-03-2022}|{weight -> 789}    |{id -> ghi789}|
+-------------------------+-------------------+--------------+

Map Concat

sparkDF = sparkDF.select(F.map_concat([(F.create_map(F.lit(c),F.col(c))) for c in columns]).alias('summaries'))\


sparkDF.show(truncate=False)

+------------------------------------------------------+
|summaries                                             |
+------------------------------------------------------+
|{timestamp -> 01-01-2022, weight -> 123, id -> abc123}|
|{timestamp -> 02-02-2022, weight -> 456, id -> def456}|
|{timestamp -> 03-03-2022, weight -> 789, id -> ghi789}|
+------------------------------------------------------+

Another approach you can use to opt and take references from pyspark: Create MapType Column from existing columns for combining the above 2 steps

Final Result Set

sparkDF = sparkDF.withColumn('status',F.lit(200))\
       .groupBy('status')\
       .agg(F.collect_list(F.col('summaries')).alias('summaries'))

sparkDF.show(truncate=False)


+------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|status|summaries                                                                                                                                                               |
+------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|200   |[{timestamp -> 01-01-2022, weight -> 123, id -> abc123}, {timestamp -> 02-02-2022, weight -> 456, id -> def456}, {timestamp -> 03-03-2022, weight -> 789, id -> ghi789}]|
+------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
Vaebhav
  • 4,672
  • 1
  • 13
  • 33
0

Working solution (for me):

I've created a starting point of my dataframe:

data = [('01-01-2022', 123, 'abc123'), ('02-02-2022', 456, 'def456'), ('03-03-2022', 789, 'ghi789')]
columns = ["timestamp", "weight", "id"]

df = spark.createDataFrame(data, columns)

Then I cast the columns to an array

dfConvert = (df.withColumn("summaries", struct("timestamp", "weight", "id")))

And use .collect_list to concatenate them in one record

dfCollect = dfConvert.withColumn('status',lit(200))\
       .groupBy('status')\
.agg(collect_list(col('summaries')).alias('summaries'))
Meeldurb
  • 25
  • 6