1

We use microservices and event driven architecture(more particular choreography). We use kafka and many services use Spring Cloud Stream as an abstraction over the message brokers.

After upgrading our Spring Cloud Stream related source to the new functional style, we started to have problems in our integration tests. The problem is related to the replacement of the old MessageCollector to OutputDestination(test-binder).

The problem appears in our integration tests, where we would like to verify if proper events are being produced. Many of our services produce to a topic and consume from it in another module(same service). The OutputDestination works on topic level now and not on channel as the old MessageCollector. It causes the OutputDestination to not consume any messages if there is already a listener for this topic in the prod code.

I created a simple project to present our problem https://github.com/dgyordanov/scs-functional-test

We have a simple service like:

@Service
public class OrderService {

.........

public void changeOrder() {
    // Some order changes
    streamBridge.send("orderEvents-out-0", "Test Order Change Event");
}

In another module we have a listener for these events in the production code:

@Bean
public Consumer<String> orderEvents() {
    // React on order events
    return e -> System.out.println("### Order Event: " + e);
}

I want to test the changeOrder() but nothing is being consumed:

@Test
void orderChangedTest() {
   orderService.changeOrder();
   Message<byte[]> event = outputDestination.receive(100, "edu.events.orderEvents");
   assertNotNull(event);
}

When we run the test from above, we see the result from System.out.println("### Order Event: " + e);

The problem is that if we do not exclude the orderEvents() listener from the test context, the outputDestination will never receive messages, because the orderEvents() listener will consume them first. With the old MessageCollector, which worked on channel level, it was possible.

Could you help me how to make our big cucumber integration test suite work with the spring cloud stream test-binder?

We also tried to declare another channel for the same topic, but the outputDestination still consumed nothing.

1 Answers1

0

That is a correct behaviour, because you have a Consumer which is the end of the line since it produces nothing. In other words you can not test the outcome of the Consumer execution.

Now, if I understand correctly, you simply want to verify that the Consumer is indeed invoked (your bindings are correct). We can certainly introduce an enhancement to the test binder to allow you to do that in a more idiomatic way (I just raised an issue for that - https://github.com/spring-cloud/spring-cloud-stream/issues/2607). As a workaround you can still do it with a little bit of a reflection. Here is a sample code:

InputDestination inputDestination = context.getBean(InputDestination.class);
try {
    Field chField = ReflectionUtils.findField(InputDestination.class, "channels");
    chField.setAccessible(true);
    List<SubscribableChannel> channels = (List<SubscribableChannel>) chField.get(inputDestination);
    SubscribableChannel ch = channels.iterator().next(); // or more elaborate code if there are multiple bindings to find your channel
    ch.subscribe((x) -> {
        System.out.println("Second subscriber: " + x);
    });
} catch (Exception e) {
    // TODO: handle exception
}

FOLLOWUP

After looking at it further and experimenting some more I realised that adding anything additional would be redundant at best. As I mentioned before, Consumer by definition returns nothing so there is nothing to evaluate. And evaluating that it was invoked with the input you just sent is redundant, since you would be re-testing the core functionality of the framework itself which is not your responsibility since there would be nothing for you to do/fix is a bug is discovered.

And that is yet another reason why we decided to drop MessageCollector.

So, what I would recommend is to simply do the proper unit testing of your Consumer and leave the rest (i.e., testing) to the framework.

Oleg Zhurakousky
  • 5,820
  • 16
  • 17
  • Thanks for the fast response Oleg! We actually want to test the producer of the events and not the consumer. The problem is that there is a consumer in another module in the same application. We have already pretty huge integration tests suite and it verifies the events being emitted by various producers. It will be a lot of work now if we want to test each module in isolation in our integration suite, so no consumers from other modules will consume the events we would like to verify with the outputDestination. We will check this enhancements you suggested and will evaluate the situation. – Diyan Yordanov Jan 04 '23 at 16:33
  • Well, the producer (StreamBridge in your case) will definitely work, no matter what you put in its "destination" value since it will either use existing one or will create a new one, so once again I am not quite sure what exactly are you trying to test, but that could be just me. May be you can create a small example and push it to github somewhere so we can take a look. As I said, I am not against an enhancement, just want to understand what is missing. – Oleg Zhurakousky Jan 05 '23 at 07:49
  • Long story short, we used the old MessageCollector as an in-memory kafka in our integration tests. Our services are based on EDA and more particularly choreography. We have a pretty complex Aggregate in this service, that produces different type of business events. In our integration tests we emitted events to the service, expecting the service to change an aggregate state and to emit a particular business event. We used the MessageCollector to validate the output events. We also have another module in this service consuming the events mentioned above and we ended up in this situation. – Diyan Yordanov Jan 09 '23 at 09:46
  • Experimenting with the workaround you suggested, we created this Util https://github.com/dgyordanov/scs-functional-test/blob/master/src/test/java/edu/spring/cloud/stream/functionaltest/ScsChannelsTestUtil.java and it may serve our needs. We can use it as an intermediary step toward using kafka running in a test container in our integration tests. Thank you for your support! – Diyan Yordanov Jan 09 '23 at 09:48
  • "a Consumer is the end of the line since it produces nothing. In other words you can not test the outcome of the Consumer execution." "Consumer by definition returns nothing so there is nothing to evaluate." I disagree, there is definitely something to evaluate: did the consumer return or did it throw an exception. In terms of the broker, this can have big effects: is the message acked and deleted from the queue or not (and error handling strategies can be quite complex, so worth testing). – jonenst Jan 23 '23 at 12:25
  • Having this in mind, should it be possible to have code like: testNominal() { methodA(nominalData); // should send a message to queue "foo" and then consume it //assert queueA empty } or testError() { methodA(errorData); // should send a message to queue "foo" and consumer throws //assert queueA not empty } – jonenst Jan 23 '23 at 12:30