10

How can I get the first non-null values from a group by? I tried using first with coalesce F.first(F.coalesce("code")) but I don't get the desired behavior (I seem to get the first row).

from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import functions as F

sc = SparkContext("local")

sqlContext = SQLContext(sc)

df = sqlContext.createDataFrame([
    ("a", None, None),
    ("a", "code1", None),
    ("a", "code2", "name2"),
], ["id", "code", "name"])

I tried:

(df
  .groupby("id")
  .agg(F.first(F.coalesce("code")),
       F.first(F.coalesce("name")))
  .collect())

DESIRED OUTPUT

[Row(id='a', code='code1', name='name2')]
Kamil Sindi
  • 21,782
  • 19
  • 96
  • 120

3 Answers3

24

For Spark 1.3 - 1.5, this could do the trick:

from pyspark.sql import functions as F
df.groupBy(df['id']).agg(F.first(df['code']), F.first(df['name'])).show()

+---+-----------+-----------+
| id|FIRST(code)|FIRST(name)|
+---+-----------+-----------+
|  a|      code1|      name2|
+---+-----------+-----------+

Edit

Apparently, in version 1.6 they have changed the way the first aggregate function is processed. Now, the underlying class First should be constructed with a second argument ignoreNullsExpr parameter, which is not yet used by the first aggregate function (as can bee seen here). However, in Spark 2.0 it will be able to call agg(F.first(col, True)) to ignore nulls (as can be checked here).

Therefore, for Spark 1.6 the approach must be different and a little more inefficient, unfornately. One idea is the following:

from pyspark.sql import functions as F
df1 = df.select('id', 'code').filter(df['code'].isNotNull()).groupBy(df['id']).agg(F.first(df['code']))
df2 = df.select('id', 'name').filter(df['name'].isNotNull()).groupBy(df['id']).agg(F.first(df['name']))
result = df1.join(df2, 'id')
result.show()

+---+-------------+-------------+
| id|first(code)()|first(name)()|
+---+-------------+-------------+
|  a|        code1|        name2|
+---+-------------+-------------+

Maybe there is a better option. I'll edit the answer if I find one.

Daniel de Paula
  • 17,362
  • 9
  • 71
  • 72
4

Because I only had one non-null value for every grouping, using min / max in 1.6 worked for my purposes:

(df
  .groupby("id")
  .agg(F.min("code"),
       F.min("name"))
  .show())

+---+---------+---------+
| id|min(code)|min(name)|
+---+---------+---------+
|  a|    code1|    name2|
+---+---------+---------+
Kamil Sindi
  • 21,782
  • 19
  • 96
  • 120
0

The first method accept an argument ignorenulls, that can be set to true,

Python:

df.groupby("id").agg(first(col("code"), ignorenulls=True).alias("code"))

Scala:

df.groupBy("id").agg(first(col("code"), ignoreNulls = true).alias("code"))
Abdennacer Lachiheb
  • 4,388
  • 7
  • 30
  • 61