I have been analyzing a memory leak in one of our applications and while analyzing a heapdump I found that >98% of the memory is occupied by Camels SharedProducerServicePool
. Essentially it keeps a lot of RemoteFileProducer
in its pool (in this particular sample about 41k). All of them have an SftpEndpoint
.
The following is the Camel route that eventually uploads a file via SFTP:
from("activemq:queue:transform")
.routeId("TransformJson2Avro")
.process(new FileListCountProcessor())
.split(body()).streaming()
.setHeader("CURRENT_FILE", simple("${body}"))
.log("File to process: ${header.CURRENT_FILE}; Flags: "
+ Json2AvroTransformationFlags.FILE_AVRO_BACKUP + "=${header." + Json2AvroTransformationFlags.FILE_AVRO_BACKUP + "}, "
+ Json2AvroTransformationFlags.FILE_AVRO_SFTP + "=${header." + Json2AvroTransformationFlags.FILE_AVRO_SFTP + "}, "
+ Json2AvroTransformationFlags.FILE_JSON_BACKUP + "=${header." + Json2AvroTransformationFlags.FILE_JSON_BACKUP + "}, "
+ Json2AvroTransformationFlags.FILE_JSON_DELETE + "=${header." + Json2AvroTransformationFlags.FILE_JSON_DELETE + "}, "
+ "EXPORT_METHOD=${header.EXPORT_METHOD}")
.convertBodyTo(File.class)
.process(new Json2AvroProcessor(filenamePattern, tempFilePath))
.choice()
.when(header(Json2AvroTransformationFlags.FILE_AVRO_BACKUP).isEqualTo("true"))
.to("file:///?fileName=${header.ARCHIVE_FOLDER}/${property.FILENAME}")
.log("Transformed file: '${property.FILENAME}' (archived to '${header.ARCHIVE_FOLDER}')")
.end()
.choice()
.when(header(Json2AvroTransformationFlags.FILE_AVRO_SFTP).isEqualTo("true"))
.log("Upload to SFTP '" + getSFTPExportStringForLogging() + "'")
.recipientList(simple("sftp://${header.SFTP_USER}@${header.SFTP_HOST}:${header.SFTP_PORT}/${header.SFTP_DIR}?password=${header.SFTP_PASSWORD}&fileName=${property.FILENAME}&disconnect=true")).end()
.end()
.process(new JsonFileOperationProcessor(backupPath))
.aggregate(constant(true), new DisabledAggregationStrategy())
.completionSize(simple("${property.fileListCount}"))
.log("Number of files: ${property.fileListCount}")
.to("activemq:queue:transformCompleted")
.id("insertIntoMessageQueue");
What could be the reason that the SFTP connections don't get closed or the Producers get created but not stopped again? I can't find a hint in the official documentation.
Any help is greatly appreaciated :)
Note: The Caml version used here is Camel 2.17.0