I am facing with a supposedly simple problem which anyway is making me scratching my head against wall.
I've set up a Kafka cluster (MSK in AWS) with one topic and 200 partitions, right now the topic has collected 100M events and 1TB of data size.
MSK is configured with 6 broker kafka.m5.4xlarge and this is the basic config:
log.retention.ms = 300000
message.max.bytes = 10485760
replica.fetch.max.bytes = 10485760
replica.fetch.response.max.bytes = 10485760
socket.receive.buffer.bytes = 10485760
socket.request.max.bytes = 10485760
socket.send.buffer.bytes = 10485760
I want to process these events one by one using a Spark cluster, so I have created a simple Spark job with this code:
from pyspark.sql import SparkSession
from src import util, event_schema
def print_row(row):
print(row)
if __name__ == "__main__":
config = util.get_config()
spark_session = SparkSession \
.builder \
.appName('test') \
.getOrCreate()
# Read from kafka
events_df = spark_session.readStream \
.format('kafka') \
.option('kafka.bootstrap.servers', config['kafka']['bootstrap_servers']) \
.option('kafka.sasl.jaas.config', f'org.apache.kafka.common.security.scram.ScramLoginModule required username="{config["kafka"]["username"]}" password="{config["kafka"]["password"]}";') \
.option('kafka.sasl.mechanism', 'SCRAM-SHA-512') \
.option('kafka.security.protocol', 'SASL_SSL') \
.option('subscribe', config['kafka']['topic']) \
.option('groupIdPrefix', 'test') \
.option('failOnDataLoss', 'false') \
.load()
events_df = events_df.selectExpr('CAST(value AS STRING) as data')
events_df = events_df.select(explode(split(events_df.data, '\n')))
events_df = events_df.select(from_json(col('col'), event_schema).alias('value'))
events_df = events_df.selectExpr('value.*')
events_df.writeStream \
.foreach(print_row) \
.start()
This simple Spark job should start consuming every single event and printing that event.
When the topic is empty, it correctly start consuming, however if I attach this consumer group to the existing topic with this amount of data, it simply doesn't start consuming at all, like if it's stuck. The same doesn't happen if I write a simple Kafka consumer (not using PySpark), it correctly starts consuming (even tho it takes few mins to start).
What is wrong with my code and how could I be able to simply start consuming events from Kafka topic straight away?
Thanks