Questions tagged [rx-py]

Reactive Extensions for Python

RxPY is a library for composing asynchronous and event-based programs using observable collections and LINQ-style query operators in Python.

RxPY is open sourced and available under the Apache License, Version 2.0

96 questions
14
votes
5 answers

How to timeout a long running program using rxpython?

Say I have a long running python function that looks something like this? import random import time from rx import Observable def intns(x): y = random.randint(5,10) print(y) print('begin') time.sleep(y) print('end') return…
syllogismos
  • 600
  • 2
  • 15
  • 39
10
votes
1 answer

Python web service subscribed to reactive source produces strange behavior in object

I have implemented a web service using Falcon. This service stores a state machine (pytransitions) that is passed to service's resources in the constructor. The service is runs with gunicorn. The web service launches a process on start using RxPy.…
Jav_Rock
  • 22,059
  • 20
  • 123
  • 164
9
votes
1 answer

Array destructuring in Python

I am hoping to make vals in the last line be more clear. import rx from rx import operators as op light_stream = rx.range(1, 10).pipe( op.with_latest_from(irradiance_stream), op.map(lambda vals: print(vals[0], vals[1]))) #…
Hongbo Miao
  • 45,290
  • 60
  • 174
  • 267
7
votes
1 answer

RxPy: Sort hot observable between (slow) scan executions

TL;DR I'm looking for help to implement the marble diagram below. The intention is to sort the non-sorted values to the extent possible without waiting time between scan executions. I'm not asking for a full implementation. Any guidance will be…
raul.vila
  • 1,984
  • 1
  • 11
  • 24
7
votes
3 answers

How to wait for RxPy parallel threads to complete

Based on this excellent SO answer I can get multiple tasks working in parallel in RxPy, my problem is how do you wait for them to all complete? I know using threading I can do .join() but there doesn't seem to be any such option with Rx Schedulers.…
phdesign
  • 2,019
  • 21
  • 18
6
votes
1 answer

How to call an async coroutine periodically using an RxPY interval observable?

I need to create an Observable stream which emits the result of a async coroutine at regular intervals. intervalRead is a function which returns an Observable, and takes as parameters the interval rate and an async coroutine function fun, which…
tomasdussa
  • 63
  • 1
  • 3
6
votes
1 answer

How can I notify RxPY observers on separate threads using asyncio?

(Note: The background for this problem is pretty verbose, but there's an SSCCE at the bottom that can be skipped to) Background I'm trying to develop a Python-based CLI to interact with a web service. In my codebase I have a CommunicationService…
Tagc
  • 8,736
  • 7
  • 61
  • 114
6
votes
1 answer

How to prevent combine_latest firing multiple times when multiple inputs tick simultaneously?

I'm using Reactive Extensions' combine_latest to perform an action whenever any inputs tick. The problem is if multiple inputs tick at the same time then combine_latest fires multiple times after each individual input ticks. This causes a headache…
mchen
  • 9,808
  • 17
  • 72
  • 125
5
votes
1 answer

rxpy composing observables efficiently

Intro: Hello. I am exploring the python rxpy library for my use case - where I am building an execution pipeline using the reactive programming concepts. This way I expect I would not have to manipulate too many states. Though my solution seems to…
Shriram V
  • 1,500
  • 2
  • 11
  • 20
5
votes
2 answers

in ReactiveX, how do I pass other parameters to Observer.create?

Using RxPY for illustration purposes. I want to create an observable from a function, but that function must take parameters. This particular example must return, at random intervals, one of many pre-defined tickers which I want to send to it. My…
Thomas Browne
  • 23,824
  • 32
  • 78
  • 121
5
votes
1 answer

subscribe_on with from_iterable/range in RxPY

I'm trying to get my head around scheduling in reactive extensions for python. I would like to use subscribe_on to process multiple observables in parallel. This works fine if the observable is created with just, but not if for example range or…
Philipp Melab
  • 409
  • 1
  • 4
  • 14
5
votes
1 answer

is not a Python function

I'm trying to build a function which I can use as a handler for an RxPy stream that I'm mapping over. The function I have needs access to a variable outside the scope where that variable is defined which, to me, means that I need to use a closure of…
Justin Valentini
  • 426
  • 3
  • 14
5
votes
2 answers

How can I make an rx.py Observable from a stream such as stdin?

I'm trying to get my head around the rxpy library for functional reactive programming (FRP) and I've already hit a roadblock. I'm writing a small program that expects data to be streamed in via standard input (sys.stdin). My question is therefore…
Louis Thibault
  • 20,240
  • 25
  • 83
  • 152
4
votes
2 answers

Rx: a zip-like operator that continues after one of the streams ended?

I'm looking to combine streams (observables) that start and end asynchronously: -1----1----1----1---|-> -2----2--|-> [ optional_zip(sum) ] -1----3----3----1---|-> What I need it for: Adding audio streams together. They're streams of audio…
uryga
  • 402
  • 4
  • 14
4
votes
0 answers

Why does RxPY's TwistedScheduler raise AlreadyCalled error?

Minimal working example -- a client that sends 'Hello world' to echo server word-by-word using an RxPY (v.1.2.4) Observable. Client: from rx.concurrency.mainloopscheduler import TwistedScheduler from rx.observable import Observable from…
mchen
  • 9,808
  • 17
  • 72
  • 125
1
2 3 4 5 6 7