Monix is a library providing asynchronous programming facilities for Scala and Scala.js.
Questions tagged [monix]
94 questions
10
votes
1 answer
What is lost after converting Monix `Task` to Cats `IO`?
This simplified case is where my question happen at...
object Main extends IOApp{
def run(args:Seq[String]): IO[ExitCode]={
Task{...}
.to[IO]
.as(ExitCode.Success)
}
}
Another option is Await.result(task), however which sounds not…

WeiChing 林煒清
- 4,452
- 3
- 30
- 65
7
votes
0 answers
Which operations are easier to implement in pull vs push models in streaming libraries (and vice versa)?
Author of Monix says in comparison of Monix with FS2
Where FS2 is better:
the model of communication between producers and consumers is pull-based, sometimes making it easier to implement new operators
Where Monix is better:
the model of…

visa
- 300
- 2
- 10
6
votes
2 answers
What's the best way to wrap a monix Task with a start time and end time print the difference?
This is what I'm trying right now but it only prints "hey" and not metrics.
I don't want to add metric related stuff in the main function.
import java.util.Date
import monix.eval.Task
import monix.execution.Scheduler.Implicits.global
import…

N A
- 831
- 2
- 8
- 28
5
votes
2 answers
Efficient way to create abstract collection with cats
I have some code that uses Monix Observable for stream processing of a file. To test this code, I'd like for the operations I do on the Observable to be type independent so I can also perform them on any other data structure like List. That why I…

Martijn
- 2,268
- 3
- 25
- 51
5
votes
1 answer
How can I consume paginated resource using Monix in Scala?
I have a paginated resource and I want to consume it recursively with Monix. I want to have an Observable that is going to emit downloaded elements and recursively consume pages. Here is a simple example. It doesn't work of course. It emits first…

Artem Malinko
- 1,761
- 1
- 22
- 39
4
votes
0 answers
Creating a Monix Observable from a NuProcess ByteBuffer
So I have a current project that uses Java Process and I am trying to replace it with NuProcess (i.e. https://github.com/brettwooldridge/NuProcess). For dealing with Java Process's STDOUT/STDERROR you have had an InputStream and since Monix provided…

mdedetrich
- 1,899
- 1
- 18
- 29
4
votes
2 answers
Splitting a Monix Observable
I would like to write a split function for monix.reactive.Observable. It should split a source Observable[A] into a new pair (Observable[A], Observable[A]), based on the value of a predicate, evaluated against each element in the source. I would…

Jeremy Townson
- 103
- 7
4
votes
1 answer
how does Monix use back-pressure with flatMap operator?
Monix use Ack to sync emitted messages, but if I use groupBy and flatMap, the inner Observable not follow the back-pressure out of the source.
See this test code please:
import java.util.concurrent.TimeUnit
import…

LoranceChen
- 2,453
- 2
- 22
- 48
4
votes
1 answer
Model multiple function calls with a stream (in a safe, FP way)
Given a function A => IO[B] (aka Kleisli[IO, A, B]) that is meant to be called multiple times, and has side effects, like updating a DB, how to delegate such multiple calls of it into a stream (I guess Pipe[IO, A, B]) (fs2, monix…

V-Lamp
- 1,630
- 10
- 18
4
votes
1 answer
How to read response as Observable[String] with sttp
I'm using sttp client. I want to intepret response as strings divided by lines, eg Observable[String]
Here sttp streaming api:
import java.nio.ByteBuffer
import com.softwaremill.sttp._
import…

zella
- 4,645
- 6
- 35
- 60
4
votes
1 answer
Monix Observable groupBy large number of keys without memory leaks
I try to perform splitting single Observable in Monix by key, then group up to last n events in every GrouppedObservable and send them for further processing. The problem is that number of keys to group on is possibly infinite and that causes memory…

nologin
- 41
- 1
4
votes
2 answers
Monix : InputStreamObservable does not support multiple subscribers
I'm trying to split an Observable of (String, Date) into two different Observables and zip them together as follows
import monix.execution.Scheduler.Implicits.global
val x = Observable.fromIterator((0 to 10).map(i => (s"a $i", s"b…

N A
- 831
- 2
- 8
- 28
3
votes
1 answer
Is there a simple way to convert Option[Task[T]] to Task[Option[T]]?
While using monix.eval.Task or zio.Task, is there a simple way to convert Option of Task to Task of Option?

Karthik P
- 481
- 5
- 9
3
votes
1 answer
How can I send HTTP Requests asynchronously while handling rate-limits?
Disclaimer: I am new to sttp and Monix, and that is my attempt to learn more about these libraries. My goal is to fetch data (client-side) from a given API via HTTP GET requests -> parse JSON responses -> write this information to a database. My…

alt-f4
- 2,112
- 17
- 49
3
votes
1 answer
stop all async Task when they fails over threshold?
I'm using Monix Task for async control.
scenario
tasks are executed in parallel
if failure occurs over X times
stop all tasks that are not yet in complete status (as quick as better)
my solution
I come up the ideas that race between 1. result and…

WeiChing 林煒清
- 4,452
- 3
- 30
- 65