Good afternoon, I have only recently started working with Kafka and have a question about the producer in connection with the schema.
Initially I tried to build a simple producer without a schema in C#. This works so far, the code is also given in a shortened version.
Code of producer without schema:
var config = new ProducerConfig
{
BootstrapServers = "localhost:9092",
BrokerAddressFamily = BrokerAddressFamily.V4,
};
using (var producer = new ProducerBuilder<Null, string>(config).Build())
{
producer.Flush();
for(int i = 0; i < 3; i++)
{
producer.Produce("topic", new Message<Null, string> { Value = "Value: " + i + "..." });
}
producer.Flush();
};
But the schema causes me problems (see next section).
Suppose I have given a consumer, say in Python, who uses the following scheme, to receive integer numbers:
{"type": "record",
"name": "Sample",
"fields": [{"name": "Value", "type": ["int"]}]
}
I now want to create a C# producer that uses this scheme and send messages to the Python consumer. The message should contain only numbers, according to the scheme.
I tried to build a producer that uses the schema, but unfortunately in many tutorials the "schema registry url" is necessary to run the producer. This is what I have, but unfortunately I cannot avoid the use of "schema registry url"...
Code of producer that uses the schema
using Avro;
using Avro.Generic;
using Confluent.Kafka.SyncOverAsync;
using Confluent.SchemaRegistry.Serdes;
using Confluent.SchemaRegistry;
using System;
using System.Threading;
using System.Text;
using System.Threading.Tasks;
using System.Diagnostics;
using System.IO;
namespace producer
{
class Program
{
static async Task Main(string[] args)
{
//This is not further specified, nothing is set up here
string schemaRegistryUrl = "localhost:8081";
var schema = (RecordSchema)RecordSchema.Parse(
@"{
""type"": ""record"",
""name"": ""Sample"",
""fields"": [
{""name"": ""Value"", ""type"": [""int""]}
]
}"
);
var config = new Confluent.Kafka.ProducerConfig
{
BootstrapServers = "localhost:9092",
BrokerAddressFamily = BrokerAddressFamily.V4,
};
using (var schemaRegistry = new CachedSchemaRegistryClient(new SchemaRegistryConfig { Url = schemaRegistryUrl }))
using (var producer = new Confluent.Kafka.ProducerBuilder<Confluent.Kafka.Null, GenericRecord>(config)
.SetValueSerializer(new AvroSerializer<GenericRecord>(schemaRegistry))
.Build())
{
for(int i = 0; i < 3; i++)
{
var record = new GenericRecord(schema);
record.Add("Value", i);
producer.ProduceAsync("topic", new Confluent.Kafka.Message<Confluent.Kafka.Null, GenericRecord> { Value = record });
}
producer.Flush();
}
}
}
}
Here are two questions, how can one build a producer without the "schema registry url"? I have already found something like this (but unfortunately in Java). How would it look like in C#? I'm interested in a solution from a producer that sends numbers to the Python consumer using the scheme (from above), preferably without using the "schema registry url". However, I would also be interested in how to get the "schema registry url" to work.
Just as a hint: If the producer tries to send a message to the Python consumer without a schema, the consumer registers this. But the consumer cannot display the sent number, because the simple producer does not use a scheme. I refer to the code of the very first producer!
I hope that my question(s) are as far as understandable, I am looking forward to receiving answers!