1

I have spark batch job that reads data from Kafka source. Offset for Kafka is managed by batch job itself. The job reads begin offset from persisted location(HDFS) and gets end offset using consumer api. The data is pulled from begin offset to end offset using spark Kafka source. There is single Kafka topic with 400 partitions. Once the job finishes end offset is persisted into HDFS so that job knows what begin offset is for next run.

This job runs fine for very small amount of data. When begin and end offsets are far apart(often 12 hours apart), the job runs into performance issues and fails with out of memory error. Below is the data size and spark submit configuration.

Data size - 25 GB per kakfa partition. 25 * 400 GB = 10 TB
Executor Memory - 60G
No of executors - 100
Executor memory overhead - 8G
No of cores - 2

Core Functionality of spark job

  1. Read from kafka source
  2. From the kafka source, read kafka value as json df ignoring kafka key
  3. Save as json data partitioned by a column(view_name)n in json df.
  4. Note - Each view_name can have its own json schema which is why I am partitioning that way for easy down stream access.
  5. Since there is no shuffle write(repartition, join, aggregation etc) I was hoping the current configuration should suffice to finish work load.

Observations

  1. As expected there are 400 tasks for read kafka data
  2. 400 tasks for saving json data
  3. Below messages are seen on failed tasks.
20/07/27 21:23:13 INFO clients.FetchSessionHandler: [Consumer clientId=consumer-3, groupId=spark-kafka-relation-b5dad8f2-cdab-4c03-b5c0-72e06bc43177-executor] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 3233: org.apache.kafka.common.errors.DisconnectException.
20/07/27 21:23:34 INFO sort.UnsafeExternalSorter: Thread 101 spilling sort data of 15.9 GB to disk (136  times so far)
20/07/27 21:23:51 INFO sort.UnsafeExternalSorter: Thread 100 spilling sort data of 15.9 GB to disk (132  times so far)
20/07/27 21:24:22 INFO sort.UnsafeExternalSorter: Thread 101 spilling sort data of 15.9 GB to disk (137  times so far)
20/07/27 21:24:36 INFO sort.UnsafeExternalSorter: Thread 100 spilling sort data of 15.9 GB to disk (133  times so far)
20/07/27 21:25:09 INFO sort.UnsafeExternalSorter: Thread 101 spilling sort data of 15.9 GB to disk (138  times so far)
20/07/27 21:25:20 INFO sort.UnsafeExternalSorter: Thread 100 spilling sort data of 15.9 GB to disk (134  times so far)
20/07/27 21:25:53 INFO sort.UnsafeExternalSorter: Thread 101 spilling sort data of 15.9 GB to disk (139  times so far)
20/07/27 21:26:07 INFO sort.UnsafeExternalSorter: Thread 100 spilling sort data of 15.9 GB to disk (135  times so far)
20/07/27 21:26:39 INFO sort.UnsafeExternalSorter: Thread 101 spilling sort data of 15.9 GB to disk (140  times so far)
20/07/27 21:26:51 INFO sort.UnsafeExternalSorter: Thread 100 spilling sort data of 15.9 GB to disk (136  times so far)
20/07/27 21:27:27 INFO sort.UnsafeExternalSorter: Thread 101 spilling sort data of 15.9 GB to disk (141  times so far)
20/07/27 21:27:35 INFO sort.UnsafeExternalSorter: Thread 100 spilling sort data of 15.9 GB to disk (137  times so far)
20/07/27 21:28:15 INFO sort.UnsafeExternalSorter: Thread 101 spilling sort data of 15.9 GB to disk (142  times so far)
20/07/27 21:28:19 INFO sort.UnsafeExternalSorter: Thread 100 spilling sort data of 15.9 GB to disk (138  times so far)
20/07/27 21:28:58 INFO sort.UnsafeExternalSorter: Thread 101 spilling sort data of 15.9 GB to disk (143  times so far)
20/07/27 21:29:02 INFO sort.UnsafeExternalSorter: Thread 100 spilling sort data of 15.9 GB to disk (139  times so far)
20/07/27 21:29:42 INFO sort.UnsafeExternalSorter: Thread 101 spilling sort data of 15.9 GB to disk (144  times so far)
20/07/27 21:29:47 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM
20/07/27 21:29:47 INFO storage.DiskBlockManager: Shutdown hook called
20/07/27 21:29:47 INFO util.ShutdownHookManager: Shutdown hook called


ExecutorLostFailure (executor 293 exited unrelated to the running tasks) Reason: Container marked as failed: container_e35_1595528864766_0024_01_000359 on host: <hostname> Exit status: -100. Diagnostics: Container released on a *lost* node.

I tried repartitioning(2000 & 4000) kafka data after first read but that only exacerbated the problem rather than helping. Any suggestions on how to tune the job?

  • Check that all node and executors of your Spark cluster is up. This error commonly occurs when a core or task node is terminated because of high disk space utilization, or when a node becomes unresponsive due to prolonged high CPU utilization or low available memory. Maybe this be helpful https://stackoverflow.com/questions/38155421/spark-on-yarn-mode-end-with-exit-status-100-diagnostics-container-released – Majid Hajibaba Jul 28 '20 at 05:24
  • Thanks @majidhajibaba. That link was helpful even though I didn't nail the right configuration. – user3874982 Jul 29 '20 at 02:58
  • In case anyone else run into this issue, I want to provide the route that I took to avoid shuffle spill. Kafka data can have 1000s of view_names and each view can have its own schema. So the final kafka df after extracting json become very wide row with 10's of thousands of fields. I just extracted the view_name field from kafka value field and saved view_name and kafka value(binary). This brought down the run time from hours to minutes. I am extracting json from kafka value down stream in view specific jobs. – user3874982 Aug 01 '20 at 05:03
  • does this work for spark dataframes? – thentangler Apr 14 '21 at 14:05

0 Answers0