Questions tagged [spark-kafka-integration]

Use this tag for any Spark-Kafka integration. This tag should be used for both batch and stream processing while also covering Spark Streaming (DStreams) and Structured Streaming.

This tag is related to the spark-streaming-kafka and spark-sql-kafka libraries.

External sources:

To precise your question, you can consider adding

This tag serves as a synonym for the existing (low traffic) tag which only focuses on Spark Streaming (not batch and not Structured Streaming).

96 questions
14
votes
4 answers

How to set group.id for consumer group in kafka data source in Structured Streaming?

I want to use Spark Structured Streaming to read from a secure kafka. This means that I will need to force a specific group.id. However, as is stated in the documentation this is not possible. Still, in the databricks documentation…
11
votes
1 answer

How to manually set group.id and commit kafka offsets in spark structured streaming?

I was going through the Spark structured streaming - Kafka integration guide here. It is told at this link that enable.auto.commit: Kafka source doesn’t commit any offset. So how do I manually commit offsets once my spark application has…
7
votes
1 answer

How to use kafka.group.id and checkpoints in spark 3.0 structured streaming to continue to read from Kafka where it left off after restart?

Based on the introduction in Spark 3.0, https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html. It should be possible to set "kafka.group.id" to track the offset. For our use case, I want to avoid the potential data loss if…
5
votes
1 answer

Connecting Pyspark with Kafka

I'm having problem understanding how to connect Kafka and PySpark. I have kafka installation on Windows 10 with topic nicely streaming data. I've installed pyspark which runs properly-I'm able to create test DataFrame without problem. But when I try…
5
votes
2 answers

Spark batch reading from Kafka & using Kafka to keep track of offsets

I understand that using Kafka's own offset tracking instead of other methods (like checkpointing) is problematic for streaming jobs. However I just want to run a Spark batch job every day, reading all messages from the last offset to the most recent…
5
votes
1 answer

Spark Structured Streaming with Kafka - How to repartition the data and distribute the processing among worker nodes

If my Kafka topic receives records like CHANNEL | VIEWERS | ..... ABC | 100 | ..... CBS | 200 | ..... And I have Spark structured streaming code to read and process Kafka records as follows: val spark = SparkSession .builder…
4
votes
1 answer

testing kafka and spark with testcontainers

I am trying to check with testcontainers a streaming pipeline as a integration test but I don´t know how get bootstrapServers, at least in last testcontainers version and create a specific topic there. How can I use 'containerDef' to extract…
4
votes
1 answer

How to configure backpreasure in Spark 3 Structure Stream Kafka/Files source with Trigger.Once option

In Spark 3 Behave of backpressure option on Kafka and File Source for trigger.once scenario was changed. But I have a question. How can I configure backpressure to my job when I want to use TriggerOnce? In spark 2.4 I have a use case, to backfill…
4
votes
1 answer

Apache Spark integration with Kafka

I am following a course on Udemy about Kafka and Spark and I'm learning apache spark integration with Kafka Below is the code of apache spark SparkSession session = SparkSession.builder().appName("KafkaConsumer").master("local[*]").getOrCreate(); …
4
votes
1 answer

Spark 3.x Integration with Kafka in Python

Kafka with spark-streaming throws an error: from pyspark.streaming.kafka import KafkaUtils ImportError: No module named kafka I have already setup a kafka broker and a working spark environment with one master and one worker. import…
4
votes
2 answers

Spark Structured Streaming Kafka Offset Management

I'm looking into storing kafka offsets inside of kafka for Spark Structured Streaming, like it's working for DStreams stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges), the same I'm looking but for Structured Streaming. Is it…
4
votes
2 answers

Spark Structured Streaming Kafka error -- offset was changed

My Spark Structured Streaming application runs for a few hours before it fails with this error java.lang.IllegalStateException: Partition [partition-name] offset was changed from 361037 to 355053, some data may have been missed. Some data may have…
3
votes
0 answers

Structured Streaming - suddenly giving error while writing to (Strimzi)Kafka topic

i've a Structured Streaming code which reads data from a Kafka Topic (on a VM) & writes to another Kafka Topic on GKE (i should be using a Mirror Maker for this, but have not implemented that yet). it suddenly stopped working (been working fine for…
3
votes
1 answer

Spark AQE Post-Shuffle partitions coalesce don't work as expected, and even make data skew in some partitions. Why?

I use global sort on my spark DF, and when I enable AQE and post-shuffle coalesce, my partitions after sort operation become even worse distributed than before. "spark.sql.adaptive.enabled" -> "true", …
3
votes
1 answer

Spark Structured Streaming checkpoint usage in production

I have troubles understanding how checkpoints work when working with Spark Structured streaming. I have a spark process that generates some events, which I log in an Hive table. For those events, I receive a confirmation event in a kafka stream. I…
1
2 3 4 5 6 7