3

I have been analyzing a memory leak in one of our applications and while analyzing a heapdump I found that >98% of the memory is occupied by Camels SharedProducerServicePool. Essentially it keeps a lot of RemoteFileProducer in its pool (in this particular sample about 41k). All of them have an SftpEndpoint.

The following is the Camel route that eventually uploads a file via SFTP:

from("activemq:queue:transform")
        .routeId("TransformJson2Avro")
        .process(new FileListCountProcessor())
    .split(body()).streaming()
        .setHeader("CURRENT_FILE", simple("${body}"))
        .log("File to process: ${header.CURRENT_FILE}; Flags: "
                + Json2AvroTransformationFlags.FILE_AVRO_BACKUP + "=${header." + Json2AvroTransformationFlags.FILE_AVRO_BACKUP + "}, "
                + Json2AvroTransformationFlags.FILE_AVRO_SFTP + "=${header." + Json2AvroTransformationFlags.FILE_AVRO_SFTP + "}, "
                + Json2AvroTransformationFlags.FILE_JSON_BACKUP + "=${header." + Json2AvroTransformationFlags.FILE_JSON_BACKUP + "}, "
                + Json2AvroTransformationFlags.FILE_JSON_DELETE + "=${header." + Json2AvroTransformationFlags.FILE_JSON_DELETE + "}, "
                + "EXPORT_METHOD=${header.EXPORT_METHOD}")
        .convertBodyTo(File.class)
        .process(new Json2AvroProcessor(filenamePattern, tempFilePath))
        .choice()
            .when(header(Json2AvroTransformationFlags.FILE_AVRO_BACKUP).isEqualTo("true"))
                .to("file:///?fileName=${header.ARCHIVE_FOLDER}/${property.FILENAME}")
                .log("Transformed file: '${property.FILENAME}' (archived to '${header.ARCHIVE_FOLDER}')")
        .end()
        .choice()
            .when(header(Json2AvroTransformationFlags.FILE_AVRO_SFTP).isEqualTo("true"))
                .log("Upload to SFTP '" + getSFTPExportStringForLogging() + "'")
                .recipientList(simple("sftp://${header.SFTP_USER}@${header.SFTP_HOST}:${header.SFTP_PORT}/${header.SFTP_DIR}?password=${header.SFTP_PASSWORD}&fileName=${property.FILENAME}&disconnect=true")).end()
        .end()
        .process(new JsonFileOperationProcessor(backupPath))
    .aggregate(constant(true), new DisabledAggregationStrategy())
        .completionSize(simple("${property.fileListCount}"))
        .log("Number of files: ${property.fileListCount}")
    .to("activemq:queue:transformCompleted")
        .id("insertIntoMessageQueue");

What could be the reason that the SFTP connections don't get closed or the Producers get created but not stopped again? I can't find a hint in the official documentation.

Any help is greatly appreaciated :)

Note: The Caml version used here is Camel 2.17.0

1 Answers1

6

In your receipientList its better to set the dynamic file name as a header, eg Exchange.FILE_NAME as the key, to avoid creating too many unique endpoints, instead you will reuse the same endpoint for the same hosts.

And if you dont want any endpoint pooling you can configure the cacheSize option on the recipient list to a lower value, or turn it off.

Claus Ibsen
  • 56,060
  • 7
  • 50
  • 65