1

I was referring to this question Here, however it works for collect_list and not collect_set

I have a dataframe like this

    data = [(("ID1", 9)), 
            (("ID1", 9)),
            (("ID1", 8)),
            (("ID1", 7)),
            (("ID1", 5)),
            (("ID1", 5))]
df = spark.createDataFrame(data, ["ID", "Values"])
df.show()

+---+------+
| ID|Values|
+---+------+
|ID1|     9|
|ID1|     9|
|ID1|     8|
|ID1|     7|
|ID1|     5|
|ID1|     5|
+---+------+

I am trying to create a new column, collecting it as set

df = df.groupBy('ID').agg(collect_set('Values').alias('Value_set'))
df.show()

+---+------------+
| ID|   Value_set|
+---+------------+
|ID1|[9, 5, 7, 8]|
+---+------------+

But the order is not maintained, my order should be [9, 8, 7, 5]

Hardik Gupta
  • 4,700
  • 9
  • 41
  • 83
  • I think you will have to sort the set, because [order of unordered sets in python](https://stackoverflow.com/questions/12165200/order-of-unordered-python-sets) – pissall Oct 10 '19 at 08:33
  • What about calling `df.dropDuplicates()` first? Then you could use `collect_list`. – Gelerion Oct 10 '19 at 08:35
  • @Gelerion OP wants to know the difference in implementation and why `collect_set` won't work as `collect_list`. – pissall Oct 10 '19 at 08:38
  • @pissall As @cph_sto pointed me in my previous answer: Are you sure that he is not asking of the order in which they appear in the original `DF`? I don't think he is asking about the difference between set and list. – Gelerion Oct 10 '19 at 08:43
  • you can use pyspark.sql.functions.array_sort function if you use spark 2.4 – firsni Oct 10 '19 at 08:43

4 Answers4

1

I solved it like this

df = df.groupby('ID').agg(collect_list('Values').alias('Values_List'))
df.show()

def my_function(x):
    return list(dict.fromkeys(x))

udf_set = udf(lambda x: my_function(x), ArrayType(IntegerType()))
df = df.withColumn("Values_Set", udf_set("Values_List")) 

df.show(truncate=False)

+---+------------------+------------+
|ID |Values_List       |Values_Set  |
+---+------------------+------------+
|ID1|[9, 9, 8, 7, 5, 5]|[9, 8, 7, 5]|
+---+------------------+------------+
Hardik Gupta
  • 4,700
  • 9
  • 41
  • 83
0

From the pyspark source code, the documentation for collect_set:

_collect_set_doc = """
    Aggregate function: returns a set of objects with duplicate elements eliminated.

    .. note:: The function is non-deterministic because the order of collected results depends
        on order of rows which may be non-deterministic after a shuffle.

    >>> df2 = spark.createDataFrame([(2,), (5,), (5,)], ('age',))
    >>> df2.agg(collect_set('age')).collect()
    [Row(collect_set(age)=[5, 2])]
    """

This means, you will have unordered sets which are based on a hash table and you can get more information on the 'order' of unordered Python sets

pissall
  • 7,109
  • 2
  • 25
  • 45
0

If you data is relatively small , you can coalesce it to 1 and then sort it before using collect_set()

Eg : relation,index

cook,3
jone,1
sam,7
zack,4
tim,2
singh,9
ambani,5
ram,8
jack,0
nike,6

df.coalesce(1).sort("ind").agg(collect_list("name").alias("names_list")).show

names_list

[jack, jone, tim, cook, zack, ambani, nike, sam, ram, singh]
tmsbrndz
  • 1,297
  • 2
  • 9
  • 23
Rinin
  • 23
  • 1
  • 3
-1

you can apply the array_sort() function to your column if you use spark 2.4 or above:

pissall
  • 7,109
  • 2
  • 25
  • 45
firsni
  • 856
  • 6
  • 12
  • I want to maintain the order, so here 12 could be lower in the rank, if I do array_sort it will not work – Hardik Gupta Oct 10 '19 at 08:54
  • I see,.The problem is that there is no order in a dataframe by default. in this case I think you need to add a column rank before the groupBy to preserve the order you want. May be you can use this function to add an index: https://spark.apache.org/docs/latest/api/python/_modules/pyspark/sql/functions.html#monotonically_increasing_id – firsni Oct 10 '19 at 09:06
  • I posted my solution – Hardik Gupta Oct 10 '19 at 09:42