I don't expect the following code to benefit from the Dataframe Catalyst query optimizer, but I do expect there to be a performance difference between the Scala/native performance of string split and the Python performance. However, my performance results are disappointing, as the native Dataframe API appears to be slower.
My test is as follows:
def get_df(spark):
return spark.read.load('s3://BUCKET/test-data.csv',
format='com.databricks.spark.csv',
inferSchema='true', header='true')
def upsize_df(df, exponent=10):
for i in range(exponent):
df = df.unionAll(df)
return df
def rdd_ver(df):
df = df.rdd.map(lambda row: row + tuple(
row.order_id.split('-'))).toDF(
df.columns + ['psrid', 'eoid'])
df.show()
def df_ver(df):
split_col = pyspark.sql.functions.split(df['order_id'], '-')
df = df.withColumn('psrid', split_col.getItem(0))
df = df.withColumn('eoid', split_col.getItem(1))
df.show()
Cluster/YARN details:
- Spark 2.0 on AWS
- 6 executors
- 2 cores per executor
Test procedure:
- Create new PySpark shell in IPython
- Get dataframe of toy-sized dataset (1000 rows)
repartition
Dataframe to 12 partitionsupsize_df
withunionAll
, to get to 1 million rows- run
df.count()
to force execution ofrepartition
andupsize_df
- finally, run
%time rdd_ver(df)
or%time df_ver(df)
My results so far have been surprising and disappointing. Here is a sampling of the results I've received, in seconds:
rdd_ver
: 14.5, 22.4, 13.1, 24.7, 17.8 --- mean: 18.5
df_ver
: 30.5, 26.9, 32.0, 29.7, 39.8 --- mean: 31.8
I'd appreciate any thoughts, either on the test procedure itself (the operation itself is derived from some production code) or on the poor performance of the Dataframe version.
EDIT:
The Spark Web UI indicates that my jobs are not actually being scheduled/submitted very quickly. I am not sure how reliable the Web UI's information is, but the 'Submitted' time displayed on the active job in this screenshot is over a minute after I initially hit 'enter' in the active Pyspark session to kick off %time df_ver(df)
Furthermore, it seems that none of the 6 executors are doing anything. They've all apparently been killed by Spark since I wasn't actively doing anything in the Spark session for more than a few seconds. It seems like the entire job is being run by the driver node, but I can't confirm that since I don't know the Spark Web UI well enough.