4

I have a stream of events i need to match against a ktable / changelog topic but the matching is done by pattern matching on a property of the ktable entries. so i cannot join the streams based on a key since i dont know yet which one is matching.

example:

ktable X:

{
  [abc]: {id: 'abc', prop: 'some pattern'},
  [efg]: {id: 'efg', prop: 'another pattern'}
}

stream A:

{ id: 'xyz', match: 'some pattern'}

so stream A should forward something like {match: 'abc'}

So i basically need to iterate over the ktable entries and find the matching entry by pattern matching on this property.

Would it be viable to create a global state store based on the ktable and then access it from the processor API and iterate over the entries?

I could also aggregate all the entries of the ktable into 1 collection and then join on a 'fake' key? But this seems also rather hacky.

Or am i just forcing something which is not really streams and rather just put it into a redis cache with the normal consumer API, which is also kinda awkward since i rather have it backed by rocksDB.

edit: i guess this is kinda related to this question

cudba
  • 41
  • 3

2 Answers2

1

A GlobalKTable won't work, because a stream-globalTable join allows you to extract a non-key join attribute from the stream -- but the lookup into the table is still based on the table key.

However, you could read the table input topic as a KStream, extract the join attribute, set it as key, and do an aggregation that returns a collection (ie, List, Set, etc). This way, you can do a stream-table join on the key, followed by a flatMapValues() (or flatMap()) that splits the join-result into multiple records (depending on how many records are in the collection of the table).

As long as your join attribute has not too many duplicates (for the table input topic), and thus the value side collection in the table does not grow too large, this should work fine. You will need to provide a custom value-Serde to (de)serialize the collection data.

Matthias J. Sax
  • 59,682
  • 7
  • 117
  • 137
  • the problem here is, that i dont have an attribute which is exactly a key i could join on. the attribute match is determined by some processing (pattern matching) of an attribute. so basically, for each event of my Stream A, i need all data available of Ktable X in a collection (aggregated) so i can find the match. In other words, i need all entries of a ktable available as a collection in another stream – cudba Feb 10 '20 at 11:12
  • 1
    For this case, you would need to use the Processor API. Using a global state store, you can scan the entire store for each input record to find records with the corresponding pattern.. The DSL only support `==` as join condition. – Matthias J. Sax Feb 10 '20 at 15:40
  • Could you elaborate on the global state store ? Sounds not so flexible .... Is there no way to simply customize the join condition of the current foreign Key join ? I would like to have the same functionality as the foreign Key Join, however in my case the situation is that the foreign Key extractor can extract multiple records, and a match needs to happen if on the other side, one of the key match. This is the case were the data represent a graph, or if you want multi-valued properties – MaatDeamon Oct 11 '20 at 22:38
  • You can always customize everything by implementing your own operators using the Processor API. -- However, the DSL has a fixed set of pre-defined operators and there are only 1:1 PK-equi-joins and 1:n FK-equi-joins available atm. – Matthias J. Sax Oct 12 '20 at 00:47
0

Normally I would map the table data so I get the join key I need. We recently had a similar case, where we had to join a stream with the corresponding data in a KTable. In our case, the stream key was the first part of the table key, so we could group by that first key part and aggregate the results in a list. At the end it looked something like this.

final KTable<String, ArrayList<String>> theTable = builder
        .table(TABLE_TOPIC, Consumed.with(keySerde, Serdes.String()))
        .groupBy((k, v) -> new KeyValue<>(k.getFirstKeyPart(), v))
        .aggregate(
                ArrayList::new,
                (key, value, list) -> {
                    list.add(value);
                    return list;
                },
                (key, value, list) -> {
                    list.remove(value);
                    return list;
                },
                Materialized.with(Serdes.String(), stringListSerde));

final KStream<String, String> theStream = builder.stream(STREAM_TOPIC);

theStream
        .join(theTable, (streamEvent, tableEventList) -> tableEventList)
        .flatMapValues(value -> value)
        .map(this::doStuff)
        .to(TARGET_TOPIC);

I am not sure, if this is also possible for you, meaning, maybe it is possible for you to map the table data in some way to to the join.

I know this does not completely belong to your case, but I hope it might be of some help anyway. Maybe you can clarify a bit, how the matching would look like for your case.

Jan Held
  • 634
  • 4
  • 14
  • the matching is actually done based on geo data matching, so there is really no partial or any key i can use i guess, but thank you for the input! – cudba Feb 11 '20 at 06:30
  • 1
    I was just thinking of your question again, and the problem with kafka geo matching. I thought, maybe there is a way to simply cluster the data, so you could at least reduce the number of results, and then try to find the exact matches in that reduced result. I found this article, describing, how to cluster geo coordinates. Maybe it's of some help for you. – Jan Held Feb 14 '20 at 07:19