4

We have a Kafka queue with two consumers, both read from the same partition (fan-out scenario). One of those consumers should be the canary and process 1% of the messages, while the other processes the 99% remaining ones.

The idea is to make the decision based on a property of the message, eg the message ID or timestamp (e.g. mod 100), and accept or drop based on that, just with a reversed logic for canary and non-canary.

Now we are facing the issue of how to do so robustly, e.g. reconfigure percentages while running and avoid loosing messages or processing them twice. It appears this escalates to a distributed consensus problem to keep the decision logic in sync, which we would very much like to avoid, even though we could just use ZooKeeper for that.

Is this a viable strategy, or are there better ways to do this? Possibly one that avoids consensus?

Update: Unfortunately the Kafka Cluster is not under our control, and we cannot make any changes.

Update 2 Latency of messages is not a huge issues, a few hundred 100ms added are okay and won't be noticed.

Michael Böckling
  • 7,341
  • 6
  • 55
  • 76
  • Can you use canary message payload to communicate config changes? – mazaneicha Mar 04 '20 at 19:01
  • Yes I think we can do that, thats a good idea. Will have think a bit what this means in terms of coordination. – Michael Böckling Mar 04 '20 at 21:34
  • Why not using quotas? From the [docs](https://kafka.apache.org/documentation/#quotas) `Quotas overrides and defaults may be configured at (user, client-id)` and also `These overrides are read by all brokers and are effective immediately. This lets us change quotas without having to do a rolling restart of the entire cluster` you set the quota only for your canary consumer and then adjust it according to the result. I would not change the data itself, read and discard it is just not efficient. See also https://kafka.apache.org/documentation/#design_quotas – Paizo Mar 05 '20 at 10:26
  • Interesting idea! Unfortunately the Kafka cluster is run by a 3rd party, and we cannot change anything. Also, the message volume is still low, so the available options (network / cpu) will probably not work well. We would need an absolute number of messages quota or a "percentage of partition" quota. – Michael Böckling Mar 05 '20 at 12:33
  • 1
    @mazaneicha I'm thinking this won't help in avoiding the Two Generals' Problem. Every solution that distributes the decision making logic to two or more nodes has the problem that the node state cannot be guaranteed to be in sync. Using an L7 filter proxy such as Envoy would be ideal, but Kafka support is experimental and incomplete. – Michael Böckling Mar 05 '20 at 14:27
  • What if you used Kafka Streams branch predicate to perform `random.nextInt(100) == 99`? – OneCricketeer Mar 05 '20 at 15:16
  • Yeah I'm starting to think we'll have to put a Kafka Connect or Kafka Streams instance in front of our consumers, but that means we need a separate queue to put the marked messages into, but we don't get one from the 3rd party Kafka so we would have to host our own using AWS MSK. Skipping persistence by just using a L7 proxy and transforming messages in-flight would have been nice, maybe this will work in a few years as Envoy or LinkerD aren't there yet. None of this is great. – Michael Böckling Mar 05 '20 at 15:39

1 Answers1

4

I dont see any way to change the "sampling strategy" across 2 machines without "ignoring" or double-processing records. Since different Kafka consumers could be in different positions in the partition, and could also get the new config at different times, you'd inevitably run into one of 2 scenarios:

  1. Double processing of the same record by both machines
  2. "Skipping" a record because neither machine thinks it should "own" it when it sees it.

I'd suggest a small change to your architecture instead:

  • Have the 99% machine (the non-canary) pick up all records, then decide for every record if it wants to handle it, or if it belongs to the canary
  • If it belongs to the canary, send the record to a 2nd topic (from the 99% machine)
  • Canary machine only listens on the 2nd topic, and processes every arriving record

And now you have a pipeline setup where decisions are only ever made in one point and no records are missed or double processed.

The obvious downside is somewhat higher latency on the canary machine. If you absolutely cannot tolerate the latency push the decision of which topic to produce to upstream to producers? (I don't know how feasible that is to you)

Variant in case a 2nd topic isnt allowed

If (as youve stated above) you cant have a 2nd topic, you could still make the decision only on the 99% machine, then for records that need to go to the canary, re-produce them into the origin partition with some sort of "marker" (either in the payload or as a kafka header, up to you). The 99% machine will ignore any incoming records with a marker, and the canary machine will only process records with a marker.

Again, the major downside is added latency.

radai
  • 23,949
  • 10
  • 71
  • 115