I have spark batch job that reads data from Kafka source. Offset for Kafka is managed by batch job itself. The job reads begin offset from persisted location(HDFS) and gets end offset using consumer api. The data is pulled from begin offset to end offset using spark Kafka source. There is single Kafka topic with 400 partitions. Once the job finishes end offset is persisted into HDFS so that job knows what begin offset is for next run.
This job runs fine for very small amount of data. When begin and end offsets are far apart(often 12 hours apart), the job runs into performance issues and fails with out of memory error. Below is the data size and spark submit configuration.
Data size - 25 GB per kakfa partition. 25 * 400 GB = 10 TB
Executor Memory - 60G
No of executors - 100
Executor memory overhead - 8G
No of cores - 2
Core Functionality of spark job
- Read from kafka source
- From the kafka source, read kafka value as json df ignoring kafka key
- Save as json data partitioned by a column(view_name)n in json df.
- Note - Each view_name can have its own json schema which is why I am partitioning that way for easy down stream access.
- Since there is no shuffle write(repartition, join, aggregation etc) I was hoping the current configuration should suffice to finish work load.
Observations
- As expected there are 400 tasks for read kafka data
- 400 tasks for saving json data
- Below messages are seen on failed tasks.
20/07/27 21:23:13 INFO clients.FetchSessionHandler: [Consumer clientId=consumer-3, groupId=spark-kafka-relation-b5dad8f2-cdab-4c03-b5c0-72e06bc43177-executor] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 3233: org.apache.kafka.common.errors.DisconnectException.
20/07/27 21:23:34 INFO sort.UnsafeExternalSorter: Thread 101 spilling sort data of 15.9 GB to disk (136 times so far)
20/07/27 21:23:51 INFO sort.UnsafeExternalSorter: Thread 100 spilling sort data of 15.9 GB to disk (132 times so far)
20/07/27 21:24:22 INFO sort.UnsafeExternalSorter: Thread 101 spilling sort data of 15.9 GB to disk (137 times so far)
20/07/27 21:24:36 INFO sort.UnsafeExternalSorter: Thread 100 spilling sort data of 15.9 GB to disk (133 times so far)
20/07/27 21:25:09 INFO sort.UnsafeExternalSorter: Thread 101 spilling sort data of 15.9 GB to disk (138 times so far)
20/07/27 21:25:20 INFO sort.UnsafeExternalSorter: Thread 100 spilling sort data of 15.9 GB to disk (134 times so far)
20/07/27 21:25:53 INFO sort.UnsafeExternalSorter: Thread 101 spilling sort data of 15.9 GB to disk (139 times so far)
20/07/27 21:26:07 INFO sort.UnsafeExternalSorter: Thread 100 spilling sort data of 15.9 GB to disk (135 times so far)
20/07/27 21:26:39 INFO sort.UnsafeExternalSorter: Thread 101 spilling sort data of 15.9 GB to disk (140 times so far)
20/07/27 21:26:51 INFO sort.UnsafeExternalSorter: Thread 100 spilling sort data of 15.9 GB to disk (136 times so far)
20/07/27 21:27:27 INFO sort.UnsafeExternalSorter: Thread 101 spilling sort data of 15.9 GB to disk (141 times so far)
20/07/27 21:27:35 INFO sort.UnsafeExternalSorter: Thread 100 spilling sort data of 15.9 GB to disk (137 times so far)
20/07/27 21:28:15 INFO sort.UnsafeExternalSorter: Thread 101 spilling sort data of 15.9 GB to disk (142 times so far)
20/07/27 21:28:19 INFO sort.UnsafeExternalSorter: Thread 100 spilling sort data of 15.9 GB to disk (138 times so far)
20/07/27 21:28:58 INFO sort.UnsafeExternalSorter: Thread 101 spilling sort data of 15.9 GB to disk (143 times so far)
20/07/27 21:29:02 INFO sort.UnsafeExternalSorter: Thread 100 spilling sort data of 15.9 GB to disk (139 times so far)
20/07/27 21:29:42 INFO sort.UnsafeExternalSorter: Thread 101 spilling sort data of 15.9 GB to disk (144 times so far)
20/07/27 21:29:47 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM
20/07/27 21:29:47 INFO storage.DiskBlockManager: Shutdown hook called
20/07/27 21:29:47 INFO util.ShutdownHookManager: Shutdown hook called
ExecutorLostFailure (executor 293 exited unrelated to the running tasks) Reason: Container marked as failed: container_e35_1595528864766_0024_01_000359 on host: <hostname> Exit status: -100. Diagnostics: Container released on a *lost* node.
I tried repartitioning(2000 & 4000) kafka data after first read but that only exacerbated the problem rather than helping. Any suggestions on how to tune the job?