import apache_beam as beam
from apache_beam.io.kafka import ReadFromKafka
def word_count(element):
message = element[1]
words = message.split()
return words
def print_message(message):
print(message)
return message
with beam.Pipeline() as pipeline:
(pipeline
| 'ReadFromKafka' >> ReadFromKafka(
consumer_config={
'bootstrap.servers': 'host.docker.internal:9092', # Change to your Kafka server address.
"group.id": "c-group",
"auto.offset.reset": "earliest",
},
topics=['my-topic']
)
| 'Print messages' >> beam.Map(print_message)
| 'FlatMap' >> beam.FlatMap(word_count))
pipeline.run()
I am exploring apache beam and trying to read the data from Kafka and for now just log the message. I am not seeing any message being consumed, I added a DEBUG log, and i can see its able to connect to cluster with the bootstrap server. The topic is single partitioned, and I am passing a message to it. The console consumer is able to consume the message, but it's not printing any message. Also the offset is seeked to latest offset in the DEBUG. Let's say the offset is re-setted to 5, when I have 4 message published. I even tried connecting to confluent cluster, the connection had no issue.
message: "[Consumer clientId=consumer-tracker-single_topic-0_offset_consumer_1139498349_consumer-group-29, groupId=tracker-single_topic-0_offset_consumer_1139498349_consumer-group] Seeking to LATEST offset of partition single_topic-0"
message: "[Consumer clientId=consumer-tracker-single_topic-0_offset_consumer_1139498349_consumer-group-29, groupId=tracker-single_topic-0_offset_consumer_1139498349_consumer-group] Resetting offset for partition single_topic-0 to offset 5."
message: "[Consumer clientId=consumer-tracker-single_topic-0_offset_consumer_1321183567_consumer-group-31, groupId=tracker-single_topic-0_offset_consumer_1321183567_consumer-group] Subscribed to partition(s): single_topic-0"
what could be wrong? Extra logs that could help
INFO:root:severity: INFO
timestamp {
seconds: 1683957338
nanos: 902000000
}
message: "Kafka version: 2.4.1"
instruction_id: "bundle_1"
transform_id: "ReadFromKafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/KafkaIO.ReadSourceDescriptors/ParDo(Unbounded)/ParMultiDo(Unbounded)/PairWithRestriction"
log_location: "org.apache.kafka.common.utils.AppInfoParser"
thread: "25"
INFO:root:severity: INFO
timestamp {
seconds: 1683957338
nanos: 903000000
}
message: "Kafka commitId: c57222ae8cd7866b"
instruction_id: "bundle_1"
transform_id: "ReadFromKafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/KafkaIO.ReadSourceDescriptors/ParDo(Unbounded)/ParMultiDo(Unbounded)/PairWithRestriction"
log_location: "org.apache.kafka.common.utils.AppInfoParser"
thread: "25"