4

I am trying to run a simple unit test using @EmbeddedKafka Annotation. As a reference, I am following the below spring documentation https://docs.spring.io/spring-kafka/reference/html/#embedded-kafka-annotation

@RunWith(SpringRunner.class)
@DirtiesContext
@EmbeddedKafka(brokerProperties = "log.dir=/kafka-logs", partitions = 1,
    topics = {
        "dare_policy_created"})
@Slf4j
public class ConsumerTest {

@Autowired
  private EmbeddedKafkaBroker embeddedKafka;

@Test
  public void someTest() {
    Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup", "true", this.embeddedKafka);
    consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    ConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
    Consumer<Integer, String> consumer = cf.createConsumer();
    this.embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "dare_policy_created");
    ConsumerRecords<Integer, String> replies = KafkaTestUtils.getRecords(consumer);
    //assertThat(replies.count()).isGreaterThanOrEqualTo(1);
  }
}

I was trying to define the log.dir @EmbeddedKafka(brokerProperties = "log.dir= ") because I was getting an error when running the Test.

I tried :

  • log.dir=/kafka-logs
  • log.dir=real_path_to_my_project/kafka-logs
  • ...

But every time I run the test I get this error :

kafka.server.LogDirFailureChannel.error - Failed to create or validate data directory /kafka-logs java.io.IOException: Failed to load /kafka-logs during broker startup

kafka.log.LogManager.fatal - Shutdown broker because none of the specified log dirs from /kafka-logs can be created or validated
mazaneicha
  • 8,794
  • 4
  • 33
  • 52
DALDOUL
  • 154
  • 1
  • 10
  • I too am having this issue. I got a little further by adding a meta.properties file to the log dir. Kafka tried to start but then failed with: java.lang.NoSuchMethodError: org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(Lorg/apache/kafka/common/network/ListenerName;ZLorg/apache/kafka/common/security/auth/SecurityProtocol;Lorg/apache/kafka/common/config/AbstractConfig;Lorg/apache/kafka/common/security/authenticator/CredentialCache;Lorg/apache/kafka/common/security/token/delegation/internals/DelegationTokenCache;)Lorg/apache/kafka/common/network/ChannelBuilder; – cSlater Oct 21 '19 at 14:08
  • I'm facing the same error too(+1), did you find any solution? – ADJ Nov 01 '19 at 05:15

1 Answers1

1

I was able to resolve the issue by removing an explicit dependency on kafka-client. I had below dependency in my pom

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.3.0</version>
</dependency>
ADJ
  • 1,182
  • 2
  • 14
  • 26