I am working with Spark for the first time and I read the docs here but i am still struggling to understand what maxOffsetsPerTrigger
setting does. Specifically, what does a "trigger interval" mean here? I am suspecting that this is a spark setting and not part of Kafka. So to debug this, i push multiple messages into my topic, set {maxOffsetsPerTrigger: 1}
, and submit my job. In the logs, I can see that the offset is incrementing by 1 for all of the partitions in the topic. But when I submit my job in a subsequent run, it is starting from the very beginning of the topic again. What am I missing here?
For some context, I have a Kafka topic (with a -1 retention setting) that I would like to ingest using the PySpark framework, but for some reason the topic offset is not being committed and my PySpark application keeps reading from the beginning every time I submit my job with spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.2 test.py
. Here is my test.py
:
from pyspark.sql import DataFrame, SparkSession
# create spark session
spark_session=SparkSession\
.builder.master("local")\
.appName("local-test")\
.config("spark.executor.cores", "1")\
.config("spark.executor.instances", "1")\
.config("spark.sql.shuffle.partitions", "1")\
.config("spark.executor.memory", "5g") \
.config("spark.driver.memory", "5g") \
.getOrCreate()
# start reading from Kafka
df: DataFrame = spark_session\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers"\
, "broker1:6667\
,broker2:6667\
,broker3:6667\
,broker4:6667")\
.option("subscribe", "my_topic")\
.option("startingOffsets", "earliest")\
.option("includeHeaders", "true")\
.option("maxOffsetsPerTrigger", 1)\
.load()
df.createTempView("temp_table")
# write temp table contents to console
spark_session.table("temp").writeStream\
.format("console")\
.start()\
.awaitTermination()
df.show()