8

I am attempting to use <KStream>.process() with a TimeWindows.of("name", 30000) to batch up some KTable values and send them on. It seems that 30 seconds exceeds the consumer timeout interval after which Kafka considers said consumer to be defunct and releases the partition.

I've tried upping the frequency of poll and commit interval to avoid this:

config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "5000");
config.put(StreamsConfig.POLL_MS_CONFIG, "5000");

Unfortunately these errors are still occurring:

(lots of these)

ERROR  o.a.k.s.p.internals.RecordCollector - Error sending record to topic kafka_test1-write_aggregate2-changelog 
org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for kafka_test1-write_aggregate2-changelog-0

Followed by these:

INFO   o.a.k.c.c.i.AbstractCoordinator - Marking the coordinator 12.34.56.7:9092 (id: 2147483547 rack: null) dead for group kafka_test1
WARN   o.a.k.s.p.internals.StreamThread - Failed to commit StreamTask #0_0 in thread [StreamThread-1]: 
  org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578)

Clearly I need to be sending heartbeats back to the server more often. How?

My topology is:

KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> lines = kStreamBuilder.stream(TOPIC);
KTable<Windowed<String>, String>  kt = lines.aggregateByKey(
            new DBAggregateInit(),
            new DBAggregate(),
            TimeWindows.of("write_aggregate2", 30000));

DBProcessorSupplier dbProcessorSupplier = new DBProcessorSupplier();

kt.toStream().process(dbProcessorSupplier);
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);

kafkaStreams.start();

The KTable is grouping values by key every 30 seconds. In Processor.init() I call context.schedule(30000).

DBProcessorSupplier provides an instance of DBProcessor. This is an implementation of AbstractProcessor where all the overrides have been provided. All they do is LOG so I know when each is being hit.

It's a pretty simple topology but it's clear I'm missing a step somewhere.


Edit:

I get that I can adjust this on the server side but Im hoping there is a client-side solution. I like the notion of partitions being made available pretty quickly when a client exits / dies.


Edit:

In an attempt to simplify the problem I removed the aggregation step from the graph. It's now just consumer->processor(). (If I send the consumer directly to .print() it works v quickly so I know it's ok). (Similarly If I output the aggregation (KTable) via .print() it seems ok too).

What I found was that the .process() - which should be calling .punctuate() every 30 seconds is actually blocking for variable lengths of time and outputting somewhat randomly (if at all).

Further:

I set the debug level to 'debug' and reran. Im seeing lots of messages:

DEBUG  o.a.k.s.p.internals.StreamTask - Start processing one record [ConsumerRecord <info>

but a breakpoint in the .punctuate() function isn't getting hit. So it's doing lots of work but not giving me a chance to use it.

Community
  • 1
  • 1
ethrbunny
  • 10,379
  • 9
  • 69
  • 131

1 Answers1

10

A few clarifications:

  • StreamsConfig.COMMIT_INTERVAL_MS_CONFIG is a lower bound on the commit interval, ie, after a commit, the next commit happens not before this time passed. Basically, Kafka Stream tries to commit ASAP after this time passed, but there is no guarantee whatsoever how long it will actually take to do the next commit.
  • StreamsConfig.POLL_MS_CONFIG is used for the internal KafkaConsumer#poll() call, to specify the maximum blocking time of the poll() call.

Thus, both values are not helpful to heartbeat more often.

Kafka Streams follows a "depth-first" strategy when processing record. This means, that after a poll() for each record all operators of the topology are executed. Let's assume you have three consecutive maps, than all three maps will be called for the first record, before the next/second record will get processed.

Thus, the next poll() call will be made, after all record of the first poll() got fully processed. If you want to heartbeat more often, you need to make sure, that a single poll() call fetches less records, such that processing all records takes less time and the next poll() will be triggered earlier.

You can use configuration parameters for KafkaConsumer that you can specify via StreamsConfig to get this done (see https://kafka.apache.org/documentation.html#consumerconfigs):

streamConfig.put(ConsumerConfig.XXX, VALUE);

  • max.poll.records: if you decrease this value, less record will be polled
  • session.timeout.ms: if you increase this value, there is more time for processing data (adding this for completeness because it is actually a client setting and not a server/broker side configuration -- even if you are aware of this solution and do not like it :))

EDIT

As of Kafka 0.10.1 it is possible (and recommended) to prefix consumer and procuder configs within streams config. This avoids parameter conflicts as some parameter names are used for consumer and producer and cannot be distinguiesh otherwise (and would be applied to consumer and producer at the same time). To prefix a parameter you can use StreamsConfig#consumerPrefix() or StreamsConfig#producerPrefix(), respectively. For example: streamsConfig.put(StreamsConfig.consumerPrefix(ConsumerConfig.PARAMETER), VALUE);

One more thing to add: The scenario described in this question is a known issue and there is already KIP-62 that introduces a background thread for KafkaConsumer that send heartbeats, thus decoupling heartbeats from poll() calls. Kafka Streams will leverage this new feature in upcoming releases.

Matthias J. Sax
  • 59,682
  • 7
  • 117
  • 137
  • I added entries for `config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "60000");` and one for REQUEST_TIMEOUT_MS_CONFIG, "120000". The client sat for > 5 minutes before the `.process().init()` was called. No other activity. – ethrbunny Aug 30 '16 at 21:40
  • (cont'd) removed above entries and added `config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000");`. No change in behavior. Client / consumer still does nothing. *BUT* if I change the `Name` parameter in my Tumbling Windows param that I pass to `.aggregateByKey()` I get action - and then promptly start getting the timeout errors again. It's clear that I'm not using this correctly. – ethrbunny Aug 30 '16 at 21:57
  • Can you share your code of `DBAggregateInit`, `DBAggregate`, `DBProcessorSupplier`, and `DBProcessor`? – Matthias J. Sax Aug 30 '16 at 22:08
  • They are stubbed out. Each function returns the default and logs to the console so I know when it's getting called. **DBAggregateInit** returns null. **ProcessorSupplier.get()** returns `new BatchProcessor();`. **DBAggregator.apply()** logs the values sent in (s, s2 and s3) to debug. That's it. – ethrbunny Aug 30 '16 at 22:16
  • The result of `DBAggregate#apply()` is going to be written into the changelog. What does it return? And what is `BatchProcessor` ? – Matthias J. Sax Aug 30 '16 at 22:44
  • See edit(s) inline. I've tried to simplify the issue. I'm not hitting the timeout error(s) at this point so it may be time to resolve this SO entry and start a new one. – ethrbunny Aug 31 '16 at 11:42
  • 1
    Im marking this as resolved. The timeouts can be alleviated with the above settings. My issues with `ProcessorContext` and `AbstractProcessor` are [here](http://stackoverflow.com/questions/39251997/kafka-kstream-using-abstractprocessor-with-a-window). – ethrbunny Aug 31 '16 at 14:12