-1

I have a Spark dataframe, consisting of let's say 10K ID. Each row of the Dataframe consists on pair of ID and their euclidean distance (each ID represent a Document. The dataframe looks like this:

ID_source | ID_destination | Euclidean Distance
1           1                0.0
1           2                1.3777
1           3                1.38
.           .                .
.           .                .
.           .                .
2           1                0.5555
2           2                0.0
.           .                .
.           .                .
.           .                .

For each ID_source i want to have the top 10 ID_destination according to Euclidean distance. Well in Spark i managed to do it well with the following lines of code. The matrix as described above is named similarity_join.

window = Window.orderBy(col("id_source")).partitionBy(col("id_source")).orderBy(col("EuclideanDistance").asc())
df_filtered = similarity_join.select('*', rank().over(window).alias('rank')).orderBy(col("id_source").asc()).filter((col('rank') <= 10))

The issue happens when i want to write the result into csv.

date_now =  datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
model_filename = "description_dataframe_"+date_now 
df_filtered.write.csv(DESCRIPTION_MODEL_PATH+model_filename)

I have missing IDs in the final csv (Obtained after compacting outputs in hadoop). When i am using a low sample (10-500) i have all the IDs, but when using a 5000 ID sample, i have a lot of missing IDs in the csv. It looks like some partitions are not written on the disk. Even when i use coalesce(1), i have the same issue. Any help please. I am using 5 machines (1 master, 4 workers). And i intend to go up to 10 millions ID so i will have 10 millions window (partitions)

Queezzy
  • 1
  • 2
  • I think first thing you should do is sort out your `orderBy`s in `Window` definition. – mazaneicha Feb 22 '20 at 15:37
  • Thank you @mazaneicha, but I believe in Spark , sort is an alias of orderBy. https://stackoverflow.com/questions/40603202/what-is-the-difference-between-sort-and-orderby-functions-in-spark – Queezzy Feb 23 '20 at 14:49

1 Answers1

0

Finally the issue was not on the partionning neither on the writing part. But instead it was due to the algorithm (Bucketed Random LSH) which was building the dataframe (similarity_join); That algorithm was non deterministic so, the number of results was different depending on the random chosen parameters.

Queezzy
  • 1
  • 2