This is a Spark Streaming application running on YARN cluster mode which produces messages in three Kafka Brokers.
As soon as it reaches 150K open files it fails:
There is insufficient memory for the Java Runtime Environment to continue
Native memory allocation (mmap) failed to map 12288 bytes for committing reserved memory.
Job aborted due to stage failure ... :
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
.....
Caused by: java.lang.OutOfMemoryError: unable to create new native thread
When doing lsof -p <PID>
for the java process that runs that executor I can see tons(up to 90K) of TCP connections from the host server in the Kafka Brokers:
host:portXXX->kafkabroker1:XmlIpcRegSvc (ESTABLISHED)
host:portYYY->kafkabroker2:XmlIpcRegSvc (ESTABLISHED)
host:portZZZ->kafkabroker3:XmlIpcRegSvc (ESTABLISHED)
I tried reducing the number of executor cores from 8 to 6 but there was not a single difference in the number of open files(still it was reaching 150K) and then kept failing.
The libraries to connect to Kafka from Spark Streaming are:
org.apache.spark.streaming.kafka010.KafkaUtils
org.apache.spark.streaming.dstream.InputDStream
org.apache.kafka.clients.producer.kafkaproducer
The code:
foreachRDD{
get kafkaProducer
do some work on each RDD...
foreach( record => {
kafkaProducer.send(record._1,record._2)
}
kafkaProducer.close()
}