1

my setup (simplified for clarity) is following:

<int:inbound-channel-adapter channel="in" expression="0">
    <int:poller cron="0 0 * * * *"/>
    <int:header name="snapshot_date" expression="new java.util.Date()"/>
    <int:header name="correlationId" expression="T(java.util.UUID).randomUUID()"/>
    <!-- more here -->
</int:inbound-channel-adapter>

<int:recipient-list-router input-channel="in" apply-sequence="true">
    <int:recipient channel="data.source.1"/>
    <int:recipient channel="data.source.2"/>
    <!-- more here -->
</int:recipient-list-router>

<int:chain input-channel="data.source.1" output-channel="save">
    <int-jdbc:outbound-gateway data-source="db1" max-rows-per-poll="0">
        <int-jdbc:query>
            select * from large_dataset
        </int-jdbc:query>
    </int-jdbc:outbound-gateway>
    <int:header-enricher>
        <int:header name="source" value="data.source.1"/>
    </int:header-enricher>
</int:chain>

<int:chain input-channel="data.source.2" output-channel="save">
    <int-jdbc:outbound-gateway data-source="db1" max-rows-per-poll="0">
        <int-jdbc:query>
            select * from another_large_dataset
        </int-jdbc:query>
    </int-jdbc:outbound-gateway>
    <int:header-enricher>
        <int:header name="source" value="data.source.2"/>
    </int:header-enricher>
</int:chain>

<int:chain input-channel="save" output-channel="process">
    <int:splitter expression="T(com.google.common.collect.Lists).partition(payload, 1000)"/>
    <int:transformer>
        <int-groovy:script location="transform.groovy"/>
    </int:transformer>
    <int:service-activator expression="@db2.insertData(payload, headers)"/>
    <int:aggregator/>
</int:chain>

<int:chain input-channel="process" output-channel="nullChannel">
    <int:aggregator/>
    <int:service-activator expression="@finalProcessing.doSomething()"/>
</int:chain>

let me explain the steps a little bit:

  1. poller is triggered by cron. message is enriched with some information about this run.
  2. message is sent to multiple data-source chains.
  3. each chain extracts data from large dataset (100+k rows). resultset message is marked with source header.
  4. resultset is split into smaller chunks, transformed and inserted into db2.
  5. after all data sources have been polled, some complex processing is initiated, using the information about the run.

this configuration does the job so far, but is not scalable. main problem is that i have to load full dataset into memory first and pass it along the pipeline, which might cause memory issues.

my question is - what is the simplest way to have resultset extracted from db1, pushed through the pipeline and inserted into db2 in small batches?

ilj
  • 859
  • 8
  • 18

1 Answers1

1

First of all since version 4.0.4 Spring Integration's <splitter> supports Iterator as payload to avoid memory overhead.

We have a test-case for the JDBC which shows that behaviour. But as you see it is based on the Spring Integration Java DSL and Java 8 Lamdas. (Yes, it can be done even for older Java versions without Lamdas). Even if this case is appropriate for you, your <aggregator> should not be in-memory, because it collects all messages to the MessageStore.

That's first case.

Another option is based on the paging algorithm, when your SELECT accepts a pair of WHERE params in the your DB dialect. For Oracle it can be like: Paging with Oracle. Where the pageNumber is some message header - :headers[pageNumber]

After that you do some trick with <recipient-list-router> to send a SELECT result to the save channel and to some other channel wich increments pageNumber header value and sends a message to the data.source.1 channel and so on. When the pageNumber becomes out of data scope, the <int-jdbc:outbound-gateway> stops produces results.

Something like that.

I don't say that it so easy, but it should be a start point for you, at least.

Community
  • 1
  • 1
Artem Bilan
  • 113,505
  • 11
  • 91
  • 118
  • 2nd option looks overcomplicated to me. it would be easier to implement all this without gateways and routers at all, just by using custom beans and passing ResultSet or Iterator around. but then why use SI anyway? 1st option looks more promising, will take a look. – ilj Jan 29 '15 at 17:52
  • Spring Integration is a framework for `EIP` and it gets deal with messages and provide `loosely-coupling` principle with those high-level components. Any custom logic always can be done with code and wired to the integration flow. Yes, flows should be as simple as possible, but we those huge options on the component we may go astray that framework should do everything for us and even some one may decide that it is programming language. No, the real life isn't so predictable and we really have to do some work even if we use some tools. – Artem Bilan Jan 29 '15 at 19:20
  • i played with Iterators and the approach works in general, but i have 2 questions, which i hope are simpler than the original one: 1. how do i correlate these messages? 2. how do i notify the bean that generates iterators that it's ok to close db connection? – ilj Jan 29 '15 at 21:41
  • 1. Actually Splitter adds `SequenceDetails` by default (`apply-sequence="false"`) otherwise. The `correlationKey` is a request message `id`. 2. If you use `JdbcTemplate` and that my `ResultSetIterator`, then everything is done for around connections. – Artem Bilan Jan 30 '15 at 07:51
  • 1
    1. it does add sequence details object, but it sets sequence length to 0, which makes it impossible to release. the only way i see out of this is to put marker object and implement custom release strategy that looks for that object. can you think of anything better? – ilj Jan 30 '15 at 21:16
  • 1
    2. you are wrong. if you debug the test in question, you will see that messages are received **after** connection is returned to pool. while this may work with HSQLDB in your test suite, it won't work with real DB such as Oracle. – ilj Jan 30 '15 at 21:18
  • 1
    1. That's true. Because we don't know the number of items. For this purpose it is better to rely on the `group-timeout` for the `aggregator` and use `release-strategy-expression="true"` to allow to release some short portions. The `expire-groups-upon-completion="true"` is good too, to allow to release the same `correlationKey` several times. – Artem Bilan Feb 01 '15 at 16:18
  • 1
    2. You are right. Just tested that with MySQL and even `@Transactional` doesn't help. So, I think we finally should come up with some `iterator` option for JDBC adapters and hide this hard work which works with MySQL well: https://gist.github.com/artembilan/180199bed9cc32902834 – Artem Bilan Feb 01 '15 at 16:37
  • 1. agreed, timeout is a better idea. as i understand, i can add custom bean into expire-advice-chain and mess with headers to indicate that aggregated message is a result of timeout rather that normal group completion. am i right? asking, because i didn't find any clean example for expiration. – ilj Feb 03 '15 at 08:13
  • 2. looks right, but you should change "close connection" to "release to datasource". any plans to make this part of framework? BTW, i'm working on unified iterator for both - DB and large files. i can open a pull request when i'm done. if you think it can have any value. – ilj Feb 03 '15 at 08:17
  • 1. I''d say that there is need to come up with some aggregator configuration to avoid any normal release, but just rely on that `group-timeout`, `send-partial-result-on-expiry="true"`, `expire-groups-upon-completion="true"` and `TimeoutCountSequenceSizeReleaseStrategy`. – Artem Bilan Feb 03 '15 at 08:27
  • 2. Yes of course `DataSourceUtils.releaseConnection`. Feel free to raise an JIRA issue on the matter! Re. files: we already have `FileSplitter` since 4.1.3 and we are going to provide namespace support for 4.2: https://jira.spring.io/browse/INT-3600 – Artem Bilan Feb 03 '15 at 08:31