2

I have a puzzling issue with one of my samza tasks. It works correctly except for messages on one partition. I have 9 partitions on the topic. If I send 1000 messages, I only receive about 890 of them.

I have checked with kafka-console-consumer with partition keys that I know don’t get processed by my samza job and the console consumer does see the message, so I know it’s getting written to the topic and that at least a vanilla consumer can see it just fine.

I have enabled debug logging on samza and there are many messages from the org.apache.samza.checkpoint.kafka.KafkaCheckpointManager that say:

Adding checkpoint Checkpoint [offsets={SystemStreamPartition [kafka, com.mycompany.indexing.document, 4]=448}] for taskName Partition 4

Partition 4 always says 448. Partition 0 has similar logs, but where it says 448, it is a steadily increasing number.

I’m happy to share whatever interesting configuration information would help narrow this down, but right now, I’m a bit mystified about what I would even share.

I’m running as ThreadJobFactory with:

  • samza-kafka_2.10 version 0.9.1

  • kafka_2.10 version 0.8.2.1 on the client

  • kafka broker 0.9.0.0

Update

I looked at an upstream samza job using the same partition key and found the problem on partition 4 upstream. Checking the samza checkpoint topic with kafkacat, I see the checkpoint for partition 4 not advancing. First I see:

{"SystemStreamPartition [kafka, resource.mutation, 6]":{"system":"kafka","partition":"6","offset":"96639","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 3]":{"system":"kafka","partition":"3","offset":"47135","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 0]":{"system":"kafka","partition":"0","offset":"49476","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 4]":{"system":"kafka","partition":"4","offset":"2556","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 8]":{"system":"kafka","partition":"8","offset":"62263","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 1]":{"system":"kafka","partition":"1","offset":"52151","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 7]":{"system":"kafka","partition":"7","offset":"58081","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 5]":{"system":"kafka","partition":"5","offset":"47712","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 2]":{"system":"kafka","partition":"2","offset":"45831","stream":"resource.mutation"}}
% Reached end of topic __samza_checkpoint_ver_1_for_resource-normalizer_1 [0] at offset 81713

Then a minute later I see:

{"SystemStreamPartition [kafka, resource.mutation, 6]":{"system":"kafka","partition":"6","offset":"96624","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 3]":{"system":"kafka","partition":"3","offset":"47115","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 0]":{"system":"kafka","partition":"0","offset":"49462","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 4]":{"system":"kafka","partition":"4","offset":"2556","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 8]":{"system":"kafka","partition":"8","offset":"62252","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 1]":{"system":"kafka","partition":"1","offset":"52134","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 7]":{"system":"kafka","partition":"7","offset":"58063","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 5]":{"system":"kafka","partition":"5","offset":"47696","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 2]":{"system":"kafka","partition":"2","offset":"45817","stream":"resource.mutation"}}
% Reached end of topic __samza_checkpoint_ver_1_for_resource-normalizer_1 [0] at offset 81722

The number doesn't advance past 2556. However, looking at the actual topic of resource.mutation on partition 4, the last offset is similar in range to the others, about 61000 as of now and growing.

There are no error messages or warning messages at all. It just stops consuming from partition 4.

jhericks
  • 5,833
  • 6
  • 40
  • 60

1 Answers1

2

The issue was that there was a message that exceeded the default max.message.bytes for a kafka consumer. But rather than give any sort of error message, the thread responsible for consuming that partition would simply hang on that message. The other partition threads would continue along happily.

Once we configured systems.kafka.consumer.fetch.message.max.bytes to a large enough value to consume each of the messages on the partition and restarted the job, it picked up where it had left off and everything started working as expected.

jhericks
  • 5,833
  • 6
  • 40
  • 60