Am trying to measure relative throughput of my kafka streams app using jmh
and TopologyTestDriver
. This is on an M1 Mac but all numbers are relative anyway so eh.
Anyway when piping directly from my input topic to my output topic:
public class EventTopologyMulti {
public static StreamsBuilder topology() {
StreamsBuilder builder = new StreamsBuilder();
// Main topic
KStream<String, Event> mainEvents = builder.<String, Event>stream("event_ingestion");
mainEvents.to("enriched_events");
return builder;
}
}
& then running jmh
I get:
TopologyBenchMulti.benchmarkTopology thrpt 4 34548.895 ± 6484.374 ops/s
Spectacular - love it - all good.
BUT as soon as I join with one table it all goes horribly wrong:
public class EventTopologyMulti {
public static StreamsBuilder topology() {
StreamsBuilder builder = new StreamsBuilder();
// Main topic
KStream<String, Event> mainEvents = builder.<String, Event>stream("event_ingestion");
// Topic to table-ize & join
KStream<String, Event> identifyStream = builder.<String, Event>stream("indentify");
// Create the table
KTable<String, Event> identifyTable = identifyStream.toTable();
// Create the ValueJoiner
final EventWithIdentify identifyJoiner = new EventWithIdentify();
// Do the join & send to final topic
mainStream.leftJoin(identifyTable, identifyJoiner).to("enriched_events");
return builder;
}
}
And I run jmh
again WOW:
TopologyBenchMulti.benchmarkTopology thrpt 4 22.934 ± 2.394 ops/s
!!! Went from 35k -> 23 ops/s????
YES both topics use the same key
value for each record. Yes the topics are co-partitioned (although running via TopologyTestDriver
that doesn't matter). Both topics ingest the same shape value are keyed with the same value so it should be super easy/straightforward/not require any extra processing.
Here is my value joiner - that does nothing:
public class EventWithIdentify implements ValueJoiner<Event, Event, Event> {
public Event apply(Event event, Event identifyEvent) {
return event;
}
}
Note I get this kind of massive slowdown whether it's join
or leftJoin
.
Also get the same slowdown if I send all Events
to the same topic & then branch them & then to the KTable
& then the leftJoin
(using .split().branch(...)
).
Funnily enough I get an even larger slowdown if I use Materialized('identify-table')
when converting the KStream
to a KTable
(.to(Materialized.as('identify-table'))
) !!
What the heck is going on? Sure I'd expect some slowdown but not this much!!!
For completeness here is the jmh
test code:
public class TopologyBenchMulti {
private static final String SCHEMA_REGISTRY_SCOPE = TopologyBenchMulti.class.getName();
private static final String MOCK_SCHEMA_REGISTRY_URL = "mock://" + SCHEMA_REGISTRY_SCOPE;
private static TopologyTestDriver testDriver;
@State(org.openjdk.jmh.annotations.Scope.Thread)
public static class MyState {
public TestInputTopic<String, Event> mainEventsTopic;
public TestInputTopic<String, Event> identifyTopic;
public Event regStartedEvent;
@Setup(Level.Trial)
public void setupState() {
Properties props = new Props().props;
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
props.put("schema.registry.url", MOCK_SCHEMA_REGISTRY_URL);
// build the topology
Topology topology = EventTopologyMulti.topology().build();
// create a test driver. we will use this to pipe data to our topology
testDriver = new TopologyTestDriver(topology, props);
// create the avro value serde
Serde<Event> avroEventSerde = new SpecificAvroSerde<>();
Map<String, String> config = Map.of("schema.registry.url", MOCK_SCHEMA_REGISTRY_URL);
avroEventSerde.configure(config, false);
// create the test topics
mainEventsTopic =
testDriver.createInputTopic(
"event_ingestion", Serdes.String().serializer(), avroEventSerde.serializer());
identifyTopic =
testDriver.createInputTopic(
"identify_event", Serdes.String().serializer(), avroEventSerde.serializer());
Event identifyEvent =
Event.newBuilder()
.setId("foofie")
.setTimestamp(Instant.now())
.setEventName(EventName.Identify)
.setEvent(Identify.newBuilder().setUserId("userid-99").build())
.build();
identifyTopic.pipeInput("anonymous id", identifyEvent);
regStartedEvent =
Event.newBuilder()
.setId("goofie")
.setTimestamp(Instant.now())
.setEventName(EventName.RegistrationStarted)
.setEvent(
RegistrationStarted.newBuilder()
.setAuthenticationMethod(RegistrationMethod.email)
.build())
.build();
}
@TearDown(Level.Trial)
public void tearDown() {
testDriver.close();
}
}
@Benchmark
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
public void benchmarkTopology(MyState state) {
state.mainEventsTopic.pipeInput("anonymous id", state.regStartedEvent);
}
}
And finally in my build.gradle
:
jmh {
iterations = 4
benchmarkMode = ['thrpt']
threads = 1
fork = 1
timeOnIteration = '3s'
resultFormat = 'TEXT'
profilers = []
warmupIterations = 3
warmup = '1s'
}
It's can't actually be this slow by just adding one join
on 1 table?? Is it TopologyTestDriver
is it me???
Losing hair over this thanks for any input!!!!