0

I have 2 kafka consumers that share the same consumer group id but subscribe to different topics. Each of them only has access to read from its corresponding topic.

When the 1st consumer is run it gets assigned the partitions from the topic it subscribes to. And when the 2nd consumer is run as well, the consumer group rebalances (causing the partitions assigned to the 1st consumer to get revoked). So far, so good. This is consistent with the discussion in Kafka Consumer Group Id and consumer rebalance issue.

But then, i start seeing TOPIC_AUTHORIZATION_FAILED in consumer 1 - apparently it tries to access the other topic that it doesn't subscribe to and has no access over. From this point forward the consumer doesn't move forward and keeps erroring out.

I was expecting that post rebalance, consumer 1 would get reassigned the partitions from the topic it subscribes to and continue on its way. Why is consumer 1 trying to access the other topic / How do i fix this?

Logs

common-request-test : Consumer 1's topic

common-request-dev: Consumer 2's topic

Below are the Consumer 1 logs (Notice the last couple of lines where it tries to access common-request-dev) -

{"@timestamp":"2021-10-22T07:39:17.550Z","message":"onPartitionsAssigned: [common-request-test-2, common-request-test-3, common-request-test-0, common-request-test-1]","severity":"INFO","service":"stream","x-correlation-id":"","client-id":"","account-id":"","trace":"0781f628a70f1e76","thread":"reactive-kafka-stream-1","class":"c.m.l.e.s.a.config.KafkaReceiverConfig","ex":""}

{"@timestamp":"2021-10-22T07:39:17.853Z","message":"[Consumer clientId=consumer-stream-1, groupId=stream] Found no committed offset for partition common-request-test-2","severity":"INFO","service":"stream","x-correlation-id":"","client-id":"","account-id":"","trace":"0781f628a70f1e76","thread":"reactive-kafka-stream-1","class":"o.a.k.c.c.internals.ConsumerCoordinator","ex":""}
{"@timestamp":"2021-10-22T07:39:17.857Z","message":"[Consumer clientId=consumer-stream-1, groupId=stream] Setting offset for partition common-request-test-3 to the committed offset FetchPosition{offset=5, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafka1-devtest1:9093 (id: 0 rack: null)], epoch=0}}","severity":"INFO","service":"stream","x-correlation-id":"","client-id":"","account-id":"","trace":"0781f628a70f1e76","thread":"reactive-kafka-stream-1","class":"o.a.k.c.c.internals.ConsumerCoordinator","ex":""}
{"@timestamp":"2021-10-22T07:39:17.858Z","message":"[Consumer clientId=consumer-stream-1, groupId=stream] Setting offset for partition common-request-test-0 to the committed offset FetchPosition{offset=4, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafka1-devtest1:9093 (id: 0 rack: null)], epoch=0}}","severity":"INFO","service":"stream","x-correlation-id":"","client-id":"","account-id":"","trace":"0781f628a70f1e76","thread":"reactive-kafka-stream-1","class":"o.a.k.c.c.internals.ConsumerCoordinator","ex":""}
{"@timestamp":"2021-10-22T07:39:17.858Z","message":"[Consumer clientId=consumer-stream-1, groupId=stream] Setting offset for partition common-request-test-1 to the committed offset FetchPosition{offset=4, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafka2-devtest1:9093 (id: 1 rack: null)], epoch=0}}","severity":"INFO","service":"stream","x-correlation-id":"","client-id":"","account-id":"","trace":"0781f628a70f1e76","thread":"reactive-kafka-stream-1","class":"o.a.k.c.c.internals.ConsumerCoordinator","ex":""}
{"@timestamp":"2021-10-22T07:39:19.382Z","message":"[Consumer clientId=consumer-stream-1, groupId=stream] Resetting offset for partition common-request-test-2 to offset 0.","severity":"INFO","service":"stream","x-correlation-id":"","client-id":"","account-id":"","trace":"0781f628a70f1e76","thread":"reactive-kafka-stream-1","class":"o.a.k.c.c.internals.SubscriptionState","ex":""}

{"@timestamp":"2021-10-22T07:43:21.598Z","message":"[Consumer clientId=consumer-stream-1, groupId=stream] Attempt to heartbeat failed since group is rebalancing","severity":"INFO","service":"stream","x-correlation-id":"","client-id":"","account-id":"","trace":"0781f628a70f1e76","thread":"reactive-kafka-stream-1","class":"o.a.k.c.c.internals.AbstractCoordinator","ex":""}
{"@timestamp":"2021-10-22T07:43:21.599Z","message":"[Consumer clientId=consumer-stream-1, groupId=stream] Revoke previously assigned partitions common-request-test-2, common-request-test-3, common-request-test-0, common-request-test-1","severity":"INFO","service":"stream","x-correlation-id":"","client-id":"","account-id":"","trace":"0781f628a70f1e76","thread":"reactive-kafka-stream-1","class":"o.a.k.c.c.internals.ConsumerCoordinator","ex":""}

{"@timestamp":"2021-10-22T07:43:21.599Z","message":"onPartitionsRevoked: [common-request-test-2, common-request-test-3, common-request-test-0, common-request-test-1]","severity":"INFO","service":"stream","x-correlation-id":"","client-id":"","account-id":"","trace":"0781f628a70f1e76","thread":"reactive-kafka-stream-1","class":"c.m.l.e.s.a.config.KafkaReceiverConfig","ex":""}

{"@timestamp":"2021-10-22T07:43:21.599Z","message":"[Consumer clientId=consumer-stream-1, groupId=stream] (Re-)joining group","severity":"INFO","service":"stream","x-correlation-id":"","client-id":"","account-id":"","trace":"0781f628a70f1e76","thread":"reactive-kafka-stream-1","class":"o.a.k.c.c.internals.AbstractCoordinator","ex":""}
{"@timestamp":"2021-10-22T07:43:22.174Z","message":"[Consumer clientId=consumer-stream-1, groupId=stream] Error while fetching metadata with correlation id 920 : {common-request-dev=TOPIC_AUTHORIZATION_FAILED}","severity":"WARN","service":"stream","x-correlation-id":"","client-id":"","account-id":"","trace":"0781f628a70f1e76","thread":"reactive-kafka-stream-1","class":"org.apache.kafka.clients.NetworkClient","ex":""}

{"@timestamp":"2021-10-22T07:43:22.174Z","message":"[Consumer clientId=consumer-stream-1, groupId=stream] Topic authorization failed for topics [common-request-dev]","severity":"ERROR","service":"stream","x-correlation-id":"","client-id":"","account-id":"","trace":"0781f628a70f1e76","thread":"reactive-kafka-stream-1","class":"org.apache.kafka.clients.Metadata","ex":""}

{"@timestamp":"2021-10-22T07:43:22.175Z","message":"[Consumer clientId=consumer-stream-1, groupId=stream] Join group failed with org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [common-request-dev]","severity":"INFO","service":"stream","x-correlation-id":"","client-id":"","account-id":"","trace":"0781f628a70f1e76","thread":"reactive-kafka-stream-1","class":"o.a.k.c.c.internals.AbstractCoordinator","ex":""}
ankush
  • 167
  • 1
  • 11
  • Why do you want to share same consumer group id between consumers that seem independent as per your explanation ? – Rishabh Sharma Oct 22 '21 at 11:06
  • @RishabhSharma - I have 2 different environments for the same application (dev & test). Each environment has a different topic name (common-request-dev & common-request-test). Consumer group id is same for the app in dev & test. – ankush Oct 22 '21 at 11:14
  • but that still does not justify keeping the consumer group id same. As the name suggests, consumer group id should be grouping of related consumers so that they can share the consumer partitions assigned. For your use case, simply have different consumer group ids. You can even append the env to the base string to have ids like my-consumergroup-test and my-consumergroup-dev. Don't have independent consumers in the same consumer group. – Rishabh Sharma Oct 22 '21 at 11:36
  • @RishabhSharma - I cannot use arbitrary consumer group id's. I have only 1 consumer group id provisioned to consume from the topics at the moment. It is an infra level constraint. Each consumer is subscribed to a different topic, so they should not be assigned partitions from topics that they are not subscribed to. Looking for suggestions to see if this can work with a single consumer group id. – ankush Oct 22 '21 at 11:48
  • Yes, one group can read multipler topics. No, you cannot have multiple applications in the same group processing data of the same partitions. Why not write dev and test data into the same topic, but maybe more partitions, if it needs to be processed together? What other consumer is running that needs the topics to be separate? – OneCricketeer Oct 22 '21 at 13:32

1 Answers1

1

Sounds like you try either using the StickyAssignor for your consumer config, or use assign, not subscribe (or talk to the person maintaining the ACL policies, and tell them what access you need)

Otherwise, yes, rebalancing happens for the whole group's topic set. You're subscribing to a topic for the group, not a single consumer.

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
  • Thanks for your suggestions. I will try the StickyAssignor. I am fine with the rebalance happening for the whole group. Any idea why post rebalance the consumers are trying to access topics that they are not subscribed to? – ankush Oct 22 '21 at 14:10
  • Read the rest of the answer? Like I said, you're subscribing the topic to all instances in the group, not a particular instance of the consumer to a topic. If that's what you want, use assign – OneCricketeer Oct 22 '21 at 14:12
  • Thanks! So, just so that i understand - If consumer-1 subscribes to Topic-A & consumer-2 subscribes to Topic-B and both consumers have the same consumer group id, then post rebalance consumer-1 can be assigned partitions from Topic-B & consumer-2 can be assigned partitions from Topic-A? – ankush Oct 22 '21 at 14:24
  • That's correct. – OneCricketeer Oct 22 '21 at 15:17