I have single node Kafka instance running locally via docker-compose.
(system: Mac/Arm64, image: wurstmeister/kafka:2.13-2.6.0)
I want to use kafkacat
(kcat
installed via Homebrew) to instantly produce and consume messages to and from Kafka.
Here is a minimal script:
#!/usr/bin/env bash
NUM_MESSAGES=${1:-3} # use arg1 or use default=3
KCAT_ARGS="-q -u -c $NUM_MESSAGES -b localhost:9092 -t unbuffered"
log() { echo "$*" 1>&2; }
producer() {
log "starting producer"
for i in `seq 1 3`; do
echo "msg $i"
log "produced: msg $i"
sleep 1
done | kcat $KCAT_ARGS -P
}
consumer() {
log "starting consumer"
kcat $KCAT_ARGS -C -o end | while read line; do
log "consumed: $line"
done
}
producer&
consumer&
wait
I would expect (roughly) the following output:
starting producer
starting consumer
produced: msg 1
consumed: msg 1
produced: msg 2
consumed: msg 2
produced: msg 3
consumed: msg 3
However, I only get output with produced and consumed messages fully batched into two groups, even though both the consumer
and producer
are running in parallel:
starting producer
starting consumer
produced: msg 1
produced: msg 2
produced: msg 3
consumed: msg 1
consumed: msg 2
consumed: msg 3
Here are some kafkacat/kafka producer properties and the values I already tried to change the producer behavior.
# kcat options having no effect on the test case
-u # unbuffered output
-T # act like `tee` and echo input
# kafka properties having no effect on the test case
-X queue.buffering.max.messages=1
-X queue.buffering.max.kbytes=1
-X batch.num.messages=1
-X queue.buffering.max.ms=100
-X socket.timeout.ms=100
-X max.in.flight.requests.per.connection=1
-X auto.commit.interval.ms=100
-X request.timeout.ms=100
-X message.timeout.ms=100
-X offset.store.sync.interval.ms=1
-X message.copy.max.bytes=100
-X socket.send.buffer.bytes=100
-X linger.ms=1
-X delivery.timeout.ms=100
None of the options above had any effect on the pipeline.
What am I missing?
Edit: It seems to be a flushing issue with either kcat or librdkafka. Maybe the -X
properties are not used correctly.
Here are the current observations (will edit them as I learn more):
When sending a larger payload of 10000 messages with a smaller delay in the script,
kcat
will produce several batches of messages. It seems to be size-based, but not configurable by any of the-X
options.The batches are then also correctly picked up by the consumer. So it must be a producer issue.
I also tried the script in docker with the current
kafkacat
from the apline repos. This one seems to flush a but earlier; with less data needed to fill the "hidden" buffer. The-X
options also had no effect.Also the
-X
properties seem to be checked. If I set out-of-range values, kcat (or maybe librdkafka) will complain. However, setting low values for any of the timeout and buffer size values has no effect.When calling
kcat
for every message (which is a bit of an overkill), the messages are produced instantly.
The question remains:
How do I tell a Kafka-pipeline to instantly produce my first message?
If you have an example in Go, this would also help, since I am having similar observations with a small Go program using kafka-go. I may post a separate question if I can strip that down to a postable format.
UPDATE: I tried using a bitnami image on a pure Linux host. Producing and consuming via kafkacat
works as expected on this system. I will post an answer once I know more.