0

I've noticed that there is sometimes a major performance difference in pyspark between expr('array(...many values here...)') and array(...many values here...).

I'm including some profiling code and results below. It looks like the performance bottleneck is simply the invocation of pyspark.sql.functions.array on a large number of values.

In particular, expr(array(...)) is often much faster than array(...), and this difference appears whether or not the constructed pyspark "Column" is applied to data.

Am I interpreting these performance results correctly? If so, why does expr(array(...)) vs array(...) make such a big difference, and are there any better workarounds besides using expr?

Testing code

This test case uses an artificial setup, but my actual use case is to perform a "melt" operation using techniques suggested here https://stackoverflow.com/a/55384727/20102838.

import pandas as pd
import pyspark.sql.functions as fx
import timeit

def code_to_profile(num_values_a, num_values_b, *, use_expr, apply_to_data):
    if use_expr:
        new_col = fx.array(*(fx.lit(i) for i in range(num_values_b)))
    else:
        new_col = fx.expr('array(' + ', '.join(str(i) for i in range(num_values_b)) + ')')

    if apply_to_data:
        df_test = spark.createDataFrame(pd.DataFrame({"a": list(range(num_values_a))}))
        df_test = df_test.select("a", fx.explode(new_col).alias("b"))
        # Perform an arbitray operation that requires processing the whole dataset
        df_test.filter('a + b % 2 = 0').count()
    
def profile(*args, **kwargs):
    num_iters, total_time = timeit.Timer(lambda: code_to_profile(*args, **kwargs)).autorange()
    result = {
        **{f'arg{arg_index}': args[arg_index] for arg_index in range(len(args))},
        **kwargs,
        "time_per_iteration": total_time / num_iters,
    }
    print(result)
    return result

results = []

for apply_to_data in [True, False]:
    for use_expr in [True, False]:
        for num_values_a in [10]:
            for num_values_b in [100, 1000, 10000]:
                results.append(profile(
                    num_values_a=num_values_a,
                    num_values_b=num_values_b,
                    use_expr=use_expr,
                    apply_to_data=apply_to_data,
                ))
                
results = pd.DataFrame(results)

print(results)

Timing results

For context, I'm testing on Spark 3.2.1

    num_values_a  num_values_b  use_expr  apply_to_data  time_per_iteration
0             10           100      True           True            0.483189
1             10          1000      True           True            3.489090
2             10         10000      True           True           33.828352
3             10           100     False           True            0.141966
4             10          1000     False           True            0.177832
5             10         10000     False           True            0.431849
6             10           100      True          False            0.358343
7             10          1000      True          False            3.395299
8             10         10000      True          False           33.437838
9             10           100     False          False            0.003791
10            10          1000     False          False            0.007336
11            10         10000     False          False            0.044786
Andrew L
  • 1
  • 1

0 Answers0