1

I am implementing Spring Batch-Integration RemoteChunking. https://docs.spring.io/spring-batch/docs/current/reference/html/spring-batch-integration.html#remote-chunking

I've come across the deprecation of @Input and the documentation says we have to use the functional style.

How can I use the Consumer (which is used on the spring cloud stream) in my Spring Batch Integration Flows?

package pt.bayonne.sensei.RemoteChunking.manager;

import org.aspectj.bridge.Message;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

import java.util.function.Consumer;
import java.util.function.Supplier;

@Profile("!worker")
@Configuration
public class FunctionalBinders {


    @Bean
    public Sinks.Many<Object> sink() {
        return Sinks.many()
                .replay()
                .latest();
    }


    @Bean
    public Supplier<Flux<Object>> clientRequests() {
        return () -> sink()
                .asFlux()
                .cache();
    }
    
    @Bean
    public Consumer<Message<?>> onClientReplies(){
        return message -> {
            //do your stuff
        };
    }
}

My JobConfiguration

public TaskletStep dispatchStep(){
       return this.remoteChunkingManagerStepBuilderFactory.get("dispatch-step")
                .chunk(10)
                .reader(reader())
                .outputChannel((message, timeout) -> sink.tryEmitNext(message).isSuccess())
                .inputChannel(replies()) //how to use the functional style here?
                .build();

}

I know that it needs a PollableChannel but my question is how to use the functional style on my RemoteChunkingManagerStepBuilderFactory?

Any example would be very appreciated. Thanks a lot.

  • How did you use it before with @Input? I am not sure what relation this has to Spring Cloud Stream as you are building a batch app. – Oleg Zhurakousky May 08 '22 at 11:18
  • @OlegZhurakousky thanks for your reply. Definitely, it has nothing to do with Spring cloud Steam but the reason I asked is the documentation of "@Input" itself says it is "@deprecated as of 3.1 in favor of functional programming model", so i just want to see how i can use the functional style on Spring Batch Integration on RemoteChunkingManagerStepBuilderFactory. I am using Spring Cloud Stream Kafka as message middleware binding. – Pascoal Eddy Bayonne May 08 '22 at 14:45
  • I'm not sure I understand what you mean by "functional style". Are you referring to functional bean registration in Spring (https://github.com/spring-projects/spring-framework/issues/19398)? Using Spring Cloud Function to implement Remote Chunking is a different thing (and I'm not sure if it is practical or even feasible by design, ie using a streaming solution to implement a batch application). – Mahmoud Ben Hassine May 09 '22 at 08:12
  • Hi @MahmoudBenHassine As you can see on the above code snippet I used lambda on outputChannel like .outputChannel((message, timeout) -> sink.tryEmitNext(message).isSuccess()). My question was if it is possible to use labmda too in the .inputChannel( ). I had to ask here because of the deprecation of "@Input" which i use on SubscribableChannel. Thanks for your answer – Pascoal Eddy Bayonne May 09 '22 at 08:20

1 Answers1

2

.outputChannel((message, timeout) -> sink.tryEmitNext(message).isSuccess()) .inputChannel(replies()) //how to use the functional style here?

The method org.springframework.batch.integration.chunk.RemoteChunkingManagerStepBuilder#outputChannel(MessageChannel) accepts a org.springframework.messaging.MessageChannel which is a functional interface (See how it is annotated with @FunctionalInterface). Hence you can use a lambda to define the output channel in the builder.

However, org.springframework.batch.integration.chunk.RemoteChunkingManagerStepBuilder#inputChannel(PollableChannel) accepts a org.springframework.messaging.PollableChannel which is not a functional interface. That's why it is not possible to use a lambda in here.

Mahmoud Ben Hassine
  • 28,519
  • 3
  • 32
  • 50
  • thanks for your answer. Now we need to do something else like workaround because the company doesn't allow us to use deprecated things, in our case is "@Output" from Spring Integration Messaging. Our Sonar compains more than any thing :) – Pascoal Eddy Bayonne May 12 '22 at 12:56