1

I want to take records from the database and transform it to json. This runs on Spring Cloud Dataflow.

I suspect I am missing some call on the IntegrationFlow.

The error output is:

Caused by: org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:440)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:319)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:267)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:231)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:140)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:62)


@Bean
public MessageSource<Object> jdbcMessageSource() {
    String query = "select cd_int_controle, de_tabela from int_controle rowlock readpast " +
    "where id_status = 0 order by cd_int_controle";
    JdbcPollingChannelAdapter adapter =
    new JdbcPollingChannelAdapter(dataSource, query);

    adapter.setMaxRows(properties.getPollSize());
    adapter.setUpdatePerRow(true);
    adapter.setRowMapper((RowMapper<IntControle>) (rs, i) -> new IntControle(rs.getLong(1), rs.getString(2)));
    adapter.setUpdateSql("update int_controle set id_status = 1 where cd_int_controle = :cdIntControle");

    return adapter;
}

@Bean
public IntegrationFlow jsonSupplier() {
    return IntegrationFlows.from(jdbcMessageSource(),
            c -> c.poller(Pollers.fixedRate(properties.getPollRateMs(), TimeUnit.MILLISECONDS).transactional()))
            .transform((GenericTransformer<List<IntControle>, String>) ints -> {
                //transform to Json
            })
            .get();
}
Thiago Sayão
  • 2,197
  • 3
  • 27
  • 41

1 Answers1

1

You are missing several points:

  1. The transform() in the Spring Integration indeed requires an output channel or reply channel header. There is just no way in Spring Integration to bypass channels between endpoints. Even if it is not requested in your flow between JDBC and transform, it is present there by the framework anyway. Since you call get() in the end of the flow and don't provide any hints what channel send transform result to, such a DestinationResolutionException is thrown.

  2. Spring Cloud Stream functional model deals with basic Java interfaces - Supplier, Function & Consumer. Calling a bean as jsonSupplier doesn't make it as a Supplier. You really need to say the framework what bean to use for binding. See docs for more info: https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.6.RELEASE/reference/html/spring-cloud-stream.html#spring_cloud_function

So, you are missing a connection point between an IntegrationFlow and Supplier declaration. Probably this one could work for you:

@Bean
PollableChannel jsonChannel() {
   return new QueueChannel();
}

...
          .transform((GenericTransformer<List<IntControle>, String>) ints -> {
                //transform to Json
          })
          .channel(jsonChannel())
          .get();
...

@Bean
public Supplier<Message<?>> jsonSupplier() {
    return jsonChannel()::receive;
}

So, the idea is to dump result of the flow into some channel and then bridge that data from a Supplier which is already visible for Spring Cloud Stream binding logic.

See also here: https://sagan-production.cfapps.io/blog/2019/10/25/spring-cloud-stream-and-spring-integration

Artem Bilan
  • 113,505
  • 11
  • 91
  • 118
  • with you guidance I have managed to make IntegrationFlow work, but now the supplier function is never called. I have `spring.cloud.stream.function.bindings.jsonSupplier-out-0=output` and `spring.cloud.function.definition=jsonSupplier` on my configuration. – Thiago Sayão Jul 30 '20 at 17:03
  • Do you have a binder dependency? Probably Spring Cloud Stream is just not fired up because there is no a binder dependency in your project... – Artem Bilan Jul 30 '20 at 17:06
  • ``` org.springframework.cloud spring-cloud-stream-binder-kafka ``` – Thiago Sayão Jul 30 '20 at 18:39
  • OK. Did you debug it to be sure that your supplier is not called? Any chances to share with us a simple `source` application project to let us to reproduce an issue? – Artem Bilan Jul 30 '20 at 18:42
  • Does jsonSupplier needs to be annotated with @PollableBean ? – Thiago Sayão Jul 31 '20 at 14:16
  • No, because it doesn’t produce a `Flux` – Artem Bilan Jul 31 '20 at 14:28
  • I have put a logger on te queue channel and it's being called (thru receive) but the following flow `stream create --name sybase_to_pgsql --definition "jdbc-sybase-supplier | log "` does not output any log. – Thiago Sayão Aug 01 '20 at 20:06
  • Have no idea. Sounds like a separate, more DataFlow-specific SO question. – Artem Bilan Aug 01 '20 at 20:47
  • https://stackoverflow.com/questions/63211414/spring-cloud-dataflow-does-not-flow – Thiago Sayão Aug 02 '20 at 21:54