Having two input streams, both producing objects instances defined as
case class ReplayData(timestamp:Long, payload:Any)
Stream 1
1, payload1
1000, payload3
Stream 2
400, payload2
1500, payload4
I would like to implement replay mechanism which would push elements downstream ordered by timestamp I have on every element
It will simulate live scenarios from production.
This mechanism would need to obey delays between messages e.g. first message send is payload1 (its starting point), payload2 from Stream2 should be send after 400 ms (difference between next message timestamp and initial message timestamp) and so on.
I may do that quite easily using DelayedQueue which usage is explained in this SO thread
An unbounded blocking queue of Delayed elements, in which an element can only be taken when its delay has expired.
The head of the queue is that Delayed element whose delay expired furthest in the past. If no delay has expired there is no head and poll will return null.
Expiration occurs when an element's getDelay(TimeUnit.NANOSECONDS) method returns a value less than or equal to zero. Even though unexpired elements cannot be removed using take or poll, they are otherwise treated as normal elements.
For example, the size method returns the count of both expired and unexpired elements. This queue does not permit null elements. does not permit null elements.
I'm trying to figure it out how to do that in Akka streams, but have troubles to find something which would solve this issue for me.
I was looking at mergeSorted as a way to merge two streams and order them based on some function.
And it seems more or less it would fit this purpose of ordering based on some custom function.
I'm not sure how to handle delays between elements based on timestamp property.
Using plain old AKKA, I may use scheduler to read data, order them and schedule every element to be send when time passed.