2

I am still grasping Akka stream concepts and trying to understand how to map them to scenarios when we have a collection of items that need to be processed in atomic way. Let's say we have a purchase order that consists of multiple items and we need to apply some processing to each item and then merge them back in a single value. Should such workflow become its own separate stream (or substream) which is closed once the purchase order is fully processed? I.e. each purchase order starts a new stream? Or I have a stream of purchase orders that never ends? But if so, won't I have a problem of mixing purchase orders from different orders?

In other words, what I am trying to achieve is processing isolation of different workflows and wonder if Akka streams provide a good match for it.

Vagif Abilov
  • 9,835
  • 8
  • 55
  • 100

1 Answers1

2

Answering your question directly: it is possible to create a stream that will "apply some processing to each item and then merge them back in a single value".

Developing your example with some sample code:

case class Item(itemId : String)

case class PurchaseOrder(orderId : String, items : Seq[Item])

val purchaseOrder : PurschaseOrder = ???

If we wanted to process the items with a stream we could, although the exact nature of the reduction was ambiguous in the question so I will not define how the fold is achieved:

type ProcessOutput = ???

def processItem(item : Item) : ProcessOutput = ???

val combinedResult : Future[CombinedResult] = 
  Source.fromIterator( purchaseOrder.items.toIterator )
        .via(Flow[Item] map processItem)
        .to(Sink.fold[ProcessOutput](???)(???) )
        .run()

Answering your question indirectly,

Consider Futures First

Akka streams are very useful when back pressure is necessary. Back pressure is common when you connect to an external source of data because bp allows your application to determine how fast the data is streamed to you since you are responsible for continuously signaling demand for more data.

In the case you presented in the question there is no need to broadcast demand, and incur the inherent overhead that such communication requires. You already have a collection of items so there's no one to send demand to...

Instead, I think Futures are the best way to go for the case you described:

def futProcess(item : Item)(implicit ec : ExecutionContext) = 
  Future { processItem(item) } 

// same output type as the stream run 
val combinedResults : Future[CombinedResult] = 
  Future.sequence{ purchaseOrder.items map futProcess }
        .map{ _ fold[ProcessOutput](???)(???) }

You will get better performance, less complexity from having a full ActorSystem around, and the exact same result as a stream anyway...

Community
  • 1
  • 1
Ramón J Romero y Vigil
  • 17,373
  • 7
  • 77
  • 125
  • Thank you for the detailed answer. I also understand that in some scenarios streams can be suboptimal as you pointed out. But I see great potentials in streams in general and would like to check them out. – Vagif Abilov Nov 13 '16 at 11:04