1

I'm computing statistics (min, avg, etc.) on fixed windows of data. The data is streamed in as single points and are continuous (like temperature).

My current pipeline (simplified for this question) looks like:

read -> window -> compute stats (CombineFn) -> write

The issue with this is that each window's stats are incorrect since they don't have a baseline. By that, I mean that I want each window's statistics to include a single data point (the latest one) from the previous window's data.

One way to think about this is that each window's input PCollection should include the ones that would normally be in the window due to their timestamp, but also one extra point from the previous window's PCollection.

I'm unsure of how I should go about doing this. Here are some things I've thought of doing:

  • Duplicating the latest data point in every window with a modified timestamp such that it lands in the next window's timeframe
  • Similarly, create a PCollectionView singleton per window that includes a modified version of its latest data point, which will be consumed as a side input to be merged into the next window's input PCollection

One constraint is that if a window doesn't have any new data points, except for the one that was forwarded to it, it should re-forward that value to the next window.

oalashqar
  • 35
  • 5
  • Please post some code showing what you have tried. – Juan Carlos Ramirez Jun 11 '19 at 15:52
  • A similar implementation to what you describe in your first point can be found [here](https://stackoverflow.com/a/55289404/6121516) – Guillem Xercavins Jun 11 '19 at 16:33
  • @GuillemXercavins I'm already doing that for a different part of the pipeline. The difference here is that I only want one (the latest) value to be duplicated. It's a bit of a mind-twister when you factor in late firing. – oalashqar Jun 11 '19 at 16:51

1 Answers1

1

It sounds like you may need to copy a value from one window into arbitrarily many future windows. The only way I know how to do this is via state and timers.

You could write a stateful DoFn that operates on globally windowed data and stores in its state the latest (by timestamp) element per window and fires a timer at each window boundary this element into the subsequent window. (You could possibly leverage the Latest combine operation to get the latest element per window rather than doing it manually.) Flattening this with your original data and then windowing should give you the values you desire.

robertwb
  • 4,891
  • 18
  • 21