0

I'm in the process of upgrading a service from spring cloud stream 2.0 to 3.1

Previously for publishing to a Kinesis stream I had

@Component
public class KinesisStreamService {
    private final Log logger = LogFactory.getLog(this.getClass());
    private StreamProcessor orderOut;

    @Autowired
    public KinesisStreamService(StreamProcessor orderOut) {
        this.orderOut = orderOut;
    }

    public void send(String event) {
        if (event != null) {
            this.orderOut.ordersOut().send(new GenericMessage(event));
            this.logger.debug("Event sent KinesisInStreamServiceImpl : " + event);
        } else {
            throw new RuntimeException("Event can not be null");
        }
    }
}
@Service
public interface StreamProcessor {
    String INPUT = "ordersIn";

    @Output
    MessageChannel ordersOut();

    @Input
    SubscribableChannel ordersIn();
}

Now @Ouput has been depreciated in favor functional programming in 3.1

@EnableBinding @deprecated as of 3.1 in favor of functional programming model, mentions creating a Supplier bean like

@Service
class PubSubSendQueue {
    @Bean
    public Supplier<String> output(){
        return Supplier { "Adam" }
    }
}

But how would I define a Supplier bean, given I would need to pass in an argument to the bean definition function at run time Would lazy initialization of the bean be an appropriate solution. Something like

    @Bean
    @Lazy
    public Supplier<GenericMessage> ordersOut(String event) {
        return () -> new GenericMessage(event);
    }

dependencies are

dependencies {
    implementation 'org.apache.activemq:activemq-broker:5.17.2'
    implementation 'com.amazonaws:aws-java-sdk:1.12.310'
    implementation 'com.amazonaws:amazon-sqs-java-messaging-lib:1.1.0'
    implementation 'org.springframework.integration:spring-integration-aws:2.5.4'
    implementation 'org.springframework.cloud:spring-cloud-stream-binder-kinesis:3.0.0'
    implementation 'org.springframework:spring-web:6.0.0'
    implementation 'org.springframework.boot:spring-boot-starter:3.0.5'
    implementation 'javax.xml.bind:jaxb-api:2.3.0'
    implementation 'com.sun.xml.bind:jaxb-impl:2.3.6'
    implementation 'org.glassfish.jaxb:jaxb-runtime:2.3.6'
    implementation 'javax.activation:activation:1.1.1'
    implementation 'org.apache.httpcomponents.client5:httpclient5:5.2.1'
    testImplementation 'org.codehaus.groovy:groovy-all:2.5.18'
    testImplementation 'org.spockframework:spock-spring:1.3-groovy-2.5'
    testImplementation 'org.spockframework:spock-core:1.3-groovy-2.5'
    testImplementation 'org.springframework:spring-test:6.0.0'
    implementation 'cglib:cglib-nodep:3.2.7'
    implementation group: 'org.springframework', name: 'spring-jms', version: '6.0.0'
    implementation 'com.fasterxml.jackson.core:jackson-databind:2.14.0-rc1'
    implementation group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-cbor', version: '2.13.4'
    implementation 'javax.jms:javax.jms-api:2.0.1'
    implementation 'javax.validation:validation-api:2.0.1.Final'
    implementation "jakarta.xml.bind:jakarta.xml.bind-api:2.3.2"
    implementation "org.glassfish.jaxb:jaxb-runtime:2.3.2"
shrek_23
  • 51
  • 1
  • 5

1 Answers1

0

Use StreamBridge to send arbitrary messages. Supplier<?> is for a polled message source.

https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_sending_arbitrary_data_to_an_output_e_g_foreign_event_driven_sources

Gary Russell
  • 166,535
  • 14
  • 146
  • 179