0
import apache_beam as beam
from apache_beam.io.kafka import ReadFromKafka

def word_count(element):
    message = element[1]
    words = message.split()
    return words

def print_message(message):
    print(message)
    return message

with beam.Pipeline() as pipeline:
    (pipeline
        | 'ReadFromKafka' >> ReadFromKafka(
            consumer_config={
                'bootstrap.servers': 'host.docker.internal:9092',  # Change to your Kafka server address.
                "group.id": "c-group",
                "auto.offset.reset": "earliest",
            },
            topics=['my-topic']
        )
        | 'Print messages' >> beam.Map(print_message)
        | 'FlatMap' >> beam.FlatMap(word_count))
    pipeline.run()

I am exploring apache beam and trying to read the data from Kafka and for now just log the message. I am not seeing any message being consumed, I added a DEBUG log, and i can see its able to connect to cluster with the bootstrap server. The topic is single partitioned, and I am passing a message to it. The console consumer is able to consume the message, but it's not printing any message. Also the offset is seeked to latest offset in the DEBUG. Let's say the offset is re-setted to 5, when I have 4 message published. I even tried connecting to confluent cluster, the connection had no issue.

message: "[Consumer clientId=consumer-tracker-single_topic-0_offset_consumer_1139498349_consumer-group-29, groupId=tracker-single_topic-0_offset_consumer_1139498349_consumer-group] Seeking to LATEST offset of partition single_topic-0"

message: "[Consumer clientId=consumer-tracker-single_topic-0_offset_consumer_1139498349_consumer-group-29, groupId=tracker-single_topic-0_offset_consumer_1139498349_consumer-group] Resetting offset for partition single_topic-0 to offset 5."

message: "[Consumer clientId=consumer-tracker-single_topic-0_offset_consumer_1321183567_consumer-group-31, groupId=tracker-single_topic-0_offset_consumer_1321183567_consumer-group] Subscribed to partition(s): single_topic-0"

what could be wrong? Extra logs that could help

INFO:root:severity: INFO
timestamp {
  seconds: 1683957338
  nanos: 902000000
}
message: "Kafka version: 2.4.1"
instruction_id: "bundle_1"
transform_id: "ReadFromKafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/KafkaIO.ReadSourceDescriptors/ParDo(Unbounded)/ParMultiDo(Unbounded)/PairWithRestriction"
log_location: "org.apache.kafka.common.utils.AppInfoParser"
thread: "25"

INFO:root:severity: INFO
timestamp {
  seconds: 1683957338
  nanos: 903000000
}
message: "Kafka commitId: c57222ae8cd7866b"
instruction_id: "bundle_1"
transform_id: "ReadFromKafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/KafkaIO.ReadSourceDescriptors/ParDo(Unbounded)/ParMultiDo(Unbounded)/PairWithRestriction"
log_location: "org.apache.kafka.common.utils.AppInfoParser"
thread: "25"
OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
Amar Kumar
  • 31
  • 8
  • Some Reference : https://stackoverflow.com/questions/66151919/apache-beam-python-sdk-readfromkafka-does-not-receive-data – Amar Kumar May 13 '23 at 11:35

0 Answers0