I'm writing a group of tests using the spring KafkaEmbedded test util. The tests each individually stand up an embedded kafka instance, produce events and assert the resultant downstream events.
The tests consistently pass when run in an IDE (e.g. IntelliJ), however the tests fail intermittently (approximately 50% of the time, no regularity) when ran with SBT. In the event of a test failure, I see the following error:
o.a.zookeeper.server.ZooKeeperServer - ZKShutdownHandler is not registered, so ZooKeeper server won't take any action on ERROR or SHUTDOWN server state changes
Additionally, I see many INFO logs that report missing ZooKeeper nodes, for instance:
o.a.z.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x164a50fe9fd0001 type:create cxid:0x5 zxid:0x4 txntype:-1 reqpath:n/a Error Path:/brokers Error:KeeperErrorCode = NoNode for /brokers
These logs DO NOT appear on successful tests. When I say "I see many INFO logs" I mean MANY, around 40 such logs, with some of the node paths being nested in previously reported node paths.
Research suggests the error log is innocent, and I'd like to think that the info logs are too, but they stand alone on test failure.
Update 7/17:
KStreams/Producers/Consumers Configurations:
Properties properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, appName);
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapUrls);
properties.put(StreamsConfig.STATE_DIR_CONFIG, String.format("/tmp/kafka-streams/%s/%s",
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
properties.put(StreamsConfig.CLIENT_ID_CONFIG, appName);
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
properties.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, appName);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.cla
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
Embedded Kafka startup:
@Rule
public KafkaEmbedded kafka = new KafkaEmbedded(getNumKafkaServers(), true, getNumPartitionsPerTopic(), getTopics().keySet().toArray(new String[0]));
Test Before:
@Before
public void before() {
// build KStreams and start topology
KStreamBuilder kStreamBuilder = new KStreamBuilder();
buildStream(kStreamBuilder);
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);
kafkaStreams.start();
}
Test After:
@After
public void after() {
kafka.destroy();
FileUtils.deleteDirectory(new File(streamsConfig.getString(StreamsConfig.STATE_DIR_CONFIG)));
}
Update 7/17:
More context, this is a Spring project and each test is annotated as such:
@SpringBootTest(classes = <this-test-class>.class)
@ActiveProfiles("test")
@RunWith(SpringRunner.class)