3

I am sending the event on the event grid like. i can see they arriving at the azure dashboard

            NomeEmailChange yay = new NomeEmailChange
            {
                Nome = "cesco",
                Email = "cesco"
            };

            var primaryTopicKey = _config["EventGridConfig:AcessKey"];
            var primaryTopic = _config["EventGridConfig:Endpoint"];

            var primaryTopicHostname = new Uri(primaryTopic).Host;

            var topicCredentials = new TopicCredentials(primaryTopicKey);
            var client = new EventGridClient(topicCredentials);

            var id = Guid.NewGuid().ToString();
            var hey = new List<EventGridEvent>
            {
                new EventGridEvent()
                {
                    Id = id,
                    EventType = "cesco-cesco",
                    Data = (yay),
                    EventTime = DateTime.Now,
                    Subject = "MS_Clientes",
                    DataVersion = "1.0",
                }
            };
            ;
            client.PublishEventsAsync(primaryTopicHostname, hey);

then I created a event grid subscrition. i can confirm on the event grid messages arriving to the event grid subscription.

On another project I am subscribing to the service bus like the bellow. it works fine for consuming messages sent directly to the bus.

        public static IServiceCollection AddBus(this IServiceCollection services, IConfiguration configuration,
            IHostingEnvironment env)
        {
            services.AddMassTransit(x => { x.AddConsumer<NomeEmailChangeConsumer>(); });
            services.AddSingleton(provider => Bus.Factory.CreateUsingAzureServiceBus(cfg =>
            {
                var keyName = "RootManageSharedAccessKey";
                var busName = configuration["ServiceBus:Name"];
                var secret = configuration["ServiceBus:Secret"];
                var host = cfg.Host(
                    "Endpoint=sb://" + busName + ".servicebus.windows.net/;" +
                    "SharedAccessKeyName=" + keyName + ";" +
                    "SharedAccessKey=" + secret,
                    z =>
                    {
                        TokenProvider
                            .CreateSharedAccessSignatureTokenProvider(keyName, secret);
                    });
                cfg.UseExtensionsLogging(provider.GetService<ILoggerFactory>());
                cfg.ReceiveEndpoint(host, configuration["ServiceBus:Topic"],
                    e => { e.Consumer<NomeEmailChangeConsumer>(provider); });
            }));
            services.AddSingleton<IPublishEndpoint>(provider => provider.GetRequiredService<IBusControl>());
            services.AddSingleton<ISendEndpointProvider>(provider => provider.GetRequiredService<IBusControl>());
            services.AddSingleton<IBus>(provider => provider.GetRequiredService<IBusControl>());
            services.AddScoped(provider => provider.GetRequiredService<IBus>().CreateRequestClient<NomeEmailChange>());
            services.AddSingleton<IHostedService, BusService>();
            return services;
        }

all supposed to work by now, but on this other project when the message arrives I get the following error

fail: MassTransit.Messages[0]
      R-FAULT sb://sbacompanharreldev.servicebus.windows.net/bff-queue 9ade19ec-238c-4c08-8e03-28bac695ea7b No deserializer was registered for the message content type: application/json; charset=utf-8. Supported content types include application/vnd.masstransit+json, application/vnd.masstransit+bson, application/vnd.masstransit+xml
System.Runtime.Serialization.SerializationException: No deserializer was registered for the message content type: application/json; charset=utf-8. Supported content types include application/vnd.masstransit+json, application/vnd.masstransit+bson, application/vnd.masstransit+xml
   at MassTransit.Serialization.SupportedMessageDeserializers.Deserialize(ReceiveContext receiveContext)
   at MassTransit.Pipeline.Filters.DeserializeFilter.Send(ReceiveContext context, IPipe`1 next)
   at GreenPipes.Filters.RescueFilter`2.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)
CESCO
  • 7,305
  • 8
  • 51
  • 83
  • https://stackoverflow.com/a/59195982/11228967 - feedback would be nice, to know if it works or not for you, as decent amout of time was invested to explore this problem. Thanks :) – kgalic Dec 06 '19 at 22:07
  • @kgalic the deserializer you send me does not compile on this line eturn new JsonConsumeContext(JsonSerializer.CreateDefault(), receiveContext, messageEnv); – CESCO Dec 07 '19 at 01:35
  • it says it need 4 arguments – CESCO Dec 07 '19 at 01:35
  • Did you try to download the sample from the repository I provided and try to run it? – kgalic Dec 07 '19 at 01:36
  • Also, are you using .net core or .net framework? – kgalic Dec 07 '19 at 01:37
  • @kgalic .net core – CESCO Dec 07 '19 at 02:02
  • I can try to run it once more but if you downloaded the repository it should work as it is based on the dotnet core. Can you please verify if you have the same package versions installed as in my example and repo? – kgalic Dec 07 '19 at 02:04
  • MassTransit is an open-source project, please have a look here, and you can see that JsonConsumeContext has constructor only with 3 params: https://github.com/MassTransit/MassTransit/blob/6e75ca3ed77af800d216dec0edc8124f1c2b4589/src/MassTransit/Serialization/JsonConsumeContext.cs – kgalic Dec 07 '19 at 03:35
  • Here is another example of how to use it(also 3 params), similar like in my deserializer: https://github.com/MassTransit/MassTransit/blob/3b98c2819fdddc519d4ad88e3873255013aa137d/src/MassTransit/Serialization/BsonMessageDeserializer.cs I also tried to run the code, and as in my screenshot(from the answer), I am receiving the messages through my deserializer. Could you please check if you can build and run my example, with your queue/topic connection string and queue/topic name? – kgalic Dec 07 '19 at 03:38

2 Answers2

5

As suggested you have to write the custom deserializer. In my implementation, I changed a couple of things on how to deal with the MassTransit package.

First, you have to register deserializers for different Content Types:

  1. For example, if your message come from MassTransit publisher it will be the default message ContentType(in C#): JsonMessageSerializer.JsonContentType
  2. If your message comes from the ServiceBus queue/topic, it is usually "application/json"

The following code shows how to register deserializers, and how to set up the receiving endpoint. One difference compared to your approach is that I used connection string, so no need for SAS token part.

class Program
    {
        static string ContentTypeJson = "application/json";
        static async Task Main(string[] args)
        {
            var bus = Bus.Factory.CreateUsingAzureServiceBus(cfg =>
            {
                var queueName = "Your SB Queue Name";
                var connectionString = "Connection String with RooTManage policy";
                var host = cfg.Host(connectionString, h =>
                {
                    h.OperationTimeout = TimeSpan.FromSeconds(60);
                });

                cfg.ReceiveEndpoint(queueName,
                    e =>
                    {
                        e.AddMessageDeserializer(contentType: new ContentType(ContentTypeJson), () =>
                        {
                            return new EventGridMessgeDeserializer(ContentTypeJson);
                        });
                        e.Consumer(() => new EventGridMessageConsumer());

                        // Uncomment if required deserializer for local messages - mass transit as publisher or direct messages from SB
                        //e.AddMessageDeserializer(contentType: JsonMessageSerializer.JsonContentType, () =>
                        //{
                        //    return new CustomMessageDeserializer(JsonMessageSerializer.JsonContentType.ToString());
                        //});
                        //e.Consumer(() => new MessageConsumer());
                    });

            });
            bus.Start();

            Console.WriteLine("Press any key to exit");
            // for testing purposes of local messages - mass transit as publisher
            // await bus.Publish<CustomMessage>(new {  Hello = "Hello, World." });
            await Task.Run(() => Console.ReadKey());

            await bus.StopAsync();

        }

I used my message simulator to pipe the messages to the EventGrid, which forwards the messages to the SB queue, and below you can see the outcome of running this code:

enter image description here

Deserializer for event grid messages, as well as full code, you can find on Github: https://github.com/kgalic/MassTransitSample and also in my answer on your other question.

kgalic
  • 2,441
  • 1
  • 9
  • 21
0

MassTransit is using message envelope that is used to deserialize the content. You will need to create and register a custom serializer to allow MassTransit ingest and process your messages. For more information, see Interoperability documentation. Once a custom serializer is created, it can be registered using configuration API.

Sean Feldman
  • 23,443
  • 7
  • 55
  • 80