2

I have a huge number of messages coming from CSV files, that then get sent to a rate limited API. I'm using a Queue Channel backed by a database channel message store to make the messages durable while processing. I want to get as close to the rate limit as possible, so I need to be sending messages to the API across multiple threads.

What I had in my head of how it should work is something reads the DB, sees what messages are available, and then delegates each message to one of the threads to be processed in a transaction.

But I haven't been able to do that, what I've had to do is have a transactional poller which has a thread pool of N threads, a fixed rate of say 5 seconds, and a max messages per poll of 10 (something more than what could be processed in 5 seconds) ... which works ok, but has problems when there are not many messages waiting (i.e. if there were 10 messages they would be processed by a single thread) this isn't going to be a problem in practice because we will have 1000's of messages. But it seems conceptually more complex than how I thought it should work.

I might not have explained this very well, but it seems like what might be a common problem when messages come in fast, but go out slower?

sMoZely
  • 369
  • 1
  • 10
  • What you are looking at is a "classic" problem of load balancing. You want to create work packages that are big enough to take a certain amount of time and have your threads actually working on something over trying to get new work packages but you also want to make sure that you have a package you can give to every thread. I don't think you can find an universal solution for this problem but can only find something that works for your use-case. As your specification is that are getting 1000s of messages I think your current solution is probably as good as it gets :) – Ben Jun 20 '18 at 10:16

1 Answers1

1

Your solution is really correct, but you need to think do not shift messages into an Exectuor since that way you you jump out of the transaction boundaries.

The fact that you have 10 messages processed in the same thread is exactly an implementation details and it looks like this:

AbstractPollingEndpoint.this.taskExecutor.execute(() -> {
            int count = 0;
            while (AbstractPollingEndpoint.this.initialized
                    && (AbstractPollingEndpoint.this.maxMessagesPerPoll <= 0
                    || count < AbstractPollingEndpoint.this.maxMessagesPerPoll)) {
                try {
                    if (!Poller.this.pollingTask.call()) {
                        break;
                    }
                    count++;
                }

So, we poll messages until maxMessagesPerPoll in the same thread.

To make it really more parallel and still keep transaction do not lose messages you need to consider to use fixedRate:

/**
 * Specify whether the periodic interval should be measured between the
 * scheduled start times rather than between actual completion times.
 * The latter, "fixed delay" behavior, is the default.
 */
public void setFixedRate(boolean fixedRate)

And increase an amount of thread used by the TaskScheduler for the polling. You can do that declaring a ThreadPoolTaskScheduler bean with the name as IntegrationContextUtils.TASK_SCHEDULER_BEAN_NAME to override a default one with the pool as 10. Or use Global Properties to just override the pool size in that default TaskScheduler: https://docs.spring.io/spring-integration/docs/5.0.6.RELEASE/reference/html/configuration.html#global-properties

Artem Bilan
  • 113,505
  • 11
  • 91
  • 118
  • Thanks @Artem! good to know I'm doing it the correct way. But just for completeness of your answer, I tried what you suggested and it looks like there is still just 1 thread processing at a time, even with fixed rate it seems like the using the ThreadPoolTaskScheduler is just running one at a time (i.e. the first Thread will process 10, then the next thread will do the next 10 ... ). But if I do `.taskExecutor(Executors.newFixedThreadPool(10))` on the Poller it behaves as you suggest (that the point I had it at before asking the question) – sMoZely Jun 20 '18 at 21:15
  • 1
    `fixed rate of say 5 seconds` is too long to start a new task. I'm pretty sure your first 10 messages are processed much faster. To make it a bit parallel you need to have `fixedRate` with really a reasonable time. Try with just 50 milliseconds. – Artem Bilan Jun 21 '18 at 00:05