Here you have a working sample using the public topic for taxis
This is the StatefulDoFn
class UpdateLast(beam.DoFn):
RIDE_TRACK = BagStateSpec('rides', TupleCoder((FloatCoder(), FloatCoder())))
def process(self,
element,
timestamp_param=beam.DoFn.TimestampParam,
ride_state=beam.DoFn.StateParam(RIDE_TRACK)):
key = element[0]
meter_reading = element[1]
timestamp = float(timestamp_param)
bag_content = [x for x in ride_state.read()]
if not bag_content:
logging.info("Generating entry %s for key %s", (meter_reading, timestamp), key)
ride_state.add((meter_reading, timestamp))
output = {"now": meter_reading, "before": None}
yield (key, output)
else:
# There should only be one element in the bag
bag_ride = bag_content[0]
old_meter = bag_ride[0]
old_timestamp = bag_ride[1]
# We only need to check if the element is more recent
if timestamp > old_timestamp:
# Update bag
ride_state.clear()
ride_state.add((meter_reading, timestamp))
output = {"now": meter_reading, "before": old_meter}
logging.info("KEY %s: updating from %s to %s", key, old_meter, meter_reading)
yield (key, output)
else:
# Invert old and new if element is old
output = {"now": old_meter, "before": meter_reading}
yield (key, output)
And a pipeline for you to test it"
options = PipelineOptions(
temp_location=f"{bucket}/tmp/",
project=project,
region=region,
streaming=True,
job_name="statedofn",
num_workers=4,
max_num_workers=20,
)
p = beam.Pipeline(DataflowRunner(), options=options)
topic = "projects/pubsub-public-data/topics/taxirides-realtime"
pubsub = (p | "Read Topic" >> ReadFromPubSub(topic=topic)
| "Json Loads" >> Map(json.loads)
| beam.Filter(lambda x: x["ride_status"] == "enroute")
| "KV" >> Map(lambda x: (x["ride_id"], x["meter_reading"]))
)
state_df = (pubsub | "Stateful Do Fn" >> ParDo(UpdateLast())
| Map(logging.info)
)
p.run()
output:
('052b8a40-1c57-4a3c-a012-73ffeddb1f02', {'now': 9.875244, 'before': 9.857124})
('835a9a99-c2fc-4f3d-9284-59098827fe05', {'now': 26.973698, 'before': 26.940273})
('952c0fa5-2bb8-4c9a-b38c-72d66dedfddc', {'now': 17.828278, 'before': 17.808857})
('952c0fa5-2bb8-4c9a-b38c-72d66dedfddc', {'now': 17.847698, 'before': 17.828278})
('d5641df2-2fd8-4416-bde7-4def6d477a29', {'now': 2.3575556, 'before': 2.3346667})
('d5641df2-2fd8-4416-bde7-4def6d477a29', {'now': 2.3804445, 'before': 2.3575556})