0

I'm trying to learn Kafka using C# on my local machine. I had a producer/consumer/stream that work with strings and am now trying to allow for complex types using schema registry.

I'm attempting to register a class using the following console app:

    static async Task Main(string[] args)
    {
        var schemaRegistryConfig = new SchemaRegistryConfig
        {
            Url = "http://localhost:29092"
        };

        var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig);

        var schema = @"{
            ""type"": ""TotallyCoolCustomClass"",
            ""properties"": {
                ""FavouriteQuote"": {""type"": ""string""},
                ""FavouriteNumber"": {""type"": ""integer""}
            }
        }";

        var subject = "SimpleTest";
        var schemaId = await schemaRegistry.RegisterSchemaAsync(subject, schema);
    }

Whenever I run my schema registry app, I get an Http exception:

System.Net.Http.HttpRequestException: '[http://localhost:29092/] HttpRequestException: An error occurred while sending the request.'

and docker reports this under my kafka instance:

    WARN [SocketServer listenerType=ZK_BROKER, nodeId=1] Unexpected error from /172.22.0.1 (channelId=172.22.0.3:29092-172.22.0.1:34802-28); closing connection (org.apache.kafka.common.network.Selector)
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 1347375956 larger than 104857600)
        at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:94)

My message is obviously small, and not breaking the default 100Mb - so I'm not sure why it thinks it is. Can anyone tell me why I'm unable to register a schema?

I've set up Kafka, Zookeeper and a Schema Registry using this:

   version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 22181:2181
  
  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  schema-registry:
    image: confluentinc/cp-schema-registry:latest
    depends_on: 
      - kafka
    ports:
      - 28081:28081
    environment:
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'PLAINTEXT://kafka:9092'
      # SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:28081

And produce to it with this (though I think this is largely redundant for the question, as the schema isn't registered so the publish will never work):

public class KafkaPProducerHostedService : IHostedService
        {
            private readonly ILogger<KafkaPProducerHostedService> _logger;
            private readonly IProducer<string, TotallyCoolCustomClass> _producer;
            public KafkaPProducerHostedService(ILogger<KafkaPProducerHostedService> logger)
            {
                _logger = logger;
                var config = new ProducerConfig();
                config.BootstrapServers = "localhost:29092";

                var schemaRegistryConfig = new SchemaRegistryConfig
                {
                    Url = "http://localhost:28081/"
                };

                //video on serializers
                _producer = new ProducerBuilder<string, TotallyCoolCustomClass>(config)
                    .SetValueSerializer(new JsonSerializer<TotallyCoolCustomClass>(new CachedSchemaRegistryClient(schemaRegistryConfig)))
                    .Build();
            }

            public async Task StartAsync(CancellationToken cancellationToken)
            {
                int i = 0;

                var rand = new Random();

                while (true) {
                    Thread.Sleep(2000);
                    var customClass = new TotallyCoolCustomClass(rand);
                    var key = $"UpdatedKey-{i}";
                    await _producer.ProduceAsync("SimpleTest", new Message<string, TotallyCoolCustomClass>()
                    {
                        Key = key,
                        Value = customClass
                    },                    
                    cancellationToken);

                    Console.WriteLine($"Published: Key: {key} with favourite quote {customClass.FavouriteQuote} and favourite number {customClass.FavouriteNumber}");
                    i++;
                }
            }

            public Task StopAsync(CancellationToken cancellationToken)
            {
                _producer?.Dispose();
                return Task.CompletedTask;
            }
        }
Joshua Mee
  • 582
  • 5
  • 20
  • You are send an HTTP Request. There are two types 1) GET where there is not body 2) POST where there is a body. It looks like you have a body since you are using the SetValueSerializer. I'm not familiar with the library. With a body you also need a HTTP Header indicating the length. Some libraries automatically calculate the length (header Content-Length) while others do not. There seems to be an error in the length. – jdweng Jun 26 '23 at 12:52
  • See following : https://stackoverflow.com/questions/60917563/kafka-producer-limits-producer-messages-to-1024-bytes – jdweng Jun 26 '23 at 12:54
  • The error message is saying that the size I'm sending is bigger than the allowed, 1347375956 bytes is 13 gb and the allowed 100001200 is 100mb - I don't think my tiny string is bigger than either of those values! – Joshua Mee Jun 26 '23 at 13:17
  • If you are leaving blank or number is negative (-1) it might be interpreted as a huge number. Blank may be -1. – jdweng Jun 26 '23 at 14:30

1 Answers1

1

You've configured your schema registry client to send data at the Kafka broker, which is not an HTTP server

Use port 28081

However you've commented SCHEMA_REGISTRY_LISTENERS, therefore the port mapping is incorrect, and should be the default 8081

schema isn't registered so the publish will never work

Producers automatically register schemas. There's no need to do that manually unless you really need the schema ID somewhere

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
  • Thanks - I'd just clocked that myself as I felt it odd that Kafka was the one complaining. Changed it to 28081 but still getting an error with even less helpful information! System.Net.Http.HttpRequestException: '[http://localhost:28081/] HttpRequestException: An error occurred while sending the request.' – Joshua Mee Jun 26 '23 at 14:38
  • And what's the new error? – OneCricketeer Jun 26 '23 at 14:39
  • Also note that schema registry should not be using Zookeeper for connection string – OneCricketeer Jun 26 '23 at 14:40
  • should that be the same as the bootstrap servers? – Joshua Mee Jun 26 '23 at 14:44
  • You only need the bootstrap servers for schema registry config. The Zookeeper option is deprecated and can be removed. More importantly, you shouldn't use both. Not sure it'll solve any problem you're having though – OneCricketeer Jun 26 '23 at 14:48
  • Sorry - to confirm, the schema registry portion should look like this? schema-registry: image: confluentinc/cp-schema-registry:latest depends_on: - kafka ports: - 28081:28081 environment: SCHEMA_REGISTRY_HOST_NAME: schema-registry SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'PLAINTEXT://kafka:9092' – Joshua Mee Jun 26 '23 at 14:54
  • 1
    You are missing LISTENERS there for port 28081, and the PLAINTEXT isn't necessary - https://github.com/confluentinc/cp-all-in-one/blob/7.4.0-post/cp-all-in-one-community/docker-compose.yml#L46 – OneCricketeer Jun 26 '23 at 16:55