I have a Spark dataframe, consisting of let's say 10K ID. Each row of the Dataframe consists on pair of ID and their euclidean distance (each ID represent a Document. The dataframe looks like this:
ID_source | ID_destination | Euclidean Distance
1 1 0.0
1 2 1.3777
1 3 1.38
. . .
. . .
. . .
2 1 0.5555
2 2 0.0
. . .
. . .
. . .
For each ID_source i want to have the top 10 ID_destination according to Euclidean distance. Well in Spark i managed to do it well with the following lines of code. The matrix as described above is named similarity_join.
window = Window.orderBy(col("id_source")).partitionBy(col("id_source")).orderBy(col("EuclideanDistance").asc())
df_filtered = similarity_join.select('*', rank().over(window).alias('rank')).orderBy(col("id_source").asc()).filter((col('rank') <= 10))
The issue happens when i want to write the result into csv.
date_now = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
model_filename = "description_dataframe_"+date_now
df_filtered.write.csv(DESCRIPTION_MODEL_PATH+model_filename)
I have missing IDs in the final csv (Obtained after compacting outputs in hadoop). When i am using a low sample (10-500) i have all the IDs, but when using a 5000 ID sample, i have a lot of missing IDs in the csv. It looks like some partitions are not written on the disk. Even when i use coalesce(1), i have the same issue. Any help please. I am using 5 machines (1 master, 4 workers). And i intend to go up to 10 millions ID so i will have 10 millions window (partitions)