0

I am trying to understand how Observables are executed but can't seem to get this simple code to work.

public class RxJavaExample {
    public static void main(String[] args) {
        Observable<String> hello = Observable.fromCallable(() -> 
            getHello()).subscribeOn(Schedulers.newThread());

        hello.subscribe();

        System.out.println("End of main!");
    }

    public static String getHello() {
        System.out.println("Hello called in " + 
            Thread.currentThread().getName());
        return "Hello";
    }
}

Shouldn't hello.subscribe() execute getHello()?

Krishnaraj
  • 2,360
  • 1
  • 32
  • 55

3 Answers3

2

It is because your main thread finishes, before the background thread gets to getHello. Try to add a Thread.sleep(5000) in your main method before exiting.

Alternatively, wait until the onCompleted of your subscription is called.

EDIT: The reason why the program terminates is because RxJava spawns daemon threads. On the search for a good source, I also found this question, which probably answers it as well.

sfiss
  • 2,119
  • 13
  • 19
  • I don't think a Java program will exit until all threads complete. Moreover I did put a breakpoint in the `getHello()` method, it was never called. – Krishnaraj Jul 02 '19 at 06:01
  • Why would you not try it out, instead of assuming that you are correct and know how it works? Please have a look at my edits. Maybe also have a look at https://stackoverflow.com/questions/2213340/what-is-a-daemon-thread-in-java – sfiss Jul 02 '19 at 06:17
  • Java does happily exit without waiting for threads to finish, that's why we have ExecutorService.awaitTermination. – LeffeBrune Jul 02 '19 at 06:24
  • LeffeBrune, I didn't initially know that RxJava created daemon threads till @sfiss pointed it out. Thanks to both of you. – Krishnaraj Jul 02 '19 at 06:42
0

@sfiss is right, this works just as you would expect:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class RxJavaExample {
  public static void main(String[] args) throws InterruptedException {
    ExecutorService exec = Executors.newCachedThreadPool();
    Observable<String> hello = Observable.fromCallable(() -> getHello())
        .subscribeOn(Schedulers.from(exec));

    hello.subscribe();

    System.out.println("End of main!");

    exec.shutdown();
    exec.awaitTermination(10, TimeUnit.SECONDS);
  }

  public static String getHello() {
    System.out.println("Hello called in " + Thread.currentThread().getName());
    return "Hello";
  }
}

With the following output:

End of main!
Hello called in pool-1-thread-1
LeffeBrune
  • 3,441
  • 1
  • 23
  • 36
-1

It may be you are getting confused between Threads and Observables,

The way I have used Observables in the past is for a timer on a Minecraft plugin, I have an event that is triggered every minute.

public class TimerHandler extends Observable implements Runnable{

    @Override
    public void run() {
        this.setChanged();
        this.notifyObservers();
    }
}

So this triggers every minute, and then to add events to the timer queue you just subscribe to the observable meaning that the subscribed calls are triggered every minute.

public class PlotTimer implements Observer {

    @Override
    public void update(Observable o, Object arg) {
        ......

to subscribe i call the following

getServer().getScheduler().scheduleAsyncRepeatingTask(this,timerHandler,1200,1200);
timerHandler.addObserver(new PayDayTimer());
timerHandler.addObserver(new ProfileTimer());
timerHandler.addObserver(new PlotTimer());
Theresa Forster
  • 1,914
  • 3
  • 19
  • 35
  • 1
    The question is not taking about `java.util.Observable` but RxJava. Both mine and the accepted answer detail how RxJava uses daemon threads and how to prevent the early shutdown. – sfiss Jul 02 '19 at 07:38
  • @Theresa AFAIK the recommended way is to use tags for specifying the tech instead of mentioning in the title. – Krishnaraj Jul 02 '19 at 13:06