2

I want to reset AerospikeSink Kafka Connector offsets, I'm doing it by deleting first the connector consumer group (connect-*) offset, then re-create it. When i'm re-creating with earliest policy, its recreated with right offsets, but then, when the tasks status is changing from tasks = [] to RUNNING tasks its continue processing from the point at which the previous instance of the connector reached, which blocks to read all the messages from kafka from start (I'm trying to read all the messges from Kafka again).

NOTE: Creating new Connector with new name not solving the problem.

Before resetting offsets: enter image description here

After resetting offsets: enter image description here

After re-creating Connector with tasks = []

enter image description here

After re-creating Connector with tasks on RUNNING state:

enter image description here

Kafka Connect logs:

Until the tasks are moved to RUNNING state:

2021-08-18 08:12:41,550 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-4 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-1]
2021-08-18 08:12:41,550 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-5 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-1]
2021-08-18 08:12:41,550 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-6 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-1]
2021-08-18 08:12:41,550 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,552 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-8 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-2]
2021-08-18 08:12:41,552 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-9 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-2]
2021-08-18 08:12:41,552 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-7 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-2]
2021-08-18 08:12:41,552 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Found no committed offset for partition profile-services-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,555 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-1 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,555 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-2 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,555 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-3 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,720 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-7 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-2]
2021-08-18 08:12:41,722 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-4 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-1]
2021-08-18 08:12:41,728 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-0 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,728 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-3 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,730 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-9 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-2]
2021-08-18 08:12:41,730 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-6 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-1]
2021-08-18 08:12:41,735 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-2 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,735 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Resetting offset for partition profile-services-0 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,735 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-1 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,737 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-8 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-2]
2021-08-18 08:12:41,917 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-5 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-1]
2

After the tasks are initialized:

2021-08-18 08:13:39,277 INFO WorkerSinkTask{id=recovery-connector-one-2} Committing offsets asynchronously using sequence number 1: {prism-bs-profile-services-8=OffsetAndMetadata{offset=2015, leaderEpoch=null, metadata=''}, prism-bs-profile-services-9=OffsetAndMetadata{offset=1989, leaderEpoch=null, metadata=''}, prism-bs-profile-services-7=OffsetAndMetadata{offset=1938, leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-recovery-connector-one-2]
2021-08-18 08:13:39,281 INFO flushed 5964 records for topic prism-bs-profile-services (com.aerospike.connect.kafka.inbound.AerospikeSinkTask) [task-thread-recovery-connector-one-1]
2021-08-18 08:13:39,281 INFO WorkerSinkTask{id=recovery-connector-one-1} Committing offsets asynchronously using sequence number 1: {prism-bs-profile-services-4=OffsetAndMetadata{offset=1973, leaderEpoch=null, metadata=''}, prism-bs-profile-services-5=OffsetAndMetadata{offset=2003, leaderEpoch=null, metadata=''}, prism-bs-profile-services-6=OffsetAndMetadata{offset=2003, leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-recovery-connector-one-1]
2021-08-18 08:13:39,323 INFO flushed 7943 records for topic prism-bs-profile-services (com.aerospike.connect.kafka.inbound.AerospikeSinkTask) [task-thread-recovery-connector-one-0]
2021-08-18 08:13:39,323 INFO flushed 193577 records for topic profile-services (com.aerospike.connect.kafka.inbound.AerospikeSinkTask) [task-thread-recovery-connector-one-0]
2021-08-18 08:13:39,323 INFO WorkerSinkTask{id=recovery-connector-one-0} Committing offsets asynchronously using sequence number 1: {prism-bs-profile-services-0=OffsetAndMetadata{offset=1986, leaderEpoch=null, metadata=''}, profile-services-0=OffsetAndMetadata{offset=421647232, leaderEpoch=null, metadata=''}, prism-bs-profile-services-1=OffsetAndMetadata{offset=1999, leaderEpoch=null, metadata=''}, prism-bs-profile-services-2=OffsetAndMetadata{offset=2048, leaderEpoch=null, metadata=''}, prism-bs-profile-services-3=OffsetAndMetadata{offset=1922, leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:36,965 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Discovered group coordinator nycd-og-kafkacluster02.my-company.corp:9094 (id: 2147483646 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,182 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Attempt to heartbeat with Generation{generationId=1, memberId='connector-consumer-recovery-connector-one-0-80393504-bb92-4d33-ac12-86e6259f8a8c', protocol='range'} and group instance id Optional.empty failed due to UNKNOWN_MEMBER_ID, resetting generation (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [kafka-coordinator-heartbeat-thread | connect-recovery-connector-one]
2021-08-18 08:14:37,211 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,211 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Lost previously assigned partitions prism-bs-profile-services-0, profile-services-0, prism-bs-profile-services-1, prism-bs-profile-services-2, prism-bs-profile-services-3 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,211 INFO flushed 175680 records for topic profile-services (com.aerospike.connect.kafka.inbound.AerospikeSinkTask) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,211 INFO WorkerSinkTask{id=recovery-connector-one-0} Committing offsets synchronously using sequence number 2: {prism-bs-profile-services-0=OffsetAndMetadata{offset=1986, leaderEpoch=null, metadata=''}, profile-services-0=OffsetAndMetadata{offset=421836731, leaderEpoch=null, metadata=''}, prism-bs-profile-services-1=OffsetAndMetadata{offset=1999, leaderEpoch=null, metadata=''}, prism-bs-profile-services-2=OffsetAndMetadata{offset=2048, leaderEpoch=null, metadata=''}, prism-bs-profile-services-3=OffsetAndMetadata{offset=1922, leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,211 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Failing OffsetCommit request since the consumer is not part of an active group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,211 ERROR WorkerSinkTask{id=recovery-connector-one-0} Commit of offsets threw an unexpected exception for sequence number 2: {prism-bs-profile-services-0=OffsetAndMetadata{offset=1986, leaderEpoch=null, metadata=''}, profile-services-0=OffsetAndMetadata{offset=421836731, leaderEpoch=null, metadata=''}, prism-bs-profile-services-1=OffsetAndMetadata{offset=1999, leaderEpoch=null, metadata=''}, prism-bs-profile-services-2=OffsetAndMetadata{offset=2048, leaderEpoch=null, metadata=''}, prism-bs-profile-services-3=OffsetAndMetadata{offset=1922, leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-recovery-connector-one-0]
org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1134)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:999)
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1504)
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1452)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommitSync(WorkerSinkTask.java:334)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommit(WorkerSinkTask.java:362)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:439)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:618)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1300(WorkerSinkTask.java:71)
    at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsRevoked(WorkerSinkTask.java:694)
    at org.apache.kafka.clients.consumer.ConsumerRebalanceListener.onPartitionsLost(ConsumerRebalanceListener.java:198)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsLost(ConsumerCoordinator.java:331)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:694)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:415)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:513)
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:451)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:318)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
2021-08-18 08:14:37,212 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,226 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,226 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,772 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Finished assignment for group at generation 3: {connector-consumer-recovery-connector-one-0-7da6ea9b-28c5-40c4-9486-4c2d3d4c638f=Assignment(partitions=[profile-services-0, prism-bs-profile-services-0, prism-bs-profile-services-1, prism-bs-profile-services-2, prism-bs-profile-services-3]), connector-consumer-recovery-connector-one-2-49f84027-8b22-48ad-9e5b-c59bb82310bf=Assignment(partitions=[prism-bs-profile-services-7, prism-bs-profile-services-8, prism-bs-profile-services-9]), connector-consumer-recovery-connector-one-1-6aae273d-1d5f-42da-9c41-04dded122f1c=Assignment(partitions=[prism-bs-profile-services-4, prism-bs-profile-services-5, prism-bs-profile-services-6])} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-2]
2021-08-18 08:14:37,787 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Successfully joined group with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [task-thread-recovery-connector-one-2]
2021-08-18 08:14:37,787 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Notifying assignor about the new Assignment(partitions=[prism-bs-profile-services-7, prism-bs-profile-services-8, prism-bs-profile-services-9]) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-2]
2021-08-18 08:14:37,787 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Adding newly assigned partitions: prism-bs-profile-services-8, prism-bs-profile-services-9, prism-bs-profile-services-7 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-2]
2021-08-18 08:14:37,788 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Successfully joined group with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [task-thread-recovery-connector-one-1]
2021-08-18 08:14:37,788 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Notifying assignor about the new Assignment(partitions=[prism-bs-profile-services-4, prism-bs-profile-services-5, prism-bs-profile-services-6]) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-1]
2021-08-18 08:14:37,788 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Adding newly assigned partitions: prism-bs-profile-services-4, prism-bs-profile-services-5, prism-bs-profile-services-6 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-1]
2021-08-18 08:14:37,797 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-4 to the committed offset FetchPosition{offset=1973, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster02.my-company.corp:9094 (id: 1 rack: null)], epoch=51}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-1]
2021-08-18 08:14:37,797 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-5 to the committed offset FetchPosition{offset=2003, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster01.my-company.corp:9094 (id: 0 rack: null)], epoch=52}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-1]
2021-08-18 08:14:37,797 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-6 to the committed offset FetchPosition{offset=2003, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster03.my-company.corp:9094 (id: 2 rack: null)], epoch=47}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-1]
2021-08-18 08:14:37,798 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Successfully joined group with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,798 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Notifying assignor about the new Assignment(partitions=[prism-bs-profile-services-0, prism-bs-profile-services-1, prism-bs-profile-services-2, prism-bs-profile-services-3, profile-services-0]) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,798 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Adding newly assigned partitions: prism-bs-profile-services-0, profile-services-0, prism-bs-profile-services-1, prism-bs-profile-services-2, prism-bs-profile-services-3 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,799 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-0 to the committed offset FetchPosition{offset=1986, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster03.my-company.corp:9094 (id: 2 rack: null)], epoch=47}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,799 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Setting offset for partition profile-services-0 to the committed offset FetchPosition{offset=421647232, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster02.my-company.corp:9094 (id: 1 rack: null)], epoch=154}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,800 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-1 to the committed offset FetchPosition{offset=1999, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster02.my-company.corp:9094 (id: 1 rack: null)], epoch=49}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,800 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-2 to the committed offset FetchPosition{offset=2048, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster01.my-company.corp:9094 (id: 0 rack: null)], epoch=49}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,800 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-3 to the committed offset FetchPosition{offset=1922, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster03.my-company.corp:9094 (id: 2 rack: null)], epoch=47}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,800 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-8 to the committed offset FetchPosition{offset=2015, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster01.my-company.corp:9094 (id: 0 rack: null)], epoch=49}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-2]
2021-08-18 08:14:37,800 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-9 to the committed offset FetchPosition{offset=1989, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster03.my-company.corp:9094 (id: 2 rack: null)], epoch=48}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-2]
2021-08-18 08:14:37,800 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-7 to the committed offset FetchPosition{offset=1938, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster02.my-company.corp:9094 (id: 1 rack: null)], epoch=49}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-2]

I want to reset the tasks internal metadata, Its not stored on __consumers_offsets topic since when i'm resetting the offsets i'm writting NULL to all relevant partitions, also its not on offset.storage.topic=aerospike-connect-zvi-connectors-offsets which is empty (its sink connector, this used only by source connector)

I tried to exec the kafka connect but didnt find any usefull internal files which can store the data. any ideas?

Thanks!

Connector Creation:

private[services] def createConnectorAsync(prevConnector: KafkaConnector): Future[Unit] = Future {
    logger.debug(s"createConnectorAsync(${prevConnector.getMetadata.getName}) Triggered")

    // Creating new Connector based on prevConnector
    val connector = new KafkaConnectorBuilder()
      .withApiVersion(prevConnector.getApiVersion)
      .withApiVersion(prevConnector.getApiVersion)
      .withNewStatus().withTasksMax(prevConnector.getStatus.getTasksMax).and
      .withMetadata(
        // Required for resetting the ResourceVersion, UID, etc.
        new ObjectMetaBuilder()
          .withName(prevConnector.getMetadata.getName)
          .withLabels(prevConnector.getMetadata.getLabels)
          .withAnnotations(prevConnector.getMetadata.getAnnotations)
          .withNamespace(prevConnector.getMetadata.getNamespace)
          .build()
      )
      .withSpec(
        new KafkaConnectorSpecBuilder(prevConnector.getSpec)
          // Re-create the connector with earliest offsets
          .addToConfig("consumer.override.auto.offset.reset", "earliest")
          .build()
      )
      .build()

    connector.getSpec.setPause(false)
    for ((name, value) <- prevConnector.getSpec.getAdditionalProperties.asScala) {
      connector.getSpec.setAdditionalProperty(name, value)
    }

    Crds.kafkaConnectorOperation(client).create(connector)

    // Waiting until new Connector is Running
    Crds.kafkaConnectorOperation(client)
      .withName(connector.getMetadata.getName)
      .waitUntilCondition(connector => {
        connector != null &&
          ConnectorTasks(connector).exists(xs => xs.nonEmpty && xs.forall(_.state.equalsIgnoreCase("Running"))) &&
          connector.getStatus != null && connector.getStatus.getConditions.stream().anyMatch(c => c.getType.equalsIgnoreCase("Ready") && c.getStatus.equalsIgnoreCase("True"))
      }, config.operationTimeoutInMillis, TimeUnit.MILLISECONDS)
  }

via strimzi-api

Zvi Mints
  • 1,072
  • 1
  • 8
  • 20

2 Answers2

4

As answered on your previous question, they're not in a per-connector topic

They are stored by connect-$name in the consumer groups (since sinks are consumers that read from Kafka to external systems) where name is what you set in the connector properties or are referring to a connector by via the Connect REST API. If you list all consumer groups, you'll see ones starting with connect-. Resetting sinks ought to be as simple as resetting a consumer group

And mentioned before that some Connectors may optionally override their group offsets with information stored elsewhere, and the only way to know this would be either inspecting the source code or the logs. In this case, resetting requires you to understand how that's done


As a minimal example, using FileSink connector

Create topic

kafka-topics.sh --create --topic test --replication-factor=1 --partitions=1 --bootstrap-server $BOOTSTRAP_ADDRESS

Populate with numbers 1..100

for i in {1..100}; do echo $i >> data.txt ; done
./kafka-console-producer.sh --topic test --broker-list $BOOTSTRAP_ADDRESS < data.txt

Create Sink with name=console-sink

curl -XPOST $CONNECT_API/connectors -H 'Content-Type: application/json' -d '{
    "name": "console-sink", 
    "config": { 
      "connector.class": "FileStreamSink",
      "tasks.max": 1,
      "topics": "test",
      "key.converter": "org.apache.kafka.connect.storage.StringConverter",
      "value.converter": "org.apache.kafka.connect.storage.StringConverter"
    }
}'

... inspect Connect worker logs to see value 100 written

Inspect the connect-* consumer groups

kafka-consumer-groups.sh --list --bootstrap-server $BOOTSTRAP_ADDRESS  | grep -e '^connect-'
connect-console-sink
...

Describe the one I want and see that the end offset is 100, as expected.

kafka-consumer-groups.sh --describe --group connect-console-sink --bootstrap-server $BOOTSTRAP_ADDRESS

GROUP                TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                            HOST            CLIENT-ID
connect-console-sink test            0          -               100             -               connector-consumer-console-sink-0-46181998-e548-4f7d-a17f-155421d9ad00 /172.25.0.4     connector-consumer-console-sink-0

Then delete the connector and repeat the describe

curl -XDELETE $CONNECT_API/connectors/console-sink/
curl $CONNECT_API/connectors/console-sink
{"error_code":404,"message":"Connector console-sink not found"}%

Group still exists

kafka-consumer-groups.sh --describe --group connect-console-sink --bootstrap-server $BOOTSTRAP_ADDRESS

Consumer group 'connect-console-sink' has no active members.

GROUP                TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
connect-console-sink test            0          100             100             0               -               -               -

If I were to repost the connector, the message saying no active members goes away, but offsets don't change. Also, indicating the group remains.

Now, let's delete again to make sure the group is inactive

curl -XDELETE $CONNECT_API/connectors/console-sink/
kafka-consumer-groups.sh --describe --group connect-console-sink --bootstrap-server $BOOTSTRAP_ADDRESS

Consumer group 'connect-console-sink' has no active members.

GROUP                TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
connect-console-sink test            0          100             100             0               -               -               -

Now I reset the offsets of test partition 0, to 42

kafka-consumer-groups.sh --reset-offsets --topic test:0 --to-offset 42 --group connect-console-sink --bootstrap-server $BOOTSTRAP_ADDRESS --execute

GROUP                          TOPIC                          PARTITION  NEW-OFFSET
connect-console-sink           test                           0          42

Describing again, we see that it's taken into account

kafka-consumer-groups.sh --describe --group connect-console-sink --bootstrap-server $BOOTSTRAP_ADDRESS

Consumer group 'connect-console-sink' has no active members.

GROUP                TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
connect-console-sink test            0          42              100             58              -               -               -

Posting the connector one more time with the same name... And inspecting the logs, we see that the messages start printing from after 42.

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
  • reset consumer group with id `connect-$name` is not working because its probably store it on Internal topic settings, take a look on our previous conversation – Zvi Mints Aug 13 '21 at 19:04
  • List all your consumer groups, you'll see what I mean. Also answered before, deleting connectors does absolutely nothing to their offsets – OneCricketeer Aug 13 '21 at 19:04
  • after deleting the connector i'm deleting the offsets, but in general if i'm deleting the consumer group, why does its not reseting the offsets? where its stored when we register new consumer with same group.id? `adminClient.deleteConsumerGroupOffsets(groupId, partitionsToOffsets.keySet.asJava).all().get(config.operationTimeoutInMillis, TimeUnit.MILLISECONDS)` – Zvi Mints Aug 13 '21 at 19:09
  • Anyways deleting the connector, then i checked that group.id is not exists, then i'm delete consumer group offsets (Where its stored if i'm not deleting the consumer group offsets?) and then i re-create the connector its still with same offsets, thats the problem, listing the consumer groups will surely show all my connectors with prefix connect which is not the question i asked – Zvi Mints Aug 13 '21 at 19:11
  • It's all stored in `__consumer_offsets` topic, as with all consumers. Please [refer to KIP-199](https://cwiki.apache.org/confluence/display/KAFKA/KIP-199%3A+Add+Kafka+Connect+offset+tool) - _Kafka Connect uses normal Kafka consumers' ability to track their own offsets for sink connectors, so **it is possible (albeit not ideal) to use the existing Kafka consumer offsets tool to view and manage those offsets**_. And again, I claim no knowledge about how the **Aerospike** connector works, but this is exactly how it works for all the Confluent ones (S3, HDFS, Elasticsearch, etc) – OneCricketeer Aug 13 '21 at 19:13
  • for some reason, i cannot see any data on `__consumer_offsets` topic. so thats the question - how can i find where its stored, if i knew where it stored i could just delete it.... – Zvi Mints Aug 13 '21 at 19:19
  • Are you asking how to [read the offsets topic](https://stackoverflow.com/questions/33925866/kafka-how-to-read-from-consumer-offsets-topic)? Or are you saying that topic doesn't exist/is empty? – OneCricketeer Aug 13 '21 at 19:29
  • Its looks like empty, but i think that the function that i wrote before (`adminClient.deleteConsumerGroupOffsets(groupId, partitionsToOffsets.keySet.asJava).all().get(config.operationTimeoutInMillis, TimeUnit.MILLISECONDS)`) should clear this topic also, its must to delete the offsets but its not, also altering it to `OffsetSpec.earlister` is not working with `adminClient.alterConsumerGroupOffsets` – Zvi Mints Aug 13 '21 at 19:43
  • My Question is : Why when i'm using alter or delete consumer group offset with admin client its clear the offsets (i can see it via the KafkaHQ) but then we the connector is re-created its change the offset from 0 to the original offset – Zvi Mints Aug 13 '21 at 19:44
  • I don't have an answer why the AdminClient doesn't work as you expect, as I have no experience with it. I updated my answer with a full example using only CLI tools, as this is what admins usually use to manage offsets – OneCricketeer Aug 13 '21 at 19:59
  • And I'll say one more time that any connector can maintain its own state as consumer group management is largely _optional_ for any Kafka consumer – OneCricketeer Aug 13 '21 at 20:05
  • I don't know about partition 0 with the high lag, but if you have like 10 tasks or so, consuming 2000 records for each task is going to be very fast (look at the sink system for duplicates or its logs if things are actively written after you reset the connector). As far as the config goes, you use the /config Connect API endpoint – OneCricketeer Aug 15 '21 at 12:15
  • thanks, i re-created the connector with `.addToConfig("consumer.override.auto.offset.reset", "earliest")` but its still taken from the commited offsets, i'm deleting the groupOffsets before, i'm also tried to delete the whole group, any ideas what should i add? – Zvi Mints Aug 15 '21 at 12:21
  • You should actually consume from the offsets topic to verify the group is really deleted. And I'll repeat myself that your specific connector does not need to use Kafka group management, and you need to read its source code or speak to the maintainers of it to understand if it does – OneCricketeer Aug 15 '21 at 12:39
  • kafka connector is not using kafka group management, it's the recovery process that manages it all, will check the source code, btw can you give me a quick explanation about difference between deleting the whole group and group offsets? since for deleting the group offsets the group should not be exists not? or only consumers? – Zvi Mints Aug 15 '21 at 12:44
  • My go-to answer is that group deletion just isn't possible. Anyone I've talked to that tells me the API exists constantly runs into issues when trying to use it – OneCricketeer Aug 15 '21 at 12:49
  • @OneCrickeeter Hey, Do you think its possible that in 1 minutes its process and read `1819520` messages from Kafka? since from to 0, then its not log anything and then commit 1819520 after 1 minute, maybe he could process it in this time and i just don't see the logs? – Zvi Mints Aug 19 '21 at 22:08
  • Almost 2M messages in a minute? Doubtful, and you'd definitely see logs for something since the default `max.poll.records` is only 500. Have you been in contact with Aerospike team about where/how/if their connectors store offsets in the database, yet? – OneCricketeer Aug 20 '21 at 00:07
  • They are checking, meantime I tried with FileStreamSink and it’s prints, means or the connector is really reading messages very fast or indeed it’s store on other place – Zvi Mints Aug 20 '21 at 09:03
  • Hey OneCricketeer - I changed the log level to all loggers to debug with REST Api to `http://localhost:62053/admin/loggers/root` endpoint, but i still not seeing the messages which the task is reading from the topic (unlike `FileStreamSink` which print all messages) - any ideas why should i change also? i want to see how many messages are written from Kafka topic before commited – Zvi Mints Aug 21 '21 at 17:50
  • It's not possible to know what log messages a specific connector has without seeing the source code – OneCricketeer Aug 21 '21 at 19:20
2

If you're using the Aerospike Connector from jcustenborder, it stores offsets in Aerospike.

You'll need to delete it from Aerospike. The key is created from Aerospike namespace, the Aerospike set name, and the topic name and partition in format topic-partitionNumber e.g., test_topic-5.

Liam Clarke
  • 375
  • 1
  • 6
  • Hey @Liam Clarke, thanks but i'm using `com.aerospike.connect.kafka.inbound.AerospikeSinkConnector` – Zvi Mints Aug 20 '21 at 09:27
  • 1
    @Zvi I'd be surprised if they didn't base their code off his, though – OneCricketeer Aug 20 '21 at 12:09
  • I cant find it on the set name which the Aerospike Sink is configured - maybe you have other ideas? – Zvi Mints Aug 20 '21 at 18:38
  • Another solution is to re-create connector with new configurations, maybe you know which configurations need to be changed (changing metadata.name is not enough) – Zvi Mints Aug 21 '21 at 01:21
  • @Zvi It's just the`name` field (in Connector config, not k8s manifest) that controls whether a config is new or not – OneCricketeer Aug 21 '21 at 15:46
  • But the connector config is created by the connector resource no? so i can do it programmatically - i want to change the name by `fabric8io` client – Zvi Mints Aug 21 '21 at 15:51