0

I am running a Spark App over a Kubernetes cluster (at the moment, I do not have permission to resize or rescale this cluster). I think I have mostly 3 important issues that are impacting my performance and I would like to ask to this community if there is something that I could do to make it better. I am going to order them according to the priorities that I set myself.

1- I do not have permissions on K8s cluster, and I think the configuration that I set for my Spark App is not taking effect. It is just a guess because I do not have much experience on K8s. So, I have configured my Spark App like this:

 return SparkSession.builder \
         .config("spark.executor.instances", "5") \
         .config("spark.executor.cores", "4") \
         .config("spark.driver.cores", "2") \
         .config("spark.kubernetes.executor.request.cores", 3.5) \
         .config("spark.executor.memory", "10g") \
         .config("spark.driver.memory", "6g") \
         .config("spark.sql.broadcastTimeout", "600") \
         .config("spark.memory.fraction", 0.2) \
         .config("spark.kubernetes.memoryOverheadFactor", 0.2) \

But when the pod is created, I get this from LOG:

 'containers': [{'args': ['/bin/bash',
                                   '-c',
                                   'spark-submit --master '
                                   '"k8s://https://${KUBERNETES_SERVICE_HOST}:${KUBERNETES_PORT_443_TCP_PORT}" '
                                   '--deploy-mode client --name "${POD_NAME}" '
                                   '--conf "spark.driver.host=${POD_IP}" '
                                   '--conf spark.driver.memory=40g --conf '
                                   'spark.driver.memoryOverhead=0.4 --conf '
                                   'spark.eventLog.dir=s3a://*****-spark/history/ireland '
                                   '--conf spark.eventLog.enabled=true --conf '

Driver's memory is 40gb instead of 6gb, Driver's memoryOverhead is 0.4 instead pf 0.2. So, when I see this I get confused and I am not sure if the number of cores/executor and executor's memory is applied like I configured.

2- The last task of this Spark App writes a Dataframe into a Hive Table (format parquet) on a S3 Bucket.

 results.withColumn("make", F.col("name_make_dict")).filter(F.col("name_mod1_dict").isNotNull()) \
        .select(*json.loads(parser.get("config", "formatMatched"))) \
        .withColumnRenamed("name_mod1_dict", "name_model_dict") \
        .repartition(30, "inserted_at_date", "brand", "make") \
        .write.insertInto(f"{schema}.cars_features")

And I have checked the LOG:

[2022-04-16 10:32:23,040] {pod_launcher.py:156} INFO - b'22/04/16 10:32:23 INFO CodeGenerator: Code generated in 35.851449 ms\n'
[2022-04-16 10:32:23,097] {pod_launcher.py:156} INFO - b'22/04/16 10:32:23 INFO SparkContext: Starting job: insertInto at NativeMethodAccessorImpl.java:0\n'
[2022-04-16 10:32:23,100] {pod_launcher.py:156} INFO - b'22/04/16 10:32:23 INFO DAGScheduler: Registering RDD 146 (insertInto at NativeMethodAccessorImpl.java:0) as input to shuffle 9\n'
[2022-04-16 10:32:23,100] {pod_launcher.py:156} INFO - b'22/04/16 10:32:23 INFO DAGScheduler: Registering RDD 149 (insertInto at NativeMethodAccessorImpl.java:0) as input to shuffle 10\n'
[2022-04-16 10:32:23,100] {pod_launcher.py:156} INFO - b'22/04/16 10:32:23 INFO DAGScheduler: Got job 18 (insertInto at NativeMethodAccessorImpl.java:0) with 30 output partitions\n'

and it finishes very fast:

[2022-04-16 10:33:02,044] {pod_launcher.py:156} INFO - b'22/04/16 10:33:02 INFO TaskSchedulerImpl: Removed TaskSet 34.0, whose tasks have all completed, from pool \n'
[2022-04-16 10:33:02,048] {pod_launcher.py:156} INFO - b'22/04/16 10:33:02 INFO DAGScheduler: ResultStage 34 (insertInto at NativeMethodAccessorImpl.java:0) finished in 26.634 s\n'
[2022-04-16 10:33:02,048] {pod_launcher.py:156} INFO - b'22/04/16 10:33:02 INFO DAGScheduler: Job 18 finished: insertInto at NativeMethodAccessorImpl.java:0, took 38.948620 s\n'

but then it starts like this and takes like 10 minutes to write files into S3:

[2022-04-16 10:33:05,305] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:33:10,321] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:33:15,337] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:33:20,354] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:33:25,370] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:33:30,383] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:33:35,399] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:33:40,416] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:33:45,432] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:33:50,450] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:33:55,466] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:34:00,482] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:34:05,498] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:34:10,513] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:34:15,527] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:34:20,541] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:34:25,559] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:34:30,578] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:34:35,593] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:34:40,614] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:34:45,628] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:34:50,645] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:34:55,662] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:35:00,679] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:35:05,690] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:35:10,705] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:35:15,722] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:35:20,740] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:35:25,757] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:35:30,775] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:35:35,788] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:35:40,799] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:35:45,817] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:35:50,834] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:35:55,852] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:36:00,867] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:36:05,889] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:36:10,904] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:36:15,913] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:36:20,926] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:36:25,941] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:36:30,957] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:36:33,949] {pod_launcher.py:156} INFO - b'22/04/16 10:36:33 INFO FileFormatWriter: Write Job eefcf8a0-9ba9-4752-a51e-f4bc27e5ffcc committed.\n'
[2022-04-16 10:36:33,954] {pod_launcher.py:156} INFO - b'22/04/16 10:36:33 INFO FileFormatWriter: Finished processing stats for write job eefcf8a0-9ba9-4752-a51e-f4bc27e5ffcc.\n'
[2022-04-16 10:36:35,972] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:36:40,991] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:36:46,006] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:36:51,022] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:36:56,038] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:37:01,057] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:37:06,072] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:37:10,856] {pod_launcher.py:156} INFO - b"22/04/16 10:37:10 INFO FileUtils: Creating directory if it doesn't exist: s3a://*********/local/odyn/cars_pricing_cvt/cars_features/inserted_at_date=2022-04-16/brand=autovit/make=JAGUAR\n"
[2022-04-16 10:37:11,089] {base_job.py:197} DEBUG - [heartbeat]
[2022-04-16 10:37:12,166] {pod_launcher.py:156} INFO - b'22/04/16 10:37:12 INFO SessionState: Could not get hdfsEncryptionShim, it is only applicable to hdfs filesystem.\n'
[2022-04-16 10:37:12,655] {pod_launcher.py:156} INFO - b'22/04/16 10:37:12 INFO Hive: Renaming src: s3a://*********/local/odyn/cars_pricing_cvt/cars_features/.hive-staging_hive_2022-04-16_10-26-50_235_365683970089316725-1/-ext-10000/inserted_at_date=2022-04-16/brand=autovit/make=JAGUAR/part-00013-87f53444-2a15-4a4c-b7bc-0b3304ba9bb7.c000, dest: s3a://*********/local/odyn/cars_pricing_cvt/cars_features/inserted_at_date=2022-04-16/brand=autovit/make=JAGUAR/part-00013-87f53444-2a15-4a4c-b7bc-0b3304ba9bb7.c000, Status:true\n'
[2022-04-16 10:37:12,967] {pod_launcher.py:156} INFO - b'22/04/16 10:37:12 INFO Hive: New loading path = s3a://*********/local/odyn/cars_pricing_cvt/cars_features/.hive-staging_hive_2022-04-16_10-26-50_235_365683970089316725-1/-ext-10000/inserted_at_date=2022-04-16/brand=autovit/make=JAGUAR with partSpec {inserted_at_date=2022-04-16, brand=autovit, make=JAGUAR}\n'
[2022-04-16 10:37:13,045] {pod_launcher.py:156} INFO - b"22/04/16 10:37:13 INFO FileUtils: Creating directory if it doesn't exist: s3a://*********/local/odyn/cars_pricing_cvt/cars_features/inserted_at_date=2022-04-16/brand=gratka/make=LANCIA\n"
[2022-04-16 10:37:14,065] {pod_launcher.py:156} INFO - b'22/04/16 10:37:14 INFO SessionState: Could not get hdfsEncryptionShim, it is only applicable to hdfs filesystem.\n'
[2022-04-16 10:37:14,576] {pod_launcher.py:156} INFO - b'22/04/16 10:37:14 INFO Hive: Renaming src: s3a://*********/local/odyn/cars_pricing_cvt/cars_features/.hive-staging_hive_2022-04-16_10-26-50_235_365683970089316725-1/-ext-10000/inserted_at_date=2022-04-16/brand=gratka/make=LANCIA/part-00009-87f53444-2a15-4a4c-b7bc-0b3304ba9bb7.c000, dest: s3a://*********/local/odyn/cars_pricing_cvt/cars_features/inserted_at_date=2022-04-16/brand=gratka/make=LANCIA/part-00009-87f53444-2a15-4a4c-b7bc-0b3304ba9bb7.c000, Status:true\n'
[2022-04-16 10:37:14,803] {pod_launcher.py:156} INFO - b'22/04/16 10:37:14 INFO Hive: New loading path = s3a://*********/local/odyn/cars_pricing_cvt/cars_features/.hive-staging_hive_2022-04-16_10-26-50_235_365683970089316725-1/-ext-10000/inserted_at_date=2022-04-16/brand=gratka/make=LANCIA with partSpec {inserted_at_date=2022-04-16, brand=gratka, make=LANCIA}\n'
[2022-04-16 10:37:14,870] {pod_launcher.py:156} INFO - b"22/04/16 10:37:14 INFO FileUtils: Creating directory if it doesn't exist: s3a://*********/local/odyn/cars_pricing_cvt/cars_features/inserted_at_date=2022-04-16/brand=otomoto/make=MG\n"
[2022-04-16 10:37:15,808] {pod_launcher.py:156} INFO - b'22/04/16 10:37:15 INFO SessionState: Could not get hdfsEncryptionShim, it is only applicable to hdfs filesystem.\n'
[2022-04-16 10:37:16,106] {base_job.py:197} DEBUG - [heartbeat]

I have tried with repartition(30, ...) as you can see above and with coalesce(1) but the write speed is the same. I do not know why this is happening and if it is an expected behaviour. I have checked this: Extremely slow S3 write times from EMR/ Spark Spark s3 write (s3 vs s3a connectors) Spark Write to S3 Storage Option

3- When I am reading some tables from Hive on S3 like this (for one day is 40.4 MB->2467 objects):

sparkSession \
        .sql(
        f"""select * from {schema}.f_car_info where inserted_at_date = to_date('{date}', "yyyy-MM-dd")""") \
        .select(F.col("id_ad"), F.col("name_make_ad"), F.col("name_make_dict"), F.col("name_model_ad"),
                F.col("name_version_ad"), F.col("name_generation_ad"), F.col("prod_year_ad"), F.col("brand"))

I get this from LOG:

    [2022-04-16 10:25:38,271] {pod_launcher.py:156} INFO - b'22/04/16 10:25:38 INFO InMemoryFileIndex: Listing leaf files and directories in parallel under 352 paths. The first several paths are: s3a://*********/local/odyn/cars_pricing_cvt/f_car_info/inserted_at_date=2022-04-16/brand=autovit/name_make_dict=ABARTH, s3a://*********/local/odyn/cars_pricing_cvt/f_car_info/inserted_at_date=2022-04-16/brand=autovit/name_make_dict=ACURA, s3a://*********/local/odyn/cars_pricing_cvt/f_car_info/inserted_at_date=2022-04-16/brand=autovit/name_make_dict=ALFA ROMEO, s3a://*********/local/odyn/cars_pricing_cvt/f_car_info/inserted_at_date=2022-04-16/brand=autovit/name_make_dict=ASTON MARTIN, s3a://*********/local/odyn/cars_pricing_cvt/f_car_info/inserted_at_date=2022-04-16/brand=autovit/name_make_dict=AUDI, s3a://*********/local/odyn/cars_pricing_cvt/f_car_info/inserted_at_date=2022-04-16/brand=autovit/name_make_dict=AUSTIN, s3a://*********/local/odyn/cars_pricing_cvt/f_car_info/inserted_at_date=2022-04-16/brand=autovit/name_make_dict=BENTLEY, s3a://*********/local/odyn/cars_pricing_cvt/f_car_info/inserted_at_date=2022-04-16/brand=autovit/name_make_dict=BMW, s3a://*********/local/odyn/cars_pricing_cvt/f_car_info/inserted_at_date=2022-04-16/brand=autovit/name_make_dict=BUICK, s3a://*********/local/odyn/cars_pricing_cvt/f_car_info/inserted_at_date=2022-04-16/brand=autovit/name_make_dict=CADILLAC.\n'
    [2022-04-16 10:25:38,564] {pod_launcher.py:156} INFO - b'22/04/16 10:25:38 INFO SparkContext: Starting job: persist at NativeMethodAccessorImpl.java:0\n'
    [2022-04-16 10:25:38,584] {pod_launcher.py:156} INFO - b'22/04/16 10:25:38 INFO DAGScheduler: Got job 0 (persist at NativeMethodAccessorImpl.java:0) with 352 output partitions\n'
    [2022-04-16 10:25:38,587] {pod_launcher.py:156} INFO - b'22/04/16 10:25:38 INFO DAGScheduler: Final stage: ResultStage 0 (persist at NativeMethodAccessorImpl.java:0)\n'
    [2022-04-16 10:25:38,587] {pod_launcher.py:156} INFO - b'22/04/16 10:25:38 INFO DAGScheduler: Parents of final stage: List()\n'
    [2022-04-16 10:25:38,587] {pod_launcher.py:156} INFO - b'22/04/16 10:25:38 INFO DAGScheduler: Missing parents: List()\n'
    [2022-04-16 10:25:38,593] {pod_launcher.py:156} INFO - b'22/04/16 10:25:38 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[2] at persist at NativeMethodAccessorImpl.java:0), which has no missing parents\n'
    [2022-04-16 10:25:38,689] {pod_launcher.py:156} INFO - b'22/04/16 10:25:38 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 88.3 KB, free 7.7 GB)\n'
    [2022-04-16 10:25:38,721] {pod_launcher.py:156} INFO - b'22/04/16 10:25:38 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 31.9 KB, free 7.7 GB)\n'
    [2022-04-16 10:25:38,724] {pod_launcher.py:156} INFO - b'22/04/16 10:25:38 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 100.126.80.1:34339 (size: 31.9 KB, free: 7.7 GB)\n'
    [2022-04-16 10:25:38,727] {pod_launcher.py:156} INFO - b'22/04/16 10:25:38 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1184\n'
    [2022-04-16 10:25:38,749] {pod_launcher.py:156} INFO - b'22/04/16 10:25:38 INFO DAGScheduler: Submitting 352 missing tasks from ResultStage 0 (MapPartitionsRDD[2] at persist at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14))\n'
    [2022-04-16 10:25:38,753] {pod_launcher.py:156} INFO - b'22/04/16 10:25:38 INFO TaskSchedulerImpl: Adding task set 0.0 with 352 tasks\n'
    [2022-04-16 10:25:38,794] {pod_launcher.py:156} INFO - b'22/04/16 10:25:38 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 100.126.159.37, executor 1, partition 0, PROCESS_LOCAL, 7494 bytes)\n'
[2022-04-16 10:25:45,006] {pod_launcher.py:156} INFO - b'22/04/16 10:25:45 INFO TaskSetManager: Finished task 350.0 in stage 0.0 (TID 350) in 107 ms on 100.126.111.179 (executor 5) (352/352)\n'
    [2022-04-16 10:25:45,009] {pod_launcher.py:156} INFO - b'22/04/16 10:25:45 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool \n'
    [2022-04-16 10:25:45,009] {pod_launcher.py:156} INFO - b'22/04/16 10:25:45 INFO DAGScheduler: ResultStage 0 (persist at NativeMethodAccessorImpl.java:0) finished in 6.371 s\n'
    [2022-04-16 10:25:45,015] {pod_launcher.py:156} INFO - b'22/04/16 10:25:45 INFO DAGScheduler: Job 0 finished: persist at NativeMethodAccessorImpl.java:0, took 6.451078 s\n'
    [2022-04-16 10:25:45,121] {pod_launcher.py:156} INFO - b'22/04/16 10:25:45 INFO PrunedInMemoryFileIndex: It took 6916 ms to list leaf files for 352 paths.\n'

Yes, they are 352! tasks. I imagine when I have much more files in a table in future. it will impact the read time. I have checked this: Why so many tasks in my spark job? Getting 200 Tasks By Default and used sqlContext.setConf("spark.sql.shuffle.partitions", "15”) but no changes.

Could you please give me some ideas on these 3 issues?

Many thanks!!!

1 Answers1

0

Hey looks like you are working on some cool stuff! I can't comment on #2 and #3 but for #1 I can probably shed some light. I haven't really used Spark.

My guess is for Spark specifying fields at runtime override whatever you are trying to do with SparkSession.builder in your code.

Those overriding runtime args can be at either the container image level OR the kubernetes pod configuration level. Since you did not share that info it is hard for me to try and figure out which one is your problem, but my guess is it at the pod definition level, these can override the container image settings.

Kubernetes Pod Definition Level

For example in a pod definition (or in a deployment) check this out: https://kubernetes.io/docs/tasks/inject-data-application/define-command-argument-container/

Example Kubernetes Pod Definition

apiVersion: v1
kind: Pod
metadata:
  name: Example
spec:
  containers:
  - name: <container name>
    image: <your image>
    command:
    - '/bin/bash'
    args:
    - '-c'
    - 'spark-submit --master '
    - '"k8s://https://${KUBERNETES_SERVICE_HOST}:${KUBERNETES_PORT_443_TCP_PORT}" '
    - '--deploy-mode client --name "${POD_NAME}" '
    - '--conf "spark.driver.host=${POD_IP}" '
    - '--conf spark.driver.memory=40g --conf '
    - 'spark.driver.memoryOverhead=0.4 --conf '
    - 'spark.eventLog.dir=s3a://*****-spark/history/ireland '
    - '--conf spark.eventLog.enabled=true --conf '

You could then change the args section of this yaml pod definition to be what you want. I hope this helps you out! or at least points you in the right direction.

jlonganecker
  • 1,180
  • 11
  • 20