I'm new to Kafka, and using @KafkaListener
(spring) to define kafka consumer.
I would like to check whether its possible to manually assign the partition to the consumer in runtime.
For example, when the application starts I don't want to "consume" any data. I'm using currently @KafkaListener(autoStartup=false ... )
for that purpose.
At some point, I'm supposed to get a notification (from another part of the application) that contains a partitionId to work on, so I would like to "skip" to the latest available offset of that partition because I don't need to consume the data that has happened to already exist there and "associate" the KafkaConsumer with the partitionId from that notification.
Later on I might get a notification to "Stop listening to this partition", despite the fact the the producer that exists somewhere else keeps writing to that topic and to that partition, so I should "unlink" the consumer from the partition and stop getting messages.
I saw there is a org.springframework.kafka.annotation.TopicPartition
but it provides a way to specify a "static" association, so I'm looking for a "dynamic" way to do so.
I guess I could resort to the low-level Kafka Client API but I would really prefer to use spring here.
UPDATE
I use topic cnp_multi_partition_test_topic
with 3 partitions.
My Current Code that tries to manage partitions dynamically from the consumer looks like this:
@Slf4j
public class SampleKafkaConsumer {
@KafkaListener(id = Constants.CONSUMER_ID, topics = Constants.TEST_TOPIC, autoStartup = "false")
public void consumePartition(@Payload String data, @Headers MessageHeaders messageHeaders) {
Object partitionId = messageHeaders.get(KafkaHeaders.RECEIVED_PARTITION_ID);
Object sessionId = messageHeaders.get(KafkaHeaders.RECEIVED_MESSAGE_KEY);
log.info("Consuming from partition: [ {} ] message: Key = [ {} ], content = [ {} ]",partitionId, sessionId, data);
}
}
@RequiredArgsConstructor
public class MultiPartitionKafkaConsumerManager {
private final KafkaListenerEndpointRegistry registry;
private final ConcurrentKafkaListenerContainerFactory<String, String> factory;
private final UUIDProvider uuidProvider;
private ConcurrentMessageListenerContainer<String, String> container;
public void assignPartitions(List<Integer> partitions) {
if(container != null) {
container.stop();
container = null;
}
if(partitions.isEmpty()) {
return;
}
var newTopicPartitionOffsets = prepareTopicPartitionOffsets(partitions);
container =
factory.createContainer(newTopicPartitionOffsets);
container.getContainerProperties().setMessageListener(
registry.getListenerContainer(Constants.CONSUMER_ID).getContainerProperties().getMessageListener());
// random group
container.getContainerProperties().setGroupId("sampleGroup-" + uuidProvider.getUUID().toString());
container.setConcurrency(1);
container.start();
}
private TopicPartitionOffset[] prepareTopicPartitionOffsets(List<Integer> partitions) {
return partitions.stream()
.map(p -> new TopicPartitionOffset(TEST_TOPIC, p, 0L, TopicPartitionOffset.SeekPosition.END))
.collect(Collectors.toList())
.toArray(new TopicPartitionOffset[] {});
}
}
Both are Spring beans (singletons) managed through java configuration.
The producer is generating 3 messages every second and sends it into 3 partitions of the test topic. I've used kafka UI tool to make sure that indeed all the messages arrive as expected I use an @EventListener
and @Async
to make it happen concurrently.
Here is how do I try to simulate the work:
@SpringBootTest // kafka is available, omitted for brevity
public class MyTest {
@Autowired
MultiPartitionKafkaConsumerManager manager;
@Test
public void test_create_kafka_consumer_with_manual_partition_management() throws InterruptedException {
log.info("Starting the test");
sleep(5_000);
log.info("Start listening on partition 0");
manager.assignPartitions(List.of(0));
sleep(10_000);
log.info("Start listening on partition 0,2");
manager.assignPartitions(List.of(0,2));
sleep(10_000);
log.info("Do not listen on partition 0 anymore");
manager.assignPartitions(List.of(2));
sleep(10_000);
log.info("Do not listen on partition 2 anymore - 0 partitions to listen");
manager.assignPartitions(Collections.emptyList());
sleep(10_000);
Logs show the following:
06:34:20.164 [main] INFO c.h.c.p.g.m.SamplePartitioningTest - Starting the test
06:34:25.169 [main] INFO c.h.c.p.g.m.SamplePartitioningTest - Start listening on partition 0
06:34:25.360 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka version: 2.5.1
06:34:25.360 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka commitId: 0efa8fb0f4c73d92
06:34:25.361 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1633664065360
06:34:25.405 [main] INFO o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-sampleGroup-96640bc4-e34f-4ade-9ff9-7a2d0bdf38c9-1, groupId=sampleGroup-96640bc4-e34f-4ade-9ff9-7a2d0bdf38c9] Subscribed to partition(s): cnp_multi_partition_test_topic-0
06:34:25.422 [main] INFO o.s.s.c.ThreadPoolTaskScheduler - Initializing ExecutorService
06:34:25.429 [consumer-0-C-1] INFO o.a.k.c.c.i.SubscriptionState - [Consumer clientId=consumer-sampleGroup-96640bc4-e34f-4ade-9ff9-7a2d0bdf38c9-1, groupId=sampleGroup-96640bc4-e34f-4ade-9ff9-7a2d0bdf38c9] Seeking to LATEST offset of partition cnp_multi_partition_test_topic-0
06:34:35.438 [main] INFO c.h.c.p.g.m.SamplePartitioningTest - Start listening on partition 0,2
06:34:35.445 [consumer-0-C-1] INFO o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-sampleGroup-96640bc4-e34f-4ade-9ff9-7a2d0bdf38c9-1, groupId=sampleGroup-96640bc4-e34f-4ade-9ff9-7a2d0bdf38c9] Unsubscribed all topics or patterns and assigned partitions
06:34:35.445 [consumer-0-C-1] INFO o.s.s.c.ThreadPoolTaskScheduler - Shutting down ExecutorService
06:34:35.453 [consumer-0-C-1] INFO o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - sampleGroup-96640bc4-e34f-4ade-9ff9-7a2d0bdf38c9: Consumer stopped
06:34:35.467 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka version: 2.5.1
06:34:35.467 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka commitId: 0efa8fb0f4c73d92
06:34:35.467 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1633664075467
06:34:35.486 [main] INFO o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-sampleGroup-05fb12f3-aba1-4918-bcf6-a1f840de13eb-2, groupId=sampleGroup-05fb12f3-aba1-4918-bcf6-a1f840de13eb] Subscribed to partition(s): cnp_multi_partition_test_topic-0, cnp_multi_partition_test_topic-2
06:34:35.487 [main] INFO o.s.s.c.ThreadPoolTaskScheduler - Initializing ExecutorService
06:34:35.489 [consumer-0-C-1] INFO o.a.k.c.c.i.SubscriptionState - [Consumer clientId=consumer-sampleGroup-05fb12f3-aba1-4918-bcf6-a1f840de13eb-2, groupId=sampleGroup-05fb12f3-aba1-4918-bcf6-a1f840de13eb] Seeking to LATEST offset of partition cnp_multi_partition_test_topic-0
06:34:35.489 [consumer-0-C-1] INFO o.a.k.c.c.i.SubscriptionState - [Consumer clientId=consumer-sampleGroup-05fb12f3-aba1-4918-bcf6-a1f840de13eb-2, groupId=sampleGroup-05fb12f3-aba1-4918-bcf6-a1f840de13eb] Seeking to LATEST offset of partition cnp_multi_partition_test_topic-2
06:34:45.502 [main] INFO c.h.c.p.g.m.SamplePartitioningTest - Do not listen on partition 0 anymore
06:34:45.503 [consumer-0-C-1] INFO o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-sampleGroup-05fb12f3-aba1-4918-bcf6-a1f840de13eb-2, groupId=sampleGroup-05fb12f3-aba1-4918-bcf6-a1f840de13eb] Unsubscribed all topics or patterns and assigned partitions
06:34:45.503 [consumer-0-C-1] INFO o.s.s.c.ThreadPoolTaskScheduler - Shutting down ExecutorService
06:34:45.510 [consumer-0-C-1] INFO o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - sampleGroup-05fb12f3-aba1-4918-bcf6-a1f840de13eb: Consumer stopped
06:34:45.527 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka version: 2.5.1
06:34:45.527 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka commitId: 0efa8fb0f4c73d92
06:34:45.527 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1633664085527
06:34:45.551 [main] INFO o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-sampleGroup-5e12d8c7-5900-434a-959f-98b14adda698-3, groupId=sampleGroup-5e12d8c7-5900-434a-959f-98b14adda698] Subscribed to partition(s): cnp_multi_partition_test_topic-2
06:34:45.551 [main] INFO o.s.s.c.ThreadPoolTaskScheduler - Initializing ExecutorService
06:34:45.554 [consumer-0-C-1] INFO o.a.k.c.c.i.SubscriptionState - [Consumer clientId=consumer-sampleGroup-5e12d8c7-5900-434a-959f-98b14adda698-3, groupId=sampleGroup-5e12d8c7-5900-434a-959f-98b14adda698] Seeking to LATEST offset of partition cnp_multi_partition_test_topic-2
06:34:55.560 [main] INFO c.h.c.p.g.m.SamplePartitioningTest - Do not listen on partition 2 anymore - 0 partitions to listen
06:34:55.561 [consumer-0-C-1] INFO o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-sampleGroup-5e12d8c7-5900-434a-959f-98b14adda698-3, groupId=sampleGroup-5e12d8c7-5900-434a-959f-98b14adda698] Unsubscribed all topics or patterns and assigned partitions
06:34:55.562 [consumer-0-C-1] INFO o.s.s.c.ThreadPoolTaskScheduler - Shutting down ExecutorService
06:34:55.576 [consumer-0-C-1] INFO o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - sampleGroup-5e12d8c7-5900-434a-959f-98b14adda698: Consumer stopped
So I do see that the consumer is started, it even tries to poll the records internally, but I think I see the WakeupException thrown and "swallowed" by a proxy. I'm not sure I understand why does it happen?