0

I'm new to Kafka, and using @KafkaListener (spring) to define kafka consumer. I would like to check whether its possible to manually assign the partition to the consumer in runtime.

For example, when the application starts I don't want to "consume" any data. I'm using currently @KafkaListener(autoStartup=false ... ) for that purpose.

At some point, I'm supposed to get a notification (from another part of the application) that contains a partitionId to work on, so I would like to "skip" to the latest available offset of that partition because I don't need to consume the data that has happened to already exist there and "associate" the KafkaConsumer with the partitionId from that notification.

Later on I might get a notification to "Stop listening to this partition", despite the fact the the producer that exists somewhere else keeps writing to that topic and to that partition, so I should "unlink" the consumer from the partition and stop getting messages.

I saw there is a org.springframework.kafka.annotation.TopicPartition but it provides a way to specify a "static" association, so I'm looking for a "dynamic" way to do so.

I guess I could resort to the low-level Kafka Client API but I would really prefer to use spring here.

UPDATE

I use topic cnp_multi_partition_test_topic with 3 partitions.

My Current Code that tries to manage partitions dynamically from the consumer looks like this:

@Slf4j
public class SampleKafkaConsumer {   
    @KafkaListener(id = Constants.CONSUMER_ID, topics = Constants.TEST_TOPIC, autoStartup = "false")
    public void consumePartition(@Payload String data, @Headers MessageHeaders messageHeaders) {
        Object partitionId = messageHeaders.get(KafkaHeaders.RECEIVED_PARTITION_ID);
        Object sessionId    = messageHeaders.get(KafkaHeaders.RECEIVED_MESSAGE_KEY);
        log.info("Consuming from partition: [ {} ] message: Key = [ {} ], content = [ {} ]",partitionId, sessionId,  data);
    }
}
@RequiredArgsConstructor
public class MultiPartitionKafkaConsumerManager {

    private final KafkaListenerEndpointRegistry registry;
    private final ConcurrentKafkaListenerContainerFactory<String, String> factory;
    private final UUIDProvider uuidProvider;
    private ConcurrentMessageListenerContainer<String, String> container;

    public void assignPartitions(List<Integer> partitions) {
        if(container != null) {
            container.stop();
            container = null;
        }
        if(partitions.isEmpty()) {
            return;
        }
        var newTopicPartitionOffsets = prepareTopicPartitionOffsets(partitions);
        container =
                factory.createContainer(newTopicPartitionOffsets);
        container.getContainerProperties().setMessageListener(
                registry.getListenerContainer(Constants.CONSUMER_ID).getContainerProperties().getMessageListener());
        // random group
        container.getContainerProperties().setGroupId("sampleGroup-" + uuidProvider.getUUID().toString());
        container.setConcurrency(1);
        container.start();
    }

    private TopicPartitionOffset[] prepareTopicPartitionOffsets(List<Integer> partitions) {
        return partitions.stream()
                .map(p -> new TopicPartitionOffset(TEST_TOPIC, p, 0L, TopicPartitionOffset.SeekPosition.END))
                .collect(Collectors.toList())
                .toArray(new TopicPartitionOffset[] {});
    }
}

Both are Spring beans (singletons) managed through java configuration.

The producer is generating 3 messages every second and sends it into 3 partitions of the test topic. I've used kafka UI tool to make sure that indeed all the messages arrive as expected I use an @EventListener and @Async to make it happen concurrently.

Here is how do I try to simulate the work:


@SpringBootTest // kafka is available, omitted for brevity
public class MyTest {
    @Autowired
    MultiPartitionKafkaConsumerManager manager;
    
    @Test
    public void test_create_kafka_consumer_with_manual_partition_management() throws InterruptedException {
        log.info("Starting the test");
        sleep(5_000);
        log.info("Start listening on partition 0");
        manager.assignPartitions(List.of(0));
        sleep(10_000);
        log.info("Start listening on partition 0,2");
        manager.assignPartitions(List.of(0,2));
        sleep(10_000);
        log.info("Do not listen on partition 0 anymore");
        manager.assignPartitions(List.of(2));
        sleep(10_000);
        log.info("Do not listen on partition 2 anymore - 0 partitions to listen");
        manager.assignPartitions(Collections.emptyList());
        sleep(10_000);

Logs show the following:

06:34:20.164 [main] INFO  c.h.c.p.g.m.SamplePartitioningTest - Starting the test
06:34:25.169 [main] INFO  c.h.c.p.g.m.SamplePartitioningTest - Start listening on partition 0
06:34:25.360 [main] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka version: 2.5.1
06:34:25.360 [main] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka commitId: 0efa8fb0f4c73d92
06:34:25.361 [main] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1633664065360
06:34:25.405 [main] INFO  o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-sampleGroup-96640bc4-e34f-4ade-9ff9-7a2d0bdf38c9-1, groupId=sampleGroup-96640bc4-e34f-4ade-9ff9-7a2d0bdf38c9] Subscribed to partition(s): cnp_multi_partition_test_topic-0
06:34:25.422 [main] INFO  o.s.s.c.ThreadPoolTaskScheduler - Initializing ExecutorService
06:34:25.429 [consumer-0-C-1] INFO  o.a.k.c.c.i.SubscriptionState - [Consumer clientId=consumer-sampleGroup-96640bc4-e34f-4ade-9ff9-7a2d0bdf38c9-1, groupId=sampleGroup-96640bc4-e34f-4ade-9ff9-7a2d0bdf38c9] Seeking to LATEST offset of partition cnp_multi_partition_test_topic-0
06:34:35.438 [main] INFO  c.h.c.p.g.m.SamplePartitioningTest - Start listening on partition 0,2
06:34:35.445 [consumer-0-C-1] INFO  o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-sampleGroup-96640bc4-e34f-4ade-9ff9-7a2d0bdf38c9-1, groupId=sampleGroup-96640bc4-e34f-4ade-9ff9-7a2d0bdf38c9] Unsubscribed all topics or patterns and assigned partitions
06:34:35.445 [consumer-0-C-1] INFO  o.s.s.c.ThreadPoolTaskScheduler - Shutting down ExecutorService
06:34:35.453 [consumer-0-C-1] INFO  o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - sampleGroup-96640bc4-e34f-4ade-9ff9-7a2d0bdf38c9: Consumer stopped
06:34:35.467 [main] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka version: 2.5.1
06:34:35.467 [main] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka commitId: 0efa8fb0f4c73d92
06:34:35.467 [main] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1633664075467
06:34:35.486 [main] INFO  o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-sampleGroup-05fb12f3-aba1-4918-bcf6-a1f840de13eb-2, groupId=sampleGroup-05fb12f3-aba1-4918-bcf6-a1f840de13eb] Subscribed to partition(s): cnp_multi_partition_test_topic-0, cnp_multi_partition_test_topic-2
06:34:35.487 [main] INFO  o.s.s.c.ThreadPoolTaskScheduler - Initializing ExecutorService
06:34:35.489 [consumer-0-C-1] INFO  o.a.k.c.c.i.SubscriptionState - [Consumer clientId=consumer-sampleGroup-05fb12f3-aba1-4918-bcf6-a1f840de13eb-2, groupId=sampleGroup-05fb12f3-aba1-4918-bcf6-a1f840de13eb] Seeking to LATEST offset of partition cnp_multi_partition_test_topic-0
06:34:35.489 [consumer-0-C-1] INFO  o.a.k.c.c.i.SubscriptionState - [Consumer clientId=consumer-sampleGroup-05fb12f3-aba1-4918-bcf6-a1f840de13eb-2, groupId=sampleGroup-05fb12f3-aba1-4918-bcf6-a1f840de13eb] Seeking to LATEST offset of partition cnp_multi_partition_test_topic-2
06:34:45.502 [main] INFO  c.h.c.p.g.m.SamplePartitioningTest - Do not listen on partition 0 anymore
06:34:45.503 [consumer-0-C-1] INFO  o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-sampleGroup-05fb12f3-aba1-4918-bcf6-a1f840de13eb-2, groupId=sampleGroup-05fb12f3-aba1-4918-bcf6-a1f840de13eb] Unsubscribed all topics or patterns and assigned partitions
06:34:45.503 [consumer-0-C-1] INFO  o.s.s.c.ThreadPoolTaskScheduler - Shutting down ExecutorService
06:34:45.510 [consumer-0-C-1] INFO  o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - sampleGroup-05fb12f3-aba1-4918-bcf6-a1f840de13eb: Consumer stopped
06:34:45.527 [main] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka version: 2.5.1
06:34:45.527 [main] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka commitId: 0efa8fb0f4c73d92
06:34:45.527 [main] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1633664085527
06:34:45.551 [main] INFO  o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-sampleGroup-5e12d8c7-5900-434a-959f-98b14adda698-3, groupId=sampleGroup-5e12d8c7-5900-434a-959f-98b14adda698] Subscribed to partition(s): cnp_multi_partition_test_topic-2
06:34:45.551 [main] INFO  o.s.s.c.ThreadPoolTaskScheduler - Initializing ExecutorService
06:34:45.554 [consumer-0-C-1] INFO  o.a.k.c.c.i.SubscriptionState - [Consumer clientId=consumer-sampleGroup-5e12d8c7-5900-434a-959f-98b14adda698-3, groupId=sampleGroup-5e12d8c7-5900-434a-959f-98b14adda698] Seeking to LATEST offset of partition cnp_multi_partition_test_topic-2
06:34:55.560 [main] INFO  c.h.c.p.g.m.SamplePartitioningTest - Do not listen on partition 2 anymore - 0 partitions to listen
06:34:55.561 [consumer-0-C-1] INFO  o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-sampleGroup-5e12d8c7-5900-434a-959f-98b14adda698-3, groupId=sampleGroup-5e12d8c7-5900-434a-959f-98b14adda698] Unsubscribed all topics or patterns and assigned partitions
06:34:55.562 [consumer-0-C-1] INFO  o.s.s.c.ThreadPoolTaskScheduler - Shutting down ExecutorService
06:34:55.576 [consumer-0-C-1] INFO  o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - sampleGroup-5e12d8c7-5900-434a-959f-98b14adda698: Consumer stopped

So I do see that the consumer is started, it even tries to poll the records internally, but I think I see the WakeupException thrown and "swallowed" by a proxy. I'm not sure I understand why does it happen?

Mark Bramnik
  • 39,963
  • 4
  • 57
  • 97
  • I think you may not be able to assign partitions dynamically. Its already discussed here https://stackoverflow.com/questions/32761598/is-it-possible-to-create-a-kafka-topic-with-dynamic-partition-count – Amit Mahajan Oct 06 '21 at 13:04

1 Answers1

2

You can't change manual assignments at runtime. There are several ways to achieve your desired result.

You can declare the listener in a prototype bean; see Can i add topics to my @kafkalistener at runtime

You can use the listener container factory to create a new container with the appropriate topic configuration and copy the listener from the statically declared container.

I can provide an example of the latter if needed.

...

EDIT

Here's an example for the second technique...

@SpringBootApplication
public class So69465733Application {

    public static void main(String[] args) {
        SpringApplication.run(So69465733Application.class, args);
    }

    @KafkaListener(id = "dummy", topics = "dummy", autoStartup = "false")
    void listen(String in) {
        System.out.println(in);
    }

    @Bean
    ApplicationRunner runner(KafkaListenerEndpointRegistry registry,
            ConcurrentKafkaListenerContainerFactory<String, String> factory) {

        return args -> {
            System.out.println("Hit Enter to create a container for topic1, partition0");
            System.in.read();
            ConcurrentMessageListenerContainer<String, String> container1 =
                    factory.createContainer(new TopicPartitionOffset("topic1", 0, SeekPosition.END));
            container1.getContainerProperties().setMessageListener(
                    registry.getListenerContainer("dummy").getContainerProperties().getMessageListener());
            container1.getContainerProperties().setGroupId("topic1-0-group2");
            container1.start();

            System.out.println("Hit Enter to create a container for topic2, partition0");
            System.in.read();
            ConcurrentMessageListenerContainer<String, String> container2 =
                    factory.createContainer(new TopicPartitionOffset("topic2", 0, SeekPosition.END));
            container2.getContainerProperties().setMessageListener(
                    registry.getListenerContainer("dummy").getContainerProperties().getMessageListener());
            container2.getContainerProperties().setGroupId("topic2-0-group2");
            container2.start();

            System.in.read();
            container1.stop();
            container2.stop();
        };
    }

}

EDIT

Log after sending records to topic1, topic2 from the command-line producer.

Hit Enter to create a container for topic1, partition0

ConsumerConfig values: 
...

Kafka version: 2.7.1
Kafka commitId: 61dbce85d0d41457
Kafka startTimeMs: 1633622966736
[Consumer clientId=consumer-topic1-0-group2-1, groupId=topic1-0-group2] Subscribed to partition(s): topic1-0

Hit Enter to create a container for topic2, partition0
[Consumer clientId=consumer-topic1-0-group2-1, groupId=topic1-0-group2] Seeking to LATEST offset of partition topic1-0
[Consumer clientId=consumer-topic1-0-group2-1, groupId=topic1-0-group2] Cluster ID: ppGfIGsZTUWRTNmRXByfZg
[Consumer clientId=consumer-topic1-0-group2-1, groupId=topic1-0-group2] Resetting offset for partition topic1-0 to position FetchPosition{offset=2, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}.

ConsumerConfig values: 
...

Kafka version: 2.7.1
Kafka commitId: 61dbce85d0d41457
Kafka startTimeMs: 1633622969071
[Consumer clientId=consumer-topic2-0-group2-2, groupId=topic2-0-group2] Subscribed to partition(s): topic2-0

Hit Enter to stop containers
[Consumer clientId=consumer-topic2-0-group2-2, groupId=topic2-0-group2] Seeking to LATEST offset of partition topic2-0
[Consumer clientId=consumer-topic2-0-group2-2, groupId=topic2-0-group2] Cluster ID: ppGfIGsZTUWRTNmRXByfZg
[Consumer clientId=consumer-topic2-0-group2-2, groupId=topic2-0-group2] Resetting offset for partition topic2-0 to position FetchPosition{offset=2, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}.
record from topic1
[Consumer clientId=consumer-topic1-0-group2-1, groupId=topic1-0-group2] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
record from topic2
[Consumer clientId=consumer-topic2-0-group2-2, groupId=topic2-0-group2] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
Application shutdown requested.
Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • I added an example for the second technique. – Gary Russell Oct 06 '21 at 13:35
  • Thanks a lot, Gary, I'll try that tomorrow, and will accept the answer. – Mark Bramnik Oct 06 '21 at 14:07
  • Unfortunately I couldn't make it work. The first method looks problematic because I don't think I can create a supplier that will return a list of annotations of `@TopicPartition` dynamically (because I can't create an instance of annotation), unlike the topic in the example. The method you've provided in the answer to this question looks promicing, but for some reason the message are not consumed, as if it doesn't "subscribe" to the topic... – Mark Bramnik Oct 07 '21 at 16:05
  • It works fine for me - I added the logs - did you `start()` the container? – Gary Russell Oct 07 '21 at 16:13
  • Thanks a lot, Gary, yes I start the container. I've added logs and the relevant code that I have now. I might be something that I don't understand about the kafka itself as well, I'm pretty new to this technology. – Mark Bramnik Oct 08 '21 at 03:50
  • Can you run with DEBUG logging and post someplace like pastebin or a GitHub gist? I am off today so won’t be able to look until Monday but maybe the debug log will help you figure it out. You should only see a wake up during the stop. – Gary Russell Oct 08 '21 at 13:02
  • Or post the complete project someplace. – Gary Russell Oct 08 '21 at 13:03
  • Thank you again, I've managed to make it work with my example! It was some other part of infrastructure that used to load property `ConsumerConfig.AUTO_OFFSET_RESET_CONFIG="earliest"` - with this property it didn't work. Without this property it looks great. Upvoting and accepting the answer :) – Mark Bramnik Oct 08 '21 at 19:57