Questions tagged [rx-scala]

RxScala – Reactive Extensions for Scala

RxScala is a library for composing asynchronous and event-based programs using observable sequences.

RxScala is open sourced and available under the Apache License, Version 2.0

91 questions
9
votes
3 answers

scala observable unify observable with a sequence without intermediate datastructure update

I have a code which calls couchbase for getting some rows as following: val gotValues: Observable[JsonDocument] = Observable.from(rowKeys).flatMap(id => couchbaseBucket.async().get(id)) If I have 1,2,3,4,5,6 as input row-keys and only rows…
Jas
  • 14,493
  • 27
  • 97
  • 148
5
votes
1 answer

Observable from Futures - onNext from multiple threads

I want to generate an Observable in real time from the results of a list of Futures. In the simplest case, suppose I have a list of futures I'm running with Future.sequence, and I'm simply monitoring their progress with an Observable that tells me…
thund
  • 1,842
  • 2
  • 21
  • 31
5
votes
1 answer

How to make AndroidScheduler.mainThread work with a Scala Observable?

I have an Observable in my oncreate and I want to get the results on my main thread. But since The AndroidScheduler.mainThread works with just Java Observable, I am unable to compile it. Here's the code val L=List(1,2,3) val…
user2730066
  • 133
  • 5
4
votes
1 answer

How to implement a single use ordered ReplaySubject?

How can I have a single-subscriber ReplaySubject that: buffers all events received with onNext() until someone subscribes to it, once someone subscribes to it, all buffered events are forwarded to the subscriber, erasing them from the ReplaySubject…
Eduardo Bezerra
  • 1,923
  • 1
  • 17
  • 32
4
votes
1 answer

Converting RxJava to RxScala

Is there a way to use both RxJava and RxScala in one project? import rx.lang.scala.{Observable => ScalaObservable} import rx.{Observable => JavaObservable} We have a module written in Java that is using the JavaObservable (RxJava). And then we…
mana
  • 6,347
  • 6
  • 50
  • 70
4
votes
3 answers

How to execute map, filter, flatMap using multiple threads in RxScala/Java?

How to run filter, map and flatMap on Observable using multiple threads: def withDelay[T](delay: Duration)(t: => T) = { Thread.sleep(delay.toMillis) t } Observable .interval(500 millisecond) .filter(x => { withDelay(1…
4
votes
0 answers

Observable vs Future Performance

I am working with Vert.x 2.x (http://vertx.io) which makes extensive use of asynchronous callbacks. These quickly become unwieldy with typical nesting/callback hell issues. I have considered both Scala Futures/Promises (which I think would be the…
mixja
  • 6,977
  • 3
  • 32
  • 34
3
votes
1 answer

How to preserve the order of items emitted by two observables after they are merged?

I have run into a behavior of Scala Observables that has surprised me. Consider my example below: object ObservablesDemo extends App { val oFast = Observable.interval(3.seconds).map(n => s"[FAST] ${n*3}") val oSlow =…
user24139
  • 43
  • 2
3
votes
1 answer

RxScala doOnCompleted not firing after call to .take()

I am new to RxScala Observables and am experiencing strange behaviour when using a combination of take(n) and doOnCompleted(). Below I have an example of a test where I believe the first subscribe is correct (with take(2) at the start) and outputs…
user499882
  • 85
  • 4
3
votes
1 answer

Pausable BehaviorSubject?

Is it possible to have a kind of BehaviorSubject with pause and resume switches? Something like PausableBehaviorSubject.pause() and PausableBehaviorSubject.resume()? How could that be done? The idea is that, when paused, the subject would not…
Eduardo Bezerra
  • 1,923
  • 1
  • 17
  • 32
3
votes
2 answers

Tracing [rx] Observable exceptions in large graphs to source code

When you have a large Observable graph (i.e. observable composed many times using merge, groupBy, join etc.), and an exception is thrown, it is sometimes difficult to figure out where the exception originated from. I would like to know if it is…
Luciano
  • 2,388
  • 1
  • 22
  • 33
3
votes
1 answer

In Rx (or RxJava/RxScala), how to make an auto-resetting stateful latch map/filter for measuring in-stream elapsed time to touch a barrier?

Apologies if the question is poorly phrased, I'll do my best. If I have a sequence of values with times as an Observable[(U,T)] where U is a value and T is a time-like type (or anything difference-able I suppose), how could I write an operator…
experquisite
  • 879
  • 5
  • 14
3
votes
1 answer

Reactive Programming using RxScala

I have a Observable that connects to a service via a Socket protocol. The connection to the socket happens through a client library. The client library that I use has java.util.Observer with with I can register for events being pushed into it final…
joesan
  • 13,963
  • 27
  • 95
  • 232
3
votes
1 answer

How do I map an Observable with a function from Future to Future?

Suppose I have an event stream of elements with type In: val observableIn: Observable[In] = ??? And a function for transforming objects of type In into objects of type Out, but "in the future": val futureInToFutureOut: (Future[In]) => Future[Out] =…
john sullivan
  • 1,898
  • 2
  • 13
  • 21
2
votes
1 answer

How to change rxjava/rxscala operator (.debounce) parameters after initialisation

I'm using the .debounce operator in rxjava/rxscala to capture some events that take place within a given time period of each other even, but would like to make the given time period controllable. The time period .debounce uses is given as arguments.…
1
2 3 4 5 6 7