1

I have a data dict (keys represent items (1,2,3..are ids of the items) and their values('712907','742068') refer to users). I convert it into a pandas dataframe:

data_dict = {0: ['712907','742068','326136','667386'],
             1: ['667386','742068','742068'],
             2: ['326136', '663056', '742068','742068'],
            3: ['326136', '663056', '742068'],4: ['326116','742068','663056', '742068'],5: ['326136','326136','663056', '742068']}
df= pd.DataFrame.from_dict(data_dict, orient='index')

I group the items in the dataframe based on the users ('712907','742068','326136'..), see figure below.

dframe = pd.get_dummies(df.stack()).sum(level=0)
sv = sparse.csr_matrix(dframe.as_matrix())

enter image description here

Note that the above dataframe (dframe ) is just a small example, the actual dframe size is (309235 x 81566). Therefore, I want to use spark to compute the cosine similarities between rows (1,2,3...) in sv (sparse matrix). Here what I have achieved so far:

from pyspark.sql import SQLContext
from pyspark.sql.types import Row
sc = pyspark.SparkContext(appName="cosinesim")
sqlContext = SQLContext(sc)
sv_rdd = sc.parallelize(sv.toarray())

Using the example, i convert rdd to a dataframe:

def f(x):
    d = {}
    for i in range(len(x)):
        d[str(i)] = int(x[i])
    return d
dfspark = sv_rdd.map(lambda x: Row(**f(x))).toDF()

Following this example, i add a new 'id' column:

row_with_index = Row(*["id"] + dfspark.columns)

def make_row(columns):
    def _make_row(row, uid):
        row_dict = row.asDict()
        return row_with_index(*[uid] + [row_dict.get(c) for c in columns])
    return _make_row

f = make_row(dfspark.columns)

dfidx = (dfspark.rdd
    .zipWithIndex()
    .map(lambda x: f(*x))
    .toDF(StructType([StructField("id", LongType(), False)] + dfspark.schema.fields)))

Finally, compute similarities between rows by transposing the matrix:

pred = IndexedRowMatrix(dfidx.rdd.map(lambda row: IndexedRow(row.id,row[1:])))
pred1 = pred.toBlockMatrix().transpose().toIndexedRowMatrix()
pred_sims = pred1.columnSimilarities()

How can I obtain top-k ids for each items 0,1,2,3,4 based on the cosine similarities(pred_sims)? I convert the CoordinateMatrix into a dataframe, but unsure how to access the top-k items for each ids..

columns = ['from', 'to', 'sim']
vals = pred_sims.entries.map(lambda e: (e.i, e.j, e.value)).collect()
dfsim = sqlContext.createDataFrame(vals, columns)
dfsim.show()

enter image description here

from pyspark.sql.functions import col, desc
for i in range(m):
    target_id = int(dataset_u[i])
    dfFrom = dfsim.where((col("from") == target_id))
.....
kitchenprinzessin
  • 1,023
  • 3
  • 14
  • 30
  • If the issue you're trying so solve is actually just picking the top-k items per id, the first part of the question seems irrelevant - can you just start with dfsim and ask from there? Or have I missed something?... – etov Jan 10 '18 at 10:05
  • the first part just shows how i generate a large sparse matrix (csr_matrix) and then convert it into rdd to calculate cosine similarities between rows. 'pred_sims' is upper triangle of a coordinate matrix. i want to get the top-k items for each items based on their similarities.. – kitchenprinzessin Jan 11 '18 at 02:22

1 Answers1

1

You could use a window function to sort by similarity per item, then use row_count():

from pyspark.sql.window import Window

window = Window.partitionBy(dfsim['from']).orderBy(dfsim['sim'].desc())

dfsim.select('*', func.row_number().over(window).alias('row_number')) \
  .filter(func.col('row_number') <= 3) \
  .show()
+----+---+------------------+----------+
|from| to|               sim|row_number|
+----+---+------------------+----------+
|   0|  1|0.6708203932499369|         1|
|   0|  5|0.6123724356957946|         2|
|   4|  5|0.5000000000000001|         1|
|   1|  4|0.7302967433402215|         1|
|   1|  2|0.7302967433402215|         2|
|   2|  3|0.9428090415820636|         1|
|   2|  4|0.8333333333333336|         2|
|   3|  5|0.9428090415820636|         1|
|   3|  4|0.7071067811865477|         2|
+----+---+------------------+----------+

Join with your original data if you need to relate row selection back to your original data frame.

etov
  • 2,972
  • 2
  • 22
  • 36