1

I've a method which could be called by multiple threads, to write data to a database. To reduce database traffic, I cache the data and write it in a bulk.

Now I wanted to know, is there a better (for example lock-free pattern) to use?

Here is a Example how I do it at the moment?

    public class WriteToDatabase : IWriter, IDisposable
    {
        public WriteToDatabase(PLCProtocolServiceConfig currentConfig)
        {
            writeTimer = new System.Threading.Timer(Writer);
            writeTimer.Change((int)currentConfig.WriteToDatabaseTimer.TotalMilliseconds, Timeout.Infinite);
            this.currentConfig = currentConfig;
        }

        private System.Threading.Timer writeTimer;
        private List<PlcProtocolDTO> writeChache = new List<PlcProtocolDTO>();
        private readonly PLCProtocolServiceConfig currentConfig;
        private bool disposed;

        public void Write(PlcProtocolDTO row)
        {
            lock (this)
            {
                writeChache.Add(row);
            }
        }

        private void Writer(object state)
        {
            List<PlcProtocolDTO> oldCachce = null;
            lock (this)
            {
                if (writeChache.Count > 0)
                {
                    oldCachce = writeChache;
                    writeChache = new List<PlcProtocolDTO>();
                }
            }

            if (oldCachce != null)
            {
                    using (var s = VisuDL.CreateSession())
                    {
                        s.Insert(oldCachce);
                    }
            }

            if (!this.disposed)
                writeTimer.Change((int)currentConfig.WriteToDatabaseTimer.TotalMilliseconds, Timeout.Infinite);
        }

        public void Dispose()
        {
            this.disposed = true;
            writeTimer.Dispose();
            Writer(null);
        }
    }
user1237393
  • 137
  • 9
  • You may want to take a look at this: [Why is lock(this) {...} bad?](https://stackoverflow.com/questions/251391/why-is-lockthis-bad) – Theodor Zoulias Nov 13 '20 at 12:38

2 Answers2

1

There are a few issues I can see with the timer based code.

  • Even in the new version of the code there is still a chance to lose writes on restart or shutdown. The Dispose method is not waiting for the completion of the last timer callback that may be currently in progress. Since timer callbacks run on thread pool threads, which are background threads, they will be aborted when the main thread exits.
  • There is no limit on the size of the batches, this is going to break when you hit a limit of the underlying storage api (e.g. sql databases have a limit on query length and the number of parameters used).
  • since you're doing i/o the implementation should probably be async
  • This will behave poorly under load. in particular as the load keeps increasing the batches will get bigger and therefore slower to execute, a slower batch execution in turn will give the next one additional time to accumulate items making them even slower, etc... ultimately either writing the batch will fail (if you hit a sql limit or the query times out) or the application will just go out of memory. To handle high load you really have only two choices which are applying backpressure (i.e. slowing down the producers) or dropping writes.
  • you might want to allow a limited number of concurrent writers if the database can handle it.
  • There's a race condition on the disposed field which might result in an ObjectDisposedException in writeTimer.Change.

I think a better pattern that addresses the issues above is the consumer-producer pattern, you can implement it in .net with a ConcurrentQueue or with the new System.Threading.Channels api.

Also keep in mind that if your application crashes for any reason you will lose the records that are still buffered.

This is a sample implementation using channels:

public interface IWriter<in T>
{
    ValueTask WriteAsync(IEnumerable<T> items);
}

public sealed record Options(int BatchSize, TimeSpan Interval, int MaxPendingWrites, int Concurrency);

public class BatchWriter<T> : IWriter<T>, IAsyncDisposable
{
    readonly IWriter<T> writer;
    readonly Options options;
    readonly Channel<T> channel;
    readonly Task[] consumers;

    public BatchWriter(IWriter<T> writer, Options options)
    {
        this.writer = writer;
        this.options = options;

        channel = Channel.CreateBounded<T>(new BoundedChannelOptions(options.MaxPendingWrites)
        {
            // Choose between backpressure (Wait) or
            // various ways to drop writes (DropNewest, DropOldest, DropWrite).
            FullMode = BoundedChannelFullMode.Wait,

            SingleWriter = false,
            SingleReader = options.Concurrency == 1
        });

        consumers = Enumerable.Range(start: 0, options.Concurrency)
            .Select(_ => Task.Run(Start))
            .ToArray();
    }

    async Task Start()
    {
        var batch = new List<T>(options.BatchSize);

        var timer = Task.Delay(options.Interval);
        var canRead = channel.Reader.WaitToReadAsync().AsTask();

        while (true)
        {
            if (await Task.WhenAny(timer, canRead) == timer)
            {
                timer = Task.Delay(options.Interval);
                await Flush(batch);
            }
            else if (await canRead)
            {
                while (channel.Reader.TryRead(out var item))
                {
                    batch.Add(item);

                    if (batch.Count == options.BatchSize)
                    {
                        await Flush(batch);
                    }
                }

                canRead = channel.Reader.WaitToReadAsync().AsTask();
            }
            else
            {
                await Flush(batch);
                return;
            }
        }

        async Task Flush(ICollection<T> items)
        {
            if (items.Count > 0)
            {
                await writer.WriteAsync(items);
                items.Clear();
            }
        }
    }

    public async ValueTask WriteAsync(IEnumerable<T> items)
    {
        foreach (var item in items)
        {
            await channel.Writer.WriteAsync(item);
        }
    }

    public async ValueTask DisposeAsync()
    {
        channel.Writer.Complete();
        await Task.WhenAll(consumers);
    }
}
Roald
  • 1,722
  • 11
  • 21
  • It was only a striped down sample, to get the Idea :-) We have multiple Locations in our Code where we collect some Changes and handle them in a batch. In reality, I start the Timer again, when the Timer Callback is finished. And also I stop the timer with a dispose and write the cache to the database. – user1237393 Nov 17 '20 at 23:36
  • @user1237393 are you also limiting the batch size and throttling producers? – Roald Nov 18 '20 at 10:12
0

Instead of using a mutable List and protecting it using locks, you could use an ImmutableList, and stop worrying about the possibility of the list being mutated by the wrong thread at the wrong time. With immutable collections it is cheap and easy to pass around snapshots of your data, because you don't need to block the writers (and possibly also the readers) while creating copies of the data. An immutable collection is a snapshot by itself.

Although you don't have to worry about the contents of the collection, you still have to worry about its reference. This is because updating an immutable collection means replacing the reference to the old collection with a new collection. You don't want to have multiple threads swapping references in an uncontrollable manner, so you still need some sort of synchronization. You can still use locks, but it is quite easy to avoid locking altogether by using interlocked operations. The example below uses the handy ImmutableInterlocked.Update method, that allows to do an atomic update-and-swap in a single line:

private ImmutableList<PlcProtocolDTO> writeCache
    = ImmutableList<PlcProtocolDTO>.Empty;

public void Write(PlcProtocolDTO row)
{
    ImmutableInterlocked.Update(ref writeCache, x => x.Add(row));
}

private void Writer(object state)
{
    IList<PlcProtocolDTO> oldCache = Interlocked.Exchange(
        ref writeCache, ImmutableList<PlcProtocolDTO>.Empty);

    using (var s = VisuDL.CreateSession())
        s.Insert(oldCache);
}

private void Dump()
{
    foreach (var row in Volatile.Read(ref writeCache))
        Console.WriteLine(row);
}

Here is the description of the ImmutableInterlocked.Update method:

Mutates a value in-place with optimistic locking transaction semantics via a specified transformation function. The transformation is retried as many times as necessary to win the optimistic locking race.

This method can be used for updating any type of reference-type variables. Its usage may be increased with the advent of the new C# 9 record types, that are immutable by default, and are intended to be used as such.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • Do you think a ImmutableList is more perfromant then a Lock? I don't know, but with this, the List is recreated every time... Maybe I need to do some tests with benchmark dotnet – user1237393 Nov 13 '20 at 17:52
  • and is it safe that no write is lost? why do i then need the ImmutableList? – user1237393 Nov 13 '20 at 18:03
  • @user1237393 creating new immutable lists frequently is affordable because they are implemented internally as binary trees, consisting of mostly reusable nodes. The performance of basic operations on immutable collection is not great, compared to the same operations on normal or concurrent collections (they are usually at least 10 times slower). The payoff comes whenever you need to take snapshots of the data. If you never need snapshots, then there is no performance payoff, and the only advantage that remains is the architectural one (which is subjective of course). – Theodor Zoulias Nov 13 '20 at 18:07
  • @user1237393 if you use interlocked operations or locks correctly, no update is going to be lost. With a (pessimistic) `lock` you need to protect both the adding to the collection and the reference swap. With the (optimistic) `ImmutableInterlocked.Update`, you can be sure that in case of a race with another thread the update operation will be repeated on the version returned previously by the thread that won the race. The price of eliding the lock is the possibility of spinning a little (and creating some garbage for the GC). In any case there is no possibility for an update to escape. – Theodor Zoulias Nov 13 '20 at 18:19