2

I am trying to read data from Kafka via spark structured streaming. However, in Spark 2.4.0., you cannot set group id for the stream (see How to set group.id for consumer group in kafka data source in Structured Streaming?).

However, as this is not set, spark simply generates the group Id and I am stuck at GroupAuthorizationException:

19/12/10 15:15:00 ERROR streaming.MicroBatchExecution: Query [id = 747090ff-120f-4a6d-b20e-634eb77ac7b8, runId = 63aa4cce-ad72-47f2-80f6-e87947b69685] terminated with error
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: spark-kafka-source-d2420426-13d5-4bda-ad21-7d8e43ebf518-1874352823-driver-2

Any ideas how to bypass this please? Funny thing is, I am able to read this data via kafka-console-consumer.sh, where I am able to pass the group id in a .properties file.

Code throwing the exception:

val df = spark
  .readStream
  .format("kafka")
  .option("subscribe", "topic")
  .option("startingOffsets", "earliest")
  .option("kafka.group.id", "idThatShouldBeUsed")
  .option("kafka.bootstrap.servers", "server")
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.sasl.mechanism", "PLAIN")
  .option("kafka.ssl.truststore.location", "/location)
  .option("kafka.ssl.truststore.password", "pass")
  .option("kafka.sasl.jaas.config", """jaasToUse""")
  .load()
  .writeStream
  .outputMode("append")
  .format("console")
  .option("startingOffsets", "earliest")
  .start().awaitTermination()
mazaneicha
  • 8,794
  • 4
  • 33
  • 52
Tomáš Sedloň
  • 153
  • 1
  • 9
  • 1
    Group id shouldn't determine authentication. The JKS files and JAAS properties should – OneCricketeer Dec 11 '19 at 01:09
  • 1
    well, it seems to - the same problem can be solved by using wildcards when granting rights to group (https://stackoverflow.com/questions/48545215/spark-structured-streaming-with-secured-kafka-throwing-not-authorized-to-acces). However, I am not allowed to change these Kafka settings. – Tomáš Sedloň Dec 11 '19 at 07:58
  • I was under the impression those are "user groups", not the consumer's "group id". The Authorizer is pluggable, by the way, but you must work with your Kafka admins to adjust these settings – OneCricketeer Dec 11 '19 at 18:39

1 Answers1

5

Seems that this is not solvable from the consumer's side. We ended up having to use bin/kafka-acls.sh and wildcards to allow all group ids generated by structured streaming.

kafka acl example:

bin/kafka-acls --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=zk:2181 --add --allow-principal User:'Bon' --operation READ --topic topicName --group='spark-kafka-source-' --resource-pattern-type prefixed
Tomáš Sedloň
  • 153
  • 1
  • 9
  • 1
    Can you include the command(s) that you used? That'd be super helpful. Thanks. Please also accept your answer to make it resolved. Thanks again! – Jacek Laskowski Dec 12 '19 at 12:54