2

Am trying to measure relative throughput of my kafka streams app using jmh and TopologyTestDriver. This is on an M1 Mac but all numbers are relative anyway so eh. Anyway when piping directly from my input topic to my output topic:

public class EventTopologyMulti {
  public static StreamsBuilder topology() {
    StreamsBuilder builder = new StreamsBuilder();

    // Main topic
    KStream<String, Event> mainEvents = builder.<String, Event>stream("event_ingestion");

    mainEvents.to("enriched_events");
    return builder;
  }
}

& then running jmh I get:

TopologyBenchMulti.benchmarkTopology  thrpt    4  34548.895 ± 6484.374  ops/s

Spectacular - love it - all good.

BUT as soon as I join with one table it all goes horribly wrong:

public class EventTopologyMulti {
  public static StreamsBuilder topology() {
    StreamsBuilder builder = new StreamsBuilder();

    // Main topic
    KStream<String, Event> mainEvents = builder.<String, Event>stream("event_ingestion");
     
    // Topic to table-ize & join
    KStream<String, Event> identifyStream = builder.<String, Event>stream("indentify");

    // Create the table
    KTable<String, Event> identifyTable = identifyStream.toTable();

    // Create the ValueJoiner
    final EventWithIdentify identifyJoiner = new EventWithIdentify();

    // Do the join & send to final topic
    mainStream.leftJoin(identifyTable, identifyJoiner).to("enriched_events");

    return builder;
  }
}

And I run jmh again WOW:

TopologyBenchMulti.benchmarkTopology  thrpt    4  22.934 ± 2.394  ops/s

!!! Went from 35k -> 23 ops/s????

YES both topics use the same key value for each record. Yes the topics are co-partitioned (although running via TopologyTestDriver that doesn't matter). Both topics ingest the same shape value are keyed with the same value so it should be super easy/straightforward/not require any extra processing.

Here is my value joiner - that does nothing:

public class EventWithIdentify implements ValueJoiner<Event, Event, Event> {
  public Event apply(Event event, Event identifyEvent) {
    return event;
  }
}

Note I get this kind of massive slowdown whether it's join or leftJoin. Also get the same slowdown if I send all Events to the same topic & then branch them & then to the KTable & then the leftJoin (using .split().branch(...)).

Funnily enough I get an even larger slowdown if I use Materialized('identify-table') when converting the KStream to a KTable (.to(Materialized.as('identify-table'))) !!

What the heck is going on? Sure I'd expect some slowdown but not this much!!!

For completeness here is the jmh test code:

public class TopologyBenchMulti {

  private static final String SCHEMA_REGISTRY_SCOPE = TopologyBenchMulti.class.getName();
  private static final String MOCK_SCHEMA_REGISTRY_URL = "mock://" + SCHEMA_REGISTRY_SCOPE;
  private static TopologyTestDriver testDriver;

  @State(org.openjdk.jmh.annotations.Scope.Thread)
  public static class MyState {
    public TestInputTopic<String, Event> mainEventsTopic;
    public TestInputTopic<String, Event> identifyTopic;
    public Event regStartedEvent;

    @Setup(Level.Trial)
    public void setupState() {
      Properties props = new Props().props;
      props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
      props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
      props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
      props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
      props.put("schema.registry.url", MOCK_SCHEMA_REGISTRY_URL);

      // build the topology
      Topology topology = EventTopologyMulti.topology().build();

      // create a test driver. we will use this to pipe data to our topology
      testDriver = new TopologyTestDriver(topology, props);

      // create the avro value serde
      Serde<Event> avroEventSerde = new SpecificAvroSerde<>();
      Map<String, String> config = Map.of("schema.registry.url", MOCK_SCHEMA_REGISTRY_URL);
      avroEventSerde.configure(config, false);

      // create the test topics
      mainEventsTopic =
          testDriver.createInputTopic(
              "event_ingestion", Serdes.String().serializer(), avroEventSerde.serializer());

      identifyTopic =
        testDriver.createInputTopic(
          "identify_event", Serdes.String().serializer(), avroEventSerde.serializer());

      Event identifyEvent =
          Event.newBuilder()
            .setId("foofie")
            .setTimestamp(Instant.now())
            .setEventName(EventName.Identify)
            .setEvent(Identify.newBuilder().setUserId("userid-99").build())
            .build();
      identifyTopic.pipeInput("anonymous id", identifyEvent);

      regStartedEvent =
        Event.newBuilder()
          .setId("goofie")
          .setTimestamp(Instant.now())
          .setEventName(EventName.RegistrationStarted)
          .setEvent(
              RegistrationStarted.newBuilder()
             .setAuthenticationMethod(RegistrationMethod.email)
                  .build())
          .build();
    }

    @TearDown(Level.Trial)
    public void tearDown() {
      testDriver.close();
    }
  }

  @Benchmark
  @BenchmarkMode(Mode.Throughput)
  @OutputTimeUnit(TimeUnit.SECONDS)
  public void benchmarkTopology(MyState state) {
    state.mainEventsTopic.pipeInput("anonymous id", state.regStartedEvent);
  }
}

And finally in my build.gradle:

jmh {
  iterations = 4
  benchmarkMode = ['thrpt']
  threads = 1
  fork = 1
  timeOnIteration = '3s'
  resultFormat = 'TEXT'
  profilers = []
  warmupIterations = 3
  warmup = '1s'
}

It's can't actually be this slow by just adding one join on 1 table?? Is it TopologyTestDriver is it me???

Losing hair over this thanks for any input!!!!

  • Please put the complete code of your benchmark – Sergey Tsypanov Feb 27 '23 at 15:06
  • updated to add all jmh code thanks! – user1883202 Feb 27 '23 at 16:37
  • Well, you benchmark seems to be correct, so I'd suggest to use JMH's GCProfiler and JavaFlightRecorderProfiler to find out what is slowing down your application. – Sergey Tsypanov Feb 28 '23 at 08:30
  • Could you try to use a in-memory store (`Stores.inMemoryKeyValueStore("name");`)? I remember seeing some performance problems with RocksDB on mac – Lucas Brutschy Mar 01 '23 at 10:58
  • I’m struggling with the same issue in a service with three topics consumed as KTables and joined via foreign key. Did you find any solution or at least improvement? A bit more details can be found in https://stackoverflow.com/questions/74876325/kafka-streams-ktable-ktable-non-key-join-performance-on-skewed-tables – Andras Hatvani May 07 '23 at 10:43
  • In your case using Kafka Streams’ cogrouping feature might be a relief since your topics use the same key and are co-partitioned. – Andras Hatvani May 07 '23 at 10:45

0 Answers0