1

I have a dataframe and I'd like to filter some rows out based on one column. But my conditions are quite complex and will require a separate function, it's not something I can do in a single expression or where clause.

My plan was to return True or False according to whether to keep or filter out that row:

from pyspark.sql.types import BooleanType
from pyspark.sql.function import udf

def my_filter(col1):
    # I will have more conditions here, but for simplicity...
    if col1 is null:
        return True

my_filter_udf = udf(my_filter, BooleanType())

df = table1_df \
    .select( \
        'col1' \
    ) \
    .filter(my_filter_udf('col1'))
    .show()

I get the error "SparkException: Exception thrown in Future.get".

What's weird is in the function, if I just use:

def my_filter(col1):
  return True

It works. But if I reference the col1 value in the function, I get this error. So the code is reaching and parsing the function, it just doesn't seem to like what I'm passing to it.

Any idea where I'm going wrong here?

user43107
  • 365
  • 3
  • 13
  • Please provide a sample data set and expected output. Refer to this guideline on how to ask a question https://stackoverflow.com/help/minimal-reproducible-example – wwnde Jul 16 '21 at 10:49
  • Your code works for me if you replace `null` with `None` in `my_filter1`. – werner Jul 16 '21 at 16:24
  • @werner that's interesting. I read in another Stack Overflow question (https://stackoverflow.com/questions/65638003/pyspark-error-sparkexception-exception-thrown-in-future-get) that someone encountered this error because of a type mismatch. But that didn't seem to be the case for me. Out of curiosity, did you get the same error as I did when you used `null` instead of `None`? – user43107 Jul 16 '21 at 22:35
  • @user43107 I am not sure if I understand what you mean with [null](https://stackoverflow.com/a/3289606/2129801). Is this some kind of constant or variable? – werner Jul 17 '21 at 11:40

1 Answers1

1

The idea of the udf is that it will iterate through all your rows and the logic under that udf will execute on each row, here you are using filter() , use withColumn() instead

Some Sample Code

@F.udf("boolean")
def my_filter(col1):
  if col1 == "NIF":
    return True

df = df.withColumn("filter_col", my_filter("EVENT"))
df.show()

+----+-----+----------+
|  ID|EVENT|filter_col|
+----+-----+----------+
|id_1|   ST|      null|
|id_1|   ST|      null|
|id_1|   ST|      null|
|id_1|   ST|      null|
|id_1|  NIF|      true|
|id_1|   ST|      null|
|id_1|   SB|      null|
|id_2|  NIF|      true|
|id_2|  NIF|      true|
|id_2|  NIF|      true|
|id_2|  NIF|      true|
|id_2|  NIF|      true|
|id_2|  NIF|      true|
|id_2|  NIF|      true|
|id_3|   AB|      null|
|id_3|  NIF|      true|
|id_3|   DR|      null|
|id_3|  NIF|      true|
|id_3|   ST|      null|
|id_3|  NIF|      true|
+----+-----+----------+ 
dsk
  • 1,863
  • 2
  • 10
  • 13
  • Thanks for your comment. I believe the problem was that the query was too big, and the server basically freaked out. I found ways to filter as much as I could without a UDF, just trimming the df down with multiple filters. I was able to run the more complex bits in the UDF once it was smaller. – user43107 Jul 16 '21 at 22:33