0

In an application, many actions need to be logged in the database. but this logging process should not slow down the request. So they should be done in an asynchronous queue or something.

This is my big picture:

big picture

And this is my example implementation:

Model:

public class ActivityLog
{
    [DatabaseGenerated(DatabaseGeneratedOption.Identity)]
    public string Id { get; set; }
    public IPAddress IPAddress { get; set; }
    public string Action { get; set; }
    public string? Metadata { get; set; }
    public string? UserId { get; set; }
    public DateTime CreationTime { get; set; }
}

Queue:

public class LogQueue
{
    private const int QueueCapacity = 1_000_000; // is one million enough?

    private readonly BlockingCollection<ActivityLog> logs = new(QueueCapacity);

    public bool IsCompleted => logs.IsCompleted;
    public void Add(ActivityLog log) => logs.Add(log);
    public IEnumerable<ActivityLog> GetConsumingEnumerable() => logs.GetConsumingEnumerable();
    public void Complete() => logs.CompleteAdding();
}

Worker (the problem is here):

public class DbLogWorker : IHostedService
{
    private readonly LogQueue queue;
    private readonly IServiceScopeFactory scf;
    private Task jobTask;

    public DbLogWorker(LogQueue queue, IServiceScopeFactory scf)
    {
        this.queue = queue;
        this.scf = scf;

        jobTask = new Task(Job, TaskCreationOptions.LongRunning);
    }
  
    private void Job()
    {
        using var scope = scf.CreateScope();
        var dbContext = scope.ServiceProvider.GetRequiredService<ApplicationDbContext>();

        // The following code does not work
        // My intension was to reduce DB trips
        //while(!queue.IsCompleted)
        //{
        //    var items = queue.GetConsumingEnumerable();
        //    dbContext.AddRange(items);
        //    dbContext.SaveChanges();
        //}

        // But this works
        // If I have 10 items available, I'll have 10 DB trips (not good), right?
        foreach (var item in queue.GetConsumingEnumerable())
        {
            dbContext.Add(item);
            dbContext.SaveChanges();
        }
    }

    public Task StartAsync(CancellationToken cancellationToken)
    {
        jobTask.Start();
        return Task.CompletedTask;
    }

    public Task StopAsync(CancellationToken cancellationToken)
    {
        queue.Complete();
        jobTask.Wait(); // or 'await jobTask' ?
        return Task.CompletedTask; // unnecessary if I use 'await jobTask'
    }
}

Dependency Injection:

builder.Services.AddSingleton<LogQueue>();
builder.Services.AddHostedService<DbLogWorker>();

Controller:

[HttpGet("/")]
public IActionResult Get(string? name = "N/A")
{
    var log = new ActivityLog()
    {
        CreationTime = DateTime.UtcNow,
        Action = "Home page visit",
        IPAddress = HttpContext.Connection.RemoteIpAddress ?? IPAddress.Any,
        Metadata = $"{{ name: {name} }}",
        UserId = User.FindFirstValue(ClaimTypes.NameIdentifier)
    };

    queue.Add(log);

    return Ok("Welcome!");
}

As I explained in the comments, I want to get as many as items that are available and save them with one DB trip. My solution was the following code, but it doesn't work:

while (!queue.IsCompleted)
{
    var items = queue.GetConsumingEnumerable();
    dbContext.AddRange(items);
    dbContext.SaveChanges();
}

So instead, I'm using this which does a DB trip per each row:

foreach (var item in queue.GetConsumingEnumerable())
{
    dbContext.Add(item);
    dbContext.SaveChanges();
}

I also have two side questions: How can I increase workers? How can I increase workers dynamically based on queue count?

I created an example project for this question:

enter image description here

Parsa99
  • 307
  • 1
  • 13
  • take a look at https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/how-to-implement-a-producer-consumer-dataflow-pattern where the consumer will save the processed items to the DB (preferably in chunks) further reading https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/walkthrough-using-batchblock-and-batchedjoinblock-to-improve-efficiency – Mohamed Jun 18 '22 at 10:31
  • Related: [How to consume a BlockingCollection in batches](https://stackoverflow.com/questions/12260166/how-to-consume-a-blockingcollectiont-in-batches) – Theodor Zoulias Jun 18 '22 at 10:37
  • When you mentioned "but it doesn't work", are you encountering an exception thrown ? If so, able to indicate what was the exception? Or was there no exceptions thrown, and the data is not saved to the DB? I also noticed that there is no while loop in the working variant, could the issue be the checking of IsCompleted? – Markuzy Jun 18 '22 at 12:16
  • @Markuzy | No exception. but nothing is in DB, in some scenarios the app does not start at all, which means the IHostedServices does not exit the StartAsync(). – Parsa99 Jun 18 '22 at 13:14
  • As a side note, the `jobTask.Start();` schedules the `jobTask` on the ambient `TaskScheduler.Current`. To be on the safe side [it is recommended](https://learn.microsoft.com/en-us/dotnet/fundamentals/code-analysis/quality-rules/ca2008 "CA2008: Do not create tasks without passing a TaskScheduler") to schedule your tasks explicitly on well-known schedulers, like the `TaskScheduler.Default`, which executes the tasks on the `ThreadPool`. – Theodor Zoulias Jun 18 '22 at 14:57
  • Isn't `ThreadPool` [the default](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.taskscheduler?view=net-6.0#the-default-task-scheduler-and-the-thread-pool)? – Parsa99 Jun 18 '22 at 16:01
  • The `TaskScheduler.Default` (`ThreadPool`) is the default scheduler for the `Parallel` class, the PLINQ, the TPL Dataflow and the `Task.Run` method. It's not the default scheduler for the `Task.Factory.StartNew`, `Task.Start`, `Task.RunSynchronously` and `Task.ContinueWith`. Which is part of the reason why using these APIs is generally discouraged. – Theodor Zoulias Jun 19 '22 at 06:16
  • AFAIK there a lot of loggers which can write to db, why do you need to reinvent the wheel? – Guru Stron Jun 19 '22 at 11:54
  • Another side note: I've seen people more frequently inheriting the [`BackgroundService`](https://learn.microsoft.com/en-us/dotnet/api/microsoft.extensions.hosting.backgroundservice) class than implementing the `IHostedService` interface directly. Working with the `BackgroundService` seems more suitable for developers with small/medium familiarity with the technology. – Theodor Zoulias Jun 20 '22 at 18:33
  • And another one: An alternative to the `BlockingCollection` is the [`Channel`](https://learn.microsoft.com/en-us/dotnet/api/system.threading.channels.channel-1) class. It has the advantage that it can be consumed without blocking a thread, because it has asynchronous API. Microsoft thinks that using channels [should be encouraged](https://github.com/dotnet/runtime/pull/69631#issuecomment-1152262138 "optimize BlockingCollection.GetConsumingEnumerable iterator") over using the `BlockingCollection` class. – Theodor Zoulias Jun 20 '22 at 18:39

2 Answers2

-1

This is what I ended up with in case anyone needs it (thanks to Theodor Zoulias's hint):

public class DbLogWorker : IHostedService
{
    private const int BufferThreshold = 100;
    private readonly TimeSpan BufferTimeLimit = TimeSpan.FromSeconds(20);
    private DateTime lastDbUpdate = DateTime.UtcNow;

    private readonly LogQueue queue;
    private readonly IServiceScopeFactory scf;

    private Task jobTask;

    public DbLogWorker(LogQueue queue, IServiceScopeFactory scf)
    {
        this.queue = queue;
        this.scf = scf;

        jobTask = new Task(Job, TaskCreationOptions.LongRunning);
    }

    private void Job()
    {
        using var scope = scf.CreateScope();
        var dbContext = scope.ServiceProvider.GetRequiredService<ApplicationDbContext>();

        ActivityLog nextItem;
        try
        {
            while (queue.TryTake(out nextItem, -1))
            {
                var buffer = new List<ActivityLog>();
                buffer.Add(nextItem);

                while (buffer.Count < BufferThreshold && 
                    DateTime.UtcNow - lastDbUpdate < BufferTimeLimit)
                {
                    if (queue.TryTake(out nextItem))
                        buffer.Add(nextItem);
                }

                dbContext.AddRange(buffer);
                dbContext.SaveChanges();

                lastDbUpdate = DateTime.UtcNow;
            }
        }
        catch (ObjectDisposedException) 
        {
            // queue completed?
        }
    }

    public Task StartAsync(CancellationToken cancellationToken)
    {
        jobTask.Start(TaskScheduler.Default);
        return Task.CompletedTask;
    }

    public Task StopAsync(CancellationToken cancellationToken)
    {
        queue.Complete();
        jobTask.Wait(); // or 'await job' ?
        return Task.CompletedTask; // unnecessary if I use 'await job'
    }
}
Parsa99
  • 307
  • 1
  • 13
  • I am not sure what's the intention for comparing the current time with the time of the last update. This check makes the inner `while` loop a tight loop. "Tight loop" is called a situation where a CPU is spinning in full speed without doing anything productive (other than converting electricity to heat). – Theodor Zoulias Jun 20 '22 at 17:29
  • My intension is to log every 20 seconds unless we have more than 100 logs (in order to reduce DB trips and also prevent huge DB requests). I can add a timeout for the inner loop TryTake as well. – Parsa99 Jun 20 '22 at 17:59
  • Yes, you can fix this problem with carefully calculated timeouts or timer-based cancellation tokens. And you should definitely do so, because tight loops are bad. But it is questionable whether it is appropriate to solve a problem here that has not been asked in the question. Your answer might be downvoted, and rightfully so, for being off topic. My suggestion is to focus on the question, answer precisely what has been asked, and keep any further development for yourself. Or post a new question with the broader problem, and self-answer it there. – Theodor Zoulias Jun 20 '22 at 18:19
  • Well the question was about batch consumption of a blocking collection in order to reduce DB trips, and since `IsCompleted` cannot be used for that purpose, I don't see this answer as an off topic one, because it exactly tackles that issue. – Parsa99 Jun 21 '22 at 04:23
  • Where is it mentioned in the question the requirement of saving the logs every X seconds, unless there are more than Y pending logs? – Theodor Zoulias Jun 21 '22 at 05:19
-2

I can recommend you to write your worker as follows, looks like it will help you:

public class ActivityWorker : IWorker
    {
        // logs
        private static readonly Logger Logger = LogManager.GetLogger(nameof(ActivityWorker));

        // services (if necessary)

       
        private readonly BlockingCollection<ActivityLog> _tasks = new BlockingCollection<ActivityLog>();
        private Task _processingTask;

        
        public ActivityWorker(// your services if required)
        {
            
        }

       
        public void Start()
        {
            _processingTask = Task.Factory.StartNew(Process, TaskCreationOptions.LongRunning);
        }

        public void Stop()
        {
            _tasks.CompleteAdding();
            _processingTask?.Wait();
        }
        
        // push your activity log here to collection
        public void Push(ActivityLog task)
        {
            _tasks.Add(task);
        }

        private void Process()
        {
            try
            {
                while (!_tasks.IsCompleted)
                {
                    var task = _tasks.Take();
                    ProcessTask(task);
                }
            }
            catch (InvalidOperationException)
            {
                Logger.Warn($"ActivityLog tasks worker have been stopped");
            }
            catch (Exception ex)
            {
                Logger.Error(ex);
            }
        }

        private void ProcessTask(ActivityLog task)
        {
            // YOUR logic here
        }

        

        public void Dispose()
        {
            _tasks?.CompleteAdding();
            _processingTask?.Wait();
            _processingTask?.TryDispose();
            _tasks?.TryDispose();
        }
    }
IKomarovLeonid
  • 352
  • 1
  • 9