3

I'm new to reactive programming using rxjava and after going through the simpler examples I'm now trying to figure out how to work with continuous streams. The problem I have with the example below is that the program doesn't terminate after I've taken the 3 elements. My assumption is that I somehow need to unsubscribe to my observable but I don't fully grasp how to terminate the while loop and make the program exit.

I've come across the following post RxJava -- Terminating Infinite Streams but I still can't figure out what I'm missing.

class MyTwitterDataProvider {
/*
This example is written in Groovy

Instance variables and constructor omitted
*/

public Observable<String> getTweets() {
    BufferedReader reader = new BufferedReader(new InputStreamReader(getTwitterStream()))

    Observable.create({ observer ->
        executor.execute(new Runnable() {
            def void run() {
                String newLine
                while ((newLine = reader.readLine()) != null) {
                    System.out.println("printing tweet: $newLine")
                    observer.onNext(newLine)
                }

                observer.onCompleted()
            }
        })
    })
}

def InputStream getTwitterStream() {
// code omitted
}

public static void main (String [] args) {
    MyTwitterDataProvider provider = new MyTwitterDataProvider()
    Observable<String> myTweetsObservable = provider.getTweets().take(3)

    Subscription myTweetSubscription = myTweetsObservable.subscribe({tweet-> println("client prints: $tweet")})
   // myTweetSubscription.unsubscribe()
}
}
Community
  • 1
  • 1
MrKY
  • 75
  • 1
  • 5

1 Answers1

2

You must add a check in your loop to see if the observer is still subscribed:

            while ((newLine = reader.readLine()) != null && !observer.isUnsubsribed()) {
                System.out.println("printing tweet: $newLine")
                observer.onNext(newLine)
            }
alexwen
  • 1,128
  • 7
  • 16