97

After a Kafka topic has been created by a producer or an administrator, how would you change the number of replicas of this topic?

Community
  • 1
  • 1
GuruPo
  • 1,095
  • 1
  • 8
  • 6

12 Answers12

176

To increase the number of replicas for a given topic you have to:

1. Specify the extra replicas in a custom reassignment json file

For example, you could create increase-replication-factor.json and put this content in it:

{"version":1,
  "partitions":[
     {"topic":"signals","partition":0,"replicas":[0,1,2]},
     {"topic":"signals","partition":1,"replicas":[0,1,2]},
     {"topic":"signals","partition":2,"replicas":[0,1,2]}
]}

2. Use the file with the --execute option of the kafka-reassign-partitions tool

[or kafka-reassign-partitions.sh - depending on the kafka package]

For example:

$ kafka-reassign-partitions --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute

3. Verify the replication factor with the kafka-topics tool

[or kafka-topics.sh - depending on the kafka package]

 $ kafka-topics --zookeeper localhost:2181 --topic signals --describe

Topic:signals   PartitionCount:3    ReplicationFactor:3 Configs:retention.ms=1000000000
Topic: signals  Partition: 0    Leader: 2   Replicas: 0,1,2 Isr: 2,0,1
Topic: signals  Partition: 1    Leader: 2   Replicas: 0,1,2 Isr: 2,0,1
Topic: signals  Partition: 2    Leader: 2   Replicas: 0,1,2 Isr: 2,0,1

See also: the part of the official documentation that describes how to increase the replication factor.

Łukasz Dumiszewski
  • 2,888
  • 2
  • 21
  • 13
  • Programatically where to set the replication factor ...i.e. consumer side or producer side – BdEngineer Jun 15 '17 at 05:29
  • 9
    Kafka-reassign-partitions can generate a suggestion on partitions to reassign by specifying **--generate** and **--topics-to-move-json-file**, however the documentation doesn't explain the contents well: `{ "topics": [ { "topic": "YOUR_TOPIC_NAME_1" }, { "topic": "YOUR_TOPIC_NAME_2" } ], "version": 1 }` The command then looks like `kafka-reassign-partitions.sh --zookeeper #.#.#.#:2181,#.#.#.#:2181,#.#.#.#:2181 --broker-list #,#,# --topics-to-move-json-file reassignment.topics.json --generate` – andyfeller Sep 28 '17 at 12:44
  • 3
    Does step 2 `kafka-reassign-partitions` cause any downtime? I have some topics with a replication factor of 1 (default, forgot to specify when creating), and I'm wondering if my producers will get errors while partitions reassigned. – mmrobins Sep 12 '18 at 22:13
  • How to do this automatically? The first step force me to hand craft this, with the knowledge of existing number of brokers,partitions. – beatrice Apr 20 '20 at 14:34
  • @beatrice In Kafka currently topic management is semi-manual process. You can get information about partitions by using ./kafka-topics.sh –zookeeper zkhost:9092 --describe For broker information use zookeeper-shell.sh zkhost:9092 ls /brokers/ids – c0der512 Apr 24 '20 at 15:59
  • Is that okay to use the same replica order for the partitions? I mean the preferred leader will be the same in this case. Can this cause any perfomance issue? – beatrice Nov 09 '20 at 15:26
24

You can also use kafkactl for this:

# first run with --validate-only to see what kafkactl will do
kafkactl alter topic my-topic --replication-factor 2 --validate-only

# then do the replica reassignment
kafkactl alter topic my-topic --replication-factor 2

Note that the Kafka API that kafkactl is using for this is only available for Kafka ≥ 2.4.0.

Disclaimer: I am contributor to this project

D-rk
  • 5,513
  • 1
  • 37
  • 55
13

Edit: I was proven to be wrong - please check excellent answer from Łukasz Dumiszewski.

I'm leaving my original answer for completness for now.



I don't think you can. Normally it would be something like

./kafka-topics.sh --zookeeper localhost:2181 --alter --topic test2 --replication-factor 3

but it says

Option "[replication-factor]" can't be used with option"[alter]"

It is funny that you can change number of partitions on the fly (which is often hugely destructive action when done in runtime), but cannot increase replication factor, which should be transparent. But remember, it is 0.10, not 10.0... Please see here for enhancement request https://issues.apache.org/jira/browse/KAFKA-1543

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
Artur Biesiadowski
  • 3,595
  • 2
  • 13
  • 18
  • Thanks for you response timely, you mean that there is no need or it is unusual to change the replicas in the Kafka runtime, right? Or if i do want to change it, i can install the patch in the link you pasted,right? – GuruPo Jun 22 '16 at 07:34
  • 1
    I don't think that this patch will work out of the box - it was done against version from year ago. On top of that, it is not working dynamically - it just changes metadata, so from what I understand, you would need to restart entire cluster for it to take effect. It might be just easier to delete and recreate topic if you take your entire system down anyway. As for real solution - adding replica while system is running, I don't think anybody is working on that and it is certainly some effort to add it (but it should be possible with current architecture). – Artur Biesiadowski Jun 23 '16 at 07:24
  • replication factors could not change in that way you need to specify --replicas option – Mansur Ul Hasan Sep 16 '21 at 12:24
  • I am not sure if this answer is serving any purpose anymore? Does it make sense to just delete it? It is just a link to the correct answer, which would come at the top if this answer is deleted (this answer is the accepted answer due to which SO shows this before the relevant answer by Łukasz Dumiszewski). – akki Oct 08 '21 at 15:08
8

Łukasz Dumiszewski's answer is correct but manually generating that file is a bit hard. Luckily there are some easy ways to achieve what @Łukasz Dumiszewski said.

  • If you are using kafka-manager tool, from version 2.0.0.2 you can change the replication factor in Generate Partition Assignment section in a topic view. Then you should click on Reassign Partitions to apply the generated partition assignment (if you select a different replication factor, you will get a warning but you can click on Force Reassign afterward).

  • If you have ruby installed you can use this helper script

  • If you prefer nodejs you can generate the file with this gist too.
  • 1
    This was the easiest method for me since I have a running Kafka manager instance. It is worth mentioning that you have to "force the reassignment" twice: When you get the warning, you have to click on the link "Try force running", which will take you back to the topic overview screen. There will now be a modified button "Force reassign partitions" which will do the magic. Thanks for the hint! – jurgispods Jun 13 '19 at 14:30
4

The scripted answer of @Дмитрий-Шепелев did not include a solution for topics with multiple partitions. This updated version does:

#!/bin/bash

brokerids="1,2,3"
topics=`kafka-topics --list --zookeeper zookeeper:2181`

while read -r line; do lines+=("$line"); done <<<"$topics"
echo '{"version":1,
  "partitions":['
for t in $topics; do
    sep=","
    pcount=$(kafka-topics --describe --zookeeper zookeeper:2181 --topic $t | awk '{print $2}' | uniq -c |awk 'NR==2{print $1}')
    for i in $(seq 0 $[pcount - 1]); do
        if [ "${t}" == "${lines[-1]}" ] && [ "$[pcount - 1]" == "$i" ]; then sep=""; fi
        randombrokers=$(echo "$brokerids" | sed -r 's/,/ /g' | tr " " "\n" | shuf | tr  "\n" "," | head -c -1)
        echo "    {\"topic\":\"${t}\",\"partition\":${i},\"replicas\":[${randombrokers}]}$sep"
    done
done

echo '  ]
}'

Note: it also randomizes the brokers and picks two replicas per partition. So make sure the brokerid's in the script are correctly defined.

Execute as follows:

$ ./reassign.sh > reassign.json
$ kafka-reassign-partitions --zookeeper zookeeper:2181 --reassignment-json-file reassign.json --execute
Filidor Wiese
  • 664
  • 8
  • 16
  • pcount did not work for me - using kafka 2.5.0. re-wrote as: pcount=$(/opt/kafka/bin/kafka-topics.sh --describe --zookeeper zookeeper:2181 --topic $t | grep 'Partition:' | wc -l) – dovka Aug 21 '21 at 20:22
3

This script may help you, if you want change replication factor for all topics:

#!/bin/bash

topics=`kafka-topics --list --zookeeper zookeeper:2181`

while read -r line; do lines+=("$line"); done <<<"$topics"
echo '{"version":1,
  "partitions":[' > tmp.json
for t in $topics; do 
    if [ "${t}" == "${lines[-1]}" ]; then
        echo "    {\"topic\":\"${t}\",\"partition\":0,\"replicas\":[0,1,2]}" >> tmp.json
    else
        echo "    {\"topic\":\"${t}\",\"partition\":0,\"replicas\":[0,1,2]}," >> tmp.json
    fi
done

echo '  ]
}' >> tmp.json

kafka-reassign-partitions --zookeeper zookeeper:2181 --reassignment-json-file tmp.json --execute
  • Could you please explain why this should work? Please add some detail to this answer so that people with similar problems can use this as well! Thank you! – creyD Jun 21 '19 at 13:07
  • I wrote this script when I discovered that the developers from my team created topics without replicas. We does not used many partitions. Not everyone knew that the default Kafka sets the replication factor to 1. High fault tolerance was the main requirement for the cluster. – Дмитрий Шепелев Jun 22 '19 at 15:35
  • why this should not work? Script get all topics. Forms json. And apply it. – Дмитрий Шепелев Jun 22 '19 at 15:39
  • This isn't very safe, as it assumes you only have 3 equally sized brokers that have even distribution already – OneCricketeer Jul 30 '19 at 16:17
  • 3
    Isn't that obvious from the script text? The first rule of the network administrator is not to run obscure scripts.))) How can I say that a cluster is High Available if I do not have 3 brokers of the same size? I know that my script is not a silver bullet. But in my situation, he helped me. If your situation is different, you can fix my script as you wish or use another. – Дмитрий Шепелев Aug 09 '19 at 13:02
  • Because if you have 10 brokers of the same size, this script loads all the data onto just three of them. OR you might have three brokers, that aren't numbered 0,1, and 2. Some explanation here would be useful. As it stands, its not useful for more complicated clusters. – Scott Carey Aug 26 '22 at 10:05
1

If you have a lot of partitions, using kafka-reassign-partitions to generate the json file required by Łukasz Dumiszewski's answer (and the official documentation) can be a timesaver. Here is an example of replicating a 64 partition topic from 1 to 2 servers without having to specify all the partitions:

expand_topic=TestTopic
current_server=111
new_servers=111,222
echo '{"topics": [{"topic":"'${expand_topic}'"}], "version":1}' > /tmp/topics-to-expand.json
/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file /tmp/topics-to-expand.json --broker-list "${current_server}" --generate | tail -1 | sed s/\\[${current_server}\\]/\[${new_servers}\]/g | tee /tmp/topic-expand-plan.json
/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file /tmp/topic-expand-plan.json --execute
/bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic ${expand_topic}

Outputs:

Topic:TestTopic PartitionCount:64   ReplicationFactor:2 Configs:retention.ms=6048000
    Topic: TestTopic    Partition: 0    Leader: 111 Replicas: 111,222   Isr: 111,222
    Topic: TestTopic    Partition: 1    Leader: 111 Replicas: 111,222   Isr: 111,222
    ....
MilesHampson
  • 2,069
  • 24
  • 43
1

1. Copy all topics to json file

#!/bin/bash
topics=`kafka-topics.sh --zookeeper localhost:2181 --list`

while read -r line; do lines+=("$line"); done <<<"$topics"
echo '{"version":1,
 "topics":['
 for t in $topics; do
     echo -e '     { "topic":' \"$t\" '},'
done

echo '  ]
}'

bash alltopics.sh > alltopics.json

2. Run kafka-reassign-partitions.sh to generate rebalanced file

kafka-reassign-partitions.sh --zookeeper localhost:2181 --broker-list "0,1,2" --generate --topics-to-move-json-file alltopics.json > reassign.json

3. Cleanup reassign.json file it contains existing and proposed values

4. Run kafka-reassign-partitions.sh to rebalance topics

kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file reassign.json --execute
bhargav joshi
  • 329
  • 3
  • 6
1

In the first step we need to alter topics with replicas

./kafka-topics.sh --describe --zookeeper prod-az-p1-zk01.<domain>.prod:2181 --topic test2

then in the next step we need to identify brokers list where we need to sync our replicas and it requires topic rebalance to do this create a json file and define all the ISR brokers and topic

    {"version":1,
    "partitions":[
     {"topic":"test2","partition":0,"replicas":[0,10]},
     {"topic":"test2","partition":1,"replicas":[10,20]}
    ]}

In the last we need to rebalance the topics for partitions

./kafka-reassign-partitions.sh --zookeeper prod-az-p1-zk01.<domain>.prod:2181 --reassignment-json-file /tmp/increase-replication-factor.json --execute

To verify

[root@prod-az-p2-kafka02 bin]# ./kafka-topics.sh --describe --zookeeper prod-az-p1-zk01.<domain>.prod:2181 --topic test2
Topic: test2    TopicId: -LoL36ztSeyC8rzvnp4YMw PartitionCount: 2   ReplicationFactor: 2    Configs:
    Topic: test2    Partition: 0    Leader: 10  Replicas: 0,10  Isr: 10
    Topic: test2    Partition: 1    Leader: 20  Replicas: 10,20 Isr: 20,10
Mansur Ul Hasan
  • 2,898
  • 27
  • 24
0

This script will generate the JSON for kafka-reassign-partitions.sh and feed it into that script to increase the replication factor. The new set of replicas will:

  • Keep the current replicas
  • Add new unique brokers (this will prevent unneeded data migrations)

This script was tested with 2.8.0 Kafka scripts. Only the variables at the top of the file will need modified.

#!/bin/bash

KAFKA_BIN="./bin"
KAFKA_CONNECTION_ARGS="--bootstrap-server localhost:9094"

broker_ids="1,2,3"
topic="topic_foobar"
new_replication_factor=3 # New replication factor


reassignment_file="./reassignment.json"


#~~~~ Don't change anything after this line ~~~~#


# Generate a list of "partition|replicas"
topic_data="$("$KAFKA_BIN/kafka-topics.sh" $KAFKA_CONNECTION_ARGS --describe --topic "$topic" | tail -n +2 | sed -E 's/.*Partition:\s+([0-9]+).*Replicas:\s+([0-9,]+).*/\1|\2/g')"
partition_count=$(echo "$topic_data" | wc -l)

echo '{
    "version": 1,
    "partitions": [' > "$reassignment_file"


log_dirs="$(yes '"any"' | head -n $new_replication_factor | sed -e ':a;N;$!ba;s/\n/,/g')"
obj_sep=","
while read -r partition_data; do
    partition=$(echo "$partition_data" | cut -d '|' -f 1)
    replicas=$(echo "$partition_data" | cut -d '|' -f 2)

    # Randomize the replicas (using this list as a queue)
    random_replicas="$(echo $broker_ids | tr "," "\n" | shuf)"
    
    # Loop until the replicas has desired RF - 1 commas
    while [ "$(echo "$replicas" | tr -dc , | wc -c)" != $((new_replication_factor-1)) ]; do
        # Pick the next replica, add it to the list if it isn't already there, otherwise advance the queue
        next_replica="$(echo "$random_replicas" | head -1)"
        if [[ $replicas != *$next_replica* ]]; then
            replicas="$replicas,$next_replica"
        else
            random_replicas="$(echo "$random_replicas" | tail -n +2)"
        fi
    done
    
    # Don't add a comma on the last object
    if [ "$((partition_count-1))" == "$partition" ]; then obj_sep=""; fi
    
    echo '      {
            "topic": "'"$topic"'",
            "partition": '"$partition"',
            "replicas": ['"$replicas"'],
            "log_dirs": ['"$log_dirs"']
        }'$obj_sep >> "$reassignment_file"
done < <(echo "$topic_data")

echo '  ]
}' >> "$reassignment_file"


cat "$reassignment_file"
read -p "Apply the above reassignment? (Ctrl-C to exit): "


"$KAFKA_BIN/kafka-reassign-partitions.sh" $KAFKA_CONNECTION_ARGS --execute --reassignment-json-file "$reassignment_file"
0

The answer by Lukas is correct, but it leaves open the question about how best to generate the topic assignment JSON files that kafka-reassign-partitions needs as input.

I like to use the DataDog topicmappr tool to create the topic re-assignments in an intelligent way. The tool is deterministic, inspects the current layout, and can optimize it in various configurable ways.

For example:

topicmappr rebuild --brokers "-2" --topics .\* --topics-exclude __.\* \
  --replication 2 --optimize-leadership --force-rebuild --skip-no-ops \
  --out-path remaps/ --zk-addr $zk

would rebalance all topics (excluding topics starting with "__") with a replication factor of 2, optimize leadership so that leadership for the given topics is spread evenly across the available brokers, force a map rebuild, skip anything that hasn't changed, and output all the resulting JSONs to the remaps directory.

The tool can optimize partition placement for even storage (for unbalanced partitions) or partition counts, is rack-aware, and has various other useful options.

The tool is completely safe to use, as all it does is output a summary of everything it is doing, and the JSONs needed for the remapping. It doesn't make any changes itself.

Raman
  • 17,606
  • 5
  • 95
  • 112
-3

To increase the number of replicas for a given topic you have to:

1. Specify the extra partitions to the existing topic with below command(let us say increase from 2 to 3)

bin/kafktopics.sh --zookeeper localhost:2181 --alter --topic topic-to-increase --partitions 3

2. Specify the extra replicas in a custom reassignment json file

For example, you could create increase-replication-factor.json and put this content in it:

{"version":1,
  "partitions":[
     {"topic":"topic-to-increase","partition":0,"replicas":[0,1,2]},
     {"topic":"topic-to-increase","partition":1,"replicas":[0,1,2]},
     {"topic":"topic-to-increase","partition":2,"replicas":[0,1,2]}
]}

3. Use the file with the --execute option of the kafka-reassign-partitions tool

bin/kafka-reassign-partitions --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute

4. Verify the replication factor with the kafka-topics tool

bin/kafka-topics --zookeeper localhost:2181 --topic topic-to-increase --describe

SivaPhani
  • 81
  • 9
  • In case anyone wonders why this is so horibly wrong... PARTITION is _not_ REPLICA and you can not undo changing the number of partitions on a topic, even if the topic is empty Kafka will not allow you to reduce the number of partitions again. – Samuel Åslund Mar 24 '22 at 17:51