0

This is a Spark Streaming application running on YARN cluster mode which produces messages in three Kafka Brokers.

As soon as it reaches 150K open files it fails:

There is insufficient memory for the Java Runtime Environment to continue
Native memory allocation (mmap) failed  to map 12288 bytes for committing reserved memory.

Job aborted due to stage failure ... : 
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
.....
Caused by: java.lang.OutOfMemoryError: unable to create new native thread

When doing lsof -p <PID> for the java process that runs that executor I can see tons(up to 90K) of TCP connections from the host server in the Kafka Brokers:

host:portXXX->kafkabroker1:XmlIpcRegSvc (ESTABLISHED)

host:portYYY->kafkabroker2:XmlIpcRegSvc (ESTABLISHED)

host:portZZZ->kafkabroker3:XmlIpcRegSvc (ESTABLISHED)

I tried reducing the number of executor cores from 8 to 6 but there was not a single difference in the number of open files(still it was reaching 150K) and then kept failing.

The libraries to connect to Kafka from Spark Streaming are:

org.apache.spark.streaming.kafka010.KafkaUtils
org.apache.spark.streaming.dstream.InputDStream
org.apache.kafka.clients.producer.kafkaproducer

The code:

foreachRDD{
   get kafkaProducer
   do some work on each RDD...
   foreach( record => {
      kafkaProducer.send(record._1,record._2)
   }
  kafkaProducer.close()
}
and_apo
  • 1,217
  • 3
  • 17
  • 41

1 Answers1

0

It was a schoolboy error. This very well explained article helped fixing the issue. The kafka producer was not closing the connections so we used the broadcast and lazy evaluation technique which solved the issue.

and_apo
  • 1,217
  • 3
  • 17
  • 41