2

I am creating a c# application to publish a message to Kafka. In the current version of my application I set up cluster locally using docker (confluentinc-cp and confluentinc-zookeeper). However to run Kafka efficiently, we decided to use Managed Streaming Apache Kafka (MSK) to run a Cloud native kafka cluster. There's already a cluster created by our data-engineering team. There's already an endpoint for private network (bootstrap server) and plaintext url for Zookeeper.

How can I change my ProducerConfig to use MSK instead of localhost

My producer code looks like this:

public class ProducerHostedService : IHostedService
{
    private readonly ILogger<ProducerHostedService> _logger;
    private readonly IProducer<Null, string> _producer;

    public ProducerHostedService(ILogger<ProducerHostedService> logger)
    {
        _logger = logger;
        var config = new ProducerConfig
        {
            //SecurityProtocol = SecurityProtocol.Ssl,
            
            BootstrapServers = "localhost:9092"
        };
        _producer = new ProducerBuilder<Null, string>(config).Build();
    }
    
    public async Task StartAsync(CancellationToken cancellationToken)
    {
        for (var i = 0; i < 100; ++i)
        {
          var order = new OrderRequest()
                      {
                          CustomerId = i,
                          ProductId = i,
                          OrderId = i,
                          Quantity = 1,
                          Status = "New"
                      };
            string message = JsonSerializer.Serialize(order);
            _logger.LogInformation(message);
            // ProduceAsync creates a topic if not exists
            await _producer.ProduceAsync("test", new Message<Null, string>()
            {
                Value = message
            }, cancellationToken);
        }

        _producer.Flush(TimeSpan.FromSeconds(10));
    }

    public Task StopAsync(CancellationToken cancellationToken)
    {
        _producer?.Dispose();
        return Task.CompletedTask;
    }
}
  • What exactly happened when you replaced localhost with your actual cluster? Your other team should be able to provide you with any extra security configs – OneCricketeer Mar 20 '22 at 15:50
  • @OneCricketeer Thanks for your response. I replaced localhost:9092 with Bootstrap server private endpoint from MSK, and as instructions suggested I also set SecurityProtocol to ssl. But the producer couldn't connect to that endpoint. So you are probably right, the other team should provide me some other security configs. But I don't know what those config settings are called and how to set them in my code – PurpleGreen Mar 21 '22 at 09:31
  • I don't know what settings your MSK cluster uses either, but all of the properties for ProducerBuilder or ProducerConfig are documented on the Confluent-dotnet library site – OneCricketeer Mar 21 '22 at 15:08

0 Answers0