12

I am trying to understand the difference between asyncBoundary and mapAsync. From the glance, I guess they should be same. However, when I run the code, it looks like that the performance of asyncBoundary is quicker than mapAsync

Here is the code

implicit val system = ActorSystem("sourceDemo")
implicit val materializer = ActorMaterializer()


Source(1 to 100).mapAsync(100)(t => Future {t + 1}).mapAsync(100)(t => Future {t * 2}).map(println).to(Sink.ignore).run()
Source(1 to 100).map(_ + 1).withAttributes(Attributes.asyncBoundary).map(_ * 2).map(t => println("async boundary", t)).to(Sink.ignore).run()

The output: async boundary is always finished quicker than mayAsync.

From the document described about asyncBoundary (https://doc.akka.io/docs/akka-stream-and-http-experimental/current/scala/stream-flows-and-basics.html), I can see it is running on different CPU, but mapAsync is multi-threaded by using Future. Future is also asynchronous.

May I ask more clarification about this two APIs ?

Xiaohe Dong
  • 4,953
  • 6
  • 24
  • 53

1 Answers1

14

Async

As you correctly point out this forces the insertion of an asynchronous boundary between two stages. In your example

Source(1 to 100).map(_ + 1).withAttributes(Attributes.asyncBoundary).map(_ * 2).map(t => println("async boundary", t)).to(Sink.ignore).run()

this practically means that the + 1 operation and the * 2 operation will be run by separated actors. This enables pipelining, as whilst an element moves on to the * 2 stage, at the same time another element can be brought in for the + 1 stage. If you don't force an async boundary there, the same actor will sequentialise the operations and will perform the operations on one element, before requesting a new one from upstream.

By the way, your example can be rewritten in a shorter format, using the async combinator:

Source(1 to 100).map(_ + 1).async.map(_ * 2).map(t => println("async boundary", t)).to(Sink.ignore).run()

mapAsync

This is a stage to parallelise execution of asynchronous operations. The parallelism factor allows you to specify the maximum number of parallel actors to spin up to serve incoming elements. The results of the parallel computations are tracked and emitted in order by the mapAsync stage.

In your example

Source(1 to 100).mapAsync(100)(t => Future {t + 1}).mapAsync(100)(t => Future {t * 2}).map(println).to(Sink.ignore).run()

potentially up to 100 + 1 operations (i.e. all of them) could be run in parallel, and the results collected in order. Subsequently, up to 100 * 2 operations could be run in parallel, and again the results collected in order and emitted downstream.

In your example you are running CPU-bound, quick operations that don't justify using mapAsync, as most likely the infrastructure needed by this stage is much more expensive than the advantage of running 100 of these operations in parallel. mapAsync is particularly useful when dealing with IO-bound, slow operations, where parallelisation is quite convenient.

For a comprehensive read on this topic, check out this blogpost.

Stefano Bonetti
  • 8,973
  • 1
  • 25
  • 44
  • 2
    I'd also point out that using `.mapAsyncUnordered` when we don't care about order can improve performance. – Yuval Itzchakov Nov 15 '17 at 11:27
  • Hey Stefano, thanks for the reply. I really like the way you described about async boundary. `This enables pipelining, as whilst an element moves on to the * 2 stage, at the same time another element can be brought in for the + 1 stage.` However, when we are talking about mapAsync, I guess all elements in each stage is running in parallel. but are the two stages (+1 and * 2) still running the parallel or sequentially ? In another word, does stage 2 (* 2) will need to wait for stage 1 (+ 1) to finish ? – Xiaohe Dong Nov 15 '17 at 11:38
  • Also, as you know, it is very hard to use code to show the difference. I cant use `thread.sleep` which may be inaccurate. Can you give me some insight about the difference ? – Xiaohe Dong Nov 15 '17 at 11:38
  • 1
    >Also note that mapAsync also by definition introduces an asynchronous boundary| This is not true. MapAsync doesn't do that – maks Aug 14 '20 at 13:55
  • My mistake from a long time ago! Thanks for pointing that out @maks – Stefano Bonetti Aug 14 '20 at 15:12