0

I am trying to partition a customer sample set by region and marketplace using Spark SQL, but in the partition result, I would like to select a random 100,000 users that was returned.

Is there a way to use random() in partition by order by clause in Spark SQL? The code below using random() always returns error, but without random(), it works perfect. Thank you in advance for your answers!

customer_panel_s3_location = f"s3://my-bucket/region_id={region_id}/marketplace_id={marketplace_id}/"
customer_panel_table = spark.read.parquet(customer_panel_s3_location)
customer_panel_table.createOrReplaceTempView("customer_panel")
dataset_date = '2023-03-16'
customer_sample_size = 100000
partition_customers_by = 'region_id, marketplace_id'

# The code below returns an error 
customer_panel_df = spark.sql(f"""
    SELECT *
    FROM (
        SELECT *
        , row_number() over (partition by {partition_customers_by} order by random()) AS rn
        FROM
          customer_panel AS c
        WHERE
          c.target_date < CAST('{dataset_date}' AS DATE)
          AND c.target_date >= date_sub(CAST('{dataset_date}' AS DATE), 7)
        ) t
    WHERE t.rn <= bigint({customer_sample_size})
""")

# But after removing 'random()', it works
customer_panel_df = spark.sql(f"""
        SELECT *
        , row_number() over (partition by {partition_customers_by} order by {partition_customers_by}) AS rn
        FROM
          customer_panel AS c
        WHERE
          c.target_date < CAST('{dataset_date}' AS DATE)
          AND c.target_date >= date_sub(CAST('{dataset_date}' AS DATE), 7)
""")

print(f"Row count of {table_name}: {customer_panel_df.count():,}")
user1330974
  • 2,500
  • 5
  • 32
  • 60
  • 1
    Unfortunately, it is not possible to use `random()` function within the ORDER BY clause of a window function `row_number()` in Spark SQL. This is because `random()` generates a non-deterministic value, meaning that it can produce different results for the same input parameters. One potential solution to achieve the desired outcome is to use the `rand()` function instead of `random()`. The `rand()` function generates a deterministic value based on the seed value, which can be set using the `spark.sql.functions.rand(seed)` function. – Dipanjan Mallick Mar 17 '23 at 16:29
  • Thank you, @DipanjanMallick! Using `row_number() over (partition by {partition_customers_by} order by rand(420)) AS rn` works like a charm! I wish you wrote your response as an answer so that I can mark it as an answer. Thanks again! – user1330974 Mar 17 '23 at 17:12
  • 1
    I'm glad it worked for you. Adding the comment to answer as well ;) – Dipanjan Mallick Mar 17 '23 at 17:31

1 Answers1

1

Unfortunately, it is not possible to use random() function within the ORDER BY clause of a window function row_number() in Spark SQL. This is because random() generates a non-deterministic value, meaning that it can produce different results for the same input parameters.

One potential solution to achieve the desired outcome is to use the rand() function instead of random(). The rand() function generates a deterministic value based on the seed value, which can be set using the spark.sql.functions.rand(seed) function.

Below is the implementation of the same to select a random subset of customers from each partition:

from pyspark.sql.functions import rand

customer_panel_df = spark.sql(f"""
    SELECT *
    FROM (
        SELECT *,
            row_number() over (partition by {partition_customers_by} order by rand({seed})) AS rn
        FROM customer_panel AS c
        WHERE c.target_date < CAST('{dataset_date}' AS DATE)
          AND c.target_date >= date_sub(CAST('{dataset_date}' AS DATE), 7)
        ) t
    WHERE t.rn <= {customer_sample_size}
""")

Please note that the rand() function takes a seed value as an argument, which ensures that the function returns a deterministic value for each partition. You may choose any integer value per your use case as the seed.

Dipanjan Mallick
  • 1,636
  • 2
  • 8
  • 20
  • Thank you, @DipanjanMallick! Accepted the answer because it works. :) I also have posted another [pyspark related question](https://stackoverflow.com/q/75770629/1330974) in case you have some spare time to take a look. Thank you again and I'm learning something new about pyspark everyday. :) – user1330974 Mar 17 '23 at 17:44