I am creating a pipeline that reads from Kafka using beam_nuggets.io and writes to BigQuery using apache beam WriteToBigQuery.
I am currently running this locally using the DirectRunner to test some of the functionality and concepts. It is able to read from Kafka with no issue, however when writing to BigQuery it logs the message "Refreshing access_token" and then nothing happens.
What is really odd is if I remove the part reading from kafka and replace it with a simple beam.Create(...) it will successfully refresh the token and write to BigQuery as expected.
Extract of the code can be seen below:
messages = (p
| "KafkaConsumer" >> kafkaio.KafkaConsume({"topic": "test",
"bootstrap_servers": "localhost:9092",
"auto_offset_reset": "earliest"})
# | "ManualCreate" >> beam.Create([{"name": "ManualTest", "desc": "a test"}])
| 'Get message' >> beam.Map(lambda x: x[1])
| 'Parse' >> beam.Map(parse_json)
)
messages | "Write to BigQuery" >> beam.io.WriteToBigQuery(pipeline_options.table_spec.get(),
schema=table_schema, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
batch_size=1)
messages | 'Writing to stdout' >> beam.Map(print)
As an additional point when running this locally I have the environment variable "GOOGLE_APPLICATION_CREDENTIALS" set to the location of my service account.
Any help in working out what might be causing this issue would be greatly appreciated.