Consider below approach. I have tried my best to make sure that the Pado fns are associative and commutative. Which means this should not break when run parallel on multiple workers. Let me know if you find this breaking over DataflowRunner
import apache_beam as beam
from apache_beam.transforms.core import DoFn
class cum_sum(DoFn):
def process(self, element,lkp_data,accum_sum):
for lkp_id_income in lkp_data:
if element['ID'] >= lkp_id_income[0]:
accum_sum += lkp_id_income[1]
element.update({'cumulative_sum':accum_sum})
yield element
class rank_it(DoFn):
def process(self, element, lkp_data,counter):
for lkp_id_cumsum in lkp_data:
if lkp_id_cumsum['cumulative_sum'] < element['cumulative_sum']:
counter += 1
element.update({'rank':counter})
yield element
with beam.Pipeline() as p:
data = (
p
| 'create'>>beam.Create(
[
{
'ID':4,
'Sector':'Liam',
'Country':'US',
'Income':1400
},
{
'ID':2,
'Sector':'piam',
'Country':'IS',
'Income':1200
},
{
'ID':1,
'Sector':'Oiam',
'Country':'PS',
'Income':1300
},
{
'ID':3,
'Sector':'Uiam',
'Country':'OS',
'Income':1800
}
]
)
)
ids_income = (
data
| 'get_ids_income'>>beam.Map(lambda element: (element['ID'], element['Income']))
)
with_cumulative_sum = (
data
| 'cumulative_sum'>>beam.ParDo(cum_sum(),lkp_data = beam.pvalue.AsIter(ids_income),accum_sum = 0)
)
with_ranking =(
with_cumulative_sum
| 'ranking'>>beam.ParDo(rank_it(),lkp_data = beam.pvalue.AsIter(with_cumulative_sum),counter = 1)
| 'print'>>beam.Map(print)
)
Output
{'ID': 4, 'Sector': 'Liam', 'Country': 'US', 'Income': 1400, 'cumulative_sum': 5700, 'rank': 4}
{'ID': 2, 'Sector': 'piam', 'Country': 'IS', 'Income': 1200, 'cumulative_sum': 2500, 'rank': 2}
{'ID': 1, 'Sector': 'Oiam', 'Country': 'PS', 'Income': 1300, 'cumulative_sum': 1300, 'rank': 1}
{'ID': 3, 'Sector': 'Uiam', 'Country': 'OS', 'Income': 1800, 'cumulative_sum': 4300, 'rank': 3}