0

I have a table in Hadoop which contains 7 billion strings which can themselves contain anything. I need to remove every name from the column containing the strings. An example string would be 'John went to the park' and I'd need to remove 'John' from that, ideally just replacing with '[name]'.

In the case of 'John and Mary went to market', the output would be '[NAME] and [NAME] went to market'.

To support this I have an ordered list of the most frequently occurring 20k names.

I have access to Hue (Hive, Impala) and Zeppelin (Spark, Python & libraries) to execute this.

I've tried this in the DB, but being unable to update columns or iterate over a variable made it a non-starter, so using Python and PySpark seems to be the best option especially considering the number of calculations (20k names * 7bil input strings)

#nameList contains ['John','Emma',etc]
def removeNames(line, nameList):
    str_line= line[0]
    for name in nameList:
        rx = f"(^| |[[:^alpha:]])({name})( |$|[[:^alpha:]])"
        str_line = re.sub(rx,'[NAME]', str_line)
    str_line= [str_line]
    return tuple(str_line)

df = session.sql("select free_text from table")
rdd = df.rdd.map(lambda line: removeNames(line, nameList))
rdd.toDF().show()

The code is executing, but it's taking an hour and a half even if I limit the input text to 1000 lines (which is nothing for Spark), and the lines aren't actually being replaced in the final output.

What I'm wondering is: Why isn't map actually updating the lines of the RDD, and how could I make this more efficient so it executes in a reasonable amount of time?

This is my first time posting so if there's essential info missing, I'll fill in as much as I can.

Thank you!

Tundra123
  • 1
  • 1
  • You can try `pyspark.sql.functions.regexp_replace()` instead of using a `udf`: [Spark functions vs UDF performance?](https://stackoverflow.com/questions/38296609/spark-functions-vs-udf-performance) – pault May 29 '19 at 22:03
  • Hi pault, thanks for commenting. I tried using regex_replace in the first instance and it did have the required outcome, but I can't see how to use map with it and parrallelise the operation. I was iterating through the name list, running regex_replace on the whole dataset, then moving on to the next name. This was 20k iterations which lead to Stack overflow due to lineage. Checkpointing and hacking through the lineage worked, but the performance was still too slow. – Tundra123 May 30 '19 at 04:52
  • The regex functionality is a heavy operation, since both solutions `func/udf` and `regexp_replace` were both pretty slow maybe you could try to increase the job parallelization. To answer that more precisely it would be better to have some characteristics of the cluster that the job is being executed. How many nodes does your cluster currently have and what are the available resources on each node? – abiratsis May 30 '19 at 13:40
  • You could try creating a pattern that is an OR of all of the values in your list. Something like `rx = "(^| |[[:^alpha:]])({name})( |$|[[:^alpha:]])".format(name="|".join(nameList))` and then doing one call to `regexp_replace` instead of iterating. – pault May 30 '19 at 16:06

1 Answers1

0

In case you're still curious about this, by using the udf (your removeNames function) Spark is serializing all of your data to the master node, essentially defeating your usage of Spark to do this operation in a distributed fashion. As the method suggested in the comments, if you go with the regexp_replace() method, Spark will be able to keep all of the data on the distributed nodes, keeping everything distributed and improving performance.

Adam
  • 21
  • 4