0

I am working with Spark for the first time and I read the docs here but i am still struggling to understand what maxOffsetsPerTrigger setting does. Specifically, what does a "trigger interval" mean here? I am suspecting that this is a spark setting and not part of Kafka. So to debug this, i push multiple messages into my topic, set {maxOffsetsPerTrigger: 1}, and submit my job. In the logs, I can see that the offset is incrementing by 1 for all of the partitions in the topic. But when I submit my job in a subsequent run, it is starting from the very beginning of the topic again. What am I missing here?

For some context, I have a Kafka topic (with a -1 retention setting) that I would like to ingest using the PySpark framework, but for some reason the topic offset is not being committed and my PySpark application keeps reading from the beginning every time I submit my job with spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.2 test.py. Here is my test.py:

from pyspark.sql import DataFrame, SparkSession

# create spark session
spark_session=SparkSession\
    .builder.master("local")\
    .appName("local-test")\
    .config("spark.executor.cores", "1")\
    .config("spark.executor.instances", "1")\
    .config("spark.sql.shuffle.partitions", "1")\
    .config("spark.executor.memory", "5g") \
    .config("spark.driver.memory", "5g") \
    .getOrCreate()

# start reading from Kafka
df: DataFrame = spark_session\
    .readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers"\
            , "broker1:6667\
            ,broker2:6667\
            ,broker3:6667\
            ,broker4:6667")\
    .option("subscribe", "my_topic")\
    .option("startingOffsets", "earliest")\
    .option("includeHeaders", "true")\
    .option("maxOffsetsPerTrigger", 1)\
    .load()

df.createTempView("temp_table")

# write temp table contents to console
spark_session.table("temp").writeStream\
      .format("console")\
      .start()\
      .awaitTermination()
df.show()
OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
lollerskates
  • 964
  • 1
  • 11
  • 28

1 Answers1

0

when I submit my job in a subsequent run, it is starting from the very beginning of the topic again

You're missing kafka.group.id (or groupIdPrefix), and are using "startingOffsets", "earliest"

Only if you set the consumer group ID, will any Kafka consumer commit offsets (store the offsets into Kafka topic)

Otherwise, you'll need to write your own offset management in Spark, and not use "earliest" option, as discussed in the Spark docs.

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
  • thanks for the tip @OneCricketeer. I have made the changes to specify the `kafka.group.id` but i am still seeing the same behavior: the offsets are not committed and restarting the spark application starts consuming from the beginning of the topic. After read [this](https://stackoverflow.com/questions/50844449/how-to-manually-set-group-id-and-commit-kafka-offsets-in-spark-structured-stream) [this](https://stackoverflow.com/questions/62799984/spark-structured-streaming-checkpoint-usage-in-production) post, i am thinking that i actually need to use checkpoints to manage kafka offsets instead – lollerskates Jul 07 '23 at 23:27