I just started using Kafka and hit the following rookie error:
'Value cannot be null.
Parameter name: Value serializer not specified and there is no default serializer defined for type ActMessage.'
It happens when trying to send a class object,…
I am trying to create an Kafka client app (both producer and consumer) using AWS managed Kafka instance (MSK). Also broker to broker communication and client to broker communication is configured as secure via TLS in cluster. CA is AWS Private CA as…
I have a bunch of services that are integrated via Apache Kafka, and each of the services has their consumers and producers, but im facing slowing consuming rate like there's something slowing the consuming when get so much load into the…
I'm trying to publish a message on a Kafka topic from the control center for initial testing. I need to include a header in the message. Can this be done from the control center?
I am working in C# and have the following:
consumer.Subscribe( new string[] {"topic1", "topic2"});
var cr = consumer.Consume();
console.Write($"key {cr.Message.Key}\r\nvalue {cr.Message.Value}");
How do I get the topic of a specific message when I…
I'm trying to get the Kafka offset using Confluent Kafka.
This is the code I'm using to obtain it:
var offsetPosition = consumer.Position(new TopicPartition(topicConfiguration.Topic, topicConfiguration.Partition));
It always gives me a value of…
I am runnning an Xunit functional test within a docker compose stack based on Debian 3.1-buster with Confluent Kafka .NET Client v1.5.3 connecting to broker confluentinc/cp-kafka:6.0.1. I am fairly new to Kafka....
The architecture is illustrated…
I am getting some random timeout errors while publishing messages using Confluent.Kafka. The application runs in a Kubernetes cluster and is built using the .NET 6 framework.
When the default timeout (60000ms) is reached, the message is successfully…
I am creating a c# application to publish a message to Kafka. In the current version of my application I set up cluster locally using docker (confluentinc-cp and confluentinc-zookeeper). However to run Kafka efficiently, we decided to use Managed…
We are noticing a weird issue with one of our prod topics(6 partitions) where our consumer (dotnet core, only 1 instance) is only able to read from 3 partitions (0,1,3). This is obviously impacting application behavior as the consumer is missing the…
I have a consumer and want to subscribe to two different topics with different schema. One of this topic I need to configure like below:
var schemaRegistry = new CachedSchemaRegistryClient(new SchemaRegistryConfig
{
Url =…
In continuation to my previous question C# Confluent.Kafka SetValueDeserializer object deserialization, I have tried creating my custom deserializer to deserialize protobuf message but getting this error:
System.InvalidOperationException: 'Type is…
In my consumer, I want to deserialize Kafka protobuf message. The key is of string type but message value is a protobuf object. I know I have to create my own custom deserializer for message value but no idea how can I create one. Here is my…
We have multiple consumers that are subscribed to one topic (for now) with 10 partitions and are part of one consumer group.
Since these consumers are constantly listening to messages, we decided to use the worker service template in .NET Core 3.1.…