8

See Update below to show potential workaround

Our application consumes 2 topics as KTables, performs a left join, and outputs to a topic. During testing, we found that this works as expected when our output topic has only 1 partition. When we increase the number of partitions, we notice that the number of messages that get produced to the output topic decreases.

We tested this theory with multiple partition configurations prior to starting the app. With 1 partition, we see 100% of the messages. With 2, we see some messages (less than 50%). With 10, we see barely any (less than 10%).

Because we are left joining, every single message that is consumed from Topic 1 should get written to our output topic, but we're finding that this is not happening. It seems like messages are getting stuck in the "intermediate" topics created from the foreign key join of the Ktables, but there are no error messages.

Any help would be greatly appreciated!

Service.java

@Bean
public BiFunction<KTable<MyKey, MyValue>, KTable<MyOtherKey, MyOtherValue>, KStream<MyKey, MyEnrichedValue>> process() {

    return (topicOne, topicTwo) ->
            topicOne
                    .leftJoin(topicTwo,
                            value -> MyOtherKey.newBuilder()
                                    .setFieldA(value.getFieldA())
                                    .setFieldB(value.getFieldB())
                                    .build(),
                            this::enrich)
                    .toStream();
}

build.gradle

plugins {
    id 'org.springframework.boot' version '2.3.1.RELEASE'
    id 'io.spring.dependency-management' version '1.0.9.RELEASE'
    id 'com.commercehub.gradle.plugin.avro' version '0.9.1'
}

...

ext {
    set('springCloudVersion', "Hoxton.SR6")
}

...

implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka-streams'
implementation 'io.confluent:kafka-streams-avro-serde:5.5.1'

Note: We are excluding the org.apache.kafka dependencies due to a bug in the versions included in spring-cloud-stream

application.yml

spring:
  application:
    name: app-name
    stream:
      bindings:
        process-in-0:
          destination: topic1
          group: ${spring.application.name}
        process-in-1:
          destination: topic2
          group: ${spring.application.name}
        process-out-0:
          destination: outputTopic
      kafka:
        streams:
          binder:
            applicationId: ${spring.application.name}
            brokers: ${KAFKA_BROKERS}
            configuration:
              commit.interval.ms: 1000
              producer:
                acks: all
                retries: 20
              default:
                key:
                  serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
                value:
                  serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
            min-partition-count: 2

Test Scenario:

To provide a concrete example, if I publish the following 3 messages to Topic 1:

{"fieldA": 1, "fieldB": 1},,{"fieldA": 1, "fieldB": 1}
{"fieldA": 2, "fieldB": 2},,{"fieldA": 2, "fieldB": 2}
{"fieldA": 3, "fieldB": 3},,{"fieldA": 3, "fieldB": 3}
{"fieldA": 4, "fieldB": 4},,{"fieldA": 4, "fieldB": 4}

The output topic will only receive 2 messages.

{"fieldA": 2, "fieldB": 2},,{"fieldA": 2, "fieldB": 2}
{"fieldA": 3, "fieldB": 3},,{"fieldA": 3, "fieldB": 3}

What happened to the other 2? It seems certain key/value pairs are just unable to get written to the output topic. Retrying these "lost" messages does not work either.

Update:

I was able to get this functioning properly by consuming Topic 1 as a KStream instead of a KTable and calling toTable() before going on to do the KTable-KTable join. I am still not sure why my original solution does not work, but hopefully this workaround can shed some light on the actual issue.

@Bean
public BiFunction<KStream<MyKey, MyValue>, KTable<MyOtherKey, MyOtherValue>, KStream<MyKey, MyEnrichedValue>> process() {

    return (topicOne, topicTwo) ->
            topicOne
                    .map(...)
                    .toTable()
                    .leftJoin(topicTwo,
                            value -> MyOtherKey.newBuilder()
                                    .setFieldA(value.getFieldA())
                                    .setFieldB(value.getFieldB())
                                    .build(),
                            this::enrich)
                    .toStream();
}
Mario P.
  • 148
  • 9
  • It's weird that using `KSteam#toTable()` changes anything. Can you maybe share the TopologyDescription of both programs to compare them? Could shed some light. – Matthias J. Sax Jul 16 '20 at 20:25
  • @MatthiasJ.Sax It turns out that the combination of `KStream#map()` and `KStream#toTable()` is what does the trick when using more than one partition. Just to reiterate, this works as expected with 1 partition, but when we try more than one, it only works if we consume as a KStream then force it to re-partition by mapping the keys/values. – Mario P. Jul 29 '20 at 15:51

5 Answers5

3

Given the description of the problem, it seems that the data in the (left) KTable input topic is not correctly partitioned by its key. For a single partitioned topic, well, there is only one partition and all data goes to this one partition and the join result is complete.

However, for a multi-partitioned input topic, you need to ensure that the data is partitioned by key, otherwise, two records with the same key might end up in different partitions and thus the join fails (as the join is done on a per-partition basis).

Note that even if a foreign-key join does not require that both input topics are co-partitioned, it is still required that each input topic itself is partitioned by its key!

If you use a map().toTable() you basically trigger an internal repartitioning of the data that ensures that the data gets partitioned by the key, and this fixes the problem.

Matthias J. Sax
  • 59,682
  • 7
  • 117
  • 137
  • 1
    I'm working on the project with @Mario P. I believe you're right that it's a partitioning issue; I'm just not sure how the data in the left topic would be incorrectly partitioned by key. For example, if I'm using the confluent CLI console producer, do I need to do anything to ensure that it is partitioning by key correctly? It appears that messages are being put to the left topic evenly among the partitions. We were able to follow this [tutorial](https://kafka-tutorials.confluent.io/foreign-key-joins/kstreams.html), but when we changed the key type from primitive to avro, it no longer worked. – monker Aug 03 '20 at 14:42
  • 1
    From the top of my head I would assume that the console producer does partition by key -- however, Avro messages (in conjunction with Confluent SR) have a "header" that encode the schema ID and this header might "mess" up the partitioning. -- What exactly do you do in your `map()` step? – Matthias J. Sax Aug 03 '20 at 15:09
  • 1
    We do this: `map(KeyValue::new)`. So we aren't actually changing the key or the value at all, just reinstantiating. – monker Aug 03 '20 at 15:15
  • If you don't change the data, I assume the the console producer uses a different default partitioning strategy compared to Kafka Streams... – Matthias J. Sax Aug 03 '20 at 16:18
  • That may be true @MatthiasJ.Sax. Though, we did experience the same behavior when deploying our full stack of applications. It is worth noting that some of our applications use the kafka-binder and others we use the kafka-streams-binder. We considered that these binders may use slightly different partitioning strategies, so at one point we tried out using a custom partitioning strategy in each app, but it still yielded the same behavior. – Mario P. Aug 03 '20 at 17:48
  • 1
    If you use a custom partitioner for the binders, did you pass those custom partitioners to Kafka Streams, too? -- You would need to reconfigure the internal `Producer` to use the same partitioner, otherwise, the internal foreign-key join subscription/respond topic would use a different partitioning -- or make sure that you use `DefaultPartitioner` in your binders. – Matthias J. Sax Aug 05 '20 at 02:08
  • 1
    It's a know issue: https://issues.apache.org/jira/browse/KAFKA-9302 – Matthias J. Sax Aug 05 '20 at 02:15
  • 2
    You are definitely correct that our issue is related to the partitioning of the keys. We changed our architecture a bit so now we are joining KTables from producers which both use the kafka-streams-binder. It seems that our previous architecture was a problem because kafka-binder and kafka-streams-binder must have slightly different default partitioning strategies. I will select your post as the answer since it provides some insight into the root cause of our issue. Thanks @MatthiasJ.Sax – Mario P. Aug 05 '20 at 19:01
0

I had a similar issue. I have two incoming KStreams, which I converted to KTables, and performed a KTable-KTable FK join. Kafka streams produced absolutely no records, the joined were never performed.

Repartitioning the KStreams didn't work for me. Instead I had to manually set the partition size to 1.

Here's a stripped down example of what doesn't work:

Note I'm using Kotlin, with some extension helper functions

fun enrichUsersData(
  userDataStream: KStream<UserId, UserData>,
  environmentDataStream: KStream<RealmId, EnvironmentMetaData>,
) {

  // aggregate all users on a server into an aggregating DTO
  val userDataTable: KTable<ServerId, AggregatedUserData> =
    userDataStream
      .groupBy { _: UserId, userData: UserData -> userData.serverId }
      .aggregate({ AggregatedUserData }) { serverId: ServerId, userData: UserData, usersAggregate: AggregatedUserData ->
        usersAggregate
          .addUserData(userData)
          .setServerId(serverId)
        return@aggregate usersAggregate
      }

  // convert all incoming environment data into a KTable
  val environmentDataTable: KTable<RealmId, EnvironmentMetaData> =
    environmentDataStream
      .toTable()

  // Now, try to enrich the user's data with the environment data
  // the KTable-KTable FK join is correctly configured, but...
  val enrichedUsersData: KTable<ServerId, AggregatedUserData> =
    userDataTable.join(
      other = environmentDataTable,
      tableJoined = tableJoined("enrich-user-data.join"),
      materialized = materializedAs(
        "enriched-user-data.store",
        jsonMapper.serde(),
        jsonMapper.serde(),
      ),
      foreignKeyExtractor = { usersData: AggregatedUserData -> usersData.realmId },
    ) { usersData: AggregatedUserData, environmentData: EnvironmentMetaData ->
      usersData.enrichUserData(environmentData)
      // this join is never called!!
      return@join usersData
    }
}

If I manually set the partition size to 1, then it works.

fun enrichUsersData(
  userDataStream: KStream<UserId, UserData>,
  environmentDataStream: KStream<RealmId, EnvironmentMetaData>,
) {

  // manually set the partition size to 1 *before* creating the table
  val userDataTable: KTable<ServerId, AggregatedUserData> =
    userDataStream
      .repartition(
        repartitionedAs(
          "user-data.pre-table-repartition",
          jsonMapper.serde(),
          jsonMapper.serde(),
          numberOfPartitions = 1,
        )
      )
      .groupBy { _: UserId, userData: UserData -> userData.serverId }
      .aggregate({ AggregatedUserData }) { serverId: ServerId, userData: UserData, usersAggregate: AggregatedUserData ->
        usersAggregate
          .addUserData(userData)
          .setServerId(serverId)
        return@aggregate usersAggregate
      }

  // again, manually set the partition size to 1 *before* creating the table
  val environmentDataTable: KTable<RealmId, EnvironmentMetaData> =
    environmentDataStream
      .repartition(
        repartitionedAs(
          "environment-metadata.pre-table-repartition",
          jsonMapper.serde(),
          jsonMapper.serde(),
          numberOfPartitions = 1,
        )
      )
      .toTable()

  // this join now works as expected!
  val enrichedUsersData: KTable<ServerId, AggregatedUserData> =
    userDataTable.join(
      other = environmentDataTable,
      tableJoined = tableJoined("enrich-user-data.join"),
      materialized = materializedAs(
        "enriched-user-data.store",
        jsonMapper.serde(),
        jsonMapper.serde(),
      ),
      foreignKeyExtractor = { usersData: AggregatedUserData -> usersData.realmId },
    ) { usersData: AggregatedUserData, environmentData: EnvironmentMetaData ->
      usersData.enrichUserData(environmentData)
      return@join usersData
    }
}
aSemy
  • 5,485
  • 2
  • 25
  • 51
0

We had the same issue let me explain the reasons and the fix. We use Debezium as a CDC to process changes in Oracle with Kafka Streams. We use avro schemas.

The schema generated by Kafka Connect for our CardKey looks like this:

{
    "type": "record",
    "name": "CardKey",
    "namespace": "hu.erste.slcard",
    "fields": [
        {
            "name": "NCRD",
            "type": [
                "null",
                "string"
            ],
            "default": null
        }
    ],
    "connect.name": "hu.erste.slcard.CardKey"
}

If I hexdump the card topic a record's key looks like this:

00 00 00 00 03 02 20 34 32 38 39 34 32 30 31 30 |...... 428942010|

In the AbstractKafkaAvroSerializer class's serializeImpl() method we can see that the first 0 is constant and that's followed by the schema id stored on 4 bytes:

out.write(0); out.write(ByteBuffer.allocate(4).putInt(id).array());

When Kafka Streams generates internal topics it generates the schema on its own. The above topic should be co-partitioned with the subscription-response topic to let the foreign-key join happen. Learn more on this here:https://www.confluent.io/blog/data-enrichment-with-kafka-streams-foreign-key-joins/

Let's see what Kafka Streams generates for us as a key for subscription-response-topic:

00 00 00 00 0d 02 20 34 32 38 39 34 32 30 31 30 |...... 428942010|

It looks like the schema id is different and when we check the schema we can see why:

{
    "type": "record",
    "name": "CardKey",
    "namespace": "hu.erste.slcard",
    "fields": [
        {
            "name": "NCRD",
            "type": [
                "null",
                {
                    "type": "string",
                    "avro.java.string": "String"
                }
            ],
            "default": null
        }
    ],
    "connect.name": "hu.erste.slcard.CardKey"
}

So instead of null and string types we have an additional "avro.java.string": "String" in the schema. As the schema is different its id will be also different in the schema registry when we register it, therefore the difference in the message keys-> different hashes-> different partitions-> no co-partitioning

There is a related opened bug in Avro codebase: https://issues.apache.org/jira/browse/AVRO-2838

Luckily there is a workaround for this based on this: https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/serdes-avro.html#avro-deserializer

"The avro-maven-plugin generated code adds Java-specific properties such as "avro.java.string":"String", which may prevent schema evolution. You can override this by setting avro.remove.java.properties=true in the Avro serializer configurations."

Adding the following to the KafkaStreamsConfiguration solves the problem:

props.put(AVRO_REMOVE_JAVA_PROPS_CONFIG, true);

Repartition/map the stream workes as it creates an internal topic with the same key schema as above and that will have the same id as the subscription-response-topic so co-partitioning is not violated.

Changing the partition number to 1 also works as with 1 partition you're always co-partitioned.

-1

Selecting the key on joined topic might help. Partition configuration of topics should be same.

return (topicOne, topicTwo) ->
        topicOne
            .leftJoin(topicTwo,
                value -> MyOtherKey.newBuilder()
                    .setFieldA(value.getFieldA())
                    .setFieldB(value.getFieldB())
                    .build(),
                this::enrich)
            .toStream().selectKey((key, value) -> key);
  • 1
    Thanks for commenting. I tried selectKey as suggested but that didn't change the behavior. For whatever reason, certain messages are just getting dropped no matter what. I will add some test scenarios to my original post for clarity. – Mario P. Jul 13 '20 at 22:46
  • Is the partition size for both the topics you materialized same ? Also , does your behavior differ based on whether you output to an app or console consumer ? – user3500408 Jul 13 '20 at 23:16
  • 1
    Yes, each topic has the same number of partitions. The only number that functions correctly is 1, and each additional partition seems to increase the chance of dropping the record. The behavior does not change whether we browse the topic using Control Center, output from the app, or consume any other way. – Mario P. Jul 13 '20 at 23:31
-1

This is a strange issue, I have never heard of a number of output topic partitions controlling the data write frequency. However I do know that toStream() writes the data to downstream only when the cache is full, so try setting cache.max.bytes.buffering = 0. Also, KTable keeps only the latest record for each key, so if you have multiple values against the same key, only latest value would stay and be written downstream.

Dharman
  • 30,962
  • 25
  • 85
  • 135
  • 1
    I gave that a try, but no luck. This is definitely an odd problem, so I added a test scenario to my post to clarify what is going on. Thanks for commenting. – Mario P. Jul 13 '20 at 23:20