1

I have a very simple BEAM Python script, working like a charm when started on the DataflowRunner. It takes datas from a Pub/Sub subscription and print it... And that is all and it works. But, when I start it on the DirectRunner, I get this error :

ERROR:apache_beam.runners.direct.executor:Exception at bundle , due to an exception.
 Traceback (most recent call last)
[...]
File ".../.local/lib/python3.9/site-packages/apache_beam/utils/timestamp.py", line 106, in from_utc_datetime
    if dt.tzinfo != pytz.utc and dt.tzinfo != datetime.timezone.utc:
AttributeError: tzinfo

If I replace the code about the Pub/Sub subscription by a Beam.Create([...]) step, it works.


    # Read from PubSub
    rows = (pipeline
            | 'Read from PubSub' >> beam.io.ReadFromPubSub(subscription=known_args.input_subscription)
            #| 'Simple PCollection' >> beam.Create([0,2,5])
            | 'print' >> beam.Map(print)
           )

Sebastien
  • 115
  • 1
  • 11
  • Can you use python to calculate the date from seconds and format it as a string and include timezone info, ie `yyyy-mm-ddThh:mm:ssZ`? – Bohemian Mar 11 '23 at 11:43
  • My suggestion would be to first check why is the if condition being run. Once know, check how is the dt value getting populated. Does it have any association to the runners in the context. I guess finding these details should probably answer the query. – Aishwary Shukla Mar 11 '23 at 15:01
  • Thanks ! I followed both of your advices. - there is no problem of libraries or python environment - I looked at the publish_time generated by PubSub : the format seems to be not fitted to the DirectRunner So : - I think about a bug : I told the BEAM dev team and wait for their answers - I made a small workaround, I bypass the publish_time in the BEAM script timestamp.py (line 106) adding a well formed DateTime : dt = datetime.datetime.now(datetime.timezone.utc) – Sebastien Mar 13 '23 at 15:44
  • I'm experiencing the same issue. Where exactly do you add this line: dt = datetime.datetime.now(datetime.timezone.utc) in your code @Sebastien – Steeve Jun 07 '23 at 22:37
  • I got it working when I specified `timestamp_attribute="logging.googleapis.com/timestamp"` in the ReadFromPubSub function. This is a parameter that is set to None by default --> https://beam.apache.org/releases/pydoc/2.29.0/apache_beam.io.gcp.pubsub.html#apache_beam.io.gcp.pubsub.ReadFromPubSub , make sure you have the latest apache beam version. I am using DirectRunner. – maverick Aug 01 '23 at 15:47

0 Answers0