I've noticed that there is sometimes a major performance difference in pyspark between expr('array(...many values here...)')
and array(...many values here...)
.
I'm including some profiling code and results below. It looks like the performance bottleneck is simply the invocation of pyspark.sql.functions.array
on a large number of values.
In particular, expr(array(...))
is often much faster than array(...)
, and this difference appears whether or not the constructed pyspark "Column" is applied to data.
Am I interpreting these performance results correctly? If so, why does expr(array(...))
vs array(...)
make such a big difference, and are there any better workarounds besides using expr
?
Testing code
This test case uses an artificial setup, but my actual use case is to perform a "melt" operation using techniques suggested here https://stackoverflow.com/a/55384727/20102838.
import pandas as pd
import pyspark.sql.functions as fx
import timeit
def code_to_profile(num_values_a, num_values_b, *, use_expr, apply_to_data):
if use_expr:
new_col = fx.array(*(fx.lit(i) for i in range(num_values_b)))
else:
new_col = fx.expr('array(' + ', '.join(str(i) for i in range(num_values_b)) + ')')
if apply_to_data:
df_test = spark.createDataFrame(pd.DataFrame({"a": list(range(num_values_a))}))
df_test = df_test.select("a", fx.explode(new_col).alias("b"))
# Perform an arbitray operation that requires processing the whole dataset
df_test.filter('a + b % 2 = 0').count()
def profile(*args, **kwargs):
num_iters, total_time = timeit.Timer(lambda: code_to_profile(*args, **kwargs)).autorange()
result = {
**{f'arg{arg_index}': args[arg_index] for arg_index in range(len(args))},
**kwargs,
"time_per_iteration": total_time / num_iters,
}
print(result)
return result
results = []
for apply_to_data in [True, False]:
for use_expr in [True, False]:
for num_values_a in [10]:
for num_values_b in [100, 1000, 10000]:
results.append(profile(
num_values_a=num_values_a,
num_values_b=num_values_b,
use_expr=use_expr,
apply_to_data=apply_to_data,
))
results = pd.DataFrame(results)
print(results)
Timing results
For context, I'm testing on Spark 3.2.1
num_values_a num_values_b use_expr apply_to_data time_per_iteration
0 10 100 True True 0.483189
1 10 1000 True True 3.489090
2 10 10000 True True 33.828352
3 10 100 False True 0.141966
4 10 1000 False True 0.177832
5 10 10000 False True 0.431849
6 10 100 True False 0.358343
7 10 1000 True False 3.395299
8 10 10000 True False 33.437838
9 10 100 False False 0.003791
10 10 1000 False False 0.007336
11 10 10000 False False 0.044786