As discussed in the comments, one possibility is to transition to Discarding fired panes, which can be set via accumulation_mode=trigger.AccumulationMode.DISCARDING
. If you still want to keep the ACCUMULATING
mode you might want to modify TopCombineFn
so that repeated panes from the same user overwrite the previous value and avoid duplicated keys. TopDistinctFn
will take as a base the code here for Beam SDK 2.13.0. In the add_input
method we'll do a previous check as follows:
for current_top_element in enumerate(heap):
if element[0] == current_top_element[1].value[0]:
heap[current_top_element[0]] = heap[-1]
heap.pop()
heapq.heapify(heap)
Basically, we'll compare the key for the element that we are evaluating (element[0]
) versus each element in the heap. Heap elements are of type ComparableValue
so we can use value
to get back the tuple (and value[0]
to get the key). If they match we'll want to pop it out from the heap (as we are accumulating the sum will be greater). Beam SDK uses the heapq
library so I based my approach on this answer to remove the i-th
element (we use enumerate
to keep index information).
I added some logging to be able to help detect duplicates:
logging.info("Duplicate: " + element[0] + "," + str(element[1]) + ' --- ' + current_top_element[1].value[0] + ',' + str(current_top_element[1].value[1]))
The code is located in a top.py
file inside a combiners
folder (with __init__.py
) and I import it with:
from combiners.top import TopDistinctFn
Then, I can use TopDistinctFn
from within the pipeline as this:
(inputs
| 'Add User as key' >> beam.Map(lambda x: (x, 1)) # ('key', 1)
| 'Apply Window of time' >> beam.WindowInto(
beam.window.FixedWindows(size=10*60),
trigger=beam.trigger.Repeatedly(beam.trigger.AfterCount(2)),
accumulation_mode=beam.trigger.AccumulationMode.ACCUMULATING)
| 'Sum Score' >> beam.CombinePerKey(sum)
| 'Top 10 scores' >> beam.CombineGlobally(
TopDistinctFn(n=10, compare=lambda a, b: a[1] < b[1])).without_defaults()
| 'Print results' >> beam.ParDo(PrintTop10Fn()))
The full code can be found here. generate_messages.py
is the Pub/Sub message generator, top.py
contains the custom version of TopCombineFn
renamed TopDistinctFn
(might look overwhelming but I only added a few lines of code starting at line 425) and test_combine.py
the main pipeline code. To run this you can put the files in the correct folder, install Beam SDK 2.13.0 if needed, modify project ID and Pub/Sub topic in generate_messages.py
and test_combine-py
. Then, start publishing messages with python generate_messages.py
and, in a different shell, run the pipeline with the DirectRunner
: python test_combine.py --streaming
. With DataflowRunner
you'll probably need to add the extra files with a setup.py
file.
As an example, Bob
was leading with 9 points and, when the next update comes, his score is up to 11 points. He'll appear in the next recap with only the updated score and no duplicate (as detected in our logging). The entry with 9 points will not appear and still the top will have 10 users as desired. Likewise for Marta
. I noted that older scores still appear in the heap even if not in the top 10 but I am not sure how garbage collection works with heapq
.
INFO:root:>>> Current top 10: [('Bob', 9), ('Connor', 8), ('Eva', 7), ('Hugo', 7), ('Paul', 6), ('Kevin', 6), ('Laura', 6), ('Marta', 6), ('Diane', 4), ('Bacon', 4)]
...
INFO:root:Duplicate: Marta,8 --- Marta,6
INFO:root:Duplicate: Bob,11 --- Bob,9
INFO:root:>>> Current top 10: [('Bob', 11), ('Connor', 8), ('Marta', 8), ('Bacon', 7), ('Eva', 7), ('Hugo', 7), ('Paul', 6), ('Laura', 6), ('Diane', 6), ('Kevin', 6)]
Let me know if that works well for your use case too.