1

I run Kafka with Docker,

First I created a topic like this,

docker exec -t kafka kafka-topics --bootstrap-server :9092 --create --topic kafka-test --partitions 5 --replication-factor 1

As you see, there are 5 partitions, I used the code below

Kafka C# Codes

I think this code probably produces messages but for only Partition 3,

it works but divides data for all partitions so not only for partition 3... and some of them are below like P2 and P0 (Partition2 - Partition0)

BUT I just want to produce messages for only specific partition...

Do I miss something ? or Do I get something wrong?

Consumer Codes below

enter image description here

This the picture of Consumer..

Kafka values

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
serhatyt
  • 180
  • 2
  • 14

1 Answers1

4

KafkaNet seems to be an abandoned project.

Switch to confluent-kafka-dotnet, and you have a Produce method that accepts a TopicPartition as an argument, which you can use instead of Meta attribute on the Message itself (which should only be a key+value+headers).

ProducerConfig config = new ProducerConfig { BootstrapServers = bootstrapServers, ClientId = Dns.GetHostName() };
using (var producer = new ProducerBuilder<Null, string>(config).Build()) { 
    // send to partition 3
    var topicPart = new TopicPartition("topic", new Partition(3)); 
    var result = await producer.ProduceAsync(topicPart, new Message<Null, string> { Value = message }); 
}

Also - Kafka is not an HTTP service; remove the protocol from your Uri variable.

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
  • var result = await producer.ProduceAsync(topic, new Message { Value=message }); I use it like this, but there are 2 others attributes key and headers in Message, Should I use them to do it? I thought ProduceAsync has got a TopicPartition which is only get; , so I cannot use it – serhatyt Jun 02 '22 at 07:23
  • Not sure I understand the question. What is your `topic` parameter? A string or a TopicPartition instance? Otherwise, Kafka will hash the `key` to determine partitions more dynamically. Headers are not used for partitioning – OneCricketeer Jun 02 '22 at 16:11
  • I used Topic as 'string' but when I want to change it to TopicPartition, it gives me an error... I just want to specify the partition. Thank you for helping... ------ ProducerConfig config = new ProducerConfig { BootstrapServers = bootstrapServers, ClientId = Dns.GetHostName() }; ----- using (var producer = new ProducerBuilder(config).Build()) { Partition part = new Partition(3); var topicPart = new TopicPartition(topic, part); var result = await producer.ProduceAsync(topicPart, new Message { Value = message }); } – serhatyt Jun 03 '22 at 08:28
  • You can't specify "just the partition"; a topic name and at least a message value are also needed. What does the error say? – OneCricketeer Jun 03 '22 at 13:38
  • ERROR IS LIKE THIS ---> Confluent.Kafka.ProduceException`2: 'Local: Unknown partition' --------------- And also like this ---> [External Code] Kafka_Producer_API.Controllers.ProducerController.SendOrderRequest(string, string) / ProducerController.cs – serhatyt Jun 03 '22 at 13:50
  • Error seems self explanatory. How many partitions does your topic actually have when you created it? – OneCricketeer Jun 03 '22 at 13:52
  • 1
    thank you so much, I missed the number of partitions, but solved now :) – serhatyt Jun 03 '22 at 14:10