I want to reset AerospikeSink Kafka Connector offsets, I'm doing it by deleting first the connector consumer group (connect-*
) offset, then re-create it.
When i'm re-creating with earliest
policy, its recreated with right offsets, but then, when the tasks status is changing from tasks = []
to RUNNING
tasks its continue processing from the point at which the previous instance of the connector reached, which blocks to read all the messages from kafka from start (I'm trying to read all the messges from Kafka again).
NOTE: Creating new Connector with new name not solving the problem.
After re-creating Connector with tasks = []
After re-creating Connector with tasks
on RUNNING
state:
Kafka Connect logs:
Until the tasks are moved to RUNNING
state:
2021-08-18 08:12:41,550 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-4 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-1]
2021-08-18 08:12:41,550 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-5 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-1]
2021-08-18 08:12:41,550 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-6 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-1]
2021-08-18 08:12:41,550 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,552 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-8 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-2]
2021-08-18 08:12:41,552 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-9 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-2]
2021-08-18 08:12:41,552 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-7 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-2]
2021-08-18 08:12:41,552 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Found no committed offset for partition profile-services-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,555 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-1 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,555 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-2 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,555 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-3 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,720 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-7 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-2]
2021-08-18 08:12:41,722 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-4 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-1]
2021-08-18 08:12:41,728 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-0 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,728 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-3 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,730 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-9 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-2]
2021-08-18 08:12:41,730 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-6 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-1]
2021-08-18 08:12:41,735 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-2 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,735 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Resetting offset for partition profile-services-0 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,735 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-1 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,737 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-8 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-2]
2021-08-18 08:12:41,917 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-5 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-1]
2
After the tasks are initialized:
2021-08-18 08:13:39,277 INFO WorkerSinkTask{id=recovery-connector-one-2} Committing offsets asynchronously using sequence number 1: {prism-bs-profile-services-8=OffsetAndMetadata{offset=2015, leaderEpoch=null, metadata=''}, prism-bs-profile-services-9=OffsetAndMetadata{offset=1989, leaderEpoch=null, metadata=''}, prism-bs-profile-services-7=OffsetAndMetadata{offset=1938, leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-recovery-connector-one-2]
2021-08-18 08:13:39,281 INFO flushed 5964 records for topic prism-bs-profile-services (com.aerospike.connect.kafka.inbound.AerospikeSinkTask) [task-thread-recovery-connector-one-1]
2021-08-18 08:13:39,281 INFO WorkerSinkTask{id=recovery-connector-one-1} Committing offsets asynchronously using sequence number 1: {prism-bs-profile-services-4=OffsetAndMetadata{offset=1973, leaderEpoch=null, metadata=''}, prism-bs-profile-services-5=OffsetAndMetadata{offset=2003, leaderEpoch=null, metadata=''}, prism-bs-profile-services-6=OffsetAndMetadata{offset=2003, leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-recovery-connector-one-1]
2021-08-18 08:13:39,323 INFO flushed 7943 records for topic prism-bs-profile-services (com.aerospike.connect.kafka.inbound.AerospikeSinkTask) [task-thread-recovery-connector-one-0]
2021-08-18 08:13:39,323 INFO flushed 193577 records for topic profile-services (com.aerospike.connect.kafka.inbound.AerospikeSinkTask) [task-thread-recovery-connector-one-0]
2021-08-18 08:13:39,323 INFO WorkerSinkTask{id=recovery-connector-one-0} Committing offsets asynchronously using sequence number 1: {prism-bs-profile-services-0=OffsetAndMetadata{offset=1986, leaderEpoch=null, metadata=''}, profile-services-0=OffsetAndMetadata{offset=421647232, leaderEpoch=null, metadata=''}, prism-bs-profile-services-1=OffsetAndMetadata{offset=1999, leaderEpoch=null, metadata=''}, prism-bs-profile-services-2=OffsetAndMetadata{offset=2048, leaderEpoch=null, metadata=''}, prism-bs-profile-services-3=OffsetAndMetadata{offset=1922, leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:36,965 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Discovered group coordinator nycd-og-kafkacluster02.my-company.corp:9094 (id: 2147483646 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,182 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Attempt to heartbeat with Generation{generationId=1, memberId='connector-consumer-recovery-connector-one-0-80393504-bb92-4d33-ac12-86e6259f8a8c', protocol='range'} and group instance id Optional.empty failed due to UNKNOWN_MEMBER_ID, resetting generation (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [kafka-coordinator-heartbeat-thread | connect-recovery-connector-one]
2021-08-18 08:14:37,211 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,211 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Lost previously assigned partitions prism-bs-profile-services-0, profile-services-0, prism-bs-profile-services-1, prism-bs-profile-services-2, prism-bs-profile-services-3 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,211 INFO flushed 175680 records for topic profile-services (com.aerospike.connect.kafka.inbound.AerospikeSinkTask) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,211 INFO WorkerSinkTask{id=recovery-connector-one-0} Committing offsets synchronously using sequence number 2: {prism-bs-profile-services-0=OffsetAndMetadata{offset=1986, leaderEpoch=null, metadata=''}, profile-services-0=OffsetAndMetadata{offset=421836731, leaderEpoch=null, metadata=''}, prism-bs-profile-services-1=OffsetAndMetadata{offset=1999, leaderEpoch=null, metadata=''}, prism-bs-profile-services-2=OffsetAndMetadata{offset=2048, leaderEpoch=null, metadata=''}, prism-bs-profile-services-3=OffsetAndMetadata{offset=1922, leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,211 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Failing OffsetCommit request since the consumer is not part of an active group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,211 ERROR WorkerSinkTask{id=recovery-connector-one-0} Commit of offsets threw an unexpected exception for sequence number 2: {prism-bs-profile-services-0=OffsetAndMetadata{offset=1986, leaderEpoch=null, metadata=''}, profile-services-0=OffsetAndMetadata{offset=421836731, leaderEpoch=null, metadata=''}, prism-bs-profile-services-1=OffsetAndMetadata{offset=1999, leaderEpoch=null, metadata=''}, prism-bs-profile-services-2=OffsetAndMetadata{offset=2048, leaderEpoch=null, metadata=''}, prism-bs-profile-services-3=OffsetAndMetadata{offset=1922, leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-recovery-connector-one-0]
org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1134)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:999)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1504)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1452)
at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommitSync(WorkerSinkTask.java:334)
at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommit(WorkerSinkTask.java:362)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:439)
at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:618)
at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1300(WorkerSinkTask.java:71)
at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsRevoked(WorkerSinkTask.java:694)
at org.apache.kafka.clients.consumer.ConsumerRebalanceListener.onPartitionsLost(ConsumerRebalanceListener.java:198)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsLost(ConsumerCoordinator.java:331)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:694)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:415)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:513)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:451)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:318)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
2021-08-18 08:14:37,212 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,226 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,226 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,772 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Finished assignment for group at generation 3: {connector-consumer-recovery-connector-one-0-7da6ea9b-28c5-40c4-9486-4c2d3d4c638f=Assignment(partitions=[profile-services-0, prism-bs-profile-services-0, prism-bs-profile-services-1, prism-bs-profile-services-2, prism-bs-profile-services-3]), connector-consumer-recovery-connector-one-2-49f84027-8b22-48ad-9e5b-c59bb82310bf=Assignment(partitions=[prism-bs-profile-services-7, prism-bs-profile-services-8, prism-bs-profile-services-9]), connector-consumer-recovery-connector-one-1-6aae273d-1d5f-42da-9c41-04dded122f1c=Assignment(partitions=[prism-bs-profile-services-4, prism-bs-profile-services-5, prism-bs-profile-services-6])} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-2]
2021-08-18 08:14:37,787 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Successfully joined group with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [task-thread-recovery-connector-one-2]
2021-08-18 08:14:37,787 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Notifying assignor about the new Assignment(partitions=[prism-bs-profile-services-7, prism-bs-profile-services-8, prism-bs-profile-services-9]) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-2]
2021-08-18 08:14:37,787 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Adding newly assigned partitions: prism-bs-profile-services-8, prism-bs-profile-services-9, prism-bs-profile-services-7 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-2]
2021-08-18 08:14:37,788 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Successfully joined group with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [task-thread-recovery-connector-one-1]
2021-08-18 08:14:37,788 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Notifying assignor about the new Assignment(partitions=[prism-bs-profile-services-4, prism-bs-profile-services-5, prism-bs-profile-services-6]) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-1]
2021-08-18 08:14:37,788 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Adding newly assigned partitions: prism-bs-profile-services-4, prism-bs-profile-services-5, prism-bs-profile-services-6 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-1]
2021-08-18 08:14:37,797 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-4 to the committed offset FetchPosition{offset=1973, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster02.my-company.corp:9094 (id: 1 rack: null)], epoch=51}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-1]
2021-08-18 08:14:37,797 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-5 to the committed offset FetchPosition{offset=2003, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster01.my-company.corp:9094 (id: 0 rack: null)], epoch=52}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-1]
2021-08-18 08:14:37,797 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-6 to the committed offset FetchPosition{offset=2003, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster03.my-company.corp:9094 (id: 2 rack: null)], epoch=47}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-1]
2021-08-18 08:14:37,798 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Successfully joined group with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,798 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Notifying assignor about the new Assignment(partitions=[prism-bs-profile-services-0, prism-bs-profile-services-1, prism-bs-profile-services-2, prism-bs-profile-services-3, profile-services-0]) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,798 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Adding newly assigned partitions: prism-bs-profile-services-0, profile-services-0, prism-bs-profile-services-1, prism-bs-profile-services-2, prism-bs-profile-services-3 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,799 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-0 to the committed offset FetchPosition{offset=1986, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster03.my-company.corp:9094 (id: 2 rack: null)], epoch=47}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,799 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Setting offset for partition profile-services-0 to the committed offset FetchPosition{offset=421647232, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster02.my-company.corp:9094 (id: 1 rack: null)], epoch=154}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,800 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-1 to the committed offset FetchPosition{offset=1999, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster02.my-company.corp:9094 (id: 1 rack: null)], epoch=49}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,800 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-2 to the committed offset FetchPosition{offset=2048, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster01.my-company.corp:9094 (id: 0 rack: null)], epoch=49}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,800 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-3 to the committed offset FetchPosition{offset=1922, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster03.my-company.corp:9094 (id: 2 rack: null)], epoch=47}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,800 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-8 to the committed offset FetchPosition{offset=2015, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster01.my-company.corp:9094 (id: 0 rack: null)], epoch=49}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-2]
2021-08-18 08:14:37,800 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-9 to the committed offset FetchPosition{offset=1989, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster03.my-company.corp:9094 (id: 2 rack: null)], epoch=48}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-2]
2021-08-18 08:14:37,800 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-7 to the committed offset FetchPosition{offset=1938, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster02.my-company.corp:9094 (id: 1 rack: null)], epoch=49}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-2]
I want to reset the tasks internal metadata, Its not stored on __consumers_offsets
topic since when i'm resetting the offsets i'm writting NULL
to all relevant partitions, also its not on offset.storage.topic=aerospike-connect-zvi-connectors-offsets
which is empty (its sink connector, this used only by source connector)
I tried to exec
the kafka connect but didnt find any usefull internal files which can store the data. any ideas?
Thanks!
Connector Creation:
private[services] def createConnectorAsync(prevConnector: KafkaConnector): Future[Unit] = Future {
logger.debug(s"createConnectorAsync(${prevConnector.getMetadata.getName}) Triggered")
// Creating new Connector based on prevConnector
val connector = new KafkaConnectorBuilder()
.withApiVersion(prevConnector.getApiVersion)
.withApiVersion(prevConnector.getApiVersion)
.withNewStatus().withTasksMax(prevConnector.getStatus.getTasksMax).and
.withMetadata(
// Required for resetting the ResourceVersion, UID, etc.
new ObjectMetaBuilder()
.withName(prevConnector.getMetadata.getName)
.withLabels(prevConnector.getMetadata.getLabels)
.withAnnotations(prevConnector.getMetadata.getAnnotations)
.withNamespace(prevConnector.getMetadata.getNamespace)
.build()
)
.withSpec(
new KafkaConnectorSpecBuilder(prevConnector.getSpec)
// Re-create the connector with earliest offsets
.addToConfig("consumer.override.auto.offset.reset", "earliest")
.build()
)
.build()
connector.getSpec.setPause(false)
for ((name, value) <- prevConnector.getSpec.getAdditionalProperties.asScala) {
connector.getSpec.setAdditionalProperty(name, value)
}
Crds.kafkaConnectorOperation(client).create(connector)
// Waiting until new Connector is Running
Crds.kafkaConnectorOperation(client)
.withName(connector.getMetadata.getName)
.waitUntilCondition(connector => {
connector != null &&
ConnectorTasks(connector).exists(xs => xs.nonEmpty && xs.forall(_.state.equalsIgnoreCase("Running"))) &&
connector.getStatus != null && connector.getStatus.getConditions.stream().anyMatch(c => c.getType.equalsIgnoreCase("Ready") && c.getStatus.equalsIgnoreCase("True"))
}, config.operationTimeoutInMillis, TimeUnit.MILLISECONDS)
}
via strimzi-api