3

I'm working on a program where I receive data from SignalR, perform processing, and then send a SignalR message back to the client once the processing has finished. I've found a couple of resources for how to do this, but I can't quite figure out how to implement it in my project.

Here's what my code looks like:

Bootstrapping


    public static void Main(string[] args)
    {
        CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
        List<ISystem> systems = new List<ISystem>
        {
            new FirstProcessingSystem(),
            new SecondProcessingSystem(),
        };
        Processor processor = new Processor(
            cancellationToken: cancellationTokenSource.Token,
            systems: systems);
        processor.Start();
        CreateHostBuilder(args).Build().Run();
        cancellationTokenSource.Cancel();
    }

    public static IHostBuilder CreateHostBuilder(string[] args) =>
        Host.CreateDefaultBuilder(args)
            .ConfigureWebHostDefaults(webBuilder =>
            {
                webBuilder.UseStartup<Startup>();
            });

    public class Startup
    {
        // This method gets called by the runtime. Use this method to add services to the container.
        // For more information on how to configure your application, visit https://go.microsoft.com/fwlink/?LinkID=398940
        public void ConfigureServices(IServiceCollection services)
        {
            services.AddSignalR();
        }

        // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
        public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
        {
            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }
            app.UseRouting();
            app.UseEndpoints(endpoints =>
            {
                endpoints.MapHub<TestHub>("/testHub");
            });
        }
    }

TestHub.cs


    public class TestHub : Hub
    {
        public async Task DoStuff(Work work)
        {
            FirstProcessingSystem.ItemsToProcess.Add(work);
        }
    }

Work.cs


    public class Work
    {
        public readonly string ConnectionId;
        public readonly string Data;

        public Work(string connectionId, string data)
        {
            ConnectionId = connectionId;
            Data = data;
        }
    }

Processor.cs

    public class Processor
    {
        readonly CancellationToken CancellationToken;
        readonly List<ISystem> Systems;

        public Processor(
            CancellationToken cancellationToken,
            List<ISystem> systems)
        {
            CancellationToken = cancellationToken;
            Systems = systems;
        }

        public void Start()
        {
            Task.Run(() =>
            {
                while (!CancellationToken.IsCancellationRequested)
                {
                    foreach (var s in Systems)
                        s.Process();
                }
            });
        }
    }

Systems


    public interface ISystem
    {
        void Process();
    }

    public class FirstProcessingSystem : ISystem
    {
        public static ConcurrentBag<Work> ItemsToProcess = new ConcurrentBag<Work>();

        public void Process()
        {
            while (!ItemsToProcess.IsEmpty)
            {
                Work work;
                if (ItemsToProcess.TryTake(out work))
                {
                    // Do things...
                    SecondProcessingSystem.ItemsToProcess.Add(work);
                }
            }
        }
    }

    public class SecondProcessingSystem : ISystem
    {
        public static ConcurrentBag<Work> ItemsToProcess = new ConcurrentBag<Work>();

        public void Process()
        {
            while (!ItemsToProcess.IsEmpty)
            {
                Work work;
                if (ItemsToProcess.TryTake(out work))
                {
                    // Do more things...
                    // Hub.Send(work.ConnectionId, "Finished");
                }
            }
        }
    }

I know that I can perform the processing in the Hub and then send back the "Finished" call, but I'd like to decouple my processing from my inbound messaging that way I can add more ISystems when needed.

Can someone please with this? (Also, if someone has a better way to structure my program I'd also appreciate the feedback)

Divyang Desai
  • 7,483
  • 13
  • 50
  • 76
c6039534
  • 33
  • 4

1 Answers1

2

aspnet has a very powerful dependency injection system, why don't you use it? By creating your worker services without dependency injection, you'll have a hard time using anything provided by aspnet.

Since your "processing systems" seem to be long running services, you'd typically have them implement IHostedService, then create a generic service starter (taken from here):

public class BackgroundServiceStarter<T> : IHostedService where T : IHostedService
{
    readonly T _backgroundService;

    public BackgroundServiceStarter(T backgroundService)
    {
        _backgroundService = backgroundService;
    }

    public Task StartAsync(CancellationToken cancellationToken)
    {
        return _backgroundService.StartAsync(cancellationToken);
    }

    public Task StopAsync(CancellationToken cancellationToken)
    {
        return _backgroundService.StopAsync(cancellationToken);
    }
}

then register them to the DI container in ConfigureServices:

// make the classes injectable
services.AddSingleton<FirstProcessingSystem>();
services.AddSingleton<SecondProcessingSystem>();

// start them up
services.AddHostedService<BackgroundServiceStarter<FirstProcessingSystem>>();
services.AddHostedService<BackgroundServiceStarter<SecondProcessingSystem>>();

Now that you got all that set up correctly, you can simply inject a reference to your signalR hub using IHubContext<TestHub> context in the constructor parameters of whatever class that needs it (as described in some of the links you posted).

Markus Dresch
  • 5,290
  • 3
  • 20
  • 40
  • Thank you very much for the response! I'm a bit confused by how this would work. Is it important that my StartAsync() and StopAsync() are implemented? Or is it only used for the DI? Using the code you provided I was able to get it working, but I'm not familiar enough with .NET to understand what's happening behind the scenes. For the moment I have both StartAsync() and StopAsync() returning Task.CompletedTask. – c6039534 Oct 24 '19 at 14:22
  • `StartAsync` and `StopAsync` have to be there to satisfy the interface, but you only have to add implementation if there is some work to do when starting and stopping, you should be fine with `Task.CompletedTask`. What's happening is, the DI container will create a single instance of both your classes and inject the SignalR Hub behind the scenes. And `AddHostedService` spins up those two classes as long running services, so they keep running as long as your WebAPI is alive. – Markus Dresch Oct 24 '19 at 14:45
  • Is that to say that if I create my ISystems and DI them into my Processor while also adding them as IHostedServices there will effectively be two instances of my ISystems? Would that also mean that if my SecondProcessingSystem had the IHubContext DI and set as a `public static IHubContext TestHubContext;` that if I use it in my Process() it could potentially lead to a race condition? (i.e., the singleton instance sets SecondProcessingSystem.TestHubContext while Process() tries to use it?) – c6039534 Oct 24 '19 at 14:54
  • if you DI them into your "Process" there should only be one instance of them. you probably shouldn't DI them using the ISystems interface though i guess. Unless you prepare that List you got originally as singleton as well, using the instances from the service collection. – Markus Dresch Oct 24 '19 at 16:14