Questions tagged [ktable]

A KTable is continuously updating materialized view.

The KTable concepts is used by Kafka Streams and ksqlDB (both also used the concept of a KStream). A KTable is a materialized view that that continuously updated.

The data source of a KTable can either be a topic in Apache Kafka (i.e., each record in the topic is "upserted" into the KTable) or the KTable stores the result of a continuous query over an input KStream or KTable.

72 questions
6
votes
0 answers

Kafka Streams: Failed to flush state store caused by java.lang.ClassCastException: cannot case key to value

We have been using Kafka streams for a while but never got to write tests to cover our topologies. We decided to give it a go and use the Topology Test driver provided by the streams library. Unfortunately we hit an issue we cannot resolve. Here is…
Daniel Papukchiev
  • 240
  • 1
  • 2
  • 10
4
votes
2 answers

How to write KTable in a Kafka topic? Like we use "to()" in KStreams how to do that for KTable?

As there is no "to()" method for KTable so do we need to always convert it to KStream before sending any message to a topic? Or how can we store a KTable in our topic?
3
votes
1 answer

Kafka streams: how to produce to a topic while aggregating?

I currently have some code that builds a KTable using aggregate: inputTopic.groupByKey().aggregate( Aggregator::new, (key, value, aggregate) -> { someProcessingDoneHere; return aggregate; }, …
user3923073
  • 151
  • 2
  • 12
3
votes
1 answer

KSQL select one row per group that corresponds to having the least timestamp

In KSQL is there a row_number kind of function that can be used in combination with TUMBLING WINDOW in order to group by and only get the event that corresponds to having the least timestamp within the group by thats used.
Vikas J
  • 358
  • 1
  • 5
  • 17
3
votes
1 answer

kafka streams groupBy aggregate produces unexpected values

my question is about Kafka streams Ktable.groupBy.aggregate. and the resulting aggregated values. situation I am trying to aggregate minute events per day. I have a minute event generator (not shown here) that generates events for a few houses.…
Antonin
  • 879
  • 2
  • 10
  • 27
2
votes
0 answers

Insane throughput slowdown kafka streams using join (kstream & ktable) using jmh and TopologyTestDriver

Am trying to measure relative throughput of my kafka streams app using jmh and TopologyTestDriver. This is on an M1 Mac but all numbers are relative anyway so eh. Anyway when piping directly from my input topic to my output topic: public class…
2
votes
1 answer

Kafka Ktable changelog (using toStream()) is missing some ktable updates when several messages with the same key arrive at the same time

I have an input stream, I use it to create a ktable. Then I create an output stream with the ktable changelog, using toStream() method. The problem is that the stream created by the toStream() method does not contains all the messages from the input…
Pijean
  • 111
  • 7
2
votes
0 answers

Kafka Ktable-Ktable join on 3 Ktables with custom serializers

I wish to perform Ktable-Ktable join for 3 ktables, something similar like this var joinedStream = person .join( address, Person::getAddress, valueJoiner, …
2
votes
1 answer

Kafka Topology Design: How to do sliding window join and emit events on timeout? [Hard]

I have a set of requirements as below: Message 'T' arrives, must wait for 5 seconds for corresponding message in 'A' to arrive (with same key). If it comes within 5 seconds, then send joined values and send downstream. If it does not come within 5…
2
votes
1 answer

KafkaStreams, Error Registering Avro Schema

Disclaimer: My experience with KafkaStreams is quite limited. I do not quite understand why am I getting a org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: and Schema being registered is incompatible with an…
Ivo
  • 450
  • 3
  • 18
2
votes
1 answer

Kafka repartitioning ( for group by based on key)

When we apply a group by function on a stream based on some key, how does kafka calculates this as same key may be present in different partitions ? I was seeing through() function which basically repartitions the data, but i don't understand what…
Sanjay
  • 165
  • 1
  • 13
2
votes
2 answers

Kafka Ktable also streaming duplicate updates

Kafka Ktable also streaming duplicate updates. I want to process the Ktable(created with Kstream.reduce()) changelog stream, i.e any change in value of the keys in the Ktable. But its seems even when the same key value pair is sent multiple times to…
2
votes
2 answers

Kafka Streams K-Table size monitoring

I have a stream topology which consumes from a topic and runs an aggregation and builds a KTable which is materialized into rocksDB. I have another application that consumes all events from that same topic daily, and sends tombstone messages for…
1
vote
1 answer

Unit test KafkaStreams gives IllegalArgumentException: Unknown topic

I have a app that uses KStream to read from Kafka, filter the data based on the header, and write to KTable. public Topology buildTopology() { KStream inputStream = builder.stream("topicname"); KStream
perplexedDev
  • 857
  • 4
  • 17
  • 49
1
vote
1 answer

KTable: how does that work behind the scenes?

I am trying to understand better kafka stream performance and I would like to understand what happens when a kstream application starts and it uses a ktable with millions of record. like... is there a GET/query operation to materialize ktable…
1
2 3 4 5