3

I have a Observable that connects to a service via a Socket protocol. The connection to the socket happens through a client library. The client library that I use has java.util.Observer with with I can register for events being pushed into it

final class MyObservable extends Observable[MyEvent] {

  def subscribe(subscriber: Subscriber[MyEvent]) = {
    // connect to the Socket (Step: 1)
    // get the responses that are pushed (Step: 2)
    // transform them into MyEvent type (Step: 3)
  }
}

I have two open questions that I do not understand.

How can I get the result of Step: 3 in my Subscriber?

Every time when I get a MyEvent, with a subscriber like below, I see that there is a new connection being created. Eventually Step 1, Step 2 and Step 3 are run for each incoming event.

val myObservable = new MyObservale()
myObservable.subscribe()
paulpdaniels
  • 18,395
  • 2
  • 51
  • 55
joesan
  • 13,963
  • 27
  • 95
  • 232
  • Where is your `Subscriber`? Could you use `myObservable.subscribe(mySubscriber)`? – zsxwing Jul 26 '15 at 14:44
  • But how to push the result of Step: 3 in the subscribe method of MyObservable class to the outside world? – joesan Jul 27 '15 at 13:42
  • Is there a RxScala library that is worthy to consider? How is the NetFlix's RxScala? Has it got back pressure enabling? – joesan Jul 27 '15 at 14:32
  • Aren't you already using RxScala? – paulpdaniels Jul 27 '15 at 15:47
  • I'm using the monifu library which is an implementation inspired by the Rx.NET. The monifu library has got a lot of goodies which at the moment for me is a bit hard to grasp! – joesan Jul 27 '15 at 15:54
  • RxScala supports back pressure. It's a wrapper of RxJava. – zsxwing Jul 27 '15 at 16:10
  • Is there an example of using RxScala that deals with my situation above? I mean all I want to do is to get the result of Step: 3 into MyObservable! and this should happen when somebody subscribes to MyObservable! – joesan Jul 27 '15 at 16:13

1 Answers1

2

Unless I'm misunderstanding your question, you just call onNext:

def subscribe(subscriber: Subscriber[MyEvent]) = {
  // connect to the Socket (Step: 1)
  // get the responses that are pushed (Step: 2)
  // transform them into MyEvent type (Step: 3)

  // finally notify the subscriber:
  subscriber.onNext(myEventFromStep3)
}

and code that subscribes would do something like:

myObservable.subscribe(onNext = println(_))
Brandon
  • 38,310
  • 8
  • 82
  • 87