I see some ways to do this without using a udf
.
You could use a list comprehension with pyspark.sql.functions.regexp_extract
, exploiting the fact that an empty string is returned if there is no match.
Try to extract all of the values in the list l
and concatenate the results. If the resulting concatenated string is an empty string, that means none of the values matched.
For example:
from pyspark.sql.functions import concat, regexp_extract
records = df.where(concat(*[regexp_extract("score", str(val), 0) for val in l]) != "")
records.show()
#+---+-----+
#| id|score|
#+---+-----+
#| 0| 100|
#| 0| 1|
#| 1| 10|
#| 3| 18|
#| 3| 18|
#| 3| 18|
#+---+-----+
If you take a look at the execution plan, you'll see that it's smart enough cast the score
column to string
implicitly:
records.explain()
#== Physical Plan ==
#*Filter NOT (concat(regexp_extract(cast(score#11L as string), 1, 0)) = )
#+- Scan ExistingRDD[id#10L,score#11L]
Another way is to use pyspark.sql.Column.like
(or similarly with rlike
):
from functools import reduce
from pyspark.sql.functions import col
records = df.where(
reduce(
lambda a, b: a|b,
map(
lambda val: col("score").like(val.join(["%", "%"])),
map(str, l)
)
)
)
Which produces the same output as above and has the following execution plan:
#== Physical Plan ==
#*Filter Contains(cast(score#11L as string), 1)
#+- Scan ExistingRDD[id#10L,score#11L]
If you wanted only distinct records, you can do:
records.distinct().show()
#+---+-----+
#| id|score|
#+---+-----+
#| 0| 1|
#| 0| 100|
#| 3| 18|
#| 1| 10|
#+---+-----+