0

I have a fairly involved process of creating a pyspark dataframe, converting it to a pandas dataframe, and outputting the result to a flat file. I am not sure at which point the error is introduced, so I'll describe the whole process.

Starting out I have a pyspark dataframe that contains pairwise similarity for sets of ids. It looks like this:

  +------+-------+-------------------+
  |  ID_A|   ID_B|  EuclideanDistance|
  +------+-------+-------------------+
  |     1|      1|                0.0|
  |     1|      2|0.13103884200454394|
  |     1|      3| 0.2176246463836219|
  |     1|      4|  0.280568636550471|
 ...

I'like to group it by ID_A, sort each group by EuclideanDistance, and only grab the top N pairs for each group. So first I do this:

from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col, row_number

window = Window.partitionBy(df['ID_A']).orderBy(df_sim['EuclideanDistance'])
result = (df.withColumn('row_num', row_number().over(window)))

I make sure ID_A = 1 is still in the "result" dataframe. Then I do this to limit each group to just 20 rows:

result1 = result.where(result.row_num<20)
result1.toPandas().to_csv("mytest.csv")

and ID_A = 1 is NOT in the resultant .csv file (although it's still there in result1). Is there a problem somewhere in this chain of conversions that could lead to a loss of data?

user3490622
  • 939
  • 2
  • 11
  • 30

2 Answers2

0

You are referencing 2 dataframes in the window of your solution. Not sure this is causing your error, but it's worth cleaning up. In any case, you don't need to reference a particular dataframe in a window definition. In any case, try

window = Window.partitionBy('ID_A').orderBy('EuclideanDistance')
David
  • 11,245
  • 3
  • 41
  • 46
0

As David mentioned, you reference a second dataframe "df_sim" in your window function.

I tested the following and it works on my machine (famous last words):

from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col, row_number
import pandas as pd

#simulate some data
df = pd.DataFrame({'ID_A': pd.np.arange(100)%5, 
    'ID_B': pd.np.repeat(pd.np.arange(20),5), 
    'EuclideanDistance': pd.np.random.rand(100)*5}
    )
#artificially set distance between point and self to 0
df['EuclideanDistance'][df['ID_A'] == df['ID_B']] = 0
df = spark.createDataFrame(df)
#end simulation
window = Window.partitionBy(df['ID_A']).orderBy(df['EuclideanDistance'])
output = df.select('*', row_number().over(window).alias('rank')).filter(col('rank') <= 10)
output.show(50)

The simulation code is there just to make this a self-contained example. You can of course use your actual dataframe and ignore the simulation when you test it. Hope that works!

Kevin Bishop
  • 241
  • 2
  • 3