1

I'm trying to figure out if there is a function that would check if a column of a spark DataFrame contains any of the values in a list:

# define a dataframe
rdd = sc.parallelize([(0,100), (0,1), (0,2), (1,2), (1,10), (1,20), (3,18), (3,18), (3,18)])
df = sqlContext.createDataFrame(rdd, ["id", "score"])

# define a list of scores
l = [1]

# filter out records by scores by list l
records = df.filter(~df.score.contains(l))

# expected: (0,100), (0,1), (1,10), (3,18)

I get an issue running this code :

java.lang.RuntimeException: Unsupported literal type class java.util.ArrayList [1]

Is there a way to do this or do we have to loop through the list to pass contains?

pault
  • 41,343
  • 15
  • 107
  • 149
E B
  • 1,073
  • 3
  • 23
  • 36
  • Can you explain what is the logic behind this? Why is `(0, 1)` in the result and why not `(0,2)` in the result? – Psidom Feb 19 '19 at 05:28
  • @Psidom .. i am trying to find if score contains the value 1 so (0, 1) is a score of 1 and (0,2) is a score of 2.. so (0,2) is not included.. it is like looping on each of the values but i need to do it as a contain since it is not an equality check.. does that make sense – E B Feb 19 '19 at 05:33
  • Why 100, 10 and 18 are included then? – Psidom Feb 19 '19 at 06:05

2 Answers2

0

If I understand you correctly, you want to have a list with elements in your case its only 1. Where you want to check if this element appears in the score. In this case its easier to work with strings and not with numbers directly.

You can do this with a custom map function and apply this via a udf (directly application resulted in some strange behavior and worked only sometimes).

Find the Code below:

rdd = sc.parallelize([(0,100), (0,1), (0,2), (1,2), (1,10), (1,20), (3,18), (3,18), (3,18)])
df = sqlContext.createDataFrame(rdd, ["id", "score"])
l = [1]

def filter_list(score, l):
    found = True
    for e in l:
        if str(e) not in str(score):  #The filter that checks if an Element e
            found = False             #does not appear in the score
    if found:
        return True                   #boolean value if the all elements were found
    else:
        return False

def udf_filter(l):
    return udf(lambda score: filter_list(score, l)) #make a udf function out of the filter list
df.withColumn("filtered", udf_filter(l)(col("score"))).filter(col("filtered")==True).drop("filtered").show()
#apply the function and store results in "filtered" column afterwards 
#only select the successful filtered rows and drop the column

Output:

+---+-----+
| id|score|
+---+-----+
|  0|  100|
|  0|    1|
|  1|   10|
|  3|   18|
|  3|   18|
|  3|   18|
+---+-----+
gaw
  • 1,960
  • 2
  • 14
  • 18
0

I see some ways to do this without using a udf.

You could use a list comprehension with pyspark.sql.functions.regexp_extract, exploiting the fact that an empty string is returned if there is no match.

Try to extract all of the values in the list l and concatenate the results. If the resulting concatenated string is an empty string, that means none of the values matched.

For example:

from pyspark.sql.functions import concat, regexp_extract

records = df.where(concat(*[regexp_extract("score", str(val), 0) for val in l]) != "")
records.show()
#+---+-----+
#| id|score|
#+---+-----+
#|  0|  100|
#|  0|    1|
#|  1|   10|
#|  3|   18|
#|  3|   18|
#|  3|   18|
#+---+-----+

If you take a look at the execution plan, you'll see that it's smart enough cast the score column to string implicitly:

records.explain()
#== Physical Plan ==
#*Filter NOT (concat(regexp_extract(cast(score#11L as string), 1, 0)) = )
#+- Scan ExistingRDD[id#10L,score#11L]

Another way is to use pyspark.sql.Column.like (or similarly with rlike):

from functools import reduce
from pyspark.sql.functions import col

records = df.where(
    reduce(
        lambda a, b: a|b, 
        map(
            lambda val: col("score").like(val.join(["%", "%"])), 
            map(str, l)
        )
    )
)

Which produces the same output as above and has the following execution plan:

#== Physical Plan ==
#*Filter Contains(cast(score#11L as string), 1)
#+- Scan ExistingRDD[id#10L,score#11L]

If you wanted only distinct records, you can do:

records.distinct().show()
#+---+-----+
#| id|score|
#+---+-----+
#|  0|    1|
#|  0|  100|
#|  3|   18|
#|  1|   10|
#+---+-----+
pault
  • 41,343
  • 15
  • 107
  • 149