1

I'm going to be using rxandroid in an android app. I'm trying to model the behavior right now in rxpy because it was the easiest for me to set up and play with. In the example below, source3 is emitting the correct data; which is a concatenation of an initialization that takes some time and a permanent subscription which I have just faked out. I want the BehaviorSubject because I need the last value immediately for field initialization.

I cannot figure out how to chain the BehaviorSubject on top of source3 so that it emits source 3 while remembering the last value. I have searched the internet for two days and not found a clear direction on this use case. Here is my code, and the question is why I don't get any emissions from the observer.

from rx import Observable, Observer
from rx.subjects import BehaviorSubject
import time, random

def fake_initialization(observer):
    time.sleep(5)  # It takes some time
    observer.on_next("Alpha")
    observer.on_completed()

def fake_subscription(observer):
    iter = 0 # Subscription emits forever
    while True:
        observer.on_next("message %02d"%(iter))
        time.sleep(random.randrange(2,5))
        iter += 1

class PrintObserver(Observer):

    def on_next(self, value):
        print("Received {0}".format(value))
        #bsubject.on_next(value)

    def on_completed(self):
        print("Done!")

    def on_error(self, error):
        print("Error Occurred: {0}".format(error))

source1 = Observable.create(fake_initialization)
source2 = Observable.create(fake_subscription)
source3 = source1 + source2

bsubject = BehaviorSubject(False)
source4 = source3.multicast(bsubject)
source4.connect()
source4.subscribe(PrintObserver())
malibu
  • 11
  • 1

1 Answers1

0

This was actually a fairly easy answer for someone. I'm posting this in case anyone else ends up in this situation. Admittedly, I didn't read the rxpy page closely enough. You need to add concurrency on your own, presumably because there are so many concurrent solutions in Python. Here is the final working code:

import random
import time

import multiprocessing
from rx import Observable,Observer
from rx.concurrency import ThreadPoolScheduler
from rx.subjects import Subject

class PrintObserver1(Observer):

    def on_next(self, value):
        print("Received 1 {0}".format(value))
        #bsubject.on_next(value)

    def on_completed(self):
        print("Done 1!")

    def on_error(self, error):
        print("Error Occurred: 1 {0}".format(error))

class PrintObserver2(Observer):

    def on_next(self, value):
        print("Received 2 {0}".format(value))
        #bsubject.on_next(value)

    def on_completed(self):
        print("Done 2!")

    def on_error(self, error):
        print("Error Occurred: 2 {0}".format(error))

def fake_initialization(observer):
    time.sleep(5)  # It takes some time
    observer.on_next("Alpha")
    observer.on_completed()

def fake_subscription(observer):
    iter = 0 # Subscription emits forever
    while True:
        observer.on_next("message %02d"%(iter))
        time.sleep(random.randrange(2,5))
        iter += 1

optimal_thread_count = multiprocessing.cpu_count()
pool_scheduler = ThreadPoolScheduler(optimal_thread_count)

source1 = Observable.create(fake_initialization).subscribe_on(pool_scheduler)
source2 = Observable.create(fake_subscription).subscribe_on(pool_scheduler)
catted_source = source1 + source2

native_source = Observable.interval(1000)
print native_source,catted_source
#source = source3
subject = Subject()
# native_source = works
# catted_source = not works
subSource = catted_source.subscribe(subject)
#####

subSubject1 = subject.subscribe(PrintObserver1())
subSubject2 = subject.subscribe(PrintObserver2())
time.sleep(30)
subject.on_completed()
subSubject1.dispose()
subSubject2.dispose()

Also note that you have to install the 'futures' package for concurrency to work on Python 2.7.

If you get this error:

from concurrent.futures import ThreadPoolExecutor 
ImportError: No module named concurrent.futures

Read this (link is for slightly different error but solution works):

ImportError: No module named concurrent.futures.process

malibu
  • 11
  • 1