I have a puzzling issue with one of my samza tasks. It works correctly except for messages on one partition. I have 9 partitions on the topic. If I send 1000 messages, I only receive about 890 of them.
I have checked with kafka-console-consumer with partition keys that I know don’t get processed by my samza job and the console consumer does see the message, so I know it’s getting written to the topic and that at least a vanilla consumer can see it just fine.
I have enabled debug logging on samza and there are many messages from the org.apache.samza.checkpoint.kafka.KafkaCheckpointManager
that say:
Adding checkpoint Checkpoint [offsets={SystemStreamPartition [kafka, com.mycompany.indexing.document, 4]=448}] for taskName Partition 4
Partition 4 always says 448. Partition 0 has similar logs, but where it says 448, it is a steadily increasing number.
I’m happy to share whatever interesting configuration information would help narrow this down, but right now, I’m a bit mystified about what I would even share.
I’m running as ThreadJobFactory
with:
samza-kafka_2.10 version 0.9.1
kafka_2.10 version 0.8.2.1 on the client
kafka broker 0.9.0.0
Update
I looked at an upstream samza job using the same partition key and found the problem on partition 4 upstream. Checking the samza checkpoint topic with kafkacat, I see the checkpoint for partition 4 not advancing. First I see:
{"SystemStreamPartition [kafka, resource.mutation, 6]":{"system":"kafka","partition":"6","offset":"96639","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 3]":{"system":"kafka","partition":"3","offset":"47135","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 0]":{"system":"kafka","partition":"0","offset":"49476","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 4]":{"system":"kafka","partition":"4","offset":"2556","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 8]":{"system":"kafka","partition":"8","offset":"62263","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 1]":{"system":"kafka","partition":"1","offset":"52151","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 7]":{"system":"kafka","partition":"7","offset":"58081","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 5]":{"system":"kafka","partition":"5","offset":"47712","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 2]":{"system":"kafka","partition":"2","offset":"45831","stream":"resource.mutation"}}
% Reached end of topic __samza_checkpoint_ver_1_for_resource-normalizer_1 [0] at offset 81713
Then a minute later I see:
{"SystemStreamPartition [kafka, resource.mutation, 6]":{"system":"kafka","partition":"6","offset":"96624","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 3]":{"system":"kafka","partition":"3","offset":"47115","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 0]":{"system":"kafka","partition":"0","offset":"49462","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 4]":{"system":"kafka","partition":"4","offset":"2556","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 8]":{"system":"kafka","partition":"8","offset":"62252","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 1]":{"system":"kafka","partition":"1","offset":"52134","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 7]":{"system":"kafka","partition":"7","offset":"58063","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 5]":{"system":"kafka","partition":"5","offset":"47696","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 2]":{"system":"kafka","partition":"2","offset":"45817","stream":"resource.mutation"}}
% Reached end of topic __samza_checkpoint_ver_1_for_resource-normalizer_1 [0] at offset 81722
The number doesn't advance past 2556. However, looking at the actual topic of resource.mutation
on partition 4, the last offset is similar in range to the others, about 61000 as of now and growing.
There are no error messages or warning messages at all. It just stops consuming from partition 4.