Questions tagged [reactive-streams]

a standard for asynchronous stream processing with non-blocking back pressure on the JVM

Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure on the JVM.

http://www.reactive-streams.org/

358 questions
77
votes
5 answers

Mono vs Flux in Reactive Stream

As per the documentation: Flux is a stream which can emit 0..N elements: Flux fl = Flux.just("a", "b", "c"); Mono is a stream of 0..1 elements: Mono mn = Mono.just("hello"); And as both are the implementations of the Publisher…
KayV
  • 12,987
  • 11
  • 98
  • 148
50
votes
6 answers

How to correctly read Flux and convert it to a single inputStream

I'm using WebClient and custom BodyExtractorclass for my spring-boot application WebClient webLCient = WebClient.create(); webClient.get() .uri(url, params) .accept(MediaType.APPLICATION.XML) .exchange() .flatMap(response -> { …
Bk Santiago
  • 1,523
  • 3
  • 13
  • 24
32
votes
1 answer

In project reactor or akka streams, what is the conceptual difference between sink and subscriber?

The concepts of sink and subscriber seem similar to me. Also, I don't see the concept of sink being explicitly defined in the reactive streams spec.
Jatin
  • 667
  • 8
  • 16
31
votes
3 answers

publishOn vs subscribeOn in Project Reactor 3

I am using publishOn vs subscribeOn both on the same flux as follows: System.out.println("*********Calling Concurrency************"); List elements = new ArrayList<>(); Flux.just(1, 2, 3, 4) .map(i -> i * 2) .log() …
26
votes
1 answer

ParallelFlux vs flatMap() for a Blocking I/O task

I have a Project Reactor chain which includes a blocking task (a network call, we need to wait for response). I'd like to run multiple blocking tasks concurrently. It seems like either ParallelFlux or flatMap() could be used, bare-bone…
Corin Fletcher
  • 1,611
  • 1
  • 17
  • 25
25
votes
4 answers

Flutter BLoC pattern - How can I navigate to another screen after a stream event?

My question is about navigation used with the BLoC pattern. In my LoginScreen widget I have a button that adds an event into the EventSink of the bloc. The bloc calls the API and authenticates the user. Where in the LoginScreen Widget do I have to…
Sebastian
  • 3,666
  • 2
  • 19
  • 32
24
votes
1 answer

Using reactor's Flux.buffer to batch work only works for single item

I'm trying to use Flux.buffer() to batch up loads from a database. The use case is that loading records from a DB may be 'bursty', and I'd like to introduce a small buffer to group together loads where possible. My conceptual approach has been to…
Marty Pitt
  • 28,822
  • 36
  • 122
  • 195
22
votes
1 answer

Akka Streams: What does Mat represents in Source[out, Mat]

In Akka streams what does Mat in Source[Out, Mat] or Sink[In, Mat] represent. When will it actually be used?
Somasundaram Sekar
  • 5,244
  • 6
  • 43
  • 85
21
votes
6 answers

How to handle errors in Spring reactor Mono or Flux?

I have below code retuning Mono: try { return userRepository.findById(id) // step 1 .flatMap(user -> barRepository.findByUserId( user.getId()) // step 2 .map(bar-> Foo.builder().msg("Already exists").build()) // step 3 …
ace
  • 11,526
  • 39
  • 113
  • 193
21
votes
1 answer

RxJava 2.0 - How to convert Observable to Publisher

How to convert Observable to Publisher in RxJava version 2? In the first version we have the https://github.com/ReactiveX/RxJavaReactiveStreams project that do exactly what I need. But How can I do it in RxJava 2?
20
votes
2 answers

compose() vs. transform() vs. as() vs. map() in Flux and Mono

Recently, I decided to try spring 5 with projectreactor.io (io.projectreactor:3.1.1). Does anyone know what the best case of using this functions? What cons and pros of using each of them and where they should be used? Good examples will be…
Andrew Sasha
  • 1,254
  • 1
  • 11
  • 21
16
votes
1 answer

Why is Sinks.many().multicast().onBackpressureBuffer() completing after one of the subscribers cancels the subscription and how to avoid it

I have come across a behaviour I don't understand when using Sinks.Many to notify some events to multiple subscribers: fun main() { val sink : Sinks.Many = Sinks.many().multicast().onBackpressureBuffer() val flux =…
codependent
  • 23,193
  • 31
  • 166
  • 308
16
votes
1 answer

Back Pressure on fetch() not working in Google Chrome

I am having trouble consuming the response from my WebFlux server via JavaScript's new Streams API. I can see via Curl (with the help of --limit-rate) that the server is slowing down as expected, but when I try to consume the body in Google Chrome…
16
votes
1 answer

What's the difference between Reactive and Reactive Streams?

I'm trying to understand the difference between Reactive and ReactiveStreams, specifically in the context of RxJava ? The most I could figure out was that Reactive Streams has some notion of backpressure in the specification but that already exists…
Setheron
  • 3,520
  • 3
  • 34
  • 52
15
votes
5 answers

How to send email reactive in spring web-flux

I'd like to stay complete reactive within my new spring application. Therefor I use web-flux/ reactor and ReactiveRepository with MongoDB. Do you know how to integrate java-mail reactively into the tech-stack? Any alternatives?
kalamar
  • 933
  • 1
  • 11
  • 27
1
2 3
23 24