1

I'm trying to get the max score between two string matches using apache beam.

with beam.Pipeline(options = options) as pipeline:
    result = (pipeline
                   | 'read_data' >> beam.io.ReadFromBigQuery(query = query)
                   | 'printing' >> beam.ParDo(print)
                   )

The output has this structure:

(('address 3642', ['270-42']), ('av. lonely street 7460', [13541]), -3020.3391812865493)
(('address 3642', ['270-42']), ('bacar 8059', [2486]), -3653.532507739938)
(('address 3642', ['270-42']), ('pdt sta 4383', [2648]), 35.382428940568616)
(('address 3642', ['270-42']), ('holy mary 6998', [239]), -2557.4241486068113)
(('new address 3328', ['266-25']), ('false street 7451', [16274]), -3332.230769230769)
(('new address 3328', ['266-25']), ('principal  7532', [1726]), -3344.12474012474)
(('new address 3328', ['266-25']), ('principal  6931', [97]), -2780.3804573804573)
(('new address 3328', ['266-25']), ('john dewey 7577', [2976]), -3458.230769230769)
(('new address 3328', ['266-25']), ('n address 3159', [7852]), 717.2029405063462)
(('address 3642', ['270-42']), ('false street 7451', [1463]), -3012.646370829033)
(('address 3642', ['270-42']), ('sft avenue 7532', [36148]), -3050.6295149638804)
(('address 3642', ['270-42']), ('principal 6931', [3841]), -2489.1169590643276)
(('address 3642', ['270-42']), ('principal 7577', [36841]), -3171.532507739938)
(('address 3642', ['270-42']), ('john dewey 3159', [6418]), 376.5702654526182)

So the last item on each tuple (ej: -3020.339) is the matching score between the first string inside the first tuple (address 3642) and the first string inside the second tuple (av. lonely street 7460).

I would like to get the max matching score having as key the first string of the first tuple (ej: address 3642 and new address 3328), but without losing the other data that is inside the tuple. For example, my desired output in this case would be:

(('address 3642', ['270-42']), ('pdt sta 4383', [2648]), 35.382428940568616)
(('new address 3328', ['266-25']), ('n address 3159', [7852]), 717.2029405063462)

I tried with beam.CombinePerKey(max) and beam.CombinePerKey(max).with_input_types(Tuple[SomeClass,float]), but haven't got the desired result. How can I achieve this?

xerac
  • 147
  • 8
  • Could you copy the *exact* format you have right before calculating the max? Is it the one there? Since the prev step is ReadFromBQ I don't think so. Anyhow, you will your own combiner so that you can keep the rest of the element. If you copy the exact input I will code it – Iñigo Jan 19 '22 at 14:44
  • Also, how do you get `-3020.3391812865493` ? `3642 - 7460 = -3818` – Iñigo Jan 19 '22 at 14:46
  • Thanks for your reply, @Iñigo. I think I didn't understood well your question. The exact format right before calculating the max is already there (ej: `(('new address 3328', ['266-25']), ('principal 6931', [97]), -2780.3804573804573)` --> `tuple(tuple(str,list),tuple(str,list),float)`). Regarding the number, the `-3020.339` value is obtained by a formula, which I don't have access to, that sees the similarity between the two mentioned strings. – xerac Jan 19 '22 at 16:30

3 Answers3

0

You can try GroupByKey - https://beam.apache.org/documentation/transforms/python/aggregation/groupbykey/

import apache_beam as beam


def convertToKV(x):
    return x[0][0], [x[0][1], x[1], x[2]]

def findMax(x):
    key = x[0]
    max_value = x[1][0]
    for v in x[1]:
        if v[2] > max_value[2]:
            max_value = v
    return ((key, max_value[0]), max_value[1], max_value[2])


with beam.Pipeline() as pipeline:
    findMaxPerKey = (
            pipeline
    | 'Create pcollection' >> beam.Create([
        (('address 3642', ['270-42']), ('av. lonely street 7460', [13541]), -3020.3391812865493),
        (('address 3642', ['270-42']), ('bacar 8059', [2486]), -3653.532507739938),
        (('address 3642', ['270-42']), ('pdt sta 4383', [2648]), 35.382428940568616),
        (('address 3642', ['270-42']), ('holy mary 6998', [239]), -2557.4241486068113),
        (('new address 3328', ['266-25']), ('false street 7451', [16274]), -3332.230769230769),
        (('new address 3328', ['266-25']), ('principal  7532', [1726]), -3344.12474012474),
        (('new address 3328', ['266-25']), ('principal  6931', [97]), -2780.3804573804573),
        (('new address 3328', ['266-25']), ('john dewey 7577', [2976]), -3458.230769230769),
        (('new address 3328', ['266-25']), ('n address 3159', [7852]), 717.2029405063462),
        (('address 3642', ['270-42']), ('false street 7451', [1463]), -3012.646370829033),
        (('address 3642', ['270-42']), ('sft avenue 7532', [36148]), -3050.6295149638804),
        (('address 3642', ['270-42']), ('principal 6931', [3841]), -2489.1169590643276),
        (('address 3642', ['270-42']), ('principal 7577', [36841]), -3171.532507739938),
        (('address 3642', ['270-42']), ('john dewey 3159', [6418]), 376.5702654526182),
    ])   |  beam.Map(lambda x: convertToKV(x))
            |  beam.GroupByKey()
      | beam.Map(lambda x : findMax(x))) | beam.Map(print)
swar patel
  • 32
  • 4
0

I can see three ways to think about what you want: as a custom comparator or a custom Beam combiner.

I think the easiest way would be to write a comparator function for your tuples and pass it to CombinePerKey. You write a callable that maps an iterable of input tuples to the one with the maximum score. The full documentation for how to write a callable that you can pass to CombinePerKey is at https://beam.apache.org/documentation/programming-guide/#simple-combines.

Another option that could make your code more readable in the future would be to wrap the data in a named class and make it comparable, as described in Comparable classes in Python 3 (and in many other places) and then you CombinePerKey(max).

And finally, if you want to learn more about Beam, you could implement the CombineFn interface, described at https://beam.apache.org/documentation/programming-guide/#advanced-combines and with many examples in https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/combiners.py.

Kenn Knowles
  • 5,838
  • 18
  • 22
0

Both what Kenneth and Swar Patel said would work. I didn't think about comparable classes and it's a good idea. The GBK may be a bit inefficient if you have many element with the same key, since you'd need to group them in the same worker and then process them, that's why I suggested the custom combiner. Combiners are "lifted" so that you can parallelize work and only keep track of the accumulator.

This is the code for what you want using custom combiners:

CombinerFn

class CustomMax(CombineFn):
  def create_accumulator(self):
    # create and initialise accumulator
    tuple_track = None
    max_value = float('-inf')
    return tuple_track, max_value

  def add_input(self, accumulator, element):
    # accumulates each element from input in accumaltor
    if element[2] > accumulator[1]:
        return element, element[2]
    return accumulator

  def merge_accumulators(self, accumulators):
    # Multiple accumulators could be processed in parallel,
    # this function merges them
    return max(accumulators, key=lambda x: x[1])

  def extract_output(self, accumulator):
    # Only output the tracker
    return accumulator[0]

A pipeline for you to test, using your data

elements = [
    (('address 3642', ['270-42']), ('av. lonely street 7460', [13541]), -3020.3391812865493),
        (('address 3642', ['270-42']), ('bacar 8059', [2486]), -3653.532507739938),
        (('address 3642', ['270-42']), ('pdt sta 4383', [2648]), 35.382428940568616),
        (('address 3642', ['270-42']), ('holy mary 6998', [239]), -2557.4241486068113),
        (('new address 3328', ['266-25']), ('false street 7451', [16274]), -3332.230769230769),
        (('new address 3328', ['266-25']), ('principal  7532', [1726]), -3344.12474012474),
        (('new address 3328', ['266-25']), ('principal  6931', [97]), -2780.3804573804573),
        (('new address 3328', ['266-25']), ('john dewey 7577', [2976]), -3458.230769230769),
        (('new address 3328', ['266-25']), ('n address 3159', [7852]), 717.2029405063462),
        (('address 3642', ['270-42']), ('false street 7451', [1463]), -3012.646370829033),
        (('address 3642', ['270-42']), ('sft avenue 7532', [36148]), -3050.6295149638804),
        (('address 3642', ['270-42']), ('principal 6931', [3841]), -2489.1169590643276),
        (('address 3642', ['270-42']), ('principal 7577', [36841]), -3171.532507739938),
        (('address 3642', ['270-42']), ('john dewey 3159', [6418]), 376.5702654526182),
]

(p | Create(elements)
   | "Add key" >> Map(lambda x: (x[0][0], x))
   | CombinePerKey(CustomMax())
   | "Remove key" >> Map(lambda x: x[1])
   | "log" >> Map(print))

Output

(('address 3642', ['270-42']), ('john dewey 3159', [6418]), 376.5702654526182)
(('new address 3328', ['266-25']), ('n address 3159', [7852]), 717.2029405063462)
Iñigo
  • 2,500
  • 2
  • 10
  • 20
  • Actually, now that I think of it, you don't need the interface and can make it with a "normal" combine, i will add it to my answer tomorrow – Iñigo Jan 20 '22 at 19:36