0

I tested a little around with the proposal in this thread: flux within executorservice

and i have simplified the example a little to understand it easier. So, heres the example:

    ExecutorService executorService = Executors.newSingleThreadExecutor();
    Flux.just("1", "2", "3").subscribeOn(Schedulers.fromExecutorService(executorService)).doOnNext(System.out::println).subscribe();

    try {
        executorService.awaitTermination(10, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    } finally {
        executorService.shutdownNow();
    }

But, now if this is executed it always waits the 10 seconds before the main thread ends. What was i expecting? I expected the executor service waiting 10 seconds at max until going on and calling shutdown. Normally it should be done in a few milliseconds and returning immediately after printing 1, 2, 3. The javadoc says here:

Blocks until all tasks have completed execution after a shutdown request, or the timeout occurs, or the current thread is interrupted, whichever happens first.

I don't get it. Whats wrong here?

Another sample that runs and ends immediately(but wrong in my opinion) is this one:

        ExecutorService executorService = Executors.newSingleThreadExecutor();
    Flux.range(1, 1_000_000).subscribeOn(Schedulers.fromExecutorService(executorService)).doOnNext(System.out::println).subscribe();
    executorService.shutdownNow();

BUT, here i would have expected that the main thread doesn't wait until completion of flux (resp. the executor service). But it does. In my understanding the two examples are behaving totally upside down according to the javadoc description. javadoc says:

This method does not wait for actively executing tasks to terminate. Use awaitTermination to do that.

Any ideas?

Regards Bernado

Brian Clozel
  • 56,583
  • 15
  • 167
  • 176
Bernado
  • 548
  • 1
  • 6
  • 18
  • 1
    Before `awaitTermination` you need to call `shutdown`. You aren't calling `shutdown` in your code. So not `shutdownNow` but `shutdown`! If you don't call this it will wait for the timeout, after 10 seconds, to occur. – M. Deinum Nov 11 '19 at 11:43
  • Oh yes. Thx, thats the solution to code part1. But, what with example 2? Why does it wait until completion although i dont added an executorService.awaitTermination(...); – Bernado Nov 11 '19 at 11:52
  • It doesn't wait, it just terminates and gives no guarantees. However apparently you are doing that little work that it has time to finish before being shutdown or during shutdown, – M. Deinum Nov 11 '19 at 12:01
  • Oh well....i think i got it. There's not gurantee that the main thread will wait until completion, right? Even the order shutdown -> await doesnt guarantee that it will act like the thread.join? – Bernado Nov 11 '19 at 12:43
  • It doesn't mean it is shutdown, the `awaitTermination` returns a `boolean` which in case of `false` means that it hasn't shutdown yet. Which is why you see the `awaitTermination` often in a while loop. `while(!awaitTermination(1, SECONDS) {}`. Which will loop until it really finished, to then cleanup the resources. – M. Deinum Nov 11 '19 at 13:45
  • Sorry...but one, again...the code flux.subscribe -> shutdown -> awaittermination is not semantically equal to thread.join, right? what i originally wanted to do is: create flux, publish it on a new thread and wait for completion in main thread. For example, the following code terminates immediately and evaluates awaittermination to true: `Flux.range(1, 3).publishOn(Schedulers.fromExecutorService(executorService)).delayElements(Duration.ofSeconds(3)) .subscribe(); executorService.shutdown(); System.out.println(executorService.awaitTermination(20, TimeUnit.SECONDS));` – Bernado Nov 11 '19 at 13:58
  • It isn't equal to `thread.join`. The shutdown will lead to a shutdown if the tasks are finished. The `awaitTermination` will see that and finish as soon as everything is done. Unless you have a long running task taking up to 30 seconds it will return false. I strongly suggest you to read the javadocs of the `ExecutorService`. – M. Deinum Nov 11 '19 at 14:11
  • Ok i will read the javadoc... ;-) Sorry again but even if i HAVE a long running task, shutdown terminates it even if there is a awaittermination call... – Bernado Nov 11 '19 at 14:53
  • If that isn't in a while loop which doesn't end until it really terminated it will just wait for x time and then destroy the program. – M. Deinum Nov 12 '19 at 06:40
  • Sorry, but that seems not to be the case. Please try my example four comments above. It terminates immediately although there is a wait... – Bernado Nov 12 '19 at 08:14
  • Which. The `shutdownNow` will not wait. The `shutdown` will wait if you use an `awaitTermination` if not it will just stop the program. – M. Deinum Nov 12 '19 at 08:15
  • There is a wait. So what is the idea of calling shutdown/await termination if it is useless or await has no effect? The example in comment starting with “sorry “....18hours ago... – Bernado Nov 12 '19 at 08:19
  • It will wait 10 seconds, or if the task finished before that finish immediately. If it takes longer then 10 seconds, your sample won;t stop becauyse there is no while loop. – M. Deinum Nov 12 '19 at 08:21
  • I think this discussion leads to no result. ;) there is an await of 20 secs, and the task should do it in 9 (3*3), but it does stop immediately. Please give it a try... – Bernado Nov 12 '19 at 08:24
  • There is no sample that does it in 9 seconds, there is no limit in your stream of processing. Just printing some numbers. – M. Deinum Nov 12 '19 at 08:26
  • Flux.range(1, 3).publishOn(Schedulers.fromExecutorService(executorService)).delayElements(Duration.ofSeconds(3)) .subscribe(); executorService.shutdown(); System.out.println(executorService.awaitTermination(20, TimeUnit.SECONDS)); – Bernado Nov 12 '19 at 08:29
  • 1
    The `delay` will move it to a different scheduler and from the point of the single threaded executor it has finished, because things moved to a different thread. – M. Deinum Nov 12 '19 at 08:41
  • ahh, now i/we got it....thx! ;-) if i do a thread.sleep e.g. in a map() so it is the same thread it works as expected – Bernado Nov 12 '19 at 09:13

0 Answers0