kafka added new feature to use regex in connectors, however it seems that the topic data from the newly added topics after the connector has been started is not consumed until the connector is restarted. We have a need to dynamically added new topic and have connector consume the topic based on regex defined in properties of connector. How can it be achieved? Ex: regex: topic-.* topic: topic-1, topic-2 If I introduce new topic topic-3, then how can I make the connector consume the topic data without restarting it?

- 179,855
- 19
- 132
- 245

- 111
- 1
- 10
-
As discussed on the [Slack](http://cnfl.io/slack) group, Kafka Connect will only process the regex when the connector starts. If the results of the regex change after this, the connector is not going to pick the new results up. How frequently do you expect to add new topics? – Robin Moffatt Jul 30 '18 at 19:57
-
Currently a topic may be added biweekly or once in a month, however we are sure that the frequency will increase. What I am looking for is if there is any alternative that can be adopted for the mentioned use case and what may be the implications of restarting the connector each time the connector is restarted. I understand that kafka connectors has offset management and guarantees delivery of the message, however, it would be helpful if you can provide some resources on effects of connector restart under production environment. – Pratik Gaglani Jul 31 '18 at 03:46
-
Why not just make a new connector? That will scale better and be more fault tolerant than having only max N tasks trying to consume from an endless growing number of topics – OneCricketeer Jul 31 '18 at 04:28
2 Answers
Following on the ideas other folks have already given in comments, basically what you need to do it build a mechanism which identifies that a new topic has been introduced and connecter needs to be cleanly restarted.
I would have done something like this,
1> Send a specific type of message in already connected topic (e.g. topic-1), if such a message is received the code should hold all new msg polling and wait for all offset Commits to complete.
2> Then break from the polling loop and remove subscriptions from your consumer (consumer.unsubscribe()).
3> After that usual flow which subscribes from regex topics, which was done in the beginning need to be followed as new topic will be part of regex now.
Keep in mind the commits are important, if you restart connecter in haste, you may get duplicates. Also kind of obvious not to change the group.id and keep auto.offset.reset as 'latest'.

- 368
- 4
- 8
Kafka consumer have an option metadata.max.age.ms
- interval of time that consumer refreshes topic metadata. It could help if you don't need real realtime. See also: kafka consumer to dynamically detect topics added
In /etc/kafka-connect/kafka-connect.properties
you should specify consumer.metadata.max.age.ms=1000
for 1 second.

- 16,368
- 26
- 84
- 129