3

I am using kafka testcontainers with JUnit5. Can someone let me know how can I delete data from Kafka testcontainers after each test so that I don't have to destroy and recreate the kafka testcontainer every time.

  • Test Container Version - 1.6.2
  • Docker Kafka Image Name - confluentinc/cp-kafka:5.2.1
OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
tuk
  • 5,941
  • 14
  • 79
  • 162
  • You could create a folder and mount it as a volume? But you also need to delete data from the attached Zookeeper container since that's where topic information is stored – OneCricketeer May 29 '22 at 11:52
  • I am using testcontainers as mentioned [here](https://www.testcontainers.org/modules/kafka/). I am not using external zookeeper. Don;t I have to delete and attach a new volume which will require me to restart the testcontainer? – tuk May 29 '22 at 14:37

2 Answers2

0

Make the container variable static

Containers declared as static fields will be shared between test methods. They will be started only once before any test method is executed and stopped after the last test method has executed

https://www.testcontainers.org/test_framework_integration/junit_5/

Make sure that you don't share state between tests, though. For example, if you want to test creating a topic, producing to it, then consuming from it, and deleting it, those all should be in one test. Although you can call separate non test methods.

That being said, each test should ideally use a unique topic name. One that describes the test, maybe.

Also, as documented, you cannot use the parallel test runner

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
0
  1. A simple cleanup of kafka can be done by deleting all topics. This will be faster than creating a new container Kafka. I have no experience with kafka streams, maybe something else is needed to clean it up.

Is there a way to purge the topic in Kafka?

c# client example:

using var client = new AdminClientBuilder(
    // setup connection
    )
    .Build();

var metadata = client.GetMetadata(TimeSpan.FromSeconds(5));    
var topics = metadata.Topics
    .Select(e => e.Topic)
    .Where(e => !e.StartsWith("__")) // Ignore system topics
    .ToArray();

if (topics.Length != 0)
{
    await client.DeleteTopicsAsync(
        topics
        );
}

  1. For parallel execution, you can create a pool of containers. Provided that the same basic configuration is suitable for all tests. (If there are several configurations, then you need to make it possible to pass the key parameter of the required configuration. And store the buffer as a key-value structure (key - conf type, value - pool of containers))

In c# it can be done like this:

class PoolManager<T>
{
    private readonly Channel<T> _buffer;
    private readonly Func<T, ValueTask<T>> _resetAction;
    
    // .. other code (init pool)

    public async Task<PoolItem> GetAsync()
    {   
        // Get a container or expect to be returned to the pool
        var value = await _buffer.Reader.ReadAsync();
        return new PoolManager<T>.PoolItem(
            this,
            value
            );
    }
    
    public record PoolItem
        : IAsyncDisposable
    {
        private readonly PoolManager<T> _poolManager;
        public T Value { get; }
    
    
        public PoolItem(
            PoolManager<T> poolManager,
            T value
            )
        {
            _poolManager = poolManager;
            Value = value;
        }
    
        public async ValueTask DisposeAsync()
        {
            // Reset container state (clear Db or Brocker)
            var value = await _poolManager._resetAction(Value);
            if (!_poolManager._buffer.Writer.TryWrite(value))
            {
                throw new Exception();
            }
        }
    }
}
cccc1808
  • 1
  • 1