3

I have a few integration tests for my application that connect to a local Kafka instance. I am using the Java KafkaServer API to create the local instance on demand when the test runs in a way similar to the accepted answer from this question:

How can I instanciate a Mock Kafka Topic for junit tests?

Each of my tests pass when run in isolation. The problem I am having is that my tests use the same Kafka topics and I would like the topics to start each test containing no messages. However, when I run the tests in series I am getting this error when all tests after the first run and try to recreate the topics they need:

kafka.common.TopicExistsException: Topic "test_topic" already exists.
    at kafka.admin.AdminUtils$.createOrUpdateTopicPartitionAssignmentPathInZK(AdminUtils.scala:187)
    at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:172)
    at kafka.admin.TopicCommand$.createTopic(TopicCommand.scala:93)

Each test creates and shuts down its own EmbeddedZookeeper and KafkaServer. I have also tried deleting the 'brokers/topics' path from ZK as well as the KafkaServer's logDirs at the end of each test. Somehow the topics from the first test are still surviving to the second.

What can I do at the end of each test to make sure that the topics it uses do not interfere with tests that run after it?

Community
  • 1
  • 1
Joseph Downing
  • 1,099
  • 2
  • 12
  • 25
  • I've also tried enabling 'delete.topic.enable' on the broker and sending a TopicCommand.deleteTopic command. The logs report the delete is successful but the errors continue. – Joseph Downing Jan 26 '16 at 22:30
  • I would be helpful maybe if you could provide the version of kafka that you are using – Nautilus Jan 27 '16 at 09:13

1 Answers1

3

I was able to eventually get it to work.

Instead of cleaning up after each test, I changed the tests to clean up before they ran.

There were two cleanup steps that I needed to do.

The first was to delete the broker's data directory before starting KafkaServer.

    String dataDirectory = 'tmp/kafka'
    FileUtils.deleteDirectory(FileUtils.getFile(dataDirectory))

    Properties props = TestUtils.createBrokerConfig(BROKER_ID, port, true)
    props.put('log.dir', dataDirectory)
    props.put('delete.topic.enable', 'true')

    KafkaConfig config = new KafkaConfig(props)
    Time mock = new MockTime()
    kafkaServer = TestUtils.createServer(config, mock)

The second was to delete the topic path recursively in Zookeeper before sending the createTopic command.

    zkClient.deleteRecursive(ZkUtils.getTopicPath(topicName))

    List<String> arguments = ['--topic', topicName, '--partitions', '1', '--replication-factor', '1']
    TopicCommand.createTopic(zkClient, new TopicCommand.TopicCommandOptions(arguments as String[]))

I tried a number of similar approaches but couldn't get it working with anything except exactly this.

Note that the code is Groovy and not Java.

Joseph Downing
  • 1,099
  • 2
  • 12
  • 25