4

So, we started a bunch of Kafka Streams applications without realizing the default replication factor is 1.

We've made the code modifications (e.g. What should be the replication factor of changelog/repartition topics )

However, I don't think that'll help with applications that have already been deployed or alter internal topics that have already been created.

For example, I used kafkacat to list out a handful of topics (based on the application.id prefix, and all have one replica)

Obviously, when a broker starts having issues (broker.id 11 or 21 here), the applications are not working well.

  topic "appid-KTABLE-SUPPRESS-STATE-STORE-0000000013-changelog" with 1 partitions:
    partition 0, leader 11, replicas: 11, isrs: 11
--
  topic "appid-KSTREAM-AGGREGATE-STATE-STORE-0000000019-changelog" with 1 partitions:
    partition 0, leader 21, replicas: 21, isrs: 21
--
  topic "appid-KSTREAM-AGGREGATE-STATE-STORE-0000000009-changelog" with 1 partitions:
    partition 0, leader 11, replicas: 11, isrs: 11
--
  topic "appid-KSTREAM-AGGREGATE-STATE-STORE-0000000007-changelog" with 1 partitions:
    partition 0, leader 21, replicas: 21, isrs: 21

I understand how to increase the replication factor (e.g. How to change the number of replicas of a Kafka topic?), but my questions

  1. Do these numbers have a specific meaning other than the processor ordering of Kafka Streams?

  2. How many of these topics should I really be increasing the replication factor for (assuming I am doing it manually, and having to do it for multiple clusters)?

Also: resetting the streams application to cleanup the internal topics doesn't seem like a good option due to how the applications write to downstream systems.

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245

1 Answers1

1

These numbers in appid-KTABLE-SUPPRESS-STATE-STORE-0000000013-changelog represent the processor node ID in the topology app-id topology. Topology is built with many processor node and each node is assigned a unique ID.

Unless you change the topology by adding or deleting some processor nodes, it will use the same names and numbers for repartition/changelog topics. In that case, you have to reset the application ID and restart all the instances. Changing number of replication factor won't affect the numbers.

But I would recommend to reset the application in order to clean the old internal topics and re run the application with updated replication factor configuration as in order to distribute replicas across broker nodes, you will have to run the ressignment like below :

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute

You can find more details here for that: https://kafka.apache.org/documentation/#basic_ops_increase_replication_factor

Nishu Tayal
  • 20,106
  • 8
  • 49
  • 101
  • If each application is reset, then that would result in re-processing data, right? I'm not sure our downstream applications have built-in protection against that. – OneCricketeer Aug 02 '19 at 22:14
  • And so other than resetting the application, I would have to increase the replication factor for each and every intermediate topic? Or, for example, is the highest numbered topic the "final output" that I would care about? – OneCricketeer Aug 02 '19 at 22:41
  • 2
    You don't have to manually increase the replication factor. Instead, set the `StreamConfig.REPLICATION_FACTOR` configuration after resetting the application and rerun. Please be aware, all internal topics created during that topology will use the same config and you need to consider increased storage at broker side. – Nishu Tayal Aug 03 '19 at 21:02