0

I am creating a pipeline that reads from Kafka using beam_nuggets.io and writes to BigQuery using apache beam WriteToBigQuery.

I am currently running this locally using the DirectRunner to test some of the functionality and concepts. It is able to read from Kafka with no issue, however when writing to BigQuery it logs the message "Refreshing access_token" and then nothing happens.

What is really odd is if I remove the part reading from kafka and replace it with a simple beam.Create(...) it will successfully refresh the token and write to BigQuery as expected.

Extract of the code can be seen below:

    messages = (p
            | "KafkaConsumer" >> kafkaio.KafkaConsume({"topic": "test",
                          "bootstrap_servers": "localhost:9092", 
                          "auto_offset_reset": "earliest"})
            # | "ManualCreate" >> beam.Create([{"name": "ManualTest", "desc": "a test"}])
            | 'Get message' >> beam.Map(lambda x: x[1])
            | 'Parse' >> beam.Map(parse_json)
            )

   messages | "Write to BigQuery" >> beam.io.WriteToBigQuery(pipeline_options.table_spec.get(),
                                                          schema=table_schema, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                                                          create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                                                          batch_size=1)

   messages | 'Writing to stdout' >> beam.Map(print)

As an additional point when running this locally I have the environment variable "GOOGLE_APPLICATION_CREDENTIALS" set to the location of my service account.

Any help in working out what might be causing this issue would be greatly appreciated.

GradviusMars
  • 71
  • 1
  • 1
  • 7

1 Answers1

1

Assuming a streaming pipeline, can you try setting an appropriate Window or a Trigger according to instructions here.

Even though you are not directly using GroupByKey operations in your pipeline, beam.io.WriteToBigQuery uses GroupByKey transforms in it's implementation, so you have to do above to make sure that the data in your pipeline gets propagated appropriately.

When you used the Create transform, the watermark would go from zero to infinity which allowed your pipeline to complete.

Also, note that Kafka implementation in beam_nuggets.io is a very straightforward Kafka implementation that uses a Beam DoFn to read. Many Beam streaming runners require sources to implement other streaming features to operate correctly (for example, checkpointing). I suggest trying out kafka.py available in Beam repo which provides a more complete Kafka unbounded read implementation.

chamikara
  • 1,896
  • 1
  • 9
  • 6
  • Thank you for the response. I am now using ReadFromKafka from beam io to consume data, with a window and trigger as suggested. I am still having some issues when it comes to the BigQueryWrite being triggerred, I am worried it is due to the issue mentioned here https://stackoverflow.com/questions/66151919/apache-beam-python-sdk-readfromkafka-does-not-receive-data. I was just curious if you have had any experience of seeing this actually work when using a DirectRunner? As I am worried the bug mentioned in that post is going to prove a blocker! – GradviusMars Jul 29 '22 at 12:49