I am running a Spark App over a Kubernetes cluster (at the moment, I do not have permission to resize or rescale this cluster). I think I have mostly 3 important issues that are impacting my performance and I would like to ask to this community if there is something that I could do to make it better. I am going to order them according to the priorities that I set myself.
1- I do not have permissions on K8s cluster, and I think the configuration that I set for my Spark App is not taking effect. It is just a guess because I do not have much experience on K8s. So, I have configured my Spark App like this:
return SparkSession.builder \
.config("spark.executor.instances", "5") \
.config("spark.executor.cores", "4") \
.config("spark.driver.cores", "2") \
.config("spark.kubernetes.executor.request.cores", 3.5) \
.config("spark.executor.memory", "10g") \
.config("spark.driver.memory", "6g") \
.config("spark.sql.broadcastTimeout", "600") \
.config("spark.memory.fraction", 0.2) \
.config("spark.kubernetes.memoryOverheadFactor", 0.2) \
But when the pod is created, I get this from LOG:
'containers': [{'args': ['/bin/bash',
'-c',
'spark-submit --master '
'"k8s://https://${KUBERNETES_SERVICE_HOST}:${KUBERNETES_PORT_443_TCP_PORT}" '
'--deploy-mode client --name "${POD_NAME}" '
'--conf "spark.driver.host=${POD_IP}" '
'--conf spark.driver.memory=40g --conf '
'spark.driver.memoryOverhead=0.4 --conf '
'spark.eventLog.dir=s3a://*****-spark/history/ireland '
'--conf spark.eventLog.enabled=true --conf '
Driver's memory is 40gb instead of 6gb, Driver's memoryOverhead is 0.4 instead pf 0.2. So, when I see this I get confused and I am not sure if the number of cores/executor and executor's memory is applied like I configured.
2- The last task of this Spark App writes a Dataframe into a Hive Table (format parquet) on a S3 Bucket.
results.withColumn("make", F.col("name_make_dict")).filter(F.col("name_mod1_dict").isNotNull()) \
.select(*json.loads(parser.get("config", "formatMatched"))) \
.withColumnRenamed("name_mod1_dict", "name_model_dict") \
.repartition(30, "inserted_at_date", "brand", "make") \
.write.insertInto(f"{schema}.cars_features")
And I have checked the LOG:
[2022-04-16 10:32:23,040] {pod_launcher.py:156} INFO - b'22/04/16 10:32:23 INFO CodeGenerator: Code generated in 35.851449 ms\n'
[2022-04-16 10:32:23,097] {pod_launcher.py:156} INFO - b'22/04/16 10:32:23 INFO SparkContext: Starting job: insertInto at NativeMethodAccessorImpl.java:0\n'
[2022-04-16 10:32:23,100] {pod_launcher.py:156} INFO - b'22/04/16 10:32:23 INFO DAGScheduler: Registering RDD 146 (insertInto at NativeMethodAccessorImpl.java:0) as input to shuffle 9\n'
[2022-04-16 10:32:23,100] {pod_launcher.py:156} INFO - b'22/04/16 10:32:23 INFO DAGScheduler: Registering RDD 149 (insertInto at NativeMethodAccessorImpl.java:0) as input to shuffle 10\n'
[2022-04-16 10:32:23,100] {pod_launcher.py:156} INFO - b'22/04/16 10:32:23 INFO DAGScheduler: Got job 18 (insertInto at NativeMethodAccessorImpl.java:0) with 30 output partitions\n'
and it finishes very fast:
[2022-04-16 10:33:02,044] {pod_launcher.py:156} INFO - b'22/04/16 10:33:02 INFO TaskSchedulerImpl: Removed TaskSet 34.0, whose tasks have all completed, from pool \n'
[2022-04-16 10:33:02,048] {pod_launcher.py:156} INFO - b'22/04/16 10:33:02 INFO DAGScheduler: ResultStage 34 (insertInto at NativeMethodAccessorImpl.java:0) finished in 26.634 s\n'
[2022-04-16 10:33:02,048] {pod_launcher.py:156} INFO - b'22/04/16 10:33:02 INFO DAGScheduler: Job 18 finished: insertInto at NativeMethodAccessorImpl.java:0, took 38.948620 s\n'
but then it starts like this and takes like 10 minutes to write files into S3:
[2022-04-16 10:33:05,305] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:33:10,321] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:33:15,337] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:33:20,354] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:33:25,370] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:33:30,383] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:33:35,399] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:33:40,416] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:33:45,432] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:33:50,450] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:33:55,466] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:34:00,482] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:34:05,498] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:34:10,513] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:34:15,527] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:34:20,541] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:34:25,559] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:34:30,578] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:34:35,593] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:34:40,614] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:34:45,628] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:34:50,645] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:34:55,662] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:35:00,679] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:35:05,690] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:35:10,705] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:35:15,722] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:35:20,740] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:35:25,757] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:35:30,775] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:35:35,788] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:35:40,799] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:35:45,817] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:35:50,834] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:35:55,852] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:36:00,867] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:36:05,889] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:36:10,904] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:36:15,913] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:36:20,926] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:36:25,941] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:36:30,957] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:36:33,949] {pod_launcher.py:156} INFO - b'22/04/16 10:36:33 INFO FileFormatWriter: Write Job eefcf8a0-9ba9-4752-a51e-f4bc27e5ffcc committed.\n'
[2022-04-16 10:36:33,954] {pod_launcher.py:156} INFO - b'22/04/16 10:36:33 INFO FileFormatWriter: Finished processing stats for write job eefcf8a0-9ba9-4752-a51e-f4bc27e5ffcc.\n'
[2022-04-16 10:36:35,972] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:36:40,991] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:36:46,006] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:36:51,022] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:36:56,038] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:37:01,057] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:37:06,072] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:37:10,856] {pod_launcher.py:156} INFO - b"22/04/16 10:37:10 INFO FileUtils: Creating directory if it doesn't exist: s3a://*********/local/odyn/cars_pricing_cvt/cars_features/inserted_at_date=2022-04-16/brand=autovit/make=JAGUAR\n"
[2022-04-16 10:37:11,089] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:37:12,166] {pod_launcher.py:156} INFO - b'22/04/16 10:37:12 INFO SessionState: Could not get hdfsEncryptionShim, it is only applicable to hdfs filesystem.\n'
[2022-04-16 10:37:12,655] {pod_launcher.py:156} INFO - b'22/04/16 10:37:12 INFO Hive: Renaming src: s3a://*********/local/odyn/cars_pricing_cvt/cars_features/.hive-staging_hive_2022-04-16_10-26-50_235_365683970089316725-1/-ext-10000/inserted_at_date=2022-04-16/brand=autovit/make=JAGUAR/part-00013-87f53444-2a15-4a4c-b7bc-0b3304ba9bb7.c000, dest: s3a://*********/local/odyn/cars_pricing_cvt/cars_features/inserted_at_date=2022-04-16/brand=autovit/make=JAGUAR/part-00013-87f53444-2a15-4a4c-b7bc-0b3304ba9bb7.c000, Status:true\n'
[2022-04-16 10:37:12,967] {pod_launcher.py:156} INFO - b'22/04/16 10:37:12 INFO Hive: New loading path = s3a://*********/local/odyn/cars_pricing_cvt/cars_features/.hive-staging_hive_2022-04-16_10-26-50_235_365683970089316725-1/-ext-10000/inserted_at_date=2022-04-16/brand=autovit/make=JAGUAR with partSpec {inserted_at_date=2022-04-16, brand=autovit, make=JAGUAR}\n'
[2022-04-16 10:37:13,045] {pod_launcher.py:156} INFO - b"22/04/16 10:37:13 INFO FileUtils: Creating directory if it doesn't exist: s3a://*********/local/odyn/cars_pricing_cvt/cars_features/inserted_at_date=2022-04-16/brand=gratka/make=LANCIA\n"
[2022-04-16 10:37:14,065] {pod_launcher.py:156} INFO - b'22/04/16 10:37:14 INFO SessionState: Could not get hdfsEncryptionShim, it is only applicable to hdfs filesystem.\n'
[2022-04-16 10:37:14,576] {pod_launcher.py:156} INFO - b'22/04/16 10:37:14 INFO Hive: Renaming src: s3a://*********/local/odyn/cars_pricing_cvt/cars_features/.hive-staging_hive_2022-04-16_10-26-50_235_365683970089316725-1/-ext-10000/inserted_at_date=2022-04-16/brand=gratka/make=LANCIA/part-00009-87f53444-2a15-4a4c-b7bc-0b3304ba9bb7.c000, dest: s3a://*********/local/odyn/cars_pricing_cvt/cars_features/inserted_at_date=2022-04-16/brand=gratka/make=LANCIA/part-00009-87f53444-2a15-4a4c-b7bc-0b3304ba9bb7.c000, Status:true\n'
[2022-04-16 10:37:14,803] {pod_launcher.py:156} INFO - b'22/04/16 10:37:14 INFO Hive: New loading path = s3a://*********/local/odyn/cars_pricing_cvt/cars_features/.hive-staging_hive_2022-04-16_10-26-50_235_365683970089316725-1/-ext-10000/inserted_at_date=2022-04-16/brand=gratka/make=LANCIA with partSpec {inserted_at_date=2022-04-16, brand=gratka, make=LANCIA}\n'
[2022-04-16 10:37:14,870] {pod_launcher.py:156} INFO - b"22/04/16 10:37:14 INFO FileUtils: Creating directory if it doesn't exist: s3a://*********/local/odyn/cars_pricing_cvt/cars_features/inserted_at_date=2022-04-16/brand=otomoto/make=MG\n"
[2022-04-16 10:37:15,808] {pod_launcher.py:156} INFO - b'22/04/16 10:37:15 INFO SessionState: Could not get hdfsEncryptionShim, it is only applicable to hdfs filesystem.\n'
[2022-04-16 10:37:16,106] {base_job.py:197} DEBUG - [heartbeat]
I have tried with repartition(30, ...) as you can see above and with coalesce(1) but the write speed is the same. I do not know why this is happening and if it is an expected behaviour. I have checked this: Extremely slow S3 write times from EMR/ Spark Spark s3 write (s3 vs s3a connectors) Spark Write to S3 Storage Option
3- When I am reading some tables from Hive on S3 like this (for one day is 40.4 MB->2467 objects):
sparkSession \
.sql(
f"""select * from {schema}.f_car_info where inserted_at_date = to_date('{date}', "yyyy-MM-dd")""") \
.select(F.col("id_ad"), F.col("name_make_ad"), F.col("name_make_dict"), F.col("name_model_ad"),
F.col("name_version_ad"), F.col("name_generation_ad"), F.col("prod_year_ad"), F.col("brand"))
I get this from LOG:
[2022-04-16 10:25:38,271] {pod_launcher.py:156} INFO - b'22/04/16 10:25:38 INFO InMemoryFileIndex: Listing leaf files and directories in parallel under 352 paths. The first several paths are: s3a://*********/local/odyn/cars_pricing_cvt/f_car_info/inserted_at_date=2022-04-16/brand=autovit/name_make_dict=ABARTH, s3a://*********/local/odyn/cars_pricing_cvt/f_car_info/inserted_at_date=2022-04-16/brand=autovit/name_make_dict=ACURA, s3a://*********/local/odyn/cars_pricing_cvt/f_car_info/inserted_at_date=2022-04-16/brand=autovit/name_make_dict=ALFA ROMEO, s3a://*********/local/odyn/cars_pricing_cvt/f_car_info/inserted_at_date=2022-04-16/brand=autovit/name_make_dict=ASTON MARTIN, s3a://*********/local/odyn/cars_pricing_cvt/f_car_info/inserted_at_date=2022-04-16/brand=autovit/name_make_dict=AUDI, s3a://*********/local/odyn/cars_pricing_cvt/f_car_info/inserted_at_date=2022-04-16/brand=autovit/name_make_dict=AUSTIN, s3a://*********/local/odyn/cars_pricing_cvt/f_car_info/inserted_at_date=2022-04-16/brand=autovit/name_make_dict=BENTLEY, s3a://*********/local/odyn/cars_pricing_cvt/f_car_info/inserted_at_date=2022-04-16/brand=autovit/name_make_dict=BMW, s3a://*********/local/odyn/cars_pricing_cvt/f_car_info/inserted_at_date=2022-04-16/brand=autovit/name_make_dict=BUICK, s3a://*********/local/odyn/cars_pricing_cvt/f_car_info/inserted_at_date=2022-04-16/brand=autovit/name_make_dict=CADILLAC.\n'
[2022-04-16 10:25:38,564] {pod_launcher.py:156} INFO - b'22/04/16 10:25:38 INFO SparkContext: Starting job: persist at NativeMethodAccessorImpl.java:0\n'
[2022-04-16 10:25:38,584] {pod_launcher.py:156} INFO - b'22/04/16 10:25:38 INFO DAGScheduler: Got job 0 (persist at NativeMethodAccessorImpl.java:0) with 352 output partitions\n'
[2022-04-16 10:25:38,587] {pod_launcher.py:156} INFO - b'22/04/16 10:25:38 INFO DAGScheduler: Final stage: ResultStage 0 (persist at NativeMethodAccessorImpl.java:0)\n'
[2022-04-16 10:25:38,587] {pod_launcher.py:156} INFO - b'22/04/16 10:25:38 INFO DAGScheduler: Parents of final stage: List()\n'
[2022-04-16 10:25:38,587] {pod_launcher.py:156} INFO - b'22/04/16 10:25:38 INFO DAGScheduler: Missing parents: List()\n'
[2022-04-16 10:25:38,593] {pod_launcher.py:156} INFO - b'22/04/16 10:25:38 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[2] at persist at NativeMethodAccessorImpl.java:0), which has no missing parents\n'
[2022-04-16 10:25:38,689] {pod_launcher.py:156} INFO - b'22/04/16 10:25:38 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 88.3 KB, free 7.7 GB)\n'
[2022-04-16 10:25:38,721] {pod_launcher.py:156} INFO - b'22/04/16 10:25:38 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 31.9 KB, free 7.7 GB)\n'
[2022-04-16 10:25:38,724] {pod_launcher.py:156} INFO - b'22/04/16 10:25:38 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 100.126.80.1:34339 (size: 31.9 KB, free: 7.7 GB)\n'
[2022-04-16 10:25:38,727] {pod_launcher.py:156} INFO - b'22/04/16 10:25:38 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1184\n'
[2022-04-16 10:25:38,749] {pod_launcher.py:156} INFO - b'22/04/16 10:25:38 INFO DAGScheduler: Submitting 352 missing tasks from ResultStage 0 (MapPartitionsRDD[2] at persist at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14))\n'
[2022-04-16 10:25:38,753] {pod_launcher.py:156} INFO - b'22/04/16 10:25:38 INFO TaskSchedulerImpl: Adding task set 0.0 with 352 tasks\n'
[2022-04-16 10:25:38,794] {pod_launcher.py:156} INFO - b'22/04/16 10:25:38 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 100.126.159.37, executor 1, partition 0, PROCESS_LOCAL, 7494 bytes)\n'
[2022-04-16 10:25:45,006] {pod_launcher.py:156} INFO - b'22/04/16 10:25:45 INFO TaskSetManager: Finished task 350.0 in stage 0.0 (TID 350) in 107 ms on 100.126.111.179 (executor 5) (352/352)\n'
[2022-04-16 10:25:45,009] {pod_launcher.py:156} INFO - b'22/04/16 10:25:45 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool \n'
[2022-04-16 10:25:45,009] {pod_launcher.py:156} INFO - b'22/04/16 10:25:45 INFO DAGScheduler: ResultStage 0 (persist at NativeMethodAccessorImpl.java:0) finished in 6.371 s\n'
[2022-04-16 10:25:45,015] {pod_launcher.py:156} INFO - b'22/04/16 10:25:45 INFO DAGScheduler: Job 0 finished: persist at NativeMethodAccessorImpl.java:0, took 6.451078 s\n'
[2022-04-16 10:25:45,121] {pod_launcher.py:156} INFO - b'22/04/16 10:25:45 INFO PrunedInMemoryFileIndex: It took 6916 ms to list leaf files for 352 paths.\n'
Yes, they are 352! tasks. I imagine when I have much more files in a table in future. it will impact the read time. I have checked this:
Why so many tasks in my spark job? Getting 200 Tasks By Default
and used sqlContext.setConf("spark.sql.shuffle.partitions", "15”)
but no changes.
Could you please give me some ideas on these 3 issues?
Many thanks!!!