1

suppose I have two dataframes:df, grp_df which is df.groupby(["region"]).set_list()

df

 user        item        region
 james        I1         Canada
  amy         I5         Germany
 chris        I33        U.S.

grp_df

  region          Item_lst
  Canada         [I1, I2,... In]
  Germany        [ I3, I5, ... In]
  U.S.           [I33, I22, I11]
  ...             ...

For each user I want to select new item that is not bought before within same region and add it to new pyspark dataframe.

new_df

user      item        region
james       I2        Canada
amy         I3        Germany
chris       I22        U.S.
                 

What is most efficient way to do this in pyspark?

My Approach:

df = df.join(grp_df, ["region"], "left")

def get_neg_sample(item, item_lst):
    return np.random.choice(item_lst.remove(item))

get_neg_sample_udf = udf(get_neg_sample, IntegerType())

df.withColumn("neg_item", get_neg_sample_udf("item", "item_lst))
haneulkim
  • 4,406
  • 9
  • 38
  • 80

2 Answers2

1

The function you are looking for is array_contains , you can make use of this with join to get your desired result

val newDf = df.join(gdf,df.col("country")===gdf.col("country") && !array_contains(gdf.col("item_list"),df.col("item")))
0

For spark 2.4+, you can use shuffle and array_remove.

new_df = df.join(grp_df, 'region').select('region', 'user', F.expr('shuffle(array_remove(Item_lst, item))[0]').alias('item'))
new_df.show(truncate=False)
过过招
  • 3,722
  • 2
  • 4
  • 11