0

I'm ingesting data (json file) via Pub/sub So my event-time by default is the time of publishing in the topic. I want to force the event-time and change it. I added a datetime field in my data.
I want to do aggregation and combination according to the new timestamp fields of my json file.

Ps: the field is named "timestamp" and it is a string. that's why I convert it into a datetime and then a timestamp in dataflow

def get_timestamp(data):
    my_date = (data['timestamp']) # date : 2010-09-18......string
    times = datetime.fromisoformat(my_date) #type: datetime.datetime
    return beam.window.TimestampedValue(data, datetime.timestamp(times))

Later I will call the function in my pipeline before I do Windowing

I receive my data from pubsub :

lines = p | 'receive_data' >> beam.io.ReadFromPubSub(
        subscription=known_args.in_topic).with_input_types(str) 
        | 'decode' >> beam.Map(lambda x: x.decode('utf-8')) 
        | 'jsonload' >> beam.Map(lambda x: json.loads(x))

and then do my processing :

 (lines |'timestamp' >> beam.Map(get_timestamp)
           | 'print timestamp' >> beam.ParDo(PrintFn2())
           | 'window' >> beam.WindowInto(
            window.FixedWindows(10),
            trigger=trigger.AfterWatermark(),
            accumulation_mode=trigger.AccumulationMode.ACCUMULATING
        )
        | 'CountGlobally' >> beam.CombineGlobally(
                beam.combiners.CountCombineFn()
            ).without_defaults() 
    )
Rim
  • 1,735
  • 2
  • 18
  • 29
  • Code looks valid at first glance. Do you have specific issue with it? – Mikhail Gryzykhin Sep 19 '19 at 21:17
  • Everytime I print (just afrer the 'CountGlobally' step, I get more than one result! normally if I'm using an afterwatermark trigger + accumulating mode , I should have just one result once the watermark reaches the end of the window! But I get more than One. – Rim Sep 20 '19 at 08:45

1 Answers1

1

When reading from PubSub the best way to set EvenTime for an element is to make use of the

Java withTimestampAttribute

Python timestamp_attribute

This will set the elements timestamp and ensure the watermark signals has good data.

If this is not an option you can change the timestamp for the element in a DoFn as per Adding Timestamps to a PCollection. However this method will not allow for setting of timestamps < then the current watermark. Which is why the withTimestampAttribute method is the best way to solve for this pattern.

Reza Rokni
  • 1,206
  • 7
  • 12