0

I would like to extract the top 10 highest score like this :

Paul - 38
Michel - 27
Hugo - 27
Bob - 24
Kevin - 19
...
(10 elements)

I'm using a fixed window and a data-driven trigger that outputs early results after a pane has collected X elements. Also, I'm using a combiner to get the top 10 highest scores.

(inputs
         | 'Apply Window of time' >> beam.WindowInto(
                        beam.window.FixedWindows(size=5 * 60))
                        trigger=trigger.Repeatedly(trigger.AfterCount(2)),
                  accumulation_mode=trigger.AccumulationMode.ACCUMULATING)
         | beam.ParDo(PairWithOne()) # ('key', 1)
         | beam.CombinePerKey(sum)
         | 'Top 10 scores' >> beam.CombineGlobally(
                        beam.combiners.TopCombineFn(n=10,
                                                    compare=lambda a, b: a[1] < b[
                                                        1])).without_defaults())

The problem here is that the first output seems to be correct but the following outputs contains duplicated keys like that :

Paul - 38
Paul - 37
Michel - 27
Paul - 36
Michel - 26
Kevin - 20
...
(10 elements)

As you can see, I'm not getting 10 distinct K/V pairs but duplicated keys.

When not using a trigger/accumulation strategy, this works well.. but if I want to have a window of 2 hours, I'd like to get frequents updates...

MichelDelpech
  • 853
  • 8
  • 36
  • 1
    Have you tried with discarding fired panes? (i.e. `accumulation_mode=trigger.AccumulationMode.DISCARDING`) – Guillem Xercavins Jun 16 '19 at 11:01
  • It's working with DISCARDING. But I'm saving this output to Cloud Firestore and with that strategy I'm not sure to be able to update my Array properly every new updates. Because with the accumulation strategy, I just had to replace the existing array with a new one. – MichelDelpech Jun 16 '19 at 11:18
  • But I get it... I think I should change the way I'm saving my data then. – MichelDelpech Jun 16 '19 at 11:25
  • Otherwise, maybe add another combineFn in between to get only Top 1 per key so that you get the latest score per user. Edit: I'm actually testing it that way and I still get duplicate keys... – Guillem Xercavins Jun 16 '19 at 11:32
  • Yeah, i don't know why TopCombineFN doesnt auto-merge the value for a same key... Well actually i'm using DISCARDING and i'm running a transaction to increment each user score on my DB. Working well but I have to do some extra queries. Still investigating for that issue too ;) – MichelDelpech Jun 16 '19 at 11:38
  • 1
    So, that gave me the idea to modify `TopCombineFn` to pop items from the heap if they share key with the incoming element from a new pane. It seems to be working for me, writing a (long) answer – Guillem Xercavins Jun 16 '19 at 14:02

1 Answers1

1

As discussed in the comments, one possibility is to transition to Discarding fired panes, which can be set via accumulation_mode=trigger.AccumulationMode.DISCARDING. If you still want to keep the ACCUMULATING mode you might want to modify TopCombineFn so that repeated panes from the same user overwrite the previous value and avoid duplicated keys. TopDistinctFn will take as a base the code here for Beam SDK 2.13.0. In the add_input method we'll do a previous check as follows:

for current_top_element in enumerate(heap):
  if element[0] == current_top_element[1].value[0]:
    heap[current_top_element[0]] = heap[-1]
    heap.pop()
    heapq.heapify(heap)

Basically, we'll compare the key for the element that we are evaluating (element[0]) versus each element in the heap. Heap elements are of type ComparableValue so we can use value to get back the tuple (and value[0] to get the key). If they match we'll want to pop it out from the heap (as we are accumulating the sum will be greater). Beam SDK uses the heapq library so I based my approach on this answer to remove the i-th element (we use enumerate to keep index information).

I added some logging to be able to help detect duplicates:

logging.info("Duplicate: " + element[0] + "," + str(element[1]) + ' --- ' + current_top_element[1].value[0] + ',' + str(current_top_element[1].value[1]))

The code is located in a top.py file inside a combiners folder (with __init__.py) and I import it with:

from combiners.top import TopDistinctFn

Then, I can use TopDistinctFn from within the pipeline as this:

(inputs
     | 'Add User as key' >> beam.Map(lambda x: (x, 1)) # ('key', 1)
     | 'Apply Window of time' >> beam.WindowInto(
                    beam.window.FixedWindows(size=10*60),
                    trigger=beam.trigger.Repeatedly(beam.trigger.AfterCount(2)),
                    accumulation_mode=beam.trigger.AccumulationMode.ACCUMULATING)
     | 'Sum Score' >> beam.CombinePerKey(sum)   
     | 'Top 10 scores' >> beam.CombineGlobally(
                    TopDistinctFn(n=10, compare=lambda a, b: a[1] < b[1])).without_defaults()
     | 'Print results' >> beam.ParDo(PrintTop10Fn()))

The full code can be found here. generate_messages.py is the Pub/Sub message generator, top.py contains the custom version of TopCombineFn renamed TopDistinctFn (might look overwhelming but I only added a few lines of code starting at line 425) and test_combine.py the main pipeline code. To run this you can put the files in the correct folder, install Beam SDK 2.13.0 if needed, modify project ID and Pub/Sub topic in generate_messages.py and test_combine-py. Then, start publishing messages with python generate_messages.py and, in a different shell, run the pipeline with the DirectRunner: python test_combine.py --streaming. With DataflowRunner you'll probably need to add the extra files with a setup.py file.

As an example, Bob was leading with 9 points and, when the next update comes, his score is up to 11 points. He'll appear in the next recap with only the updated score and no duplicate (as detected in our logging). The entry with 9 points will not appear and still the top will have 10 users as desired. Likewise for Marta. I noted that older scores still appear in the heap even if not in the top 10 but I am not sure how garbage collection works with heapq.

INFO:root:>>> Current top 10: [('Bob', 9), ('Connor', 8), ('Eva', 7), ('Hugo', 7), ('Paul', 6), ('Kevin', 6), ('Laura', 6), ('Marta', 6), ('Diane', 4), ('Bacon', 4)]
...
INFO:root:Duplicate: Marta,8 --- Marta,6
INFO:root:Duplicate: Bob,11 --- Bob,9
INFO:root:>>> Current top 10: [('Bob', 11), ('Connor', 8), ('Marta', 8), ('Bacon', 7), ('Eva', 7), ('Hugo', 7), ('Paul', 6), ('Laura', 6), ('Diane', 6), ('Kevin', 6)]

Let me know if that works well for your use case too.

Guillem Xercavins
  • 6,938
  • 1
  • 16
  • 35
  • Thank you for a such amazing and detailed answer! This seems to work very well. I hope this get added on a futur update of apache-beam – MichelDelpech Jun 16 '19 at 15:26