7

I use org.apache.kafka:kafka-streams:0.10.0.1

I'm attempting to work with a time series based stream that doesn't seem to be triggering a KStream.Process() to trigger ("punctuate"). (see here for reference)

In a KafkaStreams config I'm passing in this param (among others):

config.put(
  StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
  EventTimeExtractor.class.getName());

Here, EventTimeExtractor is a custom timestamp extractor (that implements org.apache.kafka.streams.processor.TimestampExtractor) to extract the timestamp information from JSON data.

I would expect this to call my object (derived from TimestampExtractor) when each new record is pulled in. The stream in question is 2 * 10^6 records / minute. I have punctuate() set to 60 seconds and it never fires. I know the data passes this span very frequently since its pulling old values to catch up.

In fact it never gets called at all.

  • Is this the wrong approach to setting timestamps on KStream records?
  • Is this the wrong way to declare this configuration?
Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
ethrbunny
  • 10,379
  • 9
  • 69
  • 131

3 Answers3

8

Update Nov 2017: Kafka Streams in Kafka 1.0 now supports punctuate() with both stream-time and with processing-time (wall clock time) behavior. So you can pick whichever behavior you prefer.

Your setup seems correct to me.

What you need to be aware of: As of Kafka 0.10.0, the punctuate() method operates on stream-time (by default, i.e. based on the default timestamp extractor, stream-time will mean event-time). And the stream-time is only advanced when new data records are coming in, and how much the stream-time is advanced is determined by the associated timestamps of these new records.

For example:

  • Let's assume you have set punctuate() to be called every 1 minute = 60 * 1000 (note: 1 minute of stream-time). Now, if it happens that no data is being received for the next 5 minutes, punctuate() will not be called at all -- even though you might expect it to be called 5 times. Why? Again, because punctuate() depends on stream-time, and the stream-time is only advanced based on newly received data records.

Might this be causing the behavior you are seeing?

Looking ahead: There's already a ongoing discussion in the Kafka project on how to make punctuate() more flexible, e.g. to have trigger it not only based on stream-time (which defaults to event-time) but also based on processing-time.

miguno
  • 14,498
  • 3
  • 47
  • 63
  • Are the records that your application is reading containing embedded timestamps at all? Are you using, for example, a 0.9 Kafka cluster or older version? – miguno Sep 20 '16 at 11:55
  • All server components are from the confluent repo. Installed within the last few weeks. The records do contain a timestamp field (records are JSON encoded). I wouldn't expect kafka to pick out this field hence the use of 'the extractor' (sounds v ominous) – ethrbunny Sep 20 '16 at 13:44
  • Im thinking something is up with said kafka cluster. 5 server and 1 zk all running on AWS. Im trying to push data from 3 threads and barely getting 1M / min. Adding a second process (separate machine) *reduces* the total throughput. Network interfaces are all < ~30%. Load is 1-2. These are all 8 core / 32G machines. It should be screaming fast. Topic has 6 partitions with rep-factor 2. – ethrbunny Sep 20 '16 at 13:49
  • Note to some distant observer: be sure to set the **file descriptor count** to something practical. On centos/aws the default seems to be 1024 which isn't enough even for a small setup (increased to 65535). This was a big issue with my cluster. – ethrbunny Sep 20 '16 at 17:03
  • But your problem still exists, even after increasing `nofile`? – miguno Sep 21 '16 at 14:04
  • That's correct. No change. Im getting more reliable callbacks from the `.punctuate` call but there are a slew of separate questions about that. At least one thing is working as it should. – ethrbunny Sep 21 '16 at 22:01
2

Your approach seems to be correct. Compare pargraph "Timestamp Extractor (timestamp.extractor):" in http://docs.confluent.io/3.0.1/streams/developer-guide.html#optional-configuration-parameters

Not sure, why your custom timestamp extractor is not used. Have a look into org.apache.kafka.streams.processor.internals.StreamTask. In the constructor there should be something like

TimestampExtractor timestampExtractor1 = (TimestampExtractor)config.getConfiguredInstance("timestamp.extractor", TimestampExtractor.class);

Check if your custom extractor is picked up there or not...

Matthias J. Sax
  • 59,682
  • 7
  • 117
  • 137
1

I think this is another case of issues at the broker level. I went and rebuilt the cluster using instances with more CPU and RAM. Now I'm getting the results I expected.

Note to distant observer(s): if your KStream app is behaving strangely take a look at your brokers and make sure they aren't stuck in GC and have plenty of 'headroom' for file handles, RAM, etc.

See also

Community
  • 1
  • 1
ethrbunny
  • 10,379
  • 9
  • 69
  • 131