I use org.apache.kafka:kafka-streams:0.10.0.1
I'm attempting to work with a time series based stream that doesn't seem to be triggering a KStream.Process()
to trigger ("punctuate"). (see here for reference)
In a KafkaStreams
config I'm passing in this param (among others):
config.put(
StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
EventTimeExtractor.class.getName());
Here, EventTimeExtractor
is a custom timestamp extractor (that implements org.apache.kafka.streams.processor.TimestampExtractor
) to extract the timestamp information from JSON data.
I would expect this to call my object (derived from TimestampExtractor
) when each new record is pulled in. The stream in question is 2 * 10^6 records / minute. I have punctuate()
set to 60 seconds and it never fires. I know the data passes this span very frequently since its pulling old values to catch up.
In fact it never gets called at all.
- Is this the wrong approach to setting timestamps on KStream records?
- Is this the wrong way to declare this configuration?