I have a use-case where it is paramount to not continue until all consumer records in a KafkaConsumer
have been fetched. In this use-case there will be nothing going into the pipeline. What is the proper way to assure that there absolutely positively is nothing at all left to fetch?
Asked
Active
Viewed 2,222 times
2

Matthias J. Sax
- 59,682
- 7
- 117
- 137

Filip Allberg
- 3,941
- 3
- 20
- 37
-
Are you sure this is the right way to be doing it? Seems like you're make a streaming solution into a batch one? – Robin Moffatt Nov 07 '17 at 10:45
-
It's for when (if) our gateway has gone done and the messages haven't yet made it to Cassandra, then we want to consume the messages in the queue to find our last issued revision number. – Filip Allberg Nov 07 '17 at 11:00
-
1If you know that no new data is appended, you can get the end of the log via `Consumer#endOffsets` and terminate reading when `Consumer#position` reaches the end. – Matthias J. Sax Nov 08 '17 at 09:52
2 Answers
3
Kafka is designed to handle infinite streams of data, so the "consume all" means only that nobody sends any data for some period of time (1 minute), 1 hour, etc. - it's up to you.
You can use something like (pseudocode):
int emptyCount = 0;
while (true) {
records = Consumer.poll(500);
if (records.empty()) {
emptyCount++;
if (emptyCount >= 100) {
break;
}
continue;
}
emptyCount = 0;
...process records...
}
you can tune timeout in poll & number of empty cycles to reach necessary wait period.

Alex Ott
- 80,552
- 8
- 87
- 132
-
Hm, this might work. Right now I'm looking at seekToBeginning/seekToEnd and then using position to calculate if the two are the same and treating that as the stream being empty but this might be semantically incorrect, I don't know. – Filip Allberg Nov 07 '17 at 11:01
-
What is the semantic difference between doing this and calling poll with a long timeout? As said, nothing is being inputted into Kafka while this check is being performed. – Filip Allberg Nov 07 '17 at 11:19
-
If you call `poll` with timeout longer than heartbeat, then your consumer will be considered dead. – Alex Ott Nov 07 '17 at 11:28
-
Please note that data stays in topic until they expire - it could be days by default – Alex Ott Nov 07 '17 at 11:28
-
I just need to fetch everything that was in the message queue (primary concern: guaranteeing that everything is fetched). The scenario I'm trying to solve is: gateway (GW) issues a revision number, sends it to Kafka, GW dies and consumes _everything_ held in Kafka on start-up to ensure that it doesn't re-issue an already issued revision number. The GW might go down and back up in milliseconds so I just want to be sure that I capture the last thing sent from the GW and am concerned it might not have made it to Kafka. So "emptiness" might be the wrong concept. – Filip Allberg Nov 07 '17 at 11:35
-
-
2@AlexOtt "If you call poll with timeout longer than heartbeat, then your consumer will be considered dead." -- this is only true for older Kafka versions. Compare: https://stackoverflow.com/questions/39730126/difference-between-session-timeout-ms-and-max-poll-interval-ms-for-kafka-0-10-0/39759329#39759329 – Matthias J. Sax Nov 08 '17 at 09:50
0
If you are using kafka-console-consumer
, you can specify timeout-ms
argument to define how long it will wait until it is considered to be no more message coming.
--timeout-ms <Integer: timeout_ms> If specified, exit if no message is
available for consumption for the
specified interval.

cakraww
- 2,493
- 28
- 30