2

I have a dataset that has Id, Value and Timestamp columns. Id and Value columns are strings. Sample:

Id Value Timestamp
Id1 100 1658919600
Id1 200 1658919602
Id1 300 1658919601
Id2 433 1658919677

I want to concatenate Values that belong to the same Id, and order them by Timestamp. E.g. for rows with Id1 the result would look like:

Id Values
Id1 100;300;200

Some pseudo code would be:

res = SELECT Id,
      STRING_AGG(Value,";") WITHING GROUP ORDER BY Timestamp AS Values
      FROM table
      GROUP BY Id

Can someone help me write this in Databricks? PySpark and SQL are both fine.

AdibP
  • 2,819
  • 1
  • 10
  • 24
Alcibiades
  • 335
  • 5
  • 16

3 Answers3

4

You can collect lists of struct ofTimestamp and Value (in that order) for each Id, sort them (sort_array will sort by the first value of struct, i.e Timestamp) and combine Value's values into string using concat_ws.

PySpark (Spark 3.1.2)

import pyspark.sql.functions as F

(df
 .groupBy("Id")
 .agg(F.expr("concat_ws(';', sort_array(collect_list(struct(Timestamp, Value))).Value) as Values"))
).show(truncate=False)

# +---+-----------+
# |Id |Values     |
# +---+-----------+
# |Id1|100;300;200|
# |Id2|433        |
# +---+-----------+

in SparkSQL

SELECT Id, concat_ws(';', sort_array(collect_list(struct(Timestamp, Value))).Value) as Values
FROM table
GROUP BY Id
AdibP
  • 2,819
  • 1
  • 10
  • 24
3

This is a beautiful question!! This is a perfect use case for Fugue which can port Python and Pandas code to PySpark. I think this is something that is hard to express in Spark but easy to express in native Python or Pandas.

Let's just concern ourselves with 1 ID first. For one ID, using pure native Python, it would look like below. Assume the Timestamps are already sorted when this is applied.

import pandas as pd

df = pd.DataFrame({"Id": ["Id1", "Id1", "Id1", "Id2","Id2","Id2"],
                   "Value": [100,200,300,433, 500,600],
                   "Timestamp": [1658919600, 1658919602, 1658919601, 1658919677, 1658919670, 1658919672]})

from typing import Iterable, List, Dict, Any

def logic(df: List[Dict[str,Any]]) -> Iterable[Dict[str,Any]]:
    _id = df[0]['Id']
    items = []
    for row in df:
        items.append(row['Value'])
    yield {"Id": _id, "Values": items}

Now we can call Fugue with one line of code to run this on Pandas. Fugue uses the type annotation from the logic function to handle conversions for you as it enters the function. We can run this for 1 ID (not sorted yet).

from fugue import transform
transform(df.loc[df["Id"] == "Id1"], logic, schema="Id:str,Values:[int]")

and that generates this:

    Id  Values
0   Id1 [100, 200, 300]

Now we are ready to bring it to Spark. All we need to do is add the engine and partitioning strategy to the transform call.

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sdf = transform(df, 
                logic, 
                schema="Id:str,Values:[int]", 
                partition={"by": "Id", "presort": "Timestamp asc"}, 
                engine=spark)
sdf.show()

Because we passed in the SparkSession, this code will run on Spark.sdf is a SparkDataFrame so we need .show() because it evaluates lazily. Schema is a requirement for Spark so we need it too on Fugue but it's significantly simplified. The partitioning strategy will run logic on each Id, and will sort the items by Timestamp for each partition.

For the FugueSQL version, you can do:

from fugue_sql import fsql

fsql(
"""
SELECT * 
FROM df
TRANSFORM PREPARTITION BY Id PRESORT Timestamp ASC USING logic SCHEMA Id:str,Values:[int]
PRINT
"""
).run(spark)
Kevin Kho
  • 679
  • 4
  • 14
  • This will run on Databricks no problem btw because it uses the SparkSession available. Just need to install Fugue on the cluster. – Kevin Kho Aug 04 '22 at 03:11
0

Easiest Solution :

df1=df.sort(asc('Timestamp')).groupBy("id").agg(collect_list('Value').alias('newcol'))

+---+---------------+
| id|         newcol|
+---+---------------+
|Id1|[100, 300, 200]|
|Id2|          [433]|
+---+---------------+

df1.withColumn('newcol',concat_ws(";",col("newcol"))).show()

+---+-----------+
| id|     newcol|
+---+-----------+
|Id1|100;300;200|
|Id2|        433|
+---+-----------+
Sachin Tiwari
  • 309
  • 2
  • 7
  • 1
    Are you sure sorted order is preserved after the groupby? Really curious. See https://stackoverflow.com/questions/56167750/sort-and-then-groupby-a-dataframe-is-the-sorting-order-preserved-after-the-gro – Kevin Kho Aug 04 '22 at 06:13