I've read:
- Spark task runs on only one executor (and ones it points to)
- Spark Dataset cache is using only one executor
- spark streaming only executors on one machine is working
I'm trying to generate a test dataset with 10 billion rows by crossJoin
ing a seed dataframe with some other dataframes. My problem is that the last step of the process final_df.write.parquet()
only uses one worker/executor no matter how many there are.
This obviously doesn't scale to generate billions. Which is the problem.
E.g. in a 3 node cluster with 4 cores each, final_df
has 64 partitions, but only one executor writes one parquet file with all the records from that dataframe. I've also tried with 12 nodes, which produces dataframe with 1936 partitions. But same problem.
A few observations:
- I think 64 partitions come from 2
crossJoin()
. One node is master leaving 2 executors, each with 4 processors so it comes out to:2*2*4*4 = 64
. - If I uncomment lines to
coalesce()
it reduces number of partitions to 8. But only one executor writes one parquet file with all the records from that dataframe. - If I
repartition()
(nocoalesce()
), then I get 8 files, all executors are used and writing is distributed perfectly. BUT now the problem moves to repartitioning step, which is done by just one executor. Same problem in the end.
import os, math
import pyspark.sql.functions as F
from datetime import datetime as dt
def log(*args):
print(dt.now().isoformat() + ' ' + ' '.join([str(s) for s in args]))
log('spark.version', str(spark.version))
log("reading seed file")
spark.conf.set("fs.azure.account.key.myaccount.dfs.core.windows.net", "my key")
seed_df = spark.read.csv("abfss://fs1@myaccount.dfs.core.windows.net/seed.csv", header=True)
# NUM_RECORDS_TO_GENERATE = 10_000_000_000
NUM_RECORDS_TO_GENERATE = 2_000_000
NUM_RECORDS_TO_GENERATE = NUM_RECORDS_TO_GENERATE + (NUM_RECORDS_TO_GENERATE % seed_df.count())
array_len = int(math.sqrt(NUM_RECORDS_TO_GENERATE / seed_df.count()))
log("array_len: %s, NUM_RECORDS_TO_GENERATE: %s, seed_df.count(): %s" % (array_len, NUM_RECORDS_TO_GENERATE, seed_df.count()))
df1 = spark.createDataFrame(data=[[ [1] * array_len ]])
df2 = df1.withColumn('exploded', F.explode(df1['_1'])).drop('_1')
df3 = df2.crossJoin(df2) # contains array_len ^ 2 = NUM_RECORDS_TO_GENERATE / seed_df.count() records
newdf = df3.crossJoin(seed_df) # contains NUM_RECORDS_TO_GENERATE
final_df = newdf.withColumn('uniq_row_id', F.monotonically_increasing_id()).drop('exploded') # add unique id column
# log("repartitioning")
# final_df = final_df.repartition(int(final_df.rdd.getNumPartitions() / 2))
# log("coalesceing")
# final_df = final_df.coalesce(int(final_df.rdd.getNumPartitions() / 2))
log("final_df.rdd.getNumPartitions(): ", final_df.rdd.getNumPartitions())
log('writing parquet')
final_df.write.parquet("abfss://fs1@myaccount.dfs.core.windows.net/%s/parquet-%s" % (dt.now().isoformat(), NUM_RECORDS_TO_GENERATE))
log('wrote parquet.')
log('final_df.rdd.count():', final_df.rdd.count())
output
2020-12-05T00:27:51.933995 spark.version 3.0.1
2020-12-05T00:27:51.934079 reading seed file
2020-12-05T00:27:52.713461 array_len: 377, NUM_RECORDS_TO_GENERATE: 2000002, seed_df.count(): 14
2020-12-05T00:27:52.852547 final_df.rdd.getNumPartitions(): 64
2020-12-05T00:27:52.852749 writing parquet
2020-12-05T00:28:00.823663 wrote parquet.
2020-12-05T00:28:08.757957 final_df.rdd.count(): 1989806
coalesce
... same as above ...
2020-12-05T00:12:22.620791 coalesceing
2020-12-05T00:12:22.860093 final_df.rdd.getNumPartitions(): 32
2020-12-05T00:12:22.860249 writing parquet
2020-12-05T00:12:31.280416 wrote parquet.
2020-12-05T00:12:39.204093 final_df.rdd.count(): 1989806
repartition
... same as above ...
2020-12-05T00:23:40.155481 repartitioning
2020-12-05T00:23:44.702251 final_df.rdd.getNumPartitions(): 8
2020-12-05T00:23:44.702421 writing parquet
2020-12-05T00:23:50.478841 wrote parquet.
2020-12-05T00:23:52.174997 final_df.rdd.count(): 1989806
DAG Visualization of the stage that takes long time:
PS: Ignore the slight mismatch in NUM_RECORDS_TO_GENERATE
value and the actual number of records generated. It's probably a math problem in sqrt
and I don't care if it's off by a few millions.