13

I am trying to understand how Subscriber and Publisher works in java 9.

Here I have created one subscriber here and using SubmissionPublisher for publishing item .

I am trying to publish 100 strings to subscriber. If I do not make the Client program to sleep (see commented code in MyReactiveApp), I do not see all the items are published.

why is it not waiting for all the strings processed here:

strs.stream().forEach(i -> publisher.submit(i)); // what happens here? 

If I replace the above code with, I see all the strings are printed in console

strs.stream().forEach(System.out::println);

Client program that publishes using SubmissionPublisher.

import java.util.List;
import java.util.concurrent.SubmissionPublisher;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class MyReactiveApp {

    public static void main(String args[]) throws InterruptedException {

        SubmissionPublisher<String> publisher = new SubmissionPublisher<>();

        MySubscriber subs = new MySubscriber();
        publisher.subscribe(subs);


        List<String> strs = getStrs();

        System.out.println("Publishing Items to Subscriber");
        strs.stream().forEach(i -> publisher.submit(i));

        /*while (strs.size() != subs.getCounter()) {
            Thread.sleep(10);
        }*/

        //publisher.close();

        System.out.println("Exiting the app");

    }

    private static List<String> getStrs(){

        return Stream.generate(new Supplier<String>() {
            int i =1;
            @Override
            public String get() {
                return "name "+ (i++);
            }
        }).limit(100).collect(Collectors.toList());
    }

}

Subscriber

import java.util.concurrent.Flow.Subscription;

public class MySubscriber implements java.util.concurrent.Flow.Subscriber<String>{

    private Subscription subscription;

    private int counter = 0;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(100);

    }

    @Override
    public void onNext(String item) {
        System.out.println(this.getClass().getSimpleName()+" item "+item);
        //subscription.request(1);
        counter++;

    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println(this.getClass().getName()+ " an error occured "+throwable);

    }

    @Override
    public void onComplete() {
        System.out.println("activity completed");

    }
    public int getCounter() {
        return counter;
    }

}

output:

Publishing Items to Subscriber
MySubscriber item name 1
MySubscriber item name 2
MySubscriber item name 3
MySubscriber item name 4
MySubscriber item name 5
Exiting the app
MySubscriber item name 6
MySubscriber item name 7
MySubscriber item name 8
MySubscriber item name 9
MySubscriber item name 10
MySubscriber item name 11
MySubscriber item name 12
Shiva
  • 1,962
  • 2
  • 13
  • 31
  • 1
    Looks related to [Back-Pressure](https://www.reactivemanifesto.org/glossary#Back-Pressure) instead. – Naman Jan 02 '19 at 18:21

1 Answers1

6
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();

Creates a new SubmissionPublisher using the ForkJoinPool.commonPool() for async delivery to subscribers

see: https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/SubmissionPublisher.html#SubmissionPublisher--

So actually

    strs.stream().forEach(i -> publisher.submit(i));

enqueues all submissions and delivers them asynchronously on another thread. But then the application is terminated. This is independent of the progress of the worker thread. This means that the application is terminated regardless of how many elements the worker thread has already delivered.

This can be different for each run. In the worst case, the application could be terminated before the first item is delivered.

Threads

If you want to verify that the main method of MyReactiveApp and the delivery in MySubscriber's onNext happen on different threads you can print out the names of the corresponding threads, e.g. in MyReactiveApp's main:

System.out.println(Thread.currentThread().getName()) 

will output main as thread name.

Whereas MySubscriber's onNext method will e.g. output something like ForkJoinPool.commonPool-worker-1.

User and Deamon Threads

Why does the application terminate although we still have a running thread?

There are two kind of threads in Java:

  • user threads
  • daemon threads

A Java program terminates when no longer any user threads are running, even when deamon threads are still running.

The main thread is a user thread. The SubmissionPublisher uses here worker threads from ForkJoinPool.commonPool(). These are daemon threads.

All worker threads are initialized with Thread.isDaemon() set true.

https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/ForkJoinPool.html

Stephan Schlecht
  • 26,556
  • 1
  • 33
  • 47
  • Thanks for answer, but can you please through some more light on **enqueues all submissions and delivers them asynchronously on another thread** .. are you referring to `publisher.submit`? As I was aware the created threads can be running after the main thread exits. – Shiva Jan 02 '19 at 04:48
  • I've added infos about user and worker threads to the answer. – Stephan Schlecht Jan 02 '19 at 16:40
  • Thanks, You have provided details needed to me. accepting as answer. – Shiva Jan 02 '19 at 17:19
  • After your answer, I have tried with fork join and found the similar behaviour. But I was wondering threads submitted to fork-join pool were not completed if the mail thread exits. see if you add details of it if you find them relevant here.. – Shiva Jan 02 '19 at 17:22
  • I'm not sure I understood the question. Are you saying that you are creating your own thread factory by implementing the `newThread` method of ForkJoinWorkerThreadFactory and delivering user threads? And why aren't these threads finished? Or is the question different? – Stephan Schlecht Jan 02 '19 at 21:11
  • Sorry - I missed about `ForkJoinPool` creates `daemon` threads while writing the comment.. Now it is clear to me. thanks – Shiva Jan 03 '19 at 07:10