2

I'm having a problem using kstream joins. What i do is from one topic i seperate 3 different types of messages to new streams. Then do one innerjoin with two of the streams which creates another stream, finally i do a last leftjoin with the new stream and the last remaining stream.

The joins have a window time of 30 seconds.

This is done to filter out some of the messages which are overridden by others.

Im running this application on kubernetes and the disk space for the pods are growing indefinitely until the pod crashes.

I've realized that this is because of the joins store data locally in the tmp/kafka-streams directory.

The directories are called: KSTREAM-JOINTHIS... KSTREAM-OUTEROTHER..

Which stores sst files from rocksDb and these grow indefinitely.

My understanding is since im using a window time of 30 seconds these should be flushed out after the certain time but is not.

I also changed the WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG to 10 mins to see if that makes a change which is not the case.

I need to understand how this can be changed.

kambo
  • 129
  • 2
  • 11

1 Answers1

0

The window size does not determine your storage requirement, but the join's grace period. To handle out-of-order records, data is stored longer than the window size. In newer version, it's required to always specify the grace period via JoinWindows. ofTimeDifferenceAndGrace(...). In older versions, you can set grace period via JoinWindows.of(...).grace(...) -- if not set, it defaults to 24 hours.

The config WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG configures how long data is store in the cluster. Thus, you might want to reduce it, too, but it does not help to reduce client side storage requirements.

Matthias J. Sax
  • 59,682
  • 7
  • 117
  • 137
  • Thank you this worked much better! I can now see that its stable :) Just another quick one: Is there any best practice on how long the window and retention should be? right now i've tried JoinWindow.of(30sec).until(6min) – kambo Mar 18 '19 at 08:42
  • This is application dependent and you need to use your own reasoning. It also depend on the maximum "delay" of your data, ie, the "degree of out-of-order". – Matthias J. Sax Mar 18 '19 at 17:11
  • Ok thank you. When performance testing the size is still quite high but it stabilizes after a while. is there a way to keep the memory of the stores down? or do i have to scale the application and partitions? – kambo Mar 19 '19 at 12:02
  • Well, for the join all records must be buffered with in retention period -- so knowing your data rate, retention time, and record size, you can estimate the required memory. Besides reducing retention time, I cannot think of anything else. Horizontal scaling should allow you to reduce memory footprint per instance of course. – Matthias J. Sax Mar 19 '19 at 17:16
  • @MatthiasJ.Sax Where is this ("You can reduce the retention time by passing in Materialized.as(null).withRetention(...) into your join(...) operators.") in the API? I can't see it. – JL_SO Jan 27 '22 at 09:23
  • My bad. For a stream-stream join there is no `Materialized` parameter... And the retention time is the same as the grace period of the window, that you can set via `JoinWindows.ofTimeDifferenceAndGrace(...)` (of `JoinWindows.of(...).grace(...)` in older versions, for which grace defaults to 24h). – Matthias J. Sax Jan 28 '22 at 03:26