1

I am doing fuzzy string match using MinHashLSH and approxSimilarityJoin on 500 billion pairs. It is too big for my current cluster setup, thus, I want to run it in batches

I want to partition the data, and run approxSimilarityJoin on each partition iteratively such that my cluster can handle it.

My current function is:

matched_df = model.stages[-1].approxSimilarityJoin(df1, df2, 1.0, "confidence")

But I am stuck on how to combine repartition, foreachPartition and approxSimilarityJoin.

I think it should be something like:

df1.repartition(100).foreachPartition(batch : model.stages[-1].approxSimilarityJoin(batch, df2, 1.0, "confidence"))

but I have the wrong syntax. What is the correct syntax for foreachPartition?

Béatrice Moissinac
  • 934
  • 2
  • 16
  • 41
  • What do you mean by this set being too big for you cluster? Are you getting out of memory exceptions? Running this iteratively is not going to make a performance difference. – Daniel Mar 09 '20 at 19:58
  • Yes, I am running out of memory because of how approxSimilarityJoin() works. Given two lists, it creates all pairs of each element of both lists and calculate a distance between both element of the pair. I don't need to calculate all pairs all at once, I can do that iteratively in batches. – Béatrice Moissinac Mar 09 '20 at 20:20

1 Answers1

1

I don't think you can achieve that using foreachParition. foreachParition takes a function that will be run on the executors and passes into it actual data, not a dataframe (it's an action that will trigger processing, like .collect or .write, not just a transformation definition). And if you wanted to recreate a dataframe from this passed in set, this also won't work as there is no spark context available on the worker itself. Conceptually dataframe is not a table but a lazy-evaluated definition of a transformation.

What you can do, however, is just split the df1 using Spark. If there is no key on which you can filter the DataFrame, you can just do it using randomSplit, e.g.:

df.randomSplit((0.1, 0.1, 0.1, 0.1, 0.1), seed=42)

The result of this operation is a list of DataFrames

[DataFrame[date: string, text: string],
 DataFrame[date: string, text: string],
 DataFrame[date: string, text: string],
 DataFrame[date: string, text: string],
 DataFrame[date: string, text: string]]

over which you can iterate using regular Python

dfs = df.randomSplit((0.1, 0.1, 0.1, 0.1, 0.1), seed=42)
for df in dfs:
    matched_df = model.stages[-1].approxSimilarityJoin(df, df2, 1.0, "confidence")
    do_something_with(matched_df)

To split your dataset this way into 100 parts, you can generate the weights tuple:

df.randomSplit(tuple([0.01 for x in range (100)]), seed=42)
Daniel
  • 1,132
  • 8
  • 12