0

we have installed 3 kafka brokers machines on on RHEL 7.6 linux version

Kafka version is 2.7.x

each kafka broker has 8 Jbod disks as we can see from the following ( df -h details )

df -h

/dev/sdc                    1.7T  929G  748G  56% /kafka/kafka_logs2
/dev/sdd                    1.7T  950G  727G  57% /kafka/kafka_logs3
/dev/sde                    1.7T  999G  678G  60% /kafka/kafka_logs4
/dev/sdf                    1.7T  971G  706G  58% /kafka/kafka_logs5
/dev/sdg                    1.7T  1.1T  563G  67% /kafka/kafka_logs6
/dev/sdh                    1.7T  962G  714G  58% /kafka/kafka_logs7
/dev/sdi                    1.7T  1.1T  621G  63% /kafka/kafka_logs8

as we can see from above that disk - /kafka/kafka_logs6 get 67% used

When /kafka/kafka_logs2 is 56%

after short investigation we found that partition of topic/s are not with the same number across the disks

for example

lets take the topic - cars_costs.ml for example , this topic has 100 partitions

now lets looks on the jbod disks

we have only 11 partitions on disk /kafka/kafka_logs2 that related to topic - cars_costs.ml

but on disk - /kafka/kafka_logs6 , we have 21 partitions that related to the same topic - cars_costs.ml

so - we not understand why Kafka locate different partitions numbers on the jbod disks

so just to summary the number of partitions on the disks

disk                   number of partition ( cars_costs.ml )
/kafka/kafka_logs2   - 11
/kafka/kafka_logs3   - 13
/kafka/kafka_logs4   - 20
/kafka/kafka_logs5   - 14
/kafka/kafka_logs6   - 21
/kafka/kafka_logs7   - 10
/kafka/kafka_logs8   - 11

useful parameter that already set in server.properties

offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
default.replication.factor=3
broker.rack=/default-rack

the full parameters are:

more server.properties
auto.create.topics.enable=false
auto.leader.rebalance.enable=true
background.threads=10
log.retention.bytes=-1
log.retention.hours=48
delete.topic.enable=true
leader.imbalance.check.interval.seconds=300
leader.imbalance.per.broker.percentage=10
log.dir=/kafka/kafka-logs2,/kafka/kafka-logs3 ...............
log.flush.interval.messages=9223372036854775807
log.flush.interval.ms=1000
log.flush.offset.checkpoint.interval.ms=60000
log.flush.scheduler.interval.ms=9223372036854775807
log.flush.start.offset.checkpoint.interval.ms=60000
compression.type=producer
log.roll.jitter.hours=0
log.segment.bytes=1073741824
log.segment.delete.delay.ms=60000
message.max.bytes=1000012
min.insync.replicas=1
num.io.threads=10
num.network.threads=48
num.recovery.threads.per.data.dir=1
num.replica.fetchers=1
offset.metadata.max.bytes=4096
offsets.commit.required.acks=-1
offsets.commit.timeout.ms=5000
offsets.load.buffer.size=5242880
offsets.retention.check.interval.ms=600000
offsets.retention.minutes=10080
offsets.topic.compression.codec=0
offsets.topic.num.partitions=50
offsets.topic.replication.factor=3
offsets.topic.segment.bytes=104857600
queued.max.requests=1000
quota.consumer.default=9223372036854775807
quota.producer.default=9223372036854775807
replica.fetch.min.bytes=1
replica.fetch.wait.max.ms=500
replica.high.watermark.checkpoint.interval.ms=5000
replica.lag.time.max.ms=10000
replica.socket.receive.buffer.bytes=65536
replica.socket.timeout.ms=30000
request.timeout.ms=30000
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
transaction.max.timeout.ms=900000
transaction.state.log.load.buffer.size=5242880
transaction.state.log.min.isr=2
transaction.state.log.num.partitions=50
transaction.state.log.replication.factor=3
transaction.state.log.segment.bytes=104857600
transactional.id.expiration.ms=604800000
unclean.leader.election.enable=false
zookeeper.connection.timeout.ms=600000
zookeeper.max.in.flight.requests=10
zookeeper.session.timeout.ms=600000
zookeeper.set.acl=false
broker.id.generation.enable=true
connections.max.idle.ms=600000
connections.max.reauth.ms=0
controlled.shutdown.enable=true
controlled.shutdown.max.retries=3
controlled.shutdown.retry.backoff.ms=5000
controller.socket.timeout.ms=30000
default.replication.factor=3
delegation.token.expiry.time.ms=86400000
delegation.token.max.lifetime.ms=604800000
delete.records.purgatory.purge.interval.requests=1
fetch.purgatory.purge.interval.requests=1000
group.initial.rebalance.delay.ms=3000
group.max.session.timeout.ms=1800000
group.max.size=2147483647
group.min.session.timeout.ms=6000
log.cleaner.backoff.ms=15000
log.cleaner.dedupe.buffer.size=134217728
log.cleaner.delete.retention.ms=86400000
log.cleaner.enable=true
log.cleaner.io.buffer.load.factor=0.9
log.cleaner.io.buffer.size=524288
log.cleaner.io.max.bytes.per.second=1.7976931348623157e308
log.cleaner.max.compaction.lag.ms=9223372036854775807
log.cleaner.min.cleanable.ratio=0.5
log.cleaner.min.compaction.lag.ms=0
log.cleaner.threads=1
log.cleanup.policy=delete
log.index.interval.bytes=4096
log.index.size.max.bytes=10485760
log.message.timestamp.difference.max.ms=9223372036854775807
log.message.timestamp.type=CreateTime
log.preallocate=false
log.retention.check.interval.ms=300000
max.connections=2147483647
max.connections.per.ip=2147483647
max.incremental.fetch.session.cache.slots=1000
num.partitions=1
producer.purgatory.purge.interval.requests=1000
queued.max.request.bytes=-1
replica.fetch.backoff.ms=1000
replica.fetch.max.bytes=1048576
replica.fetch.response.max.bytes=10485760
reserved.broker.max.id=1500
transaction.abort.timed.out.transaction.cleanup.interval.ms=60000
transaction.remove.expired.transaction.cleanup.interval.ms=3600000
zookeeper.sync.time.ms=2000
broker.rack=/default-rack
OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
jessica
  • 2,426
  • 24
  • 66
  • Kafka allocating partitions does not take disk space or load into account, but should balance the number of partitions evenly. However, a topic should be distributed across all three of your brokers round-robin and if I understand correctly the 100 partitions are on just one of the brokers. Could you say what you configured as `replication.factor` and `broker.rack`? And maybe find out the number of partitions on the other two brokers? – maow Oct 26 '21 at 14:07
  • yes , I will check that soon – jessica Oct 26 '21 at 14:08
  • see the update , I add the relevant parameters – jessica Oct 26 '21 at 14:18
  • let me know if you need traditional info – jessica Oct 26 '21 at 14:18
  • As a sidenote: I would recommend configuring `min.insync.replicas=2`. There is a very nice discussion in https://stackoverflow.com/questions/48825755/how-does-kafka-handle-network-partitions why that might be safer. – maow Oct 26 '21 at 14:44
  • let me know if we set only 2 replica then it can help the reassign-partitions ? – jessica Oct 26 '21 at 16:28
  • FYI - we runs the script kafka-reassign-partitions.sh for the topic more then 4 times but still without good results – jessica Oct 26 '21 at 16:30

2 Answers2

1

I looked it up a bit and it seems like this is a known behavior of Kafka on jbod disks.

https://mail-archives.apache.org/mod_mbox/kafka-users/201506.mbox/%3CCAA+BczTLvZND4MGsG-LBM-wutzTNy3CXKLRRjo_55Xp00fwXLw@mail.gmail.com%3E

There are even three KIPs for this.

In short: Yes, the assigment of partitions to disks is not balanced, but you can reassign that as an administrator e.g. with the kafka-reassign-partitions.sh script. This is also very useful in case you have an unbalanced load on your partitions and need to reflect that in the assignment to disks.

And of course, if you have confluent platform, they take care of that for you. https://docs.confluent.io/platform/current/kafka/rebalancer/index.html

You live and learn...

maow
  • 2,712
  • 1
  • 11
  • 25
  • thank you for the answer , but I want to say that we are already used the script - kafka-reassign-partitions.sh , but without good results , so I am understand if the script not balance the partitions accros the disks then we not have other solution - am I right ? – jessica Oct 26 '21 at 16:04
  • do you think we need to set min.insync.replicas=2 , and then run again the kafka-reassign-partitions.sh , maybe we will get good results ? – jessica Oct 26 '21 at 16:05
  • about min.insync.replicas=2 , we by default use 3 , we are worry to set 2 because we actually have only one backup , so maybe its little risky , but if you think when settings min.insync.replicas=2 , its help for reassign-partitions , then we can consider this option – jessica Oct 26 '21 at 16:19
  • no, `min.insync.replicas` will not help with that. "if the script not balance the partitions accros the disks then we not have other solution - am I right ?" I am afraid I don't have any solution then :( – maow Oct 26 '21 at 19:17
0

I would recommend to try CruiseControl. CruiseControl was born to solve such balancing issues, and it can help to balance the load across disks or brokers.

Hao
  • 1