I'm ingesting data (json file) via Pub/sub
So my event-time by default is the time of publishing in the topic.
I want to force the event-time and change it.
I added a datetime field in my data.
I want to do aggregation and combination according to the new timestamp fields of my json file.
Ps: the field is named "timestamp" and it is a string. that's why I convert it into a datetime and then a timestamp in dataflow
def get_timestamp(data):
my_date = (data['timestamp']) # date : 2010-09-18......string
times = datetime.fromisoformat(my_date) #type: datetime.datetime
return beam.window.TimestampedValue(data, datetime.timestamp(times))
Later I will call the function in my pipeline before I do Windowing
I receive my data from pubsub :
lines = p | 'receive_data' >> beam.io.ReadFromPubSub(
subscription=known_args.in_topic).with_input_types(str)
| 'decode' >> beam.Map(lambda x: x.decode('utf-8'))
| 'jsonload' >> beam.Map(lambda x: json.loads(x))
and then do my processing :
(lines |'timestamp' >> beam.Map(get_timestamp)
| 'print timestamp' >> beam.ParDo(PrintFn2())
| 'window' >> beam.WindowInto(
window.FixedWindows(10),
trigger=trigger.AfterWatermark(),
accumulation_mode=trigger.AccumulationMode.ACCUMULATING
)
| 'CountGlobally' >> beam.CombineGlobally(
beam.combiners.CountCombineFn()
).without_defaults()
)