I'm trying to learn Kafka using C# on my local machine. I had a producer/consumer/stream that work with strings and am now trying to allow for complex types using schema registry.
I'm attempting to register a class using the following console app:
static async Task Main(string[] args)
{
var schemaRegistryConfig = new SchemaRegistryConfig
{
Url = "http://localhost:29092"
};
var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig);
var schema = @"{
""type"": ""TotallyCoolCustomClass"",
""properties"": {
""FavouriteQuote"": {""type"": ""string""},
""FavouriteNumber"": {""type"": ""integer""}
}
}";
var subject = "SimpleTest";
var schemaId = await schemaRegistry.RegisterSchemaAsync(subject, schema);
}
Whenever I run my schema registry app, I get an Http exception:
System.Net.Http.HttpRequestException: '[http://localhost:29092/] HttpRequestException: An error occurred while sending the request.'
and docker reports this under my kafka instance:
WARN [SocketServer listenerType=ZK_BROKER, nodeId=1] Unexpected error from /172.22.0.1 (channelId=172.22.0.3:29092-172.22.0.1:34802-28); closing connection (org.apache.kafka.common.network.Selector)
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 1347375956 larger than 104857600)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:94)
My message is obviously small, and not breaking the default 100Mb - so I'm not sure why it thinks it is. Can anyone tell me why I'm unable to register a schema?
I've set up Kafka, Zookeeper and a Schema Registry using this:
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
schema-registry:
image: confluentinc/cp-schema-registry:latest
depends_on:
- kafka
ports:
- 28081:28081
environment:
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'PLAINTEXT://kafka:9092'
# SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:28081
And produce to it with this (though I think this is largely redundant for the question, as the schema isn't registered so the publish will never work):
public class KafkaPProducerHostedService : IHostedService
{
private readonly ILogger<KafkaPProducerHostedService> _logger;
private readonly IProducer<string, TotallyCoolCustomClass> _producer;
public KafkaPProducerHostedService(ILogger<KafkaPProducerHostedService> logger)
{
_logger = logger;
var config = new ProducerConfig();
config.BootstrapServers = "localhost:29092";
var schemaRegistryConfig = new SchemaRegistryConfig
{
Url = "http://localhost:28081/"
};
//video on serializers
_producer = new ProducerBuilder<string, TotallyCoolCustomClass>(config)
.SetValueSerializer(new JsonSerializer<TotallyCoolCustomClass>(new CachedSchemaRegistryClient(schemaRegistryConfig)))
.Build();
}
public async Task StartAsync(CancellationToken cancellationToken)
{
int i = 0;
var rand = new Random();
while (true) {
Thread.Sleep(2000);
var customClass = new TotallyCoolCustomClass(rand);
var key = $"UpdatedKey-{i}";
await _producer.ProduceAsync("SimpleTest", new Message<string, TotallyCoolCustomClass>()
{
Key = key,
Value = customClass
},
cancellationToken);
Console.WriteLine($"Published: Key: {key} with favourite quote {customClass.FavouriteQuote} and favourite number {customClass.FavouriteNumber}");
i++;
}
}
public Task StopAsync(CancellationToken cancellationToken)
{
_producer?.Dispose();
return Task.CompletedTask;
}
}