1

I'm currently thinking to build pipeline that have LAG operator like in SQL. But i'm not sure if it's possible.

To be clearer, let's say I have stream of data like this:

# sensor_name, temperature
("station 1", 30.0)
("station 1", 31.0)
("station 1", 32.0)
("station 1", 33.0)
("station 2", 30.0)
("station 2", 31.0)
("station 2", 32.0)

and do PTransform and the output become

("station 1", {"now":30.0, "before":None})
("station 1", {"now":31.0, "before":30.0})
("station 1", {"now":32.0, "before":31.0})
("station 1", {"now":33.0, "before":32.0})
("station 2", {"now":30.0, "before":None})
("station 2", {"now":31.0, "before":30.0})
("station 2", {"now":32.0, "before":31.0})

Is it possible to do so? thanks!

fahmiduldul
  • 924
  • 7
  • 18
  • This is possible with StatefulDoFn, but you may receive elements in the wrong order if they are nearby (for example, you may receive 32 before 31). In that case, what would you do with 31? I guess you are using Python, right? – Iñigo Jan 18 '22 at 16:15
  • I think your best bet to keep work with this is to have two arrays, one which is the original and the other which lags behind it. So then you can merge the result. You can find more details of this approach on this [stack overflow answer](https://stackoverflow.com/questions/55279946/dataflow-look-up-a-previous-event-in-an-event-stream). Its that what you are looking into? – Betjens Jan 18 '22 at 16:46

1 Answers1

3

Here you have a working sample using the public topic for taxis

This is the StatefulDoFn

class UpdateLast(beam.DoFn):
    RIDE_TRACK = BagStateSpec('rides', TupleCoder((FloatCoder(), FloatCoder())))
    
    def process(self,
                element,
                timestamp_param=beam.DoFn.TimestampParam,
                ride_state=beam.DoFn.StateParam(RIDE_TRACK)):
        
        key = element[0]
        meter_reading = element[1]
        timestamp = float(timestamp_param)
        
        bag_content = [x for x in ride_state.read()]
        if not bag_content:
            logging.info("Generating entry %s for key %s", (meter_reading, timestamp), key)
            ride_state.add((meter_reading, timestamp))
            output = {"now": meter_reading, "before": None}
            yield (key, output)
        else:
            # There should only be one element in the bag
            bag_ride = bag_content[0]
            
            old_meter = bag_ride[0]
            old_timestamp = bag_ride[1]
            # We only need to check if the element is more recent
            if timestamp > old_timestamp:
                # Update bag
                ride_state.clear()
                ride_state.add((meter_reading, timestamp))
                output = {"now": meter_reading, "before": old_meter}
                logging.info("KEY %s: updating from %s to %s", key, old_meter, meter_reading)
                yield (key, output)
            else: 
                # Invert old and new if element is old
                output = {"now": old_meter, "before": meter_reading}
                yield (key, output) 

And a pipeline for you to test it"

options = PipelineOptions(
    temp_location=f"{bucket}/tmp/",
    project=project,
    region=region,
    streaming=True,
    job_name="statedofn",
    num_workers=4,
    max_num_workers=20,
)

p = beam.Pipeline(DataflowRunner(), options=options)

topic = "projects/pubsub-public-data/topics/taxirides-realtime"

pubsub = (p | "Read Topic" >> ReadFromPubSub(topic=topic)
            | "Json Loads" >> Map(json.loads)
            | beam.Filter(lambda x: x["ride_status"] == "enroute")
            | "KV" >> Map(lambda x: (x["ride_id"], x["meter_reading"]))
         )

state_df = (pubsub | "Stateful Do Fn" >> ParDo(UpdateLast())
                   | Map(logging.info)
           )

p.run()

output:

('052b8a40-1c57-4a3c-a012-73ffeddb1f02', {'now': 9.875244, 'before': 9.857124})
('835a9a99-c2fc-4f3d-9284-59098827fe05', {'now': 26.973698, 'before': 26.940273})
('952c0fa5-2bb8-4c9a-b38c-72d66dedfddc', {'now': 17.828278, 'before': 17.808857})
('952c0fa5-2bb8-4c9a-b38c-72d66dedfddc', {'now': 17.847698, 'before': 17.828278})
('d5641df2-2fd8-4416-bde7-4def6d477a29', {'now': 2.3575556, 'before': 2.3346667})
('d5641df2-2fd8-4416-bde7-4def6d477a29', {'now': 2.3804445, 'before': 2.3575556})
Iñigo
  • 2,500
  • 2
  • 10
  • 20
  • Thank you so much bro! If you remember you also asked my last question about Dataflow that I failed to give you bounty. I promise I will give you bounty this time if it works (which is super likely will happen)!! – fahmiduldul Jan 19 '22 at 06:28
  • No worries, happy it worked for you! – Iñigo Jan 26 '22 at 17:05