19

Can anyone please explain me difference between map and mapAsync w.r.t AKKA stream? In the documentation it is said that

Stream transformations and side effects involving external non-stream based services can be performed with mapAsync or mapAsyncUnordered

Why cant we simply us map here? I assume that Flow, Source, Sink all would be Monadic in nature and thus map should work fine w.r.t the Delay in the nature of these ?

Ramón J Romero y Vigil
  • 17,373
  • 7
  • 77
  • 125
Anand
  • 601
  • 2
  • 7
  • 17

1 Answers1

49

Signature

The difference is best highlighted in the signatures: Flow.map takes in a function that returns a type T while Flow.mapAsync takes in a function that returns a type Future[T].

Practical Example

As an example, suppose that we have a function which queries a database for a user's full name based on a user id:

type UserID   = String
type FullName = String

val databaseLookup : UserID => FullName = ???  //implementation unimportant

Given an akka stream Source of UserID values we could use Flow.map within a stream to query the database and print the full names to the console:

val userIDSource : Source[UserID, _] = ???

val stream = 
  userIDSource.via(Flow[UserID].map(databaseLookup))
              .to(Sink.foreach[FullName](println))
              .run()

One limitation of this approach is that this stream will only make 1 db query at a time. This serial querying will be a "bottleneck" and likely prevent maximum throughput in our stream.

We could try to improve performance through concurrent queries using a Future:

def concurrentDBLookup(userID : UserID) : Future[FullName] = 
  Future { databaseLookup(userID) }

val concurrentStream = 
  userIDSource.via(Flow[UserID].map(concurrentDBLookup))
              .to(Sink.foreach[Future[FullName]](_ foreach println))
              .run()

The problem with this simplistic addendum is that we have effectively eliminated backpressure.

The Sink is just pulling in the Future and adding a foreach println, which is relatively fast compared to database queries. The stream will continuously propagate demand to the Source and spawn off more Futures inside of the Flow.map. Therefore, there is no limit to the number of databaseLookup running concurrently. Unfettered parallel querying could eventually overload the database.

Flow.mapAsync to the rescue; we can have concurrent db access while at the same time capping the number of simultaneous lookups:

val maxLookupCount = 10

val maxLookupConcurrentStream = 
  userIDSource.via(Flow[UserID].mapAsync(maxLookupCount)(concurrentDBLookup))
              .to(Sink.foreach[FullName](println))
              .run()

Also notice that the Sink.foreach got simpler, it no longer takes in a Future[FullName] but just a FullName instead.

Unordered Async Map

If maintaining a sequential ordering of the UserIDs to FullNames is unnecessary then you can use Flow.mapAsyncUnordered. For example: you just need to print all of the names to the console but didn't care about order they were printed.

Ramón J Romero y Vigil
  • 17,373
  • 7
  • 77
  • 125
  • 4
    Is `mapAsync` similar to applying async boundary to that specific stage?According to the documentation, marking async boundary will run each stage in an actor, just wondering if its the same. – druuu Aug 14 '17 at 13:44
  • Using `"com.typesafe.akka" %% "akka-stream" % "2.6.3"`, tried this example, got compilation error `type mismatch; found : akka.stream.scaladsl.Flow[Boolean,Boolean,akka.NotUsed] required: akka.stream.Graph[akka.stream.FlowShape[String,?],?]`. I would appreciate it if someone could update this example for the latest version of Akka Streams – radumanolescu Mar 20 '20 at 22:39
  • @RamonJ would you please explain on which thread the mapAsync will be executed?Will it use the threads from the same dispatchers as used by graph(for which akka streams creates an actor, unless you create async boundary)? – beinghuman Mar 20 '20 at 23:03
  • @beinghuman Given that `mapAsync` does not accept an `ExecutionContext`, I would assume that yes it is using the same dispatcher as the graph itself. However, I have not read the code to confirm this to be the case. – Ramón J Romero y Vigil Mar 21 '20 at 11:59
  • @radumanolescu No part of the above answer utilizes `Boolean`, therefore your error message doesn't seem to utilize "this example". Feel free to ask a stackoverflow question, and I'll try to take a look... – Ramón J Romero y Vigil Mar 22 '20 at 12:37
  • Hmm, correct me if I am wrong but in scala Future cannot be created without execution context, thus I don't really understand how the mapAsync parallelism work? I mean the amount of threads will depends on the Future execution context anyway – Skeeve Aug 22 '23 at 13:58
  • @Skeeve You are correct: `mapAsync` requires an implicit `ExecutionContext`. I left that detail out for brevity since the thrust of the question was regarding the two different stream methods and not Future execution mechanisms. – Ramón J Romero y Vigil Aug 22 '23 at 14:07
  • mapAsync doesn't require ExecutionContext as of version 2.6.19: ``` def mapAsync[T](parallelism: Int)(f: Out => Future[T]): Repr[T] = ``` that's the most interesting part. But it accepts a function that should return a future and a future will have an execution context, that's what weird – Skeeve Aug 23 '23 at 09:54