0

Recently we introduced task executor to spring integrations pollers to fasten our file reading process . However introduction of task executor led to unexpected problems where in our service stopped processing messages in spring integration channels

  <?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:integration="http://www.springframework.org/schema/integration"
    xmlns:int-file="http://www.springframework.org/schema/integration/file"
    xmlns:int-jdbc="http://www.springframework.org/schema/integration/jdbc"
    xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/integration
        http://www.springframework.org/schema/integration/spring-integration.xsd
        http://www.springframework.org/schema/integration/file
        http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
        http://www.springframework.org/schema/integration/jdbc 
        http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc.xsd
        http://www.springframework.org/schema/integration/amqp 
        http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd">


    
    <integration:channel id="filesIn" />
    <integration:channel id="toArchive" />
    
    
    <integration:channel id="toRabbitForRO" />
    <integration:channel id="outputFilesROIn" />
    
    <integration:channel id="toRabbitForSA" />
    <integration:channel id="outputFilesSAIn" />
    
    <int-file:inbound-channel-adapter directory="${rnr.file.directory}" auto-startup="true"
                            filter="filterFiles" channel="filesIn">
                <integration:poller 
                    cron="0 0,5,10,15,20,25,30,35,40,45,50,55 0-9,18-23 * * ?"
                    task-executor="largeFileTaskExecutor" 
                    max-messages-per-poll="${max-messages}"/>
    </int-file:inbound-channel-adapter>
    
    
    <integration:service-activator input-channel="filesIn" output-channel="toArchive"
                     ref="processSingleLargeFile" method="process"></integration:service-activator>
    
    
    <int-file:outbound-channel-adapter channel="toArchive" delete-source-files="true"
                                         directory="file:${rnr.file.directory}/archive">
    </int-file:outbound-channel-adapter>
    
    
     <int-file:inbound-channel-adapter directory="${roOutputDir}"
                                    auto-startup="true" filename-pattern="*.xml" channel="outputFilesROIn">
                    <integration:poller fixed-delay="200"  
                       task-executor="smallFileTaskExecutor" 
                       max-messages-per-poll="${max-messages}" ></integration:poller>
     </int-file:inbound-channel-adapter>
    
     
     <integration:service-activator input-channel="outputFilesROIn" 
                                output-channel="toRabbitForRO" ref="processMultipleFiles" method="processROFile"></integration:service-activator>



    <int-amqp:outbound-channel-adapter
        channel="toRabbitForRO" amqp-template="rabbitTemplate" exchange-name="sample-excahnge"
        routing-key="sample-key" />
        
        

</beans>

Te first task executor introduced in poller works perfectly. However the second poller doesn't seem to work. It does not read file from the directory mentioned


     <int-file:inbound-channel-adapter directory="${roOutputDir}"
                                    auto-startup="true" filename-pattern="*.xml" channel="outputFilesROIn">
                    <integration:poller fixed-delay="200"  
                       task-executor="smallFileTaskExecutor" 
                       max-messages-per-poll="${max-messages}" ></integration:poller>
     </int-file:inbound-channel-adapter>
// this file channel adapter is not working . No message appear in output channel **outputFilesROIn**

smallFileTaskExecutor and largeFileTaskexecutor have 2 threads as core pool size . max-messages-per-poll for each poller is defined as 2

Thread dump when our service is not processing messages : https://fastthread.io/my-thread-report.jsp?p=c2hhcmVkLzIwMjAvMDgvMTkvLS1hcGktZWQzZTJmYzMtMWFkYy00Mzk5LWJkZjgtNzk0NGQwMzdjNjIwMjg2Njk5ZDMtYTFmNC00YzIzLThmZTQtYzQ4Nzg4NmNhNGM1LnR4dC0t&

PS : followed this How to read and process multiple files concurrently in spring? while implementing concurrency in reading files

vipulk10
  • 117
  • 3
  • 15

1 Answers1

0

According to your thread dump, it looks like a default task scheduler with pool of 10 threads is fully busy. Probably your cron is very aggressive and with that largeFileTaskExecutor it has a opportunity to fire scheduled tasks like a crazy. This way your second poller just don't have resources in the task scheduler to do its job.

Consider to adjust your cron or reconfigure that task scheduler to bigger thread pool. Or just don't use executors since it confirms to us that it doesn't increase your performance.

See docs for scheduler thread pool: https://docs.spring.io/spring-integration/docs/current/reference/html/configuration.html#namespace-taskscheduler

Artem Bilan
  • 113,505
  • 11
  • 91
  • 118