I'm trying to get the max score between two string matches using apache beam.
with beam.Pipeline(options = options) as pipeline:
result = (pipeline
| 'read_data' >> beam.io.ReadFromBigQuery(query = query)
| 'printing' >> beam.ParDo(print)
)
The output has this structure:
(('address 3642', ['270-42']), ('av. lonely street 7460', [13541]), -3020.3391812865493)
(('address 3642', ['270-42']), ('bacar 8059', [2486]), -3653.532507739938)
(('address 3642', ['270-42']), ('pdt sta 4383', [2648]), 35.382428940568616)
(('address 3642', ['270-42']), ('holy mary 6998', [239]), -2557.4241486068113)
(('new address 3328', ['266-25']), ('false street 7451', [16274]), -3332.230769230769)
(('new address 3328', ['266-25']), ('principal 7532', [1726]), -3344.12474012474)
(('new address 3328', ['266-25']), ('principal 6931', [97]), -2780.3804573804573)
(('new address 3328', ['266-25']), ('john dewey 7577', [2976]), -3458.230769230769)
(('new address 3328', ['266-25']), ('n address 3159', [7852]), 717.2029405063462)
(('address 3642', ['270-42']), ('false street 7451', [1463]), -3012.646370829033)
(('address 3642', ['270-42']), ('sft avenue 7532', [36148]), -3050.6295149638804)
(('address 3642', ['270-42']), ('principal 6931', [3841]), -2489.1169590643276)
(('address 3642', ['270-42']), ('principal 7577', [36841]), -3171.532507739938)
(('address 3642', ['270-42']), ('john dewey 3159', [6418]), 376.5702654526182)
So the last item on each tuple (ej: -3020.339) is the matching score between the first string inside the first tuple (address 3642) and the first string inside the second tuple (av. lonely street 7460).
I would like to get the max matching score having as key the first string of the first tuple (ej: address 3642 and new address 3328), but without losing the other data that is inside the tuple. For example, my desired output in this case would be:
(('address 3642', ['270-42']), ('pdt sta 4383', [2648]), 35.382428940568616)
(('new address 3328', ['266-25']), ('n address 3159', [7852]), 717.2029405063462)
I tried with beam.CombinePerKey(max)
and beam.CombinePerKey(max).with_input_types(Tuple[SomeClass,float])
, but haven't got the desired result. How can I achieve this?