4

Code: https://github.com/giuliopulina/spring-integration-poller

I have a problem trying to create a jdbc poller with Spring integration.

When i feed the table with new data, the processing is slower than expected: everything works fine, apart the fact that the poll is triggered every 60 seconds, and I can't understand why.

2015-05-27 10:50:40,234 DEBUG ExpressionEvaluatingSqlParameterSourceFactory - Resolved expression #root.![pk] to (list of pks)

2015-05-27 10:51:40,234 DEBUG ExpressionEvaluatingSqlParameterSourceFactory - Resolved expression #root.![pk] to (list of pks)

This is the relevant part of the spring integration configuration xml:

<task:executor id="pollerPool" pool-size="5-20" queue-capacity="0" rejection-policy="CALLER_RUNS" keep-alive="5"/>

<!--<task:executor id="processingPool" pool-size="5-20" queue-capacity="0" rejection-policy="CALLER_RUNS" keep-alive="5"/> -->

<bean id="jdbcSource" class="org.springframework.integration.jdbc.JdbcPollingChannelAdapter">
    <constructor-arg ref="dataSource"/>
    <constructor-arg value="XXXXXXXXXXXXXX"/>
    <property name="updateSql" value="XXXXXXXXXXXXXXXX"/>
    <property name="maxRowsPerPoll" value="50"/>
</bean>

<int:inbound-channel-adapter send-timeout="10000" auto-startup="false" id="inboundAdapter" ref="jdbcSource" channel="jdbcOutputChannel">
    <int:poller receive-timeout="3000" time-unit="MILLISECONDS" fixed-rate="0" error-channel="errorChannel" task-executor="pollerPool">
        <int:advice-chain>
            <ref bean="threadPrepareInterceptor"/>
            <ref bean="txAdvice"/>
        </int:advice-chain>
    </int:poller>
</int:inbound-channel-adapter>

<int:service-activator id="serviceActivator" input-channel="jdbcOutputChannel" ref="someServiceActivatorBean"/>

<tx:advice id="txAdvice" transaction-manager="txManager">
    <tx:attributes>
        <tx:method name="get*" read-only="true"/>
        <tx:method name="*"/>
    </tx:attributes>
</tx:advice>

<int:channel id="jdbcOutputChannel"  >
    <!-- using direct channel -->
    <!--<int:dispatcher task-executor="processingPool"/>-->
</int:channel>

Could you please help me understand the issue?

UPDATE:

About "jdbcOutputChannel" suggestion on transactions, I agree and I modified my configuration according to your hint because it's cleaner (anyway, also service activator was running in a separate transaction, even if wasn't outlined in the xml sample).

About the issue I have, I tried to remove all other spring integration components and the poller is continuosly triggered as I expected (I know the fixed-rate=0 is way too high :) ) Instead, when having other pollers in the project configured like this, also my poller seems to inherit the same timeout:

<int:service-activator id="someOtherServiceActivator">
    <int:poller fixed-rate="0" error-channel="someOtherPollerErrorChannel" receive-timeout="60000" />
</int:service-activator>

Switching the timeout of the other pollers to 10000ms, also my poller is triggered every 10 seconds (instead of 60). I cannot share the complete spring integration configuration, but I would like to ask: it can be possible that completely separated poller can modify each other behaviour?

UPDATE 2: I created a separated project trying to reproduce the issue, but still I couldn't manage to do it. So, I tried to remove the following configuration, that it was been introduced in order to start the pollers only when the application is fully up and running:

<int:publish-subscribe-channel id="startupChannel" />
<int:control-bus input-channel="controlBusChannel" />

<int-event:inbound-channel-adapter channel="startupChannel" event-types="org.springframework.context.event.ContextRefreshedEvent"/>

<int:transformer input-channel="startupChannel" expression="'@inboundAdapter.start()'" output-channel="controlBusChannel" />
<int:transformer input-channel="startupChannel" expression="'@someOtherServiceActivator.start()'" output-channel="controlBusChannel" /> 

And the problem has disappeared, even if this I can fully understand the reason. Anyway, creating a different startupChannel for my poller works perfectly:

<int:publish-subscribe-channel id="globalStartupChannel" />
<int:publish-subscribe-channel id="myStartupChannel" />
<int:control-bus input-channel="controlBusChannel" />

<int-event:inbound-channel-adapter channel="globalStartupChannel" event-types="org.springframework.context.event.ContextRefreshedEvent"/>
<int-event:inbound-channel-adapter channel="myStartupChannel" event-types="org.springframework.context.event.ContextRefreshedEvent"/>

<int:transformer input-channel="globalStartupChannel" expression="'@someOtherServiceActivator.start()'" output-channel="controlBusChannel" />
<int:transformer input-channel="myStartupChannel" expression="'@inboundAdapter.start()'" output-channel="controlBusChannel" />

UPDATE 3:

While preparing the project with the code for you I noticed the following log:

Info: No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.

So, I added the following configuration and now everything is working fine:

<task:scheduler id="taskScheduler" pool-size="20" />

I guess that the default pool-size is 10, so in some way the configuration is overwritten when having totalNumberOfPollers > taskScheduler.size(). Am I right?

Thanks Giulio

Giulio Pulina
  • 344
  • 4
  • 15
  • Can you explain more clearly what you mean by "...creating a different startupChannel..." ? – Gary Russell Jun 01 '15 at 15:47
  • I provided an example in my question. – Giulio Pulina Jun 01 '15 at 17:23
  • Ah - yes; I assume you have a lot of "blocking" pollers. Generally, inbound channel adapter pollers don't block - the source returns null when there's no data and the thread is freed up for other uses. QueueChannel pollers, by default, will block for 1 second in the queue, waiting for a message; only then is the thread freed up. It is quite unusual to have so many async (QueueChannel) handoffs in a single app; it might indicate some misunderstanding of when to use them. OTOH, if you really need that many, you can reduce the receive-timeout on the poller, or use more scheduler threads. – Gary Russell Jun 06 '15 at 13:01
  • That said, although you will get thread starvation, it's not clear why the starvation would last 60 seconds, so I'd still like to see your code. – Gary Russell Jun 06 '15 at 13:02
  • I've added github repo link to the top of the original post :) Sorry for the confusion, but I performed a lot of tests, it would have been better to share the code before. For example, I discovered that, differently from what I supposed to have found, the behaviour is the same also for spring integration version 4.1.4.RELEASE (and it makes sense to me!). – Giulio Pulina Jun 08 '15 at 00:33
  • Thanks, I added an explanation, and alternative solution, to the answer. – Gary Russell Jun 15 '15 at 15:05

1 Answers1

1

I can't reproduce your situation; I suggest you take a thread dump between polls to see what the thread is doing.

That said, a fixed-rate of 0 is incredibly aggressive; your DBAs will probably throw a fit polling with no delay like that.

Also, jdbcOutputChannel, being an ExecutorChannel, means the transaction will commit immediately after the message is sent to that channel. If you want the flow to run in the transactions, you should not use a dispatcher here.

EDIT:

I still can't reproduce your situation with this...

<int:control-bus input-channel="input"/>

<int-event:inbound-channel-adapter channel="ps" event-types="org.springframework.context.event.ContextRefreshedEvent"/>

<int:publish-subscribe-channel id="ps" />

<int:transformer input-channel="ps" output-channel="input" expression="'@foo.start()'" />

<int:transformer input-channel="ps" output-channel="input" expression="'@sa.start()'" />

<int:inbound-channel-adapter id="foo" channel="bar" expression="'foo'" auto-startup="false">
    <int:poller fixed-rate="1000" />
</int:inbound-channel-adapter>

<int:channel id="bar">
    <int:queue />
</int:channel>

<int:service-activator id="sa" input-channel="bar" output-channel="baz" auto-startup="false"
        expression="payload.toUpperCase()">
    <int:poller fixed-rate="6000" receive-timeout="0" />
</int:service-activator>

<int:logging-channel-adapter id="baz" level="ERROR"/>

... as expected I see 6 FOOs every 6 seconds (the i-c-a is polled once a second and the sa runs once every 6 seconds).

EDIT2:

I looked at your project and the root cause of your issue is, as you say, many polled endpoints, but really, it's this:

fixed-rate="0" receive-timeout="60000"

With this configuration, the scheduler resources (threads) are blocked in the QueueChannels and, as you have found, you have exhausted all the resources.

One solution is to increase the number of threads in the scheduler pool.

With this configuration, it seems you are trying to get on-demand, zero latency messaging with a poller by having the poller constantly waiting in the queue receive() method.

If you can't afford any latency, consider using DirectChannels instead. If you don't want the downstream endpoint to run on the caller's thread, use ExecutorChannels...

<task:executor id="exec" pool-size="100"/>

<int:channel id="otherMessageChannel1">
    <int:dispatcher task-executor="exec" />
</int:channel>

This is generally preferred over your current setup.

Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • Thank you a lot for your quick answer, I edited my original questions with further analysis. – Giulio Pulina May 31 '15 at 14:54
  • Interesting; another poller should not impact the jdbc poller - if it does, it's a bug; right now, I can't see how it can, looking at the code; I'll have to run some tests (but it might be tomorrow). Can you confirm which version of Spring Integration you are using? – Gary Russell May 31 '15 at 15:08
  • Thank you again :) The version I'm using is 2.1.0.RELEASE – Giulio Pulina Jun 01 '15 at 08:25
  • That is a __VERY__ old (ancient in software terms) version from January 2012 (although I still don't remember us having to fix a problem where the pollers interacted). The [current version is 4.1.4](http://projects.spring.io/spring-integration/). – Gary Russell Jun 01 '15 at 12:55
  • Very strange; I still can't reproduce your problem, even after dropping back to 2.1.0.RELEASE. Using separate event adapters and channels is logically identical to using a pub/sub channel for the same event. It would be interesting to compare the two startup logs (at DEBUG) showing the event and control bus messages. I'll update the answer with my test config. – Gary Russell Jun 01 '15 at 20:35
  • I think I reproduced the issue outside of the application I'm using: can you try to create about 10 serviceactivator (the ones with the poller inside)? I'm going to perform some other test tomorrow. – Giulio Pulina Jun 04 '15 at 17:18
  • Yes I can confirm I reproduced the issue. I created two test cases, one with 5 service activators and one with 10 service activators, and int the second one the poll frequency is 'overwritten' by the receive-timeout of one of the 10 pollers. I can share the code if you want. – Giulio Pulina Jun 05 '15 at 08:52
  • Tested also with spring integration 3.0.7 and not working. Instead, it works fine with 4.1.4, but unfortunately I cannot upgrade. – Giulio Pulina Jun 05 '15 at 10:36
  • Interesting; I can't think of any changes we have made that would change this behavior between 3.x and 4.x but, yes please, do share the code to reproduce and we'll take a look. – Gary Russell Jun 05 '15 at 12:34