1

I want implement Diff transformation stream with such logic:

I have some Source[JsValue] and some database with id->JsValue data. I want to get Diff from JsValue in Source and db, then pull generated Diff further and store JsValue from source to db.

I think about Akka-persistence as store implementation, but I need only current state, so it can be any key-value db.

As I am new to akka-stream, I can't understand, what the best way is for implementing this idea.

srj
  • 9,591
  • 2
  • 23
  • 27
andrey.ladniy
  • 1,664
  • 1
  • 11
  • 27

1 Answers1

1

Assuming your JsValue objects have an "id" field, you could write a querying Flow that would take in the original JsValue and produce a tuple of the original and database version:

def dbQuery(id : String) : JsValue = ???

val queryFlow : Flow[JsValue, (JsValue,JsValue), _] = 
  Flow[JsValue] map { originalJs =>
    originalJs -> dbQuery((originalJs \ "id").as[String])
  }

These tuples could be passed into a diffing Flow:

def diffJs(original : JsValue, dbVersion : JsValue) : JsValue = ???

val diffFlow : Flow[(JsValue, JsValue), JsValue, _] = 
  Flow[(JsValue, JsValue)] map diffJs.tupled

The final part you mentioned was a db that would persist the differences, this can represented with a Sink:

val dbSink : Sink[JsValue, _] = ???

All of these components can then be combined to form the stream based on your source of values:

val jsSource : Source[JsValue, _] = ???

jsSource via queryFlow via diffFlow runWith dbSink

For an example of asynchronous db querying see this example demonstrating mapAsync.

Community
  • 1
  • 1
Ramón J Romero y Vigil
  • 17,373
  • 7
  • 77
  • 125
  • thanks, I think about same way, but how about async `dbQuery: Future[JsValue]` in Flow and then in Sink? I can't use simple `map`. – andrey.ladniy Feb 10 '17 at 19:58
  • @andrey.ladniy You are welcome. I updated my answer with a link to a stack answer that is an exact demonstration of what you're looking for. If there's anything un-accounted for there then I can update this answer... – Ramón J Romero y Vigil Feb 10 '17 at 22:30