Questions tagged [monix]

Monix is a library providing asynchronous programming facilities for Scala and Scala.js.

94 questions
10
votes
1 answer

What is lost after converting Monix `Task` to Cats `IO`?

This simplified case is where my question happen at... object Main extends IOApp{ def run(args:Seq[String]): IO[ExitCode]={ Task{...} .to[IO] .as(ExitCode.Success) } } Another option is Await.result(task), however which sounds not…
7
votes
0 answers

Which operations are easier to implement in pull vs push models in streaming libraries (and vice versa)?

Author of Monix says in comparison of Monix with FS2 Where FS2 is better: the model of communication between producers and consumers is pull-based, sometimes making it easier to implement new operators Where Monix is better: the model of…
visa
  • 300
  • 2
  • 10
6
votes
2 answers

What's the best way to wrap a monix Task with a start time and end time print the difference?

This is what I'm trying right now but it only prints "hey" and not metrics. I don't want to add metric related stuff in the main function. import java.util.Date import monix.eval.Task import monix.execution.Scheduler.Implicits.global import…
N A
  • 831
  • 2
  • 8
  • 28
5
votes
2 answers

Efficient way to create abstract collection with cats

I have some code that uses Monix Observable for stream processing of a file. To test this code, I'd like for the operations I do on the Observable to be type independent so I can also perform them on any other data structure like List. That why I…
Martijn
  • 2,268
  • 3
  • 25
  • 51
5
votes
1 answer

How can I consume paginated resource using Monix in Scala?

I have a paginated resource and I want to consume it recursively with Monix. I want to have an Observable that is going to emit downloaded elements and recursively consume pages. Here is a simple example. It doesn't work of course. It emits first…
Artem Malinko
  • 1,761
  • 1
  • 22
  • 39
4
votes
0 answers

Creating a Monix Observable from a NuProcess ByteBuffer

So I have a current project that uses Java Process and I am trying to replace it with NuProcess (i.e. https://github.com/brettwooldridge/NuProcess). For dealing with Java Process's STDOUT/STDERROR you have had an InputStream and since Monix provided…
mdedetrich
  • 1,899
  • 1
  • 18
  • 29
4
votes
2 answers

Splitting a Monix Observable

I would like to write a split function for monix.reactive.Observable. It should split a source Observable[A] into a new pair (Observable[A], Observable[A]), based on the value of a predicate, evaluated against each element in the source. I would…
4
votes
1 answer

how does Monix use back-pressure with flatMap operator?

Monix use Ack to sync emitted messages, but if I use groupBy and flatMap, the inner Observable not follow the back-pressure out of the source. See this test code please: import java.util.concurrent.TimeUnit import…
LoranceChen
  • 2,453
  • 2
  • 22
  • 48
4
votes
1 answer

Model multiple function calls with a stream (in a safe, FP way)

Given a function A => IO[B] (aka Kleisli[IO, A, B]) that is meant to be called multiple times, and has side effects, like updating a DB, how to delegate such multiple calls of it into a stream (I guess Pipe[IO, A, B]) (fs2, monix…
V-Lamp
  • 1,630
  • 10
  • 18
4
votes
1 answer

How to read response as Observable[String] with sttp

I'm using sttp client. I want to intepret response as strings divided by lines, eg Observable[String] Here sttp streaming api: import java.nio.ByteBuffer import com.softwaremill.sttp._ import…
zella
  • 4,645
  • 6
  • 35
  • 60
4
votes
1 answer

Monix Observable groupBy large number of keys without memory leaks

I try to perform splitting single Observable in Monix by key, then group up to last n events in every GrouppedObservable and send them for further processing. The problem is that number of keys to group on is possibly infinite and that causes memory…
nologin
  • 41
  • 1
4
votes
2 answers

Monix : InputStreamObservable does not support multiple subscribers

I'm trying to split an Observable of (String, Date) into two different Observables and zip them together as follows import monix.execution.Scheduler.Implicits.global val x = Observable.fromIterator((0 to 10).map(i => (s"a $i", s"b…
N A
  • 831
  • 2
  • 8
  • 28
3
votes
1 answer

Is there a simple way to convert Option[Task[T]] to Task[Option[T]]?

While using monix.eval.Task or zio.Task, is there a simple way to convert Option of Task to Task of Option?
3
votes
1 answer

How can I send HTTP Requests asynchronously while handling rate-limits?

Disclaimer: I am new to sttp and Monix, and that is my attempt to learn more about these libraries. My goal is to fetch data (client-side) from a given API via HTTP GET requests -> parse JSON responses -> write this information to a database. My…
alt-f4
  • 2,112
  • 17
  • 49
3
votes
1 answer

stop all async Task when they fails over threshold?

I'm using Monix Task for async control. scenario tasks are executed in parallel if failure occurs over X times stop all tasks that are not yet in complete status (as quick as better) my solution I come up the ideas that race between 1. result and…
WeiChing 林煒清
  • 4,452
  • 3
  • 30
  • 65
1
2 3 4 5 6 7