1

It seems that with the new Spark structured streaming, we can no longer pass group id as an option when reading from Kafka.

Kafka option 'group.id' is not supported as user-specified consumer groups are not used to track offsets.

Is there a way to force structured streaming to use given Kafka group id?

Code:

val df = spark
  .readStream
  .format("kafka")
  .option("subscribe", "topic")
  .option("startingOffsets", "earliest")
  .option("kafka.group.id", "idThatShouldBeUsed")
  .option("kafka.bootstrap.servers", "server")
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.sasl.mechanism", "PLAIN")
  .option("kafka.ssl.truststore.location", "/location)
  .option("kafka.ssl.truststore.password", "pass")
  .option("kafka.sasl.jaas.config", """jaasToUse""")
  .load()
  .writeStream
  .outputMode("append")
  .format("console")
  .option("startingOffsets", "earliest")
  .start().awaitTermination()

produces:

java.lang.IllegalArgumentException: Kafka option 'group.id' is not supported as user-specified consumer groups are not used to track offsets.**
  at org.apache.spark.sql.kafka010.KafkaSourceProvider.validateGeneralOptions(KafkaSourceProvider.scala:347)
  at org.apache.spark.sql.kafka010.KafkaSourceProvider.validateStreamOptions(KafkaSourceProvider.scala:402)
  at org.apache.spark.sql.kafka010.KafkaSourceProvider.sourceSchema(KafkaSourceProvider.scala:70)
  at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:208)
  at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:94)
  at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:94)
  at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:33)
  at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:172)
  ... 57 elided

If we set Spark log level to INFO, we can see that the group id that is used is completely different:

INFO consumer.ConsumerConfig: ConsumerConfig values:
        group.id = spark-kafka-source-625f97d6-59ed-4a72-90f6-c4add9c3a2a7--849027099-driver-0

Any idea how to make it use the right group?

mazaneicha
  • 8,794
  • 4
  • 33
  • 52
Tomáš Sedloň
  • 153
  • 1
  • 9

0 Answers0