2

My problem: I want to create a stream of string that will be be sent from controller from time to time.

 Processing started!
 Step 1 completed.                     (This might be sent after 5 seconds or 10 minutes.)
 Process completed.                    (This might be sent after 15 minutes.)

Here is code snippet in controller:

@GetMapping(value = "/stream1", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> streamData() {
        return Flux.create(emitter -> {
            emitter.next("Processing started!");
            try {
                TimeUnit.SECONDS.sleep(5);
                emitter.next("Step 1 completed.");
                TimeUnit.SECONDS.sleep(5);
                emitter.next("Process completed.");
                emitter.complete();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, FluxSink.OverflowStrategy.LATEST);
        //create.publish().connect();
        //return create;
    }

But it emmits data only when it is completed all processing. Means it emmits data after 10 seconds and all stream at once.

How to achieve some stream where it start sending data as soon as single data is ready?

  • What do you use to call your endpoint? – Yauhen Balykin Mar 06 '20 at 10:55
  • @YauhenBalykin I am using chrome browser and curl also to call this endpoint. But they are both behaving same in this case. – full_stuck_developer Mar 06 '20 at 11:01
  • As I remember SSE doesn’t work for curl, but for Chrome it works ok – Yauhen Balykin Mar 06 '20 at 11:04
  • I checked your code in Chrome on my pc, it works fine. – Yauhen Balykin Mar 06 '20 at 14:10
  • if you want to do requests using curl you need to include the `-N` parameter. So `curl -N http://localhost:8080/stream1` the `-N` parameter disables buffering so as curl receives data it will print it, and not buffer until the end of the request. – Toerktumlare Mar 06 '20 at 21:38
  • `Thread.sleep` is almost never a good idea in Reactive Programming. Here you are creating a `Flux` that gives the illusion that things are asynchronous, when in reality they are very much synchronous and blocking thanks to the `sleep`. Since you're using WebFlux, you're potentially hogging up resources that will prevent other requests to be processed by the system. – Simon Baslé Mar 09 '20 at 08:22
  • @SimonBaslé Thanks for pointing out. You are right. But to ask the problem and visualize the problem, I created this. This code snippet is just demo to what the actual code might do in the meantime. In actual cases I am either using reactive WebClient for API call or somewhere a simple processing thread. – full_stuck_developer Mar 09 '20 at 10:00
  • Did you managed to make it work? – David Marciel Jun 15 '22 at 15:59

1 Answers1

0

you are using less ideal method for your task. You can use 'Flux.generate(...)'. It is in contrast to 'Flux.create(...)' used to generate single item, and it is used when subscriber requests something. So no problem with backpressure.

Sample:

    @GetMapping(value = "/feapi/automation/approach1", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> streamData() {
        final AtomicInteger counter = new AtomicInteger();
        return Flux.generate(generator -> {
            try {
                TimeUnit.SECONDS.sleep(new Random().nextInt(1, 10));
                generator.next("Next step (" + counter.incrementAndGet() + ") done. Going further.");
                // NOTE only SINGLE item can be emmited in one generator call. You can also call complete or error.
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
Lubo
  • 1,621
  • 14
  • 33