0

I have a PySpark DataFrame which I group on a field (column) with the purpose of eliminating,per each group, the records, which have a certain value of another field. So for instance, the table looks like

colA    colB
'a'     1
'b'     1
'a'     0
'c'     0

here what I'd like is removing the records where there is a duplicate colA and colB is 0, so to obtain

colA    colB
'a'     1
'b'     1
'c'     0

row for 'c' remains because I want to remove the 0s only for the duplicated (on colA) rows.

I can't think of a way to achieve this because I'm not proficient with the way to use agg after a groupBy, if the expr is not one of "avg", "max", etc.

zero323
  • 322,348
  • 103
  • 959
  • 935
mar tin
  • 9,266
  • 23
  • 72
  • 97

1 Answers1

4

How about simple max?

from pyspark.sql.functions import max as max_

df = sc.parallelize([
  ('a', 1), ('b', 1), ('a', 0), ('c', 0)
]).toDF(('colA', 'colB'))

df.groupBy('colA').agg(max_('colB')).show()
## +----+---------+
## |colA|max(colB)|
## +----+---------+
## |   a|        1|
## |   b|        1|
## |   c|        0|
## +----+---------+

This approach should work for any column which support ordering and uses binary labels with an optional adjustment of the aggregate function you use (min / max).

It is possible implement more advanced rules using window functions but it will be more expensive.

Nevertheless here is an example:

from pyspark.sql.functions import col, sum as sum_, when
from pyspark.sql import Window
import sys

w = Window.partitionBy("colA").rowsBetween(-sys.maxsize, sys.maxsize)

this_non_zero = col("colB") != 0
any_non_zero = sum_(this_non_zero.cast("long")).over(w) != 0

(df
  .withColumn("this_non_zero", this_non_zero)
   .withColumn("any_non_zero", any_non_zero)
   .where(
       (col("this_non_zero") & col("any_non_zero")) | 
       ~col("any_non_zero")
   ))
zero323
  • 322,348
  • 103
  • 959
  • 935
  • You can also check http://stackoverflow.com/q/35218882/1560062 and http://stackoverflow.com/q/33878370/1560062 for some other ideas. But here I wouldn't bother. – zero323 Mar 09 '16 at 12:50
  • 1
    Don't :) `max` / `min` should work directly but if you want to be explicit use something like `when(col("colA") == "foo", 0).otherwise(1)` – zero323 Mar 09 '16 at 14:01
  • This is great! Can I ask how do exactly max and min work then, do they compute the max/min of the length of the string? With max I am getting the shortest of the strings for a, with min the longest (think of 1 and 0 becoming 'saved' and 'hidden' in your example: getting 'saved' for max and 'hidden' for min so I'm confused). – mar tin Mar 09 '16 at 14:19
  • 1
    Assuming no mixed-case string and no localized strings it will just alphabetic order. So `h` comes before `s` :) – zero323 Mar 09 '16 at 14:27