3

There is a Broadcaster, that accepts strings and append them to a StringBuilder.

I want to test it.

I have to use Thread#sleep to wait, while the broadcaster finish processing of strings. I want to remove sleep.

I tried to use Control#debug() unsuccessfully.

public class BroadcasterUnitTest {

@Test
public void test() {
    //prepare
    Environment.initialize();
    Broadcaster<String> sink = Broadcaster.create(Environment.newDispatcher()); //run broadcaster in separate thread (dispatcher)
    StringBuilder sb = new StringBuilder();
    sink
            .observe(s -> sleep(100)) //long-time operation
            .consume(sb::append);

    //do
    sink.onNext("a");
    sink.onNext("b");

    //assert
    sleep(500);//wait while broadcaster finished (if comment this line then the test will fail)
    assertEquals("ab", sb.toString());
}

private void sleep(int millis) {
    try {
        Thread.sleep(millis);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}
}
Aleks Ya
  • 859
  • 5
  • 15
  • 27

2 Answers2

2

I'm not familiar with Broadcaster (and it's probably deprecated since the question is old), but these 3 ways could be helpful in general:

  1. When testing Project-Reactor's Fluxes and stuff, you're probably better of using their testing library made specially for this. Their reference and the Javadoc on that part are pretty good, and I'll just copy an example that speaks for itself here:

    @Test
    public void testAppendBoomError() {
      Flux<String> source = Flux.just("foo", "bar"); 
      StepVerifier.create( 
        appendBoomError(source)) 
        .expectNext("foo") 
        .expectNext("bar")
        .expectErrorMessage("boom") 
        .verify(); 
    }
    
  2. You could just block() by yourself on the Fluxes and Monos and then run checks. And note that if an error is emitted, this will result in an exception. But have a feeling you'll find yourself needing to write more code for some cases (e.g., checking the Flux has emitted 2 items X & Y then terminated with error) and you'd be then re-implementing StepVerifier.

    @Test
    public void testFluxOrMono() {
      Flux<String> source = Flux.just(2, 3);
      List<Integer> result = source
            .flatMap(i -> multiplyBy2Async(i))
            .collectList()
            .block();
      // run your asserts on the list. Reminder: the order may not be what you expect because of the `flatMap`
      // Or with a Mono:
      Integer resultOfMono = Mono.just(5)
            .flatMap(i -> multiplyBy2Async(i))
            .map(i -> i * 4)
            .block();
      // run your asserts on the integer
    }
    
  3. You could use the general solutions to async testing like CountDownLatch, but, again, wouldn't recommend and would give you trouble in some cases. For example, if you don't know the number of receivers in advance you'll need to use something else.

Community
  • 1
  • 1
Hossam El-Deen
  • 972
  • 1
  • 13
  • 27
0

Per answer above, I found blockLast() helped.

@Test
public void MyTest()
{
    Logs.Info("Start test");    
 
    /* 1 */
    // Make a request
    WebRequest wr1 = new WebRequest("1", "2", "3", "4");
    String json1 = wr1.toJson(wr1);
    
    Logs.Info("Flux");
    Flux<String> responses = controller.getResponses(json1);

    /* 2 */
    Logs.Info("Responses in");
    responses.subscribe(s -> mySub.myMethod(s)); // Test for strings is in myMethod
    
    Logs.Info("Test thread sleeping");
    Thread.sleep(2000);
    
    /* 3 */
    Logs.Info("Test thread blocking");
    responses.blockLast();
    
    Logs.Info("Finish test");
}
rupweb
  • 3,052
  • 1
  • 30
  • 57