See Update below to show potential workaround
Our application consumes 2 topics as KTables, performs a left join, and outputs to a topic. During testing, we found that this works as expected when our output topic has only 1 partition. When we increase the number of partitions, we notice that the number of messages that get produced to the output topic decreases.
We tested this theory with multiple partition configurations prior to starting the app. With 1 partition, we see 100% of the messages. With 2, we see some messages (less than 50%). With 10, we see barely any (less than 10%).
Because we are left joining, every single message that is consumed from Topic 1 should get written to our output topic, but we're finding that this is not happening. It seems like messages are getting stuck in the "intermediate" topics created from the foreign key join of the Ktables, but there are no error messages.
Any help would be greatly appreciated!
Service.java
@Bean
public BiFunction<KTable<MyKey, MyValue>, KTable<MyOtherKey, MyOtherValue>, KStream<MyKey, MyEnrichedValue>> process() {
return (topicOne, topicTwo) ->
topicOne
.leftJoin(topicTwo,
value -> MyOtherKey.newBuilder()
.setFieldA(value.getFieldA())
.setFieldB(value.getFieldB())
.build(),
this::enrich)
.toStream();
}
build.gradle
plugins {
id 'org.springframework.boot' version '2.3.1.RELEASE'
id 'io.spring.dependency-management' version '1.0.9.RELEASE'
id 'com.commercehub.gradle.plugin.avro' version '0.9.1'
}
...
ext {
set('springCloudVersion', "Hoxton.SR6")
}
...
implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka-streams'
implementation 'io.confluent:kafka-streams-avro-serde:5.5.1'
Note: We are excluding the org.apache.kafka dependencies due to a bug in the versions included in spring-cloud-stream
application.yml
spring:
application:
name: app-name
stream:
bindings:
process-in-0:
destination: topic1
group: ${spring.application.name}
process-in-1:
destination: topic2
group: ${spring.application.name}
process-out-0:
destination: outputTopic
kafka:
streams:
binder:
applicationId: ${spring.application.name}
brokers: ${KAFKA_BROKERS}
configuration:
commit.interval.ms: 1000
producer:
acks: all
retries: 20
default:
key:
serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
value:
serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
min-partition-count: 2
Test Scenario:
To provide a concrete example, if I publish the following 3 messages to Topic 1:
{"fieldA": 1, "fieldB": 1},,{"fieldA": 1, "fieldB": 1}
{"fieldA": 2, "fieldB": 2},,{"fieldA": 2, "fieldB": 2}
{"fieldA": 3, "fieldB": 3},,{"fieldA": 3, "fieldB": 3}
{"fieldA": 4, "fieldB": 4},,{"fieldA": 4, "fieldB": 4}
The output topic will only receive 2 messages.
{"fieldA": 2, "fieldB": 2},,{"fieldA": 2, "fieldB": 2}
{"fieldA": 3, "fieldB": 3},,{"fieldA": 3, "fieldB": 3}
What happened to the other 2? It seems certain key/value pairs are just unable to get written to the output topic. Retrying these "lost" messages does not work either.
Update:
I was able to get this functioning properly by consuming Topic 1 as a KStream instead of a KTable and calling toTable()
before going on to do the KTable-KTable join. I am still not sure why my original solution does not work, but hopefully this workaround can shed some light on the actual issue.
@Bean
public BiFunction<KStream<MyKey, MyValue>, KTable<MyOtherKey, MyOtherValue>, KStream<MyKey, MyEnrichedValue>> process() {
return (topicOne, topicTwo) ->
topicOne
.map(...)
.toTable()
.leftJoin(topicTwo,
value -> MyOtherKey.newBuilder()
.setFieldA(value.getFieldA())
.setFieldB(value.getFieldB())
.build(),
this::enrich)
.toStream();
}