2

I'm new to stream processing (kafka streams / flink / storm / spark / etc.) and trying to figure out the best way to go about handling a real world problem, represented here by a toy example. We are tied to Kafka for our pubsub/data ingestion, but have no particular attachment in terms of stream processor framework/approach.

Theoretically, suppose I have a source emitting floating point values sporadically. Also at any given point there is a multiplier M that should be applied to this source's values; but M can change, and critically, I may only find out about the change much later - possibly not even "in change order."

I am thinking of representing this in Kafka as

"Values": (timestamp, floating point value) - the values from the source, tagged with their emission time.

"Multipliers": (timestamp, floating point multiplier) - indicates M changed to this floating point multiplier at this timestamp.

I would then be tempted to create an output topic, say "Results", using a standard stream processing framework, that joins the two streams, and merely multiplies each value in Values with the current multiplier determined by Multipliers.

However, based on my understanding this is not going to work, because new events posted to Multipliers can have arbitrarily large impact on results already written to the Results stream. Conceptually, I would like to have something like a Results stream that is current as of the last event posted to Multipliers against all values in Values, but which can be "recalculated" as either further Values or Multipliers events come in.

What are some techniques for achieving/architecting this with kafka and major stream processors?

Example:

Initially,

Values = [(1, 2.4), (2, 3.6), (3, 1.0), (5, 2.2)]
Multipliers = [(1, 1.0)]
Results = [(1, 2.4), (2, 3.6), (3, 1.0), (5, 2.2)]

Later,

Values = [(1, 2.4), (2, 3.6), (3, 1.0), (5, 2.2)]
Multipliers = [(1, 1.0), (4, 2.0)]
Results = [(1, 2.4), (2, 3.6), (3, 1.0), (5, 4.4)]

Finally, after yet another event posted to Multipliers (and also a new Value emitted too):

Values = [(1, 2.4), (2, 3.6), (3, 1.0), (5, 2.2), (7, 5.0)]
Multipliers = [(1, 1.0), (4, 2.0), (2, 3.0)]
Results = [(1, 2.4), (2, 10.8), (3, 3.0), (5, 4.4), (7, 10.0)]
jdowdell
  • 1,578
  • 12
  • 24
  • IMHO, this is fairly broad as to give you a concrete answer. The actual solution will depend on the requirements: "What do we need to do with the data". On the example provided, I would store both streams and do the operation on read: ie. when the results are needed. But that might not be sufficient depending on the application requirements in an actual scenario. – maasg Jul 31 '17 at 10:33
  • In This program, the Multiplier multiple the value by key. So your result will affected . – R Palanivel-Tamilnadu India Jul 31 '17 at 07:05
  • good point maasg. In our case there is too much data streaming in to support postponing the calculation. In addition, we need to do a sort of query, like "give me all result values and their timestamp where the value is between X and Y, as far as you know according to current info about multipliers"; can't index the results for that query without having calculated them. – jdowdell Jul 31 '17 at 18:24

1 Answers1

1

I am only familiar with Spark and in order for this to work as you describe, you are looking to selectively "update" previous results as new multiplier values are received, while applying the highest indexed multiplier to new values that have not yet had a multiplier applied to them. AFAIK, Spark by itself won't let you do this using streaming (you need to cache and update old results and you also need to know which is the multiplier to use for new values), but you could code the logic such that you write your "results" topic to a regular DB table, and when you received a new multiplier, all subsequent events in the Values dataframe would just use that value, but you would do a one time check to find if there are values in the results table that now need to be updated to use the new multiplier and simply update those values in the DB table.

Your results consumer has to be able to deal with inserts and updates. You can use Spark with any DB that has a connector to achieve this.

Alternatively, you can use SnappyData, which turns Apache Spark into a mutable compute + data platform. Using Snappy, you would have Values and Multipliers as regular streaming dataframes, and you would have Results as a dataframe setup as a replicated table in SnappyData. And when you process a new entry in the multiplier stream, you would update all results stored in the results table. This is perhaps the easiest way to accomplish what you are trying to do