2

I'm working on Worker Service in .Net Core and Added DDD to it. In the Application Layer, I have created a Factory Pattern to Pick a specific Message Handler to handle different Kafka Topics. This approach is working fine. But, wanted to know is it the right way of implementing it? I am a bit confused about injecting constructor parameters in MessageFactory

public interface IMessageFactory
{
    IMessageHandler CreateMessage(string topicName);
}

Implementation of Factory Method

public class MessageFactory: IMessageFactory
    {
        private readonly ILogger<MessageFactory> _logger;
        private readonly IOneRepository _oneRepository;
        private readonly ITwoRepository _twoRepositoty;

        public MessageFactory(ILogger<MessageFactory> logger,
            IOneRepository oneRepository,
            ITwoRepository twoRepositoty)
        {
            _logger = logger;
            _oneRepository = oneRepository;
            _twoRepositoty = twoRepositoty;
        }

        public IMessageHandler CreateMessage(string topicName)
        {
            switch (topicName.ToUpper())
            {
                case KafkaConstants.OneMaster:
                    return new OneMessageHandler(_logger, _oneRepository );
                case KafkaConstants.TwoMaster:
                    return new TwoMessageHandler(_logger, _twoRepositoty );
                default:
                    return null;
            }
        }
    }
}

Individual Handlers

public class OneMessageHandler : IMessageHandler
{
    private readonly ILogger<MessageFactory> _logger;
    private readonly IOneRepository _oneRepository;

    public OneMessageHandler (ILogger<MessageFactory> logger,
        IOneRepository _oneRepository)
    {
        _logger = logger;
       _oneRepository = oneRepository;
    }

    public async Task<bool> ProcessMessage(string message)
    {
        ..........
    }
}

public class TwoMessageHandler : IMessageHandler
{
    private readonly ILogger<MessageFactory> _logger;
    private readonly ITwoRepository _twoRepository;

    public TwoMessageHandler(ILogger<MessageFactory> logger,
        ITwoRepository twoRepository)
    {
        _logger = logger;
        _twoRepository = twoRepository;
    }

    public async Task<bool> ProcessMessage(string message)
    {
        ..........
    }
}

Worker Service

public class Worker: BackgroundService
{
    private readonly IMessageFactory _messageFactory;
   
    public Worker(IMessageFactory messageFactory)
    {
        _messageFactory = messageFactory;
    }
    IMessageHandler messageHandler = _messageFactory.CreateMessage("OneMaster");
    messageHandler.ProcessMessage(message_from_kafka_response));
}

Dependency Injection

services.AddTransient<IMessageFactory, MessageFactory>();
Unknown Coder
  • 1,510
  • 2
  • 28
  • 56
  • Possible duplicate of [Factory method with DI and IoC](https://stackoverflow.com/a/31971691). – NightOwl888 May 17 '22 at 17:29
  • 1
    `return new OneMessageHandler(_logger, _oneRepository );` does this even work? I would expect it to throw because an `ILogger` is not an `ILogger` ? – Fildor May 17 '22 at 17:29
  • 1
    Presumably, you've also registered an implementation for `IOneRepository` and `ITwoRepository` with your DI container. We can't tell you if this is the "right" way to do it without knowing how you define "right" (without resorting to tautologies like "correct" or "best practice"). – Robert Harvey May 17 '22 at 17:29
  • @Fildor Sorry, that's a typo while adding a snippet here. – Unknown Coder May 17 '22 at 17:33
  • 2
    For me it looks good enough. But, I would have split processing from consuming (i.e you have something which produces messages until it meets the end, something which processes them and can be wrapped in loggers/metrics, and you can wrap it into retry background workers of various types). So, in other words at runtime you have set of background workers, their creation is responsibility of IoC containers. – eocron May 17 '22 at 17:33
  • @eocron can you give some more pointers on it, please. – Unknown Coder May 17 '22 at 17:36

1 Answers1

0

Well, I would make everything injectable, so I can register it like this (all classes self explanatory) without using factory and just using container:

container.RegisterSingleton<IHostedService>(x =>
    new RepeatBackgroundWorker(
        repeatInterval,
        new MessageHandler<TKey, TPayload>( //this can also be just something like void IWorker.RunAsync(cancellationToken)
            new KafkaMessageProducer<TKey, TPayload>(/*settings like name, consumer group and connection settings*/),
            x.GetByKey<IMessageProcessor<TKey, TPayload>>(/*processor name*/))));


//RepeatBackgroundWorker - it will generally repeat some background worker
//MessageHandler - it will consume message, process it, set new offset and until message producer enumerable reaches the end (end of queue)
//KafkaMessageProducer - it will just return something like IAsyncEnumerable<Message<TKey, TPayload>>
//IMessageProcessor - it will be your processor

as you see you have mutiple benefits:

  • can additionally wrap any part into general logging or metrics system, like MessageProcessorLogger, MessageProducerLogger, etc
  • can apply general filtering to producer (like grouping by Key or some field in payload, for example: grouping messages by UserId and taking last action performed, or wrap it into some batch wrapper, etc)
  • can manipulate handler behavior (how and when it restarts, what to do on errors, etc)
  • can manipulate various sources (like, changing kafka to rabbitmq etc)
  • can put entire factory into container or runtime
  • easy to use in tests, changing producer to some custom implementation, or mocking processor/consumer with something
  • this way you basically need to just implement a processor/consumer and specify name of queue (+consumer group for kafka) and it will automatically provide you with fine-named metrics and logs

PS I didnt implement all classes, because it depends on business requirements

PS2 This is basically what I use in production to squeeze kafka performance, get all logs into kibana and all metrics (message timestamp is particullary useful when you have spikes and want to rebalance up/down) to prometheus+grafana from all sources out of the box.

eocron
  • 6,885
  • 1
  • 21
  • 50