1

I need to enrich my dataframe in PySpark-Sql with a language attribute, that basically tells the language of a paper title for each row. I need to filter out English papers only. I've tens of millions of papers, so I need to do it in parallel.

I have registered an UDF using a Python library called langdetect (https://pypi.org/project/langdetect/), after having installed the library on the cluster. I'm using the following code:

from langdetect import detect

def lang_detector(_s):
  try:
    lan = detect(_s)
  except:
    lan = 'null'
  return lan

detect2 = udf(lang_detector, StringType())

papers_abs_fos_en = papers_abs \
.join(papersFos_L1, "PaperId") \
.withColumn("Lang", detect2(col("PaperTitle"))) \
.filter("Lang =='en'") \
.select("PaperId", "Rank", "PaperTitle", "RefCount", "CitCount", "FoSList")

It works, but it takes forever even on ca 10M titles. I am not sure if this is due to langdetect, to UDFs or if I'm just doing something wrong, but I'd be grateful for any suggestion!

Thanks a lot! Paolo

Paolt
  • 23
  • 4
  • The UDF is the main issue here and I think there is not much you can do. Read this [post](https://stackoverflow.com/a/38297050/6664872) for more information. We once used langdetect for 160000 texts and it took around 45 minutes. – cronoik Apr 12 '19 at 23:15

1 Answers1

1

Thank you cronoik for confirming this. I ended up to a different solution that tooks 6+ minutes for 9,5M documents. Basically I make a set of all the words in the Brown data-set in NLTK, and distribute it to nodes as a broadcast variable. Then I computed for each document in the dataframe the fraction of words occurring in that set. If it is > 75% then I heuristically conclude it must be English. This is the code embedded into an UDF.

from nltk.corpus import brown
import re

bwn = set([x.lower() for x in brown.words()])
bc_brown = sc.broadcast(bwn)

def is_en(_s):
  tok = set(re.findall(r"\w+", _s.lower()))
  return len(tok & bc_brown.value) / len(tok)

isEn = udf(is_en)

papers_abs_fos_en = papers_abs \
.join(papersFos_L1, "PaperId") \
.filter(isEn(col("PaperTitle")) > 0.75) \
.select("PaperId", "Rank", "PaperTitle", "RefCount", "CitCount", "FoSList")
Paolt
  • 23
  • 4