7

The Kafka Connector can make use of a primary key and a timestamp to determine which rows need to be processed.

I'm looking for a way to reset the Connector so that it will process from the beginning of time.

user2122031
  • 581
  • 5
  • 11
  • Is this running in distributed mode or standalone? In standalone you can just remove the offset file I believe. In distributed mode, the easiest thing to do might be to change the connector name. Either way, you'll get duplicate data going to Kafka if you do this just for your awareness. – dawsaw Mar 25 '17 at 11:10
  • It will run in distributed mode, because it will require a large cluster to process all the databases that we need to connect too. Yes, that makes sense. I'm just trying to understand HOW it could be done we can figure out the data issues. – user2122031 Mar 26 '17 at 15:04

2 Answers2

8

Because the requirement is to run in distributed mode, the easiest thing to do is to update the connector name to a new value. This will prompt a new entry to be made into the connect-offsets topic as it looks like a totally new connector. Then the connector should start reading again as if nothing has been written to Kafka yet. You could also manually send a tombstone message to the key in the connect-offsets topic associated with that particular connector, but renaming is much easier than dealing with that. This method applies to all source connectors, not only the JDBC one described here.

dawsaw
  • 2,283
  • 13
  • 10
  • Even I have similar requirements where I need to reprocess the binlog messages from certain time like moving offset to last week. Can we achieve this and I am using Debezium connector. – Renukaradhya Apr 13 '17 at 02:13
  • 1
    Wouldn't it be possible to update the connector configuration to bulk mode, let it run, and change it back to timestamp mode? – Ickster Jul 26 '18 at 16:23
  • Yes. It is possible to switch from bulk to timestamp mode updating worker config. However, for the connector this change is like complete new task. – pushpavanthar Oct 26 '19 at 13:04
3

I got a bit tired of renaming the connector every time during development so started using the tombstone method. This methods can be used for any source connector.

First check the format of the key/value of the connector:

kafka-console-consumer --bootstrap-server localhost:9092 --topic kafka-connect-offsets --from-beginning --property print.key=true
["demo",{"query":"query"}] {"timestamp_nanos":542000000,"timestamp":1535768081542}
["demo",{"query":"query"}] {"timestamp_nanos":171831000,"timestamp":1540435281171}
["demo",{"query":"query"}] {"timestamp_nanos":267775000,"timestamp":1579522539267}

Create the tombstone message by sending the key without any value:

echo '["demo",{"query":"query"}]#' | kafka-console-producer --bootstrap-server localhost:9092 --topic kafka-connect-offsets --property "parse.key=true" --property "key.separator=#"

Now restart or recreate the connector and it will start producing messages again.

Be very careful with this in production unless you really know what you're doing. There's some more information here: https://rmoff.net/2019/08/15/reset-kafka-connect-source-connector-offsets/

Marcel
  • 983
  • 10
  • 8