30

I am having a pyspark dataframe as

DOCTOR | PATIENT
JOHN   | SAM
JOHN   | PETER
JOHN   | ROBIN
BEN    | ROSE
BEN    | GRAY

and need to concatenate patient names by rows so that I get the output like:

DOCTOR | PATIENT
JOHN   | SAM, PETER, ROBIN
BEN    | ROSE, GRAY

Can anybody help me regarding creating this dataframe in pyspark ?

Thanks in advance.

falsetru
  • 357,413
  • 63
  • 732
  • 636
Prerit Saxena
  • 303
  • 1
  • 3
  • 4

3 Answers3

68

The simplest way I can think of is to use collect_list

import pyspark.sql.functions as f
df.groupby("col1").agg(f.concat_ws(", ", f.collect_list(df.col2)))
pault
  • 41,343
  • 15
  • 107
  • 149
Assaf Mendelson
  • 12,701
  • 5
  • 47
  • 56
  • Thanks Assaf ! Will this replace the existing column or create a new column ? My intention is to create a new column. – Prerit Saxena Jan 22 '17 at 09:31
  • This will create a dataframe with only two columns, col1 and col2 aggregated as this is an aggregate function. – Assaf Mendelson Jan 22 '17 at 09:44
  • 1
    Hi @Assaf, thanks for the clarification. While I am putting df.col2 in the above statement, its not retaining the order of col2 in mind while concatenating. For e.g. if I take the same example as in the question, and need the result as JOHN | PETER, ROBIN, SAM BEN | GRAY, ROSE i,e. alphabetically sorted, what changes should I make to the statement ? Thanks in Advance ! – Prerit Saxena Jan 24 '17 at 17:10
  • if you need to sort inside a key, what I would do is do just the collect_list part, without concatenating, then do a UDF which gets the list, sorts it and creates the string. It will be slower though and involve more than a single line – Assaf Mendelson Jan 24 '17 at 18:25
  • The problem with this is that when you call `collect_list` on a single string, it converts the splits the string by character. – frosty Jun 22 '19 at 03:11
  • @frosty I am not sure what you mean, collect_list collects values from different rows, it should never be called on a string – Assaf Mendelson Jun 24 '19 at 04:51
  • Yeah, I ended up getting it to work, however the aggregated column is not the same for every key. If a key only has row after the group by, the row will look like `key1 value1` but if the key has more than one value, it is wrapped in quotes like `key1 "value1,value2"` – frosty Jun 24 '19 at 21:23
1
import pyspark.sql.functions as f
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType

data = [
  ("U_104", "food"),
  ("U_103", "cosmetics"),
  ("U_103", "children"),
  ("U_104", "groceries"),
  ("U_103", "food")
]
schema = StructType([
  StructField("user_id", StringType(), True),
  StructField("category", StringType(), True),
])
sc = SparkContext.getOrCreate()
spark = SparkSession.builder.appName("groupby").getOrCreate()
df = spark.createDataFrame(data, schema)
group_df = df.groupBy(f.col("user_id")).agg(
  f.concat_ws(",", f.collect_list(f.col("category"))).alias("categories")
)
group_df.show()
+-------+--------------------+
|user_id|          categories|
+-------+--------------------+
|  U_104|      food,groceries|
|  U_103|cosmetics,childre...|
+-------+--------------------+

There are some useful aggregation examples

0

Using Spark SQL this worked for me:

SELECT col1, col2, col3, REPLACE(REPLACE(CAST(collect_list(col4) AS string),"[",""),"]","")
FROM your_table
GROUP BY col1, col2, col3
buddemat
  • 4,552
  • 14
  • 29
  • 49
Konrad
  • 1
  • 1