5

I have the following Program.cs

 public class Program
{
    public async static Task Main(string[] args)
    {

        var host = CreateHostBuilder(args).Build();

        using (var scope = host.Services.CreateScope())
        {
            var services = scope.ServiceProvider;

            try
            {
                var context = services.GetRequiredService<ProductDbContext>();

                if (context.Database.IsSqlServer())
                {
                    context.Database.Migrate();
                }
                await ProductDbContextSeed.SeedSampleDataAsync(context);
            }
            catch (Exception ex)
            {
                var logger = scope.ServiceProvider.GetRequiredService<ILogger<Program>>();

                logger.LogError(ex, "An error occurred while migrating or seeding the database.");

                throw;
            }
        }

        await host.RunAsync();
    }

    public static IHostBuilder CreateHostBuilder(string[] args) =>
        Host.CreateDefaultBuilder(args)
         .UseServiceProviderFactory(new AutofacServiceProviderFactory())
            .ConfigureWebHostDefaults(webBuilder =>
                webBuilder.UseStartup<Startup>());

}

By adding the following line: (as I already have in the code above):

.UseServiceProviderFactory(new AutofacServiceProviderFactory())

My code runs how I want. However if I remove .UseServiceProviderFactory(new AutofacServiceProviderFactory()) And simply have:

  public static IHostBuilder CreateHostBuilder(string[] args) =>
        Host.CreateDefaultBuilder(args)
            .ConfigureWebHostDefaults(webBuilder =>
                webBuilder.UseStartup<Startup>());

I get the following error:

    Cannot resolve 'ProductAPI.IntegrationEvents.EventHandling.OrderCreatedIntegrationEventHandler' 
from root provider because it requires scoped service 
'RetailInMotion.Services.Inventory.ProductAPI.Infrastructure.Persistence.ProductDbContext'.

Here is how I have configured the ProductDbContext:

    public static class DependencyInjection
{
    public static IServiceCollection AddInfrastructure(this IServiceCollection services, IConfiguration configuration)
    {
       
            services.AddDbContext<ProductDbContext>(options =>
                options.UseSqlServer(
                    configuration.GetConnectionString("DefaultConnection"),
                    b => b.MigrationsAssembly(typeof(ProductDbContext).Assembly.FullName)));

        services.AddDbContext<EventLogContext>(options =>
        {
            options.UseSqlServer(configuration.GetConnectionString("DefaultConnection"),
                                 sqlServerOptionsAction: sqlOptions =>
                                 {
                                     sqlOptions.MigrationsAssembly(typeof(ProductDbContext).Assembly.FullName);
                                     sqlOptions.EnableRetryOnFailure(10, TimeSpan.FromSeconds(30), null);
                                 });
        });

        services.AddAuthentication();

        services.AddAuthorization();

        return services;
    }
}

I register my EventBus as follows:

private void RegisterEventBus(IServiceCollection services, IConfiguration configuration)
{
    services.AddSingleton<IEventBus, EventBusRabbitMq>(sp =>
    {
        var subscriptionClientName = configuration["SubscriptionClientName"];
        var rabbitMQPersistentConnection = sp.GetRequiredService<IRabbitMQConnectionManagementService>();
        var logger = sp.GetRequiredService<ILogger<EventBusRabbitMq>>();
        //var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
        var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();

        var retryCount = 5;
        if (!string.IsNullOrEmpty(configuration["EventBusRetryCount"]))
        {
            retryCount = int.Parse(configuration["EventBusRetryCount"]);
        }
        //return new EventBusRabbitMq(eventBusSubcriptionsManager, rabbitMQPersistentConnection, iLifetimeScope, logger, retryCount, subscriptionClientName);
        return new EventBusRabbitMq(eventBusSubcriptionsManager, rabbitMQPersistentConnection, sp, logger, retryCount, subscriptionClientName);
    });

    services.AddSingleton<IEventBusSubscriptionsManager, InMemoryEventBusSubscriptionsManager>();
    services.AddTransient<OrderCreatedIntegrationEventHandler>();
    services.AddTransient<RemoveProductStockIntegrationEventHandler>();
}

Here is my EventBusRabbitMq class:

`

public class EventBusRabbitMq : IEventBus, IDisposable
{
    const string BROKER_NAME = "retailInMotion_event_bus";
    const string AUTOFAC_SCOPE_NAME = "retailInMotion_event_bus";

private readonly IEventBusSubscriptionsManager _subsManager;
private readonly IRabbitMQConnectionManagementService _rabbitMQConnectionManagementService;
private readonly IServiceProvider _serviceProvider;
//private readonly ILifetimeScope _autofac;
private readonly ILogger<EventBusRabbitMq> _logger;
private readonly int _retryCount;

private IModel _consumerChannel;
private string _queueName;

public EventBusRabbitMq(
     IEventBusSubscriptionsManager subsManager,
     IRabbitMQConnectionManagementService rabbitMQConnectionManagementService,
     IServiceProvider serviceProvider, 
     //ILifetimeScope autofac,
     ILogger<EventBusRabbitMq> logger, 
     int retryCount = 5, 
     string queueName = null)
{
    _subsManager = subsManager;
    _rabbitMQConnectionManagementService = rabbitMQConnectionManagementService;
    _serviceProvider = serviceProvider;
    //_autofac = autofac;
    _logger = logger;
    _retryCount = retryCount;
    _queueName = queueName;
    _consumerChannel = CreateConsumerChannel();
    _subsManager.OnEventRemoved += SubsManager_OnEventRemoved;
}

private void SubsManager_OnEventRemoved(object? sender, string eventName)
{
    if (!_rabbitMQConnectionManagementService.IsConnected)
    {
        _rabbitMQConnectionManagementService.TryConnect();
    }

    using (var channel = _rabbitMQConnectionManagementService.CreateModel())
    {
        channel.QueueUnbind(queue: _queueName,
            exchange: BROKER_NAME,
            routingKey: eventName);

        if (_subsManager.IsEmpty)
        {
            _queueName = string.Empty;
            _consumerChannel.Close();
        }
    }
}

private IModel? CreateConsumerChannel()
{
    if (!_rabbitMQConnectionManagementService.IsConnected)
    {
        _rabbitMQConnectionManagementService.TryConnect();
    }

    _logger.LogTrace("Creating RabbitMQ consumer channel");

    var channel = _rabbitMQConnectionManagementService.CreateModel();

    channel.ExchangeDeclare(exchange: BROKER_NAME,
                            type: "direct");

    channel.QueueDeclare(queue: _queueName,
                            durable: true,
                            exclusive: false,
                            autoDelete: false,
                            arguments: null);

    channel.CallbackException += (sender, ea) =>
    {
        _logger.LogWarning(ea.Exception, "Recreating RabbitMQ consumer channel");

        _consumerChannel.Dispose();
        _consumerChannel = CreateConsumerChannel();
        StartBasicConsume();
    };

    return channel;
}

public void Publish(IntegrationEvent @event)
{
    if (!_rabbitMQConnectionManagementService.IsConnected)
    {
        _rabbitMQConnectionManagementService.TryConnect();
    }

    var policy = RetryPolicy.Handle<BrokerUnreachableException>()
        .Or<SocketException>()
        .WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>
        {
            _logger.LogWarning(ex, "Could not publish event: {EventId} after {Timeout}s ({ExceptionMessage})", @event.Id, $"{time.TotalSeconds:n1}", ex.Message);
        });

    var eventName = @event.GetType().Name;
    //var jsonMessage = JsonConvert.SerializeObject(@event);

    _logger.LogTrace("Creating RabbitMQ channel to publish event: {EventId} ({EventName})", @event.Id, eventName);

    using (var channel = _rabbitMQConnectionManagementService.CreateModel())
    {
        _logger.LogTrace("Declaring RabbitMQ exchange to publish event: {EventId}", @event.Id);

        channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct");

        var body = JsonSerializer.SerializeToUtf8Bytes(@event, @event.GetType(), new JsonSerializerOptions
        {
            WriteIndented = true
        });

        //var body = Encoding.UTF8.GetBytes(jsonMessage);

        policy.Execute(() =>
        {
            var properties = channel.CreateBasicProperties();
            properties.DeliveryMode = 2; // persistent

            _logger.LogTrace("Publishing event to RabbitMQ: {EventId}", @event.Id);

            channel.BasicPublish(
                exchange: BROKER_NAME,
                routingKey: eventName,
                mandatory: true,
                basicProperties: properties,
                body: body);
        });
    }
}

//public void Setup()
//{
//    throw new NotImplementedException();
//}

public void Subscribe<T, TH>()
    where T : IntegrationEvent
    where TH : IIntegrationEventHandler<T>
{
    var eventName = _subsManager.GetEventKey<T>();
    DoInternalSubscription(eventName);

    _logger.LogInformation("Subscribing to event {EventName} with {EventHandler}", eventName, typeof(TH).Name);

    _subsManager.AddSubscription<T, TH>();
    StartBasicConsume();
}
private void DoInternalSubscription(string eventName)
{
    var containsKey = _subsManager.HasSubscriptionsForEvent(eventName);
    if (!containsKey)
    {
        if (!_rabbitMQConnectionManagementService.IsConnected)
        {
            _rabbitMQConnectionManagementService.TryConnect();
        }

        _consumerChannel.QueueBind(queue: _queueName,
                            exchange: BROKER_NAME,
                            routingKey: eventName);
    }
}
public void Unsubscribe<T, TH>()
    where T : IntegrationEvent
    where TH : IIntegrationEventHandler<T>
{
    var eventName = _subsManager.GetEventKey<T>();

    _logger.LogInformation("Unsubscribing from event {EventName}", eventName);

    _subsManager.RemoveSubscription<T, TH>();
}

private void StartBasicConsume()
{
    _logger.LogTrace("Starting RabbitMQ basic consume");

    if (_consumerChannel != null)
    {
        var consumer = new AsyncEventingBasicConsumer(_consumerChannel);

        consumer.Received += Consumer_Received;

        _consumerChannel.BasicConsume(
            queue: _queueName,
            autoAck: false,
            consumer: consumer);
    }
    else
    {
        _logger.LogError("StartBasicConsume can't call on _consumerChannel == null");
    }
}

private async Task Consumer_Received(object sender, BasicDeliverEventArgs eventArgs)
{
    var eventName = eventArgs.RoutingKey;
    var message = Encoding.UTF8.GetString(eventArgs.Body.Span);

    try
    {
        if (message.ToLowerInvariant().Contains("throw-fake-exception"))
        {
            throw new InvalidOperationException($"Fake exception requested: \"{message}\"");
        }

        await ProcessEvent(eventName, message);
    }
    catch (Exception ex)
    {
        _logger.LogWarning(ex, "----- ERROR Processing message \"{Message}\"", message);
    }

    // Even on exception we take the message off the queue.
    // in a REAL WORLD app this should be handled with a Dead Letter Exchange (DLX). 
    // For more information see: https://www.rabbitmq.com/dlx.html
    _consumerChannel.BasicAck(eventArgs.DeliveryTag, multiple: false);
}

private async Task ProcessEvent(string eventName, string message)
{
    _logger.LogTrace("Processing RabbitMQ event: {EventName}", eventName);

    if (_subsManager.HasSubscriptionsForEvent(eventName))
    {
        //using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME)) // can do it this way with autofac or like directly below with built in .net core DI 
        using (var scope = _serviceProvider.CreateScope())
        {
            var subscriptions = _subsManager.GetHandlersForEvent(eventName);
            foreach (var subscription in subscriptions)
            {
                var handler = _serviceProvider.GetRequiredService(subscription.HandlerType);  //CHRIS INVESTIGATE!!
                //var handler = scope.ResolveOptional(subscription.HandlerType);
                if (handler == null) continue;
                var eventType = _subsManager.GetEventTypeByName(eventName);
                //var integrationEvent = JsonConvert.DeserializeObject(message, eventType);
                //dynamic json = Newtonsoft.Json.JsonConvert.DeserializeObject(message);
                //using dynamic eventData = JsonDocument.Parse(message);

                var integrationEvent = JsonSerializer.Deserialize(message, eventType, new JsonSerializerOptions() { PropertyNameCaseInsensitive = true});   //JsonSerializer.Deserialize(message, eventType, new JsonSerializerOptions() { PropertyNameCaseInsensitive = true });
                var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
                await (Task)concreteType.GetMethod("HandleAsync").Invoke(handler, new object[] { integrationEvent });
            }
        }
    }
    else
    {
        _logger.LogWarning("No subscription for RabbitMQ event: {EventName}", eventName);
    }
}

public void Dispose()
{
    if (_consumerChannel != null)
    {
        _consumerChannel.Dispose();
    }
    _subsManager.Clear();
}

} `

Here is the stack trace for the error I am receiving:

at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteValidator.ValidateResolution(Type serviceType, IServiceScope scope, IServiceScope rootScope) at Microsoft.Extensions.DependencyInjection.ServiceProvider.GetService(Type serviceType, ServiceProviderEngineScope serviceProviderEngineScope) at Microsoft.Extensions.DependencyInjection.ServiceLookup.ServiceProviderEngineScope.GetService(Type serviceType) at Microsoft.Extensions.DependencyInjection.ServiceProviderServiceExtensions.GetRequiredService(IServiceProvider provider, Type serviceType) at EventBus.EventBusRabbitMq.<ProcessEvent>d__18.MoveNext() in C:\Users\porterc\Documents\My Version of DDD\RetailInMotion-master\EventBus\EventBusRabbitMq.cs:line 260 at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw() at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at System.Runtime.CompilerServices.TaskAwaiter.GetResult() at EventBus.EventBusRabbitMq.<Consumer_Received>d__17.MoveNext() in C:\Users\porterc\Documents\My Version of DDD\RetailInMotion-master\EventBus\EventBusRabbitMq.cs:line 235

My question is why? If anyone can point me in the right direction or tell me what does this one line actually does I would be very grateful.

Ports
  • 95
  • 7
  • 1
    DbContext is, by default, added as a scoped service (e.g. `services.AddScoped...`) so you need a new scope to resolve it. – DavidG Jun 15 '22 at 17:33
  • Thanks @DavidG so by adding - UseServiceProviderFactory(new AutofacServiceProviderFactory()) this creates a new scope. Is that correct? – Ports Jun 15 '22 at 18:26
  • Can you update all the code in the Startup.cs file that contains the ProductDbContext in the post? Pls hide sensitive info. – Jason Pan Jun 16 '22 at 01:39
  • Hi @Jason, I have now added the code for the ProductDbContext above. Do you need me to include anymore? – Ports Jun 16 '22 at 08:26
  • [Pls check this answer.](https://stackoverflow.com/a/63935867/7687666) – Jason Pan Jun 16 '22 at 08:37
  • Thanks @Jason, but by setting `.UseDefaultServiceProvider(options => options.ValidateScopes = false)` is that best practice or is it better to use `.UseServiceProviderFactory(new AutofacServiceProviderFactory())` ? – Ports Jun 16 '22 at 11:44
  • These two have a completely different use casse. `options.ValidateScopes = false` will disable the exception when resolving a scoped service without having a scope, and some other errors (like injecting a scoped service into a singleton). `AutofacServiceProviderFactory` will use Autofac as a service provider instead of the default service provider. The fact that the second option will "fix" the exception is that Autofac probably ignores the flag and doesn't ever check whether scopes are valid. – Michael Jun 20 '22 at 07:42
  • What do you use to handle your integration events? – asgerhallas Jun 24 '22 at 10:57
  • Hi, @asgerhallas I am using RabbitMq. Is this what you mean? – Ports Jul 06 '22 at 17:04

2 Answers2

1

As the exception message says, your ProductAPI.IntegrationEvents.EventHandling.OrderCreatedIntegrationEventHandler is not resolved in a scope or is not registered as a scoped service.

Whatever event dispatcher you use ought to begin a scope before resolving the event handler and handling events. And the OrderCreatedIntegrationEventHandler must be registered as a service scoped too.

My guess is that Autofac does not perform any lifetime mismatch checks at startup and that's why it does not throw when you plug in Autofac. This does not mean, that it actually works, only that you hide a potential error. It may come up as an exception the first time you handle an event, or it may be even more severe in that it just uses an the same ProductDbContext over the lifetime of the event handler, which might be singleton.

But note that I havn't used Autofac, only other containers, so I don't know how exactly this library handles such issues.

EDIT 1

This line (260): var handler = _serviceProvider.GetRequiredService(subscription.HandlerType); in the provided source for EventBusRabbitMq needs to be changed to: var handler = scope.ServiceProvicer.GetRequiredService(subscription.HandlerType);.

Inside the scope you should resolve your services using the scope (as opposed to from the root service provider _serviceProvider from which you create your scope).

asgerhallas
  • 16,890
  • 6
  • 50
  • 68
  • Hi @asgerhallas, I have now included how I register my EventBus in my original post above. So do you mean I need to change `services.AddTransient();` and add it as scoped instead? – Ports Jul 06 '22 at 17:13
  • 1
    @Ports I have looked in to what I think is the EventBusRabbitMq you reference, and something confuses me. The version I can find does not have a constructor that takes IServiceProvider, so the newly added registraion code in the question, does not compile - mentally for me anyways :) Can you provide a link to the EventBusRabbitMq you use and a stack trace of the exception you get? – asgerhallas Jul 07 '22 at 10:43
  • 1
    Hi @asgerhallas, thank you for taking your time to look into this. I have included the EventBusRabbitMq class above and the stacktrace of the error. Am I right in think it is because I have registered my OrderCreatedIntegrationEventHandler as transient? – Ports Jul 07 '22 at 17:08
  • @Ports thanks for the source and the stack trace! Check my edits, let's see if that helps :) – asgerhallas Jul 07 '22 at 19:06
  • Brilliant, thank you @asgerhallas that worked. Just one more question. Where is the root service provider created and what is its default setting (i.e I thought it was created as scoped by default?)? – Ports Jul 08 '22 at 11:33
  • 1
    @Ports it's created when you call `Build()` in `var host = CreateHostBuilder(args).Build();`. It will then call your ConfigureServices implementation, if any. This ServiceProvider has no scope (or it is scoped to the lifetime of the application, so to speak). On an incoming http request the default pipeline will create a scope for that particular request. So http requests are "scoped by default", but your EventBusRabbitMq is another entry point into your application and thus is not using the same pipeline, and you have to create the scope yourself. – asgerhallas Jul 08 '22 at 11:46
  • 1
    Great thank you @asgerhallas for all your help and taking the time to explain it to me :-D – Ports Jul 08 '22 at 12:04
0

Try this, I think this might be to do more with the order it is constructed.

Host.CreateDefaultBuilder(args)
            .ConfigureWebHostDefaults(webBuilder => {
                webBuilder.UseStartup<Startup>();
            })
            .UseServiceProviderFactory(new AutofacServiceProviderFactory());
Netferret
  • 604
  • 4
  • 15
  • Also this may be of interest https://stackoverflow.com/questions/57822173/recommended-way-to-configure-autofac-in-asp-net-core-3-0 – Netferret Jun 21 '22 at 10:44