4

I am trying to optimize the code below (PySpark UDF).

It gives me the desired result (based on my data set) but it's too slow on very large datasets (approx. 180M).

The results (accuracy) are better than available Python modules (e.g. geotext, hdx-python-country). So I'm not looking for another module.

DataFrame:

df = spark.createDataFrame([
  ["3030 Whispering Pines Circle, Prosper Texas, US","John"],   
  ["Kalverstraat Amsterdam","Mary"],   
  ["Kalverstraat Amsterdam, Netherlands","Lex"] 
]).toDF("address","name")

regex.csv:

iso2;keywords
US;\bArizona\b
US;\bTexas\b
US;\bFlorida\b
US;\bChicago\b
US;\bAmsterdam\b
US;\bProsper\b
US;\bUS$
CA;\bAlberta\b
CA;\bNova Scotia\b
CA;\bNova Scotia\b
CA;\bWhitehorse\b
CA;\bCA$
NL;\bAmsterdam\b
NL;\Netherlands\b
NL;\bNL$

......<many, many more>

Creating a Pandas DataFrame from regex.csv, group by iso2 and joining the keywords (\bArizona\b|\bTexas\b\bFlorida\b|\bUS$).

df = pd.read_csv(regex.csv, sep=';')
df_regex = df.groupby('iso2').agg({'keywords': '|'.join }).reset_index()

Function:

def get_iso2(x): 
 
    iso2={}
    
    for j, row in df_regex.iterrows():
 
        regex = re.compile(row['keywords'],re.I|re.M)         
        matches = re.finditer(regex, x)
        
        for m in matches:
            iso2[row['iso2']] = iso2.get(row['iso2'], 0) + 1
            
    return [key for key, value in iso2.items() for _ in range(value)]

PySpark UDF:

get_iso2_udf = F.udf(get_iso2, T.ArrayType(T.StringType()))

Create new column:

df_new = df.withColumn('iso2',get_iso2_udf('address')

Expected sample output:

[US,US,NL]
[CA]
[BE,BE,AU]

Some places occur in more than one country (input is address column with city, province, state, country...)

Sample:

3030 Whispering Pines Circle, Prosper Texas, US -> [US,US,US]
Kalverstraat Amsterdam -> [US,NL]
Kalverstraat Amsterdam, Netherlands -> [US, NL, NL]

Maybe using Scala UDFs in PySpark is an option, but I have no idea how.

Your optimisation recommendations are highly appreciated!

marc_s
  • 732,580
  • 175
  • 1,330
  • 1,459
John Doe
  • 9,843
  • 13
  • 42
  • 73
  • @anky Edited my question. I'm running a Jupyter Notebook on a Spark cluster using PySpark (Spark DataFrame). – John Doe Aug 12 '20 at 10:51
  • 1
    @anky The regex file pretty much does the trick because it is optimised for my dataset and the result is validated with high accuracy. – John Doe Aug 12 '20 at 11:02
  • Sounds like you are effectively joining two tables using regexp-like predicate, then grouping on the ID of the first table. You can use `df = df_addresses.crossJoin(df_regex)` to join the two dataframes, then `df.filter(df('address').rlike(df('keywords'))` and then group on the ID column inherited from `df_addresses`. This will run entirely in Spark without marshalling data to and from the Python helper processes. – Hristo Iliev Aug 12 '20 at 11:13
  • @HristoIliev : I'm not joining tables. Just creating a new column in my spark DataFrame calling the UDF. The Pandas DataFrame is only used for joining the keywords from the csv-file creating a long regex-OR for every ISO2. If there's a match the iso2 related to the match(es) is added. – John Doe Aug 12 '20 at 11:21
  • I said "effectively joining two tables." The first table contains the addresses. The second table contains the ISO code and the corresponding regexp that matches any city/country name for that ISO code. Joining the two and then filtering the rows where the regexp column matches the content address column, then grouping on the address (or the address ID) gives you the list of ISO codes. – Hristo Iliev Aug 12 '20 at 11:41
  • 1
    Acutally, you can also join and filter in a single operation like `df_addresses.join(df_regex, df_addresses('address').rlike(df_regex('keywords')), 'cross')`. – Hristo Iliev Aug 12 '20 at 11:46
  • @HristoIliev : Sorry for the misunderstanding. Your intention was lost in translation. I now understand what you mean and will try. Marshalling data to and from the Python helper processes is indeed the slowing factor. – John Doe Aug 12 '20 at 11:47
  • @JohnDoe - it'd be a lot easier to help if you provided code snippets that could be copy / pasted into a PySpark console. That way I wouldn't need to try to reverse engineer the question into a createDataFrame snippet to created a sample data set ;) – Powers Aug 12 '20 at 12:42
  • 1
    @JohnDoe - it's a great question and you can make it even better if you add the "I have the following DataFrame" ... "which can be created with this code" [as described here](https://stackoverflow.com/questions/48427185/how-to-make-good-reproducible-apache-spark-examples). – Powers Aug 12 '20 at 12:54
  • @HristoIliev: When I try this `df_addresses.join(df_regex, df_addresses('address').rlike(df_regex('keywords')), 'cross')`I get an error `'DataFrame' object is not callable`. Can you please help. – John Doe Aug 17 '20 at 13:58
  • Sorry, I've mixed it up with the Scala syntax. Try replacing `df('foo')` with `df.foo`. It will work if there aren't any special or punctuation symbols in `foo` that are not allowed in Python identifiers. Otherwise, use `df['foo']`. – Hristo Iliev Aug 17 '20 at 15:59
  • @HristoIliev : `Pyspark.sql.Column.rlike()` method unfortunately takes only text patterns, not other columns as pattern. – John Doe Aug 18 '20 at 17:00
  • Yes, I just realised that. In that case, Pandas UDFs are the second best option after Scala UDFs. – Hristo Iliev Aug 18 '20 at 20:56
  • (not spark related) Don't compile the regex on each loop. Put it ouside the loop or provide it as an argument to the function instead. Since the regex is quite big, this should save some time parsing, building, and trowing out a state machine for just one use. – Iñigo González Aug 24 '20 at 08:56
  • @Iñigo : I thought about that, but how do I get it done. Can you give an example? – John Doe Aug 24 '20 at 16:45
  • @JohnDoe - I dont like compiling regexes inside loops. For a simple program maybe moving it to the startup code might do the trick (gist.github.com/igponce/8c083447d38f4715874f3a1a4569fd40). Since you're using Spark, maybe you need to do it in two steps: One job for making the dataframe, compiling the expression, and pickling to a file. And another to unpickle at start and maybe broadcast the df. Also, consider sorting the DF with the regexes by ISO2 with the more frequents first. – Iñigo González Aug 24 '20 at 17:02
  • @JohnDoe, you can try: `df_regex = df.groupby('iso2').agg({'keywords':lambda x: re.compile('(?im)' + '|'.join(x))}).reset_index()`. You can do the same to other Python-based functions like pandas_udf, but do not add this if you use Scala-based functions `split`, `regexp_replace` etc. – jxc Aug 24 '20 at 20:46
  • Thank you very much for the additions and examples. I'm going to see if that makes it faster. – John Doe Aug 26 '20 at 03:52

3 Answers3

7

IIUC, you can try the following steps without using UDF:

from pyspark.sql.functions import expr, first, collect_list, broadcast, monotonically_increasing_id, flatten
import pandas as pd

df = spark.createDataFrame([
  ["3030 Whispering Pines Circle, Prosper Texas, US","John"],
  ["Kalverstraat Amsterdam","Mary"],
  ["Kalverstraat Amsterdam, Netherlands","Lex"],
  ["xvcv", "ddd"]
]).toDF("address","name")

Step-1: convert df_regex to a Spark dataframe df1 and add an unique_id to df.

df_regex = pd.read_csv("file:///path/to/regex.csv", sep=";")

# adjust keywords to uppercase except chars preceded with backslash:
df_regex["keywords"] = df_regex["keywords"].str.replace(r'(^|\\.)([^\\]*)', lambda m: m.group(1) + m.group(2).upper())

# create regex patterns:
df_regex = df_regex.groupby('iso2').agg({'keywords':lambda x: '(?m)' + '|'.join(x)}).reset_index()

df1 = spark.createDataFrame(df_regex)
df1.show(truncate=False)
+----+---------------------------------------------------------------------------------+
|iso2|keywords                                                                         |
+----+---------------------------------------------------------------------------------+
|CA  |(?m)\bALBERTA\b|\bNOVA SCOTIA\b|\bWHITEHORSE\b|\bCA$                             |
|NL  |(?m)\bAMSTERDAM\b|\bNETHERLANDS\b|\bNL$                                          |
|US  |(?m)\bARIZONA\b|\bTEXAS\b|\bFLORIDA\b|\bCHICAGO\b|\bAMSTERDAM\b|\bPROSPER\b|\bUS$|
+----+---------------------------------------------------------------------------------+

df = df.withColumn('id', monotonically_increasing_id())
df.show(truncate=False)
+-----------------------------------------------+----+---+
|address                                        |name|id |
+-----------------------------------------------+----+---+
|3030 Whispering Pines Circle, Prosper Texas, US|John|0  |
|Kalverstraat Amsterdam                         |Mary|1  |
|Kalverstraat Amsterdam, Netherlands            |Lex |2  |
|xvcv                                           |ddd |3  |
+-----------------------------------------------+----+---+

Step-2: left join df_regex to df using rlike

df2 = df.alias('d1').join(broadcast(df1.alias('d2')), expr("upper(d1.address) rlike d2.keywords"), "left")
df2.show()
+--------------------+----+---+----+--------------------+
|             address|name| id|iso2|            keywords|
+--------------------+----+---+----+--------------------+
|3030 Whispering P...|John|  0|  US|(?m)\bARIZONA\b|\...|
|Kalverstraat Amst...|Mary|  1|  NL|(?m)\bAMSTERDAM\b...|
|Kalverstraat Amst...|Mary|  1|  US|(?m)\bARIZONA\b|\...|
|Kalverstraat Amst...| Lex|  2|  NL|(?m)\bAMSTERDAM\b...|
|Kalverstraat Amst...| Lex|  2|  US|(?m)\bARIZONA\b|\...|
|                xvcv| ddd|  3|null|                null|
+--------------------+----+---+----+--------------------+

Step-3: count number of matched d2.keywords in d1.address by splitting d1.address by d2.keywords, and then reduce the size of the resulting Array by 1:

df3 = df2.withColumn('num_matches', expr("size(split(upper(d1.address), d2.keywords))-1"))
+--------------------+----+---+----+--------------------+-----------+
|             address|name| id|iso2|            keywords|num_matches|
+--------------------+----+---+----+--------------------+-----------+
|3030 Whispering P...|John|  0|  US|(?m)\bARIZONA\b|\...|          3|
|Kalverstraat Amst...|Mary|  1|  NL|(?m)\bAMSTERDAM\b...|          1|
|Kalverstraat Amst...|Mary|  1|  US|(?m)\bARIZONA\b|\...|          1|
|Kalverstraat Amst...| Lex|  2|  NL|(?m)\bAMSTERDAM\b...|          2|
|Kalverstraat Amst...| Lex|  2|  US|(?m)\bARIZONA\b|\...|          1|
|                xvcv| ddd|  3|null|                null|         -2|
+--------------------+----+---+----+--------------------+-----------+

Step-4: use array_repeat to repeat the value of iso2 num_matches times (require Spark 2.4+):

df4 = df3.withColumn("iso2", expr("array_repeat(iso2, num_matches)"))
+--------------------+----+---+------------+--------------------+-----------+
|             address|name| id|        iso2|            keywords|num_matches|
+--------------------+----+---+------------+--------------------+-----------+
|3030 Whispering P...|John|  0|[US, US, US]|(?m)\bARIZONA\b|\...|          3|
|Kalverstraat Amst...|Mary|  1|        [NL]|(?m)\bAMSTERDAM\b...|          1|
|Kalverstraat Amst...|Mary|  1|        [US]|(?m)\bARIZONA\b|\...|          1|
|Kalverstraat Amst...| Lex|  2|    [NL, NL]|(?m)\bAMSTERDAM\b...|          2|
|Kalverstraat Amst...| Lex|  2|        [US]|(?m)\bARIZONA\b|\...|          1|
|                xvcv| ddd|  3|          []|                null|         -2|
+--------------------+----+---+------------+--------------------+-----------+

Step-5: groupby and do the aggregation:

df_new = df4 \
    .groupby('id') \
    .agg(
      first('address').alias('address'),
      first('name').alias('name'),
      flatten(collect_list('iso2')).alias('countries')
)
+---+--------------------+----+------------+
| id|             address|name|   countries|
+---+--------------------+----+------------+
|  0|3030 Whispering P...|John|[US, US, US]|
|  1|Kalverstraat Amst...|Mary|    [NL, US]|
|  3|                xvcv| ddd|          []|
|  2|Kalverstraat Amst...| Lex|[NL, NL, US]|
+---+--------------------+----+------------+

Alternative: Step-3 can also be handled by Pandas UDF:

from pyspark.sql.functions import pandas_udf, PandasUDFType
from pandas import Series
import re

@pandas_udf("int", PandasUDFType.SCALAR)
def get_num_matches(addr, ptn):
    return Series([ 0 if p is None else len(re.findall(p,s)) for p,s in zip(ptn,addr) ])

df3 = df2.withColumn("num_matches", get_num_matches(expr('upper(address)'), 'keywords'))
+--------------------+----+---+----+--------------------+-----------+
|             address|name| id|iso2|            keywords|num_matches|
+--------------------+----+---+----+--------------------+-----------+
|3030 Whispering P...|John|  0|  US|(?m)\bARIZONA\b|\...|          3|
|Kalverstraat Amst...|Mary|  1|  NL|(?m)\bAMSTERDAM\b...|          1|
|Kalverstraat Amst...|Mary|  1|  US|(?m)\bARIZONA\b|\...|          1|
|Kalverstraat Amst...| Lex|  2|  NL|(?m)\bAMSTERDAM\b...|          2|
|Kalverstraat Amst...| Lex|  2|  US|(?m)\bARIZONA\b|\...|          1|
|                xvcv| ddd|  3|null|                null|          0|
+--------------------+----+---+----+--------------------+-----------+

Notes:

  1. as pattern-matching with case-insensitive is expensive, we converted all chars of keywords (except anchors or escaped chars like \b, \B, \A, \z) to upper case.
  2. just a reminder, patterns used in rlike and regexp_replace are Java-based while in pandas_udf it's Python-based which might have slight differences when setting up patterns in regex.csv.

Method-2: using pandas_udf

As using join and groupby triggers data shuffling, the above method could be slow. Just one more option for your testing:

df_regex = pd.read_csv("file:///path/to/regex.csv", sep=";")

df_regex["keywords"] = df_regex["keywords"].str.replace(r'(^|\\.)([^\\]*)', lambda m: m.group(1) + m.group(2).upper())

df_ptn = spark.sparkContext.broadcast(
    df_regex.groupby('iso2').agg({'keywords':lambda x: '(?m)' + '|'.join(x)})["keywords"].to_dict()
)
df_ptn.value
#{'CA': '(?m)\\bALBERTA\\b|\\bNOVA SCOTIA\\b|\\bNOVA SCOTIA\\b|\\bWHITEHORSE\\b|\\bCA$',
# 'NL': '(?m)\\bAMSTERDAM\\b|\\bNETHERLANDS\\b|\\bNL$',
# 'US': '(?m)\\bARIZONA\\b|\\bTEXAS\\b|\\bFLORIDA\\b|\\bCHICAGO\\b|\\bAMSTERDAM\\b|\\bPROSPER\\b|\\bUS$'}

# REF: https://stackoverflow.com/questions/952914/how-to-make-a-flat-list-out-of-list-of-lists
from operator import iconcat
from functools import reduce
from pandas import Series
from pyspark.sql.functions import pandas_udf, PandasUDFType, flatten

def __get_iso2(addr, ptn):   
   return Series([ reduce(iconcat, [[k]*len(re.findall(v,s)) for k,v in ptn.value.items()]) for s in addr ])

get_iso2 = pandas_udf(lambda x:__get_iso2(x, df_ptn), "array<string>", PandasUDFType.SCALAR)

df.withColumn('iso2', get_iso2(expr("upper(address)"))).show()
+--------------------+----+---+------------+
|             address|name| id|        iso2|
+--------------------+----+---+------------+
|3030 Whispering P...|John|  0|[US, US, US]|
|Kalverstraat Amst...|Mary|  1|    [NL, US]|
|Kalverstraat Amst...| Lex|  2|[NL, NL, US]|
|                xvcv| ddd|  3|          []|
+--------------------+----+---+------------+

Or return an array of arrays in pandas_udf (w/o reduce and iconcat) and do flatten with Spark:

def __get_iso2_2(addr, ptn):
    return Series([ [[k]*len(re.findall(v,s)) for k,v in ptn.value.items()] for s in addr ])

get_iso2_2 = pandas_udf(lambda x:__get_iso2_2(x, df_ptn), "array<array<string>>", PandasUDFType.SCALAR)

df.withColumn('iso2', flatten(get_iso2_2(expr("upper(address)")))).show()

Update: to find unique countries, do the following:

def __get_iso2_3(addr, ptn):
  return Series([ [k for k,v in ptn.value.items() if re.search(v,s)] for s in addr ])

get_iso2_3 = pandas_udf(lambda x:__get_iso2_3(x, df_ptn), "array<string>", PandasUDFType.SCALAR)

df.withColumn('iso2', get_iso2_3(expr("upper(address)"))).show()
+--------------------+----+--------+
|             address|name|    iso2|
+--------------------+----+--------+
|3030 Whispering P...|John|    [US]|
|Kalverstraat Amst...|Mary|[NL, US]|
|Kalverstraat Amst...| Lex|[NL, US]|
|                xvcv| ddd|      []|
+--------------------+----+--------+

Method-3: use a list comprehension:

Similar to @CronosNull's method, In case the list of regex.csv is manageable, you can handle this using a list comprehension:

from pyspark.sql.functions import size, split, upper, col, array, expr, flatten

df_regex = pd.read_csv("file:///path/to/regex.csv", sep=";")
df_regex["keywords"] = df_regex["keywords"].str.replace(r'(^|\\.)([^\\]*)', lambda m: m.group(1) + m.group(2).upper())
df_ptn = df_regex.groupby('iso2').agg({'keywords':lambda x: '(?m)' + '|'.join(x)})["keywords"].to_dict()

df1 = df.select("*", *[ (size(split(upper(col('address')), v))-1).alias(k) for k,v in df_ptn.items()])

df1.select(*df.columns, flatten(array(*[ expr("array_repeat('{0}',`{0}`)".format(c)) for c in df_ptn.keys() ])).alias('iso2')).show()
+--------------------+----+---+------------+
|             address|name| id|        iso2|
+--------------------+----+---+------------+
|3030 Whispering P...|John|  0|[US, US, US]|
|Kalverstraat Amst...|Mary|  1|    [NL, US]|
|Kalverstraat Amst...| Lex|  2|[NL, NL, US]|
|                xvcv| ddd|  3|          []|
+--------------------+----+---+------------+
jxc
  • 13,553
  • 4
  • 16
  • 34
  • I'l give it a try. Looks promising and with explanations! – John Doe Aug 21 '20 at 06:04
  • Step 2: I don't have expected matches `df2 = df.alias('d1').join(broadcast(df1.alias('d2')), expr("d1.address rlike d2.keywords"), "left")` I changed it to `df2 = df.alias('d1').join(broadcast(df1.alias('d2')), expr("d1.address rlike d2.keywords"), "cross")` and it already looks much better. I'll continue testing and keep you updated. – John Doe Aug 21 '20 at 08:33
  • I forgot to add the (?im) flags to the beginning of the pattern in rlike: `d1.address rlike '(?im)'||d2.keywords`. ideally, this should be handled in df1. Another suggestion is that the case-sensitive matches using `i` flag are often expensive, it would be better to convert all chars in keywords into UPPER case in df1, remove the (i) flag. and do regexp operations on `UPPER(address)` instead. – jxc Aug 21 '20 at 09:02
  • made some adjustments based on the comments, see Step-1 and Notes at the end. – jxc Aug 21 '20 at 12:31
  • I've made the adjustments and it's running. Is it useful to do a repartition ? – John Doe Aug 21 '20 at 17:03
  • it depends on your cluster resource and the data size, if the default 200 shuffle.partitions is not enough, just adjust it at the beginning of your code, i.e. `spark.conf.set("spark.sql.shuffle.partitions", 1000)`. IMO there is no point to do repartition in the code. Also bear in mind that a regex pattern with a large list of alternations itself can be very slow. sorting the sub-patterns so that the most frequently seen sub patterns located to the front of the final pattern are usually good practice, I suggest you test on a smaller dataset to verify the method before moving to the large one – jxc Aug 21 '20 at 18:19
  • `regexp_replace` in Step-3 is redundant, this can be handled directly using `split` alone. check the updated code with df3 – jxc Aug 22 '20 at 01:55
  • I've made the adjustments and it's running on a smaller set (100_000) – John Doe Aug 22 '20 at 03:38
  • Due to technical problems I have to postpone the tests. I appreciate your help and hope to be able to start testing again as soon as possible. – John Doe Aug 22 '20 at 05:39
  • Used a set of 2000000 records (same test table). My solution:`Wall time:15min 52s` Your solution :`Wall time:15min 53s` – John Doe Aug 22 '20 at 12:32
  • @JohnDoe, this is most likely an issue from data shuffling in both Step-2 and Step-5 (which might be worse than data SerDes between Python and Java interpreters), especially in a network environment which is inevitable for most big data projects. In the current case, can you set `spark.sql.shuffle.partitions` to a smaller number (i.e. `10` ) for testing. – jxc Aug 22 '20 at 15:25
  • I added one more solution using pandas_udf under `Method-2` section, can you check if that makes some improvement to the plain udf. – jxc Aug 22 '20 at 15:36
  • Yes, I'll check that one as well. – John Doe Aug 22 '20 at 15:38
  • `sc.broadcast` is giving me an error. Already tried `from pyspark import SparkContext sc =SparkContext()` – John Doe Aug 22 '20 at 15:54
  • if you have spark Session, can you try `sc = spark.sparkContext` – jxc Aug 22 '20 at 15:57
  • sorry, made a mistake with a capital 's' for sparkContext. – John Doe Aug 22 '20 at 16:01
  • Very nice ;-) `Wall time: 7mins 45s` – John Doe Aug 22 '20 at 16:43
  • I'll check your method-3 as well. It will be difficult to decide which answer to accept. I really appreciate your patience and contribution. Because of the extra explanation, this is also very educational for me. – John Doe Aug 23 '20 at 04:21
  • 1
    I've accepted your answer because of the more extensive explanation and the more flexible method. It was a difficult choice because @CronosNull also helped me very well and his solution is sometimes even a bit faster. Thank you again for the time and patience! – John Doe Aug 24 '20 at 05:49
  • How can i get unique values for Method-2. I use this method also for another regex and now I need unique values. – John Doe Aug 30 '20 at 05:04
  • @JohnDoe, updated, check the `Update` section at the end of Method-2 – jxc Aug 30 '20 at 12:17
2

Note: Edited based on the comments

I like @jxc's approach. I took a slightly different way, still without using UDFs, and without the need to broadcast the regex dataset (you only use it in the driver).

Setup the scenario

import re
from io import StringIO
from pyspark.sql.functions import (
    split,
    regexp_replace,
    regexp_extract,
    col,
    size,
    concat,
    lit,
    when,
    array,
    expr,
    array_repeat,
    regexp_extract,
    array_join,
)
from pyspark.sql import DataFrame
import pandas as pd
df = spark.createDataFrame([
  ["3030 Whispering Pines Circle, Prosper Texas, US","John"],   
  ["Kalverstraat Amsterdam","Mary"],   
  ["Kalverstraat Amsterdam, Netherlands","Lex"] 
]).toDF("address","name")

sample_data = r"""iso2;keywords
US;\bArizona\b
US:\bTexas\b
US:\bFlorida\b
US;\bChicago\b
US:\bAmsterdam\b
US;\bProsper\b
US;\bUS$
CA:\bAlberta\b
CA:\bNova Scotia\b
CA:\bNova Scotia\b
CA;\bWhitehorse\b
CA;\bCA$
NL:\bAmsterdam\b
NL:\bNetherlands\b
NL;\bNL$"""
replace_pd = pd.read_csv(StringIO(sample_data),delimiter='[;:]', engine='python')
#Resample to have a similar number of rows
replace_pd = replace_pd.append([replace_pd]*10000)

Create a new column by each row of the regex dictionary

def replace_dict(df: DataFrame, column: str, replace_pd: pd.DataFrame)->DataFrame:
    """
    returns a dataframe with the required transformations 
    to have a list of iso2 codes, and its number of repeats, based on the column (e.g. address) selected
    """
    _df = (
        df.withColumn("words", col(column))
    )
    #For each row in the csv create a new column
    # it will contains the new value if the original
    # column contains a matching string. 
    i = 0
    cols = []
    #grouping by iso2 code
    grouped_df = replace_pd.groupby('iso2').agg({'keywords':lambda x: '(?im)' + '|'.join(x)}).reset_index()
    for index, row in grouped_df.iterrows():
        key = row.keywords
        value = row.iso2
        _cr = value
        _df = _df.withColumn(_cr, size(split(col("words"), f"({key})"))-1)
        cols.append(_cr)
        i += 1
    # Join the aux columns, removing the empty strings. 
    _df = _df.withColumn("iso2", array(*[when(col(x)>0,concat(lit(x),lit(":"),col(x))) for x in cols])).withColumn(
        "iso2", expr(r"filter( iso2, x->x NOT rlike '^\s*$')")
    )
    _df = _df.drop("words",*cols) #drop the aux columns.
    return _df

run the test

replace_dict(df,'address', replace_pd).show(truncate=False)

This gives you as a result:

+--------------------+----+------------+
|             address|name|        iso2|
+--------------------+----+------------+
|3030 Whispering P...|John|      [US:3]|
|Kalverstraat Amst...|Mary|[NL:1, US:1]|
|Kalverstraat Amst...| Lex|[NL:2, US:1]|
+--------------------+----+------------+

It should be faster than other alternatives (all transformations are narrow), but it depends on the size of your regex.csv file (as it creates lots of sparse columns).

CronosNull
  • 1,353
  • 15
  • 24
  • Many thanks. I continue testing @jxc's approach and then I start with yours. The regex.csv is quite large (all countries) – John Doe Aug 22 '20 at 03:41
  • `regex.csv` separator is all semicolon. Well noticed !! – John Doe Aug 22 '20 at 03:48
  • Due to technical problems I have to postpone the tests. I appreciate your help and hope to be able to start testing again as soon as possible – John Doe Aug 22 '20 at 05:39
  • It's been running for a while now but I don't see any job in Spark UI. Only 1 active driver and 1 dead executor. Regex.csv contains 164.059 lines – John Doe Aug 22 '20 at 13:17
  • Iterate over the pandas' data frame is very costly... if we combine this with the @jxc's idea of grouping the expressions we will get way better results. I'll test and come back :) – CronosNull Aug 22 '20 at 13:58
  • Ok, using https://gist.github.com/cronosnull/c6f243b0f2d717103b628e4b41342653 we get a way better result, but you may need to modify it to have the duplicates count if you need it (I guess you do). – CronosNull Aug 22 '20 at 14:10
  • Need to find an illegal character in my regex ...Python / Java issue?. I don't have this error in my script or @jxc – John Doe Aug 22 '20 at 14:26
  • Why are you replacing `\b` ? `keywords_wob = keywords.replace(r"\b", "")` – John Doe Aug 22 '20 at 14:38
  • It was because I had tokenized the string by any non-word character, but actually those steps are not necessary (they were part of an early attempt) I just updated the gist, let me know if you still have the problem with the regex: https://gist.github.com/cronosnull/c6f243b0f2d717103b628e4b41342653 – CronosNull Aug 22 '20 at 14:59
  • Finished without errors `Wall time: 6mins 38s` For all tests I added a repartition of 2000. Without repartition, I only had a limited number of executors and it was much slower. – John Doe Aug 22 '20 at 15:37
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/220269/discussion-between-cronosnull-and-john-doe). – CronosNull Aug 22 '20 at 15:43
  • 1
    I've accepted @jxc's answer because of the more extensive explanation and the more flexible method. It was a difficult choice because you also helped me very well and your solution is sometimes even a bit faster. Thank you again for the time and patience! – John Doe Aug 24 '20 at 05:50
  • Totally agree, I really like how he rewrite the alternative using list comprehension. – CronosNull Aug 24 '20 at 07:27
1

You'll want to broadcast df_regex to all nodes in the cluster, so each core can process the data in parallel.

df_regex_b = spark.sparkContext.broadcast(df_regex)

Update get_iso2 to use the broadcasted variable:

def get_iso2(x, df_regex_b): 
 
    iso2={}
    
    for j, row in df_regex_b.value.iterrows():
 
        regex = re.compile(row['keywords'],re.I|re.M)         
        matches = re.finditer(regex, x)
        
        for m in matches:
            iso2[row['iso2'] = iso2.get(row['iso2'], 0) + 1
            
    return [key for key, value in iso2.items() for _ in range(value)]

Define the UDF with a nested function:

def get_iso2_udf(mapping):
    def f(x):
        return get_iso2(x, mapping)
    return F.udf(f)
Machavity
  • 30,841
  • 27
  • 92
  • 100
Powers
  • 18,150
  • 10
  • 103
  • 108
  • Many thanks. I currently have a job running, but as soon as it is ready I'll give it a try.. Indeed I want to broadcast `df_regex` to all nodes in the cluster. I was under the impression that this was already happening. – John Doe Aug 12 '20 at 13:11
  • It's running. I've assumed that i have to change `mapping` to `df_regex_b` and `return F.udf(f)` to `return F.udf(f, T.ArrayType(T.StringType()))` – John Doe Aug 13 '20 at 07:30
  • Unfortunately it's not faster. – John Doe Aug 15 '20 at 11:06
  • It is slow because the data in each partition is marshalled from the JVM to a helper Python process and there a `for` loop is performed. Python loops aren't the fastest thing around, which is why one should aim to write Pandas UDFs. But even Pandas UDFs are slow compared to doing everything inside the JVM. – Hristo Iliev Aug 17 '20 at 15:49