I have a Spark DataFrame that contains multiple columns with free text. Separately, I have a dictionary of regular expressions where each regex maps to a key.
For instance:
df = spark.sparkContext.parallelize([Row(**{'primary_loc': 'USA', 'description': 'PyCon happens annually in the United States, with satellite events in India, Brazil and Tokyo'}),
Row(**{'primary_loc': 'Canada', 'description': 'The annual hockey championship has some events occurring in the US'})]).toDF()
keywords = {'united states': re.compile(r'\b(usa|us|united states|texas|washington|new york)\b', re.I),
'india': re.compile(r'\b(india|bangalore|mumbai|delhi)\b', re.I),
'canada': re.compile(r'\b(canada|winnipeg|toronto|ontario|vancouver)\b', re.I),
'japan': re.compile(r'\b(japan|tokyo|kyoto)\b', re.I}
I want to be able to extract countries from the dataframe, such that I extract all countries from a set of columns (primary_loc
and description
in this case). So in this case, I'd get an output somewhat like
primary_loc | description | country
--------------------------------------------
USA | PyCon... | united states
USA | PyCon... | india
USA | PyCon... | brazil
USA | PyCon... | japan
Canada | The ann... | canada
Canada | The ann... | united states
To get an idea of the scale of the problem, I have around 12-15k regexes and a dataframe with around 90 million rows.
I've tried using a Python UDF that looks somewhat like:
def get_countries(row):
rd = row.asDict()
rows_out = []
for p, k in keywords.items():
if k.search(rd['PRIMARY_LOC']) or k.search(rd['DESCRIPTION']):
rows_out.append(Row(**{'product': p, **rd}))
return rows_out
newDF = df.rdd.flatMap(lambda row: get_countries(row)).toDF()
but this is excruciatingly slow, even when operating on a subset of 10k or so rows.
If it matters, I'm using PySpark via DataBricks on Azure.