38

I have a Kafka topic where I send location events (key=user_id, value=user_location). I am able to read and process it as a KStream:

KStreamBuilder builder = new KStreamBuilder();

KStream<String, Location> locations = builder
        .stream("location_topic")
        .map((k, v) -> {
            // some processing here, omitted form clarity
            Location location = new Location(lat, lon);
            return new KeyValue<>(k, location);
        });

That works well, but I'd like to have a KTable with the last known position of each user. How could I do it?

I am able to do it writing to and reading from an intermediate topic:

// write to intermediate topic
locations.to(Serdes.String(), new LocationSerde(), "location_topic_aux");

// build KTable from intermediate topic
KTable<String, Location> table = builder.table("location_topic_aux", "store");

Is there a simple way to obtain a KTable from a KStream? This is my first app using Kafka Streams, so I'm probably missing something obvious.

Matthias J. Sax
  • 59,682
  • 7
  • 117
  • 137
Guido
  • 46,642
  • 28
  • 120
  • 174

1 Answers1

34

Update:

In Kafka 2.5, a new method KStream#toTable() will be added, that will provide a convenient way to transform a KStream into a KTable. For details see: https://cwiki.apache.org/confluence/display/KAFKA/KIP-523%3A+Add+KStream%23toTable+to+the+Streams+DSL

Original Answer:

There is not straight forward way at the moment to do this. Your approach is absolutely valid as discussed in Confluent FAQs: http://docs.confluent.io/current/streams/faq.html#how-can-i-convert-a-kstream-to-a-ktable-without-an-aggregation-step

This is the simplest approach with regard to the code. However, it has the disadvantages that (a) you need to manage an additional topic and that (b) it results in additional network traffic because data is written to and re-read from Kafka.

There is one alternative, using a "dummy-reduce":

KStreamBuilder builder = new KStreamBuilder();
KStream<String, Long> stream = ...; // some computation that creates the derived KStream

KTable<String, Long> table = stream.groupByKey().reduce(
    new Reducer<Long>() {
        @Override
        public Long apply(Long aggValue, Long newValue) {
            return newValue;
        }
    },
    "dummy-aggregation-store");

This approach is somewhat more complex with regard to the code compared to option 1 but has the advantage that (a) no manual topic management is required and (b) re-reading the data from Kafka is not necessary.

Overall, you need to decide by yourself, which approach you like better:

In option 2, Kafka Streams will create an internal changelog topic to back up the KTable for fault tolerance. Thus, both approaches require some additional storage in Kafka and result in additional network traffic. Overall, it’s a trade-off between slightly more complex code in option 2 versus manual topic management in option 1.

Matthias J. Sax
  • 59,682
  • 7
  • 117
  • 137
  • I am trying to use your approach for constructing a `KTable` out of a `KStream` by doing a dumb `groupByKey` but the method `groupByKey` cannot be resolved. Do you any idea what may be going wrong? (I am new to the java ecosystem and to kafkas) – LetsPlayYahtzee Mar 23 '17 at 17:10
  • 4
    What's your Streams version? For older version, it should be `stream.reduceByKey(...)` instead of `stream.groupByKey().reduce(...)`. See http://docs.confluent.io/3.1.0/streams/upgrade-guide.html#stream-grouping-and-aggregation – Matthias J. Sax Mar 23 '17 at 18:47
  • 1
    I thought I was using the newest version but I was using the `0.10.0` while looking at the docs for the `0.10.1` version. So I fixed it :) thnx – LetsPlayYahtzee Mar 23 '17 at 19:18
  • 2
    Using your "dummy-reduce" how would you ever tombstone an entry in the resulting ktable? My understanding is that the reduce will simply ignore any null values. Update: I see your comment on another thread indicating the use of a "surrogate" which is indeed what I have done in the past. https://stackoverflow.com/questions/50708252/tombstone-messages-not-removing-record-from-ktable-state-store – AFrieze Jul 25 '18 at 20:04
  • Although, which of the above options may be more inclined to the best practice? – Mujtaba Faizi Jan 10 '20 at 05:02