Questions tagged [confluent-kafka-dotnet]

A Kafka .NET client, written by Confluent.

Source: https://github.com/confluentinc/confluent-kafka-dotnet

138 questions
7
votes
1 answer

Kafka Producer Error: ' Value serializer not specified and there is no default serializer defined for type ...'

I just started using Kafka and hit the following rookie error: 'Value cannot be null. Parameter name: Value serializer not specified and there is no default serializer defined for type ActMessage.' It happens when trying to send a class object,…
MiguelSlv
  • 14,067
  • 15
  • 102
  • 169
4
votes
0 answers

SSL Authentication for Managed Kafka AWS (MSK) broker validation failure

I am trying to create an Kafka client app (both producer and consumer) using AWS managed Kafka instance (MSK). Also broker to broker communication and client to broker communication is configured as secure via TLS in cluster. CA is AWS Private CA as…
4
votes
0 answers

Which is the best configuration for a Kafka consumer to increase throughput

I have a bunch of services that are integrated via Apache Kafka, and each of the services has their consumers and producers, but im facing slowing consuming rate like there's something slowing the consuming when get so much load into the…
3
votes
1 answer

Kafka client unit testing for dotnet

Im looking for assistance in writing unit tests for kafka in dotnet. Confluent seems to provide a Mocks for producers and consumers in Java only.
ComeIn
  • 1,519
  • 17
  • 12
3
votes
0 answers

Publish Message from Confluent Control Center with Header

I'm trying to publish a message on a Kafka topic from the control center for initial testing. I need to include a header in the message. Can this be done from the control center?
3
votes
1 answer

When reading multiple topics in kafka how can I tell the topic of a specific message?

I am working in C# and have the following: consumer.Subscribe( new string[] {"topic1", "topic2"}); var cr = consumer.Consume(); console.Write($"key {cr.Message.Key}\r\nvalue {cr.Message.Value}"); How do I get the topic of a specific message when I…
3
votes
1 answer

Confluent Kafka returns -1001 for offset position

I'm trying to get the Kafka offset using Confluent Kafka. This is the code I'm using to obtain it: var offsetPosition = consumer.Position(new TopicPartition(topicConfiguration.Topic, topicConfiguration.Partition)); It always gives me a value of…
runnerpaul
  • 5,942
  • 8
  • 49
  • 118
3
votes
0 answers

Why Is Kafka .NET Client Consumer Blocking With confluent/cp-kafka? Why Does It Consume Correctly With lensesio:fast-data-dev?

I am runnning an Xunit functional test within a docker compose stack based on Debian 3.1-buster with Confluent Kafka .NET Client v1.5.3 connecting to broker confluentinc/cp-kafka:6.0.1. I am fairly new to Kafka.... The architecture is illustrated…
2
votes
0 answers

Random "Timed out ProduceRequest in flight" messages

I am getting some random timeout errors while publishing messages using Confluent.Kafka. The application runs in a Kubernetes cluster and is built using the .NET 6 framework. When the default timeout (60000ms) is reached, the message is successfully…
Catalin
  • 41
  • 6
2
votes
0 answers

How to use Amazon MSK in C# code to publish a message to Kafka

I am creating a c# application to publish a message to Kafka. In the current version of my application I set up cluster locally using docker (confluentinc-cp and confluentinc-zookeeper). However to run Kafka efficiently, we decided to use Managed…
2
votes
2 answers

Kafka consumer not able to read from all available partitions

We are noticing a weird issue with one of our prod topics(6 partitions) where our consumer (dotnet core, only 1 instance) is only able to read from 3 partitions (0,1,3). This is obviously impacting application behavior as the consumer is missing the…
2
votes
0 answers

How subscribe to two different topics with different schema in C# confluent kafka?

I have a consumer and want to subscribe to two different topics with different schema. One of this topic I need to configure like below: var schemaRegistry = new CachedSchemaRegistryClient(new SchemaRegistryConfig { Url =…
2
votes
1 answer

c# confluent.kafka unable to deserialize protobuf message using Protobuf-net

In continuation to my previous question C# Confluent.Kafka SetValueDeserializer object deserialization, I have tried creating my custom deserializer to deserialize protobuf message but getting this error: System.InvalidOperationException: 'Type is…
Ali Shahzad
  • 5,163
  • 7
  • 36
  • 64
2
votes
3 answers

C# Confluent.Kafka SetValueDeserializer object deserialization

In my consumer, I want to deserialize Kafka protobuf message. The key is of string type but message value is a protobuf object. I know I have to create my own custom deserializer for message value but no idea how can I create one. Here is my…
Ali Shahzad
  • 5,163
  • 7
  • 36
  • 64
2
votes
0 answers

Running Multiple Kafka Consumers as Worker Services

We have multiple consumers that are subscribed to one topic (for now) with 10 partitions and are part of one consumer group. Since these consumers are constantly listening to messages, we decided to use the worker service template in .NET Core 3.1.…
1
2 3
9 10