1

I am facing with a supposedly simple problem which anyway is making me scratching my head against wall.

I've set up a Kafka cluster (MSK in AWS) with one topic and 200 partitions, right now the topic has collected 100M events and 1TB of data size.

MSK is configured with 6 broker kafka.m5.4xlarge and this is the basic config:

log.retention.ms = 300000
message.max.bytes = 10485760
replica.fetch.max.bytes = 10485760
replica.fetch.response.max.bytes = 10485760
socket.receive.buffer.bytes = 10485760
socket.request.max.bytes = 10485760
socket.send.buffer.bytes = 10485760

I want to process these events one by one using a Spark cluster, so I have created a simple Spark job with this code:

from pyspark.sql import SparkSession

from src import util, event_schema

def print_row(row):
    print(row)

if __name__ == "__main__":
    config = util.get_config()

    spark_session = SparkSession \
        .builder \
        .appName('test') \
        .getOrCreate()

    # Read from kafka
    events_df = spark_session.readStream \
        .format('kafka') \
        .option('kafka.bootstrap.servers', config['kafka']['bootstrap_servers']) \
        .option('kafka.sasl.jaas.config', f'org.apache.kafka.common.security.scram.ScramLoginModule required username="{config["kafka"]["username"]}" password="{config["kafka"]["password"]}";') \
        .option('kafka.sasl.mechanism', 'SCRAM-SHA-512') \
        .option('kafka.security.protocol', 'SASL_SSL') \
        .option('subscribe', config['kafka']['topic']) \
        .option('groupIdPrefix', 'test') \
        .option('failOnDataLoss', 'false') \
        .load()

    events_df = events_df.selectExpr('CAST(value AS STRING) as data')
    events_df = events_df.select(explode(split(events_df.data, '\n')))
    events_df = events_df.select(from_json(col('col'), event_schema).alias('value'))
    events_df = events_df.selectExpr('value.*')
    
    events_df.writeStream \
        .foreach(print_row) \
        .start()

This simple Spark job should start consuming every single event and printing that event.

When the topic is empty, it correctly start consuming, however if I attach this consumer group to the existing topic with this amount of data, it simply doesn't start consuming at all, like if it's stuck. The same doesn't happen if I write a simple Kafka consumer (not using PySpark), it correctly starts consuming (even tho it takes few mins to start).

What is wrong with my code and how could I be able to simply start consuming events from Kafka topic straight away?

Thanks

int 2Eh
  • 525
  • 1
  • 5
  • 12
  • Could you check if the topic is balanced, i.e if there are records in every partition? This thread includes info on how to do: https://stackoverflow.com/questions/35432326/how-to-get-latest-offset-for-a-partition-for-a-kafka-topic What _could_ happen is that some partition have no or very little record and Spark is waiting for enough input from all partitions before making progress – Svend Jul 13 '22 at 08:26

1 Answers1

1

Please follow the spark-kafka integration guide while reading stream from kafka into spark. Try to explore following options present on the same page referred by above link:

  1. startingOffsets: This is the latest by default in case of reading stream which means application will be able to read the new events arriving into kafka after spark application deployment. If you want to read the historic non processed events, try to use value earliest for this option. Look for checkpointing as well.
  2. maxOffsetsPerTrigger: This would be useful if in every trigger you want to read limited events only.
yadavlpsir
  • 101
  • 3