2

I'm writing a group of tests using the spring KafkaEmbedded test util. The tests each individually stand up an embedded kafka instance, produce events and assert the resultant downstream events.

The tests consistently pass when run in an IDE (e.g. IntelliJ), however the tests fail intermittently (approximately 50% of the time, no regularity) when ran with SBT. In the event of a test failure, I see the following error:

o.a.zookeeper.server.ZooKeeperServer - ZKShutdownHandler is not registered, so ZooKeeper server won't take any action on ERROR or SHUTDOWN server state changes

Additionally, I see many INFO logs that report missing ZooKeeper nodes, for instance:

o.a.z.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x164a50fe9fd0001 type:create cxid:0x5 zxid:0x4 txntype:-1 reqpath:n/a Error Path:/brokers Error:KeeperErrorCode = NoNode for /brokers

These logs DO NOT appear on successful tests. When I say "I see many INFO logs" I mean MANY, around 40 such logs, with some of the node paths being nested in previously reported node paths.

Research suggests the error log is innocent, and I'd like to think that the info logs are too, but they stand alone on test failure.

Update 7/17:

KStreams/Producers/Consumers Configurations:

Properties properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, appName);
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapUrls);
properties.put(StreamsConfig.STATE_DIR_CONFIG, String.format("/tmp/kafka-streams/%s/%s",
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
properties.put(StreamsConfig.CLIENT_ID_CONFIG, appName);
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
properties.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, appName);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.cla
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);

Embedded Kafka startup:

@Rule
public KafkaEmbedded kafka = new KafkaEmbedded(getNumKafkaServers(), true, getNumPartitionsPerTopic(), getTopics().keySet().toArray(new String[0]));

Test Before:

@Before
public void before() {
    // build KStreams and start topology
    KStreamBuilder kStreamBuilder = new KStreamBuilder();
    buildStream(kStreamBuilder);
    KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);
    kafkaStreams.start();
}

Test After:

@After
public void after() {
    kafka.destroy();
    FileUtils.deleteDirectory(new File(streamsConfig.getString(StreamsConfig.STATE_DIR_CONFIG)));
}

Update 7/17:

More context, this is a Spring project and each test is annotated as such:

@SpringBootTest(classes = <this-test-class>.class)
@ActiveProfiles("test")
@RunWith(SpringRunner.class)
Freestyle076
  • 1,548
  • 19
  • 36
  • Update, discovered that the INFO logs appear on successful tests when run via the IDE. No ERROR logs however. – Freestyle076 Jul 16 '18 at 22:45
  • Additional update, I can't seem to get the error to happen when running it alone via "sbt testOnly". So, the error only occurs when tested in conjunction with other integration tests – Freestyle076 Jul 16 '18 at 22:52
  • Show, please, at least one test you write with such an approach. – Artem Bilan Jul 17 '18 at 01:00
  • Sorry @ArtemBilan , unfortunately the tests are written on top of a proprietary test framework which I cannot share. Without it, the tests wouldnt be of much use. I can, however, show configurations and startup/tear down code. Since this happens on many tests of the same type, this common code should prove useful. – Freestyle076 Jul 17 '18 at 15:29
  • Well, my point that you have to close Kafka resources after each test class. That is exactly what is done via `@ClassRule` on the `public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 1, ...);` property in our tests in the project: https://github.com/spring-projects/spring-kafka/tree/master/spring-kafka/src/test/java/org/springframework/kafka – Artem Bilan Jul 17 '18 at 15:36
  • @ArtemBilan I like where you're headed. Since it only happens on consecutive tests it certainly seems like setup/teardown are involved in the issue. As you can see we decided to use a `@Rule` instead of `@ClassRule`. AFAICT the only semantic difference is that the external resource will be created/destroyed at the end of each test, rather than each test cass – Freestyle076 Jul 17 '18 at 15:44
  • OK. And how does it work? – Artem Bilan Jul 17 '18 at 16:02
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/176169/discussion-between-freestyle076-and-artem-bilan). – Freestyle076 Jul 17 '18 at 16:03

0 Answers0