2

I have a schema

schema = StructType([
        StructField('title', StringType(), True),
        StructField('author', ArrayType(StringType()), True),
        StructField('year', IntegerType(), True),
        StructField('url', StringType(), True)])
article = sqlContext.read.format('com.databricks.spark.xml') \
        .options(rowTag='article', excludeAttribute=True, charset='utf-8') \
        .load('source.xml', schema=schema)

where author contains several names of authors.

I can filter the name inside author by array_contains like:

name = 'Tom Cat'
article.filter(array_contains(article.author, name)).show()

However, I wonder if there's a way for me that I can filter a name ignoring cases like:

name = 'tom cat'
article.filter(array_contains(article.author, name, CASE_INSENSITIVE)).show()

such that I can get the same result as the previous sentence.

SXKDZ
  • 87
  • 1
  • 7
  • 1
    Re duplicate mark: the linked question references Scala, while this one references Python. And while the technique may be similar, there are differences both in implementation and other factors, e.g. cost of UDF in Scala vs. Python. So, IMO there's enough merit here (which is why I took the time to answer). – etov Jan 16 '18 at 16:19

1 Answers1

2

Here are two similar options, differing in their performance tradeoff - both should work, but if performance matters you may want to measure on your own data (if you do, please post results!)

Option 1: Custom UDF

As suggested in the comments, you can write a UDF to turn both needle and haystack to lowercase (assuming text is plain ascii), something like:

import pyspark.sql.functions as func
from pyspark.sql.types import BooleanType

sc = SparkContext.getOrCreate()
sql_sc = SQLContext(sc)

df = sql_sc.createDataFrame([(["Mike", "John"], '1940'), (["Jill", "Marry"], '1950')], ("author", "year"))

def case_insensitive_array_contains(needle, haystackArr):
    lower_needle = needle.lower()
    for haystack in haystackArr:
        if lower_needle in haystack.lower():
            return True

    return False


udf_cis=func.udf(case_insensitive_search, BooleanType())


name = "John"
df.rdd.filter(lambda x: case_insensitive_array_contains(name, x['author'])).take(5)

[Row(author=['Mike', 'John'], year='1940')]

However, there's a non-trivial cost in PySpark associated with custom UDFs (unlike the referenced question which uses Scala), so a similar non-UDF option would be:

Option 2: Explode

(Shown via DF interface)

df1.select(['author', 'year', func.explode('author').alias('single_author')]) \
    .where(func.lower(func.col('single_author')) == name.lower()).show()

This doesn't use a custom UDF but clearly, if arrays tend to have many values, this would probably be less performant (currently on my local machine option 2 runs faster; but it may well be very different for a large data set and/or on a distributed environment)

You could also use regex case insensitive search instead of lower() - I speculate it'll be slower, though.

etov
  • 2,972
  • 2
  • 22
  • 36