3

Does anybody know how to performe a window function in apache beam (dataflow)?

Example: Ex

ID  Sector  Country Income
1   Liam    US  16133
2   Noah    BR  10184
3   Oliver  ITA 11119
4   Elijah  FRA 13256
5   William GER 7722
6   James   AUS 9786
7   Benjamin    ARG 1451
8   Lucas   FRA 4541
9   Henry   US  9111
10  Alexander   ITA 13002
11  Olivia  ENG 5143
12  Emma    US  18076
13  Ava MEX 15930
14  Charlotte   ENG 18247
15  Sophia  BR  9578
16  Amelia  FRA 10813
17  Isabella    FRA 7575
18  Mia GER 14875
19  Evelyn  AUS 19749
20  Harper  ITA 19642

Questions:

  1. How to create another column with the running sum of the Income ordered by ID?
  2. How to create another column with the Rank of the people who earns the most

Thank You Bruno

robertwb
  • 4,891
  • 18
  • 21

2 Answers2

1

Consider below approach. I have tried my best to make sure that the Pado fns are associative and commutative. Which means this should not break when run parallel on multiple workers. Let me know if you find this breaking over DataflowRunner

import apache_beam as beam
from apache_beam.transforms.core import  DoFn


class cum_sum(DoFn):

    def process(self, element,lkp_data,accum_sum):
        
        for lkp_id_income in lkp_data:
            if element['ID'] >= lkp_id_income[0]:
                accum_sum += lkp_id_income[1]
        element.update({'cumulative_sum':accum_sum})
        yield element
    
class rank_it(DoFn):

    def process(self, element, lkp_data,counter):
        
        for lkp_id_cumsum in lkp_data:
            if lkp_id_cumsum['cumulative_sum'] < element['cumulative_sum']:
                counter += 1
        element.update({'rank':counter})
        yield element


with beam.Pipeline() as p:
    data = (
        p
        | 'create'>>beam.Create(
            [
               {
              'ID':4,
              'Sector':'Liam',
              'Country':'US',
             'Income':1400
             },
             {
              'ID':2,
              'Sector':'piam',
              'Country':'IS',
             'Income':1200
             },
             {
              'ID':1,
              'Sector':'Oiam',
              'Country':'PS',
             'Income':1300
             },
             {
              'ID':3,
              'Sector':'Uiam',
              'Country':'OS',
             'Income':1800
             }
           ]
       )
   )
   
    ids_income = (
       data
       | 'get_ids_income'>>beam.Map(lambda element: (element['ID'], element['Income']))
    )
    with_cumulative_sum = (
        data
        | 'cumulative_sum'>>beam.ParDo(cum_sum(),lkp_data = beam.pvalue.AsIter(ids_income),accum_sum = 0)
    )

    with_ranking =(
        with_cumulative_sum
        | 'ranking'>>beam.ParDo(rank_it(),lkp_data = beam.pvalue.AsIter(with_cumulative_sum),counter = 1)
        | 'print'>>beam.Map(print)

    )

Output

{'ID': 4, 'Sector': 'Liam', 'Country': 'US', 'Income': 1400, 'cumulative_sum': 5700, 'rank': 4}
{'ID': 2, 'Sector': 'piam', 'Country': 'IS', 'Income': 1200, 'cumulative_sum': 2500, 'rank': 2}
{'ID': 1, 'Sector': 'Oiam', 'Country': 'PS', 'Income': 1300, 'cumulative_sum': 1300, 'rank': 1}
{'ID': 3, 'Sector': 'Uiam', 'Country': 'OS', 'Income': 1800, 'cumulative_sum': 4300, 'rank': 3}
Mr.Batra
  • 787
  • 1
  • 5
  • 11
0

Windowing in Apache Beam subdivide your unbounded PCollection in smaller bounded chunk to apply some computation (group by, sum, avg,..).

Unbounded PCollection comes from streaming processing and windows are based on timestamp (you can create sliding window of 5 minutes for instance). In your example, you haven't timestamps, and sounds like a bounded PCollection (a batch).

Technically you can simulate timestamp by preprocessing the elements and adding a dummy time indicator. But in your case, a simple groupby, or a sort is enough to achieve what you want.

guillaume blaquiere
  • 66,369
  • 2
  • 47
  • 76
  • You could also look at the Top function (available in both Java and Python) which might be helpful here. – robertwb Nov 11 '21 at 18:51