2

I have a thread that reads data in small chunks and puts it into ImmutableList<T>. I need to dump this list into a file. Originally I was dumping the list on every update but with time list grew and file size is now close to 200Mb so writing file takes too much time to keep it synchronous. I've made file writing asynchronous and the code I currently have is:

public ImmutableList<T> Items { get; private set; }

public void Insert(IEnumerable<T> items)
{
    lock (_syncObj)
    {
        Items = Items.AddRange(items).Sort(_sortOrder);
        QueueSaving();
    }
}

void SavingThread()
{
    for (; ; )
    {
        var snapshot = Items;
        SaveItems(snapshot);

        lock (_syncObj)
        {
            if (snapshot == Items)
                Monitor.Wait(_syncObj);
        }
    }
}

void QueueSaving()
{
    lock (_syncObj)
        Monitor.Pulse(_syncObj);
}

I.e. if there were many updates since the last write then writer just saves the last version. Obviously I now have to have a thread that is dedicated to saving and which sleeps when there are no updates.

What would be the cleanest code to avoid having a dedicated writer thread? I.e. run QueueSaving until file writer caught up with the list changes and finish, then start it on the next change?

Konstantin Spirin
  • 20,609
  • 15
  • 72
  • 90
  • 1
    Are you writing the whole file each time? If so could you write only the new items on the list since last write if it is taking too long to write. – DavidC Nov 30 '13 at 12:34
  • Thanks, I also considered similar approach using sharding. Data is serialized into a json array so there's a closing bracket and I can't just simply append the records as it will invalidate json format. I'd like to keep file writing logic as simple as possible and find a solution that uses asynchronous writer. – Konstantin Spirin Nov 30 '13 at 12:39
  • An ImmutableList is a convenient abstraction, stops you from having to think about thread-safety. But stops working well when you mutate the heck out of it with "small chunks". There's little evidence that your program isn't wasting lots of time on saving stale data. – Hans Passant Nov 30 '13 at 14:02
  • Have a look at [TPL Dataflow](http://msdn.microsoft.com/en-us/library/hh228603(v=vs.110).aspx). – noseratio Nov 30 '13 at 14:36
  • @svick yes, I can. Does it have anything that might help? – Konstantin Spirin Nov 30 '13 at 14:56
  • @Noseratio how would it help me? – Konstantin Spirin Nov 30 '13 at 14:58
  • 1
    @KonstantinSpirin, it may help you to rethink your data model and workflow. Currently your file is 200MB, but what if has grown to 2GB? – noseratio Nov 30 '13 at 15:30
  • @Noseratio I'm on 64bit machine with 16Gb of RAM so loading 2Gb of data into memory is not a problem. Besides it'll take roughly a year to grow to 2Gb at the current data rates. – Konstantin Spirin Nov 30 '13 at 15:37
  • I'll switch to a proper database when the time comes. Right now I want to know what's the best implementation of an asynchronous writer. – Konstantin Spirin Nov 30 '13 at 15:38
  • @HansPassant there's not too much of a wastage since writer only writes the latest version of the data and after that checks if data has changed while writing. I could add "cool down" delays to only start writing after certain period of inactivity but I don't want to complicate things. – Konstantin Spirin Nov 30 '13 at 15:40
  • 1
    That's certainly not the case, writing stale data is almost inevitable. Your Monitor.Pulse() is completely ignored when it is busy writing. It won't catch up either. You'd only ever a match between the file data and the actual list by a coincidence. A pretty low one if you mutate the list quicker than it can be written. – Hans Passant Nov 30 '13 at 15:46
  • @HansPassant The loop in `SavingThread` blocks on the monitor after writer has caught up with the reader. `QueueSaving` resumes the loop. I can't see any bugs here, what do you think needs fixing? – Konstantin Spirin Nov 30 '13 at 15:53
  • 1
    You assume that SavingThread entered the lock. It didn't, SaveItems is busy working on saving the previous list and no lock is held. So QueueSaving has no trouble acquiring the lock and the Pulse has no effect at all and the modified list never is saved. That's a bug. – Hans Passant Nov 30 '13 at 16:03
  • `SavingThread` only enters the lock to check if there were changes to the list since saving started. If there weren't then it starts waiting for changes. `Pulse` won't have effect if saving is in progress and that's fine because after saving finishes the loop will see that `Items != snapshot` and will start saving again. See second part of http://www.yoda.arachsys.com/csharp/threads/deadlocks.shtml – Konstantin Spirin Nov 30 '13 at 20:44

4 Answers4

3

As I understand from the comments, you're only looking to convert your logic into asynchronous code. Below is how this might be done without explicit separate threads (besides using Task.Run for the whole process).

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication
{
    public class Worker
    {
        class Item 
        {
            public string Data { get; set; }
        }

        const int SAVE_AFTER = 2;

        string _fileName;
        List<Item> _items;
        int _savedItemsCount = 0;

        CancellationToken _token;
        Task _processTask;

        Task _pendingSaveTask = null;

        // get next item
        async Task<Item> GetNextItemAsync()
        {
            await Task.Delay(500); // delay for testing
            return new Item { Data = "Item from " + DateTime.Now.ToString() };
        }

        // write
        async Task SaveItemsAsync(Item[] items)
        {
            if (_pendingSaveTask != null)
                await _pendingSaveTask; // await the previous save

            var text = items.Aggregate(String.Empty, (a, b) => a + b.Data + Environment.NewLine);

            using (var writer = new System.IO.StreamWriter(_fileName, append: false))
            {
                await writer.WriteAsync(text);
            }
        }

        // main process
        async Task ProcessAsync()
        {
            while (true)
            {
                _token.ThrowIfCancellationRequested();

                // start getting the next item
                var getNextItemTask = GetNextItemAsync();

                // save the snapshot if needed
                if (_items.Count >= _savedItemsCount + SAVE_AFTER)
                {
                    var snapshot = _items.ToArray();
                    _savedItemsCount = snapshot.Length;
                    _pendingSaveTask = SaveItemsAsync(snapshot);
                }

                // await the next item
                var item = await getNextItemTask;
                _items.Add(item);
            }
        }

        // start
        public void Start(CancellationToken token)
        {
            _token = token;
            _fileName = System.IO.Path.GetTempFileName();
            _items = new List<Item>();

            _processTask = Task.Run(new Func<Task>(ProcessAsync), _token);
        }

        // stop
        public void Stop()
        {
            if (_pendingSaveTask != null)
                _pendingSaveTask.Wait();

            try
            {
                _processTask.Wait(); // wait for the task to complete
            }
            catch (Exception ex)
            {
                // rethrow if anything but OperationCanceledException
                if (!(ex is OperationCanceledException))
                {
                    var aggEx = ex as AggregateException;
                    if (aggEx == null || !(aggEx.InnerException is OperationCanceledException))
                        throw;
                }
            }
        }
    }

    class Program
    {
        public static void Main()
        {
            var cts = new CancellationTokenSource();
            var worker = new Worker();

            Console.WriteLine("Start process");
            worker.Start(cts.Token);

            Thread.Sleep(10000);

            Console.WriteLine("Stop process");
            cts.Cancel();
            worker.Stop();

            Console.WriteLine("Press Enter to exit...");
            Console.ReadLine();
        }
    }
}

Note that if new items (GetNextItemAsync) arrive faster than SaveItemsAsync finishes saving the last snapshot, this implementation may end up with a growing chain of pending SaveItemsAsync calls. If this is a problem, you could deal with it by limiting SaveItemsAsync task to only one pending instance and using BlockingCollection to queue new snapshots.

[UPDATE] Here is a slightly improved version which eliminate redundant writes if updates are coming faster than saving. It doesn't use BlockingCollection but adds some extra cancellation logic to SaveItemsAsync instead. It's a console app, feel free to try it out to see what's going on. Try calling _saveTask = SaveItemsAsync(snapshot) a few times in a row.

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication
{
    public class Worker
    {
        class Item 
        {
            public string Data { get; set; }
        }

        const int SAVE_AFTER = 2;

        string _fileName;
        List<Item> _items;
        int _savedItemsCount = 0;

        CancellationToken _token;
        Task _processTask;

        Task _saveTask;
        CancellationTokenSource _saveTaskCts;

        // get next item
        async Task<Item> GetNextItemAsync()
        {
            Console.WriteLine("Enter GetNextItemAsync, thread: " + System.Threading.Thread.CurrentThread.ManagedThreadId);

            await Task.Delay(500); // delay for testing
            return new Item { Data = "Item from " + DateTime.Now.ToString() };
        }

        // save items
        async Task SaveItemsAsync(Item[] items)
        {
            // avoid multiple pending SaveItemsAsync tasks
            Console.WriteLine("Enter SaveItemsAsync, thread: " + System.Threading.Thread.CurrentThread.ManagedThreadId);

            var oldSaveTaskCts = _saveTaskCts;
            var oldSaveTask = _saveTask;

            var thisSaveTaskCts = CancellationTokenSource.CreateLinkedTokenSource(_token);

            _saveTaskCts = thisSaveTaskCts;
            _saveTask = null;

            // cancel the previous pending SaveItemsAsync, if any
            if (oldSaveTaskCts != null) 
            {
                oldSaveTaskCts.Cancel();
                if (oldSaveTask != null)
                    await oldSaveTask.WaitObservingCancellationAsync();
            }

            // another SaveItemsAsync call should lead to cancelling this one
            thisSaveTaskCts.Token.ThrowIfCancellationRequested();

            // execute the save logic on a pool thread, 
            // Task.Run automatically unwraps the nested Task<Task>
            await Task.Run(async () => 
            {
                // do the CPU-bound work: create textual representation of data
                var text = items.Aggregate(String.Empty, (agg, item) => agg + item.Data + Environment.NewLine);

                // write asynchronously
                Console.WriteLine("Write, thread: " + System.Threading.Thread.CurrentThread.ManagedThreadId);

                // StreamWriter doesn't support cancellation, so do it in two stages with MemoryStream
                using (var memoryStream = new MemoryStream())
                {
                    // write to a memory stream first
                    using (var writer = new StreamWriter(
                        memoryStream,
                        encoding: System.Text.Encoding.UTF8,
                        bufferSize: Environment.SystemPageSize,
                        leaveOpen: true))
                    {
                        await writer.WriteAsync(text);
                    }

                    thisSaveTaskCts.Token.ThrowIfCancellationRequested();

                    // copy the memory stream to the file
                    using (var fileStream = new FileStream(_fileName, FileMode.Create, FileAccess.Write))
                    {
                        // copy with possible cancellation
                        memoryStream.Seek(0, SeekOrigin.Begin);
                        await memoryStream.CopyToAsync(fileStream, Environment.SystemPageSize, thisSaveTaskCts.Token);
                    }
                }
            }, thisSaveTaskCts.Token);
        }

        // main process
        async Task ProcessAsync()
        {
            while (true)
            {
                // handle cancellation
                if (_token.IsCancellationRequested)
                {
                    // await the pending save if any, before throwing
                    if (_saveTask != null)
                        await _saveTask.WaitObservingCancellationAsync();
                    _token.ThrowIfCancellationRequested();
                }

                // handle last save errors if any
                if (_saveTask != null && _saveTask.IsFaulted)
                    await _saveTask.WaitObservingCancellationAsync();

                // start getting the next item
                var getNextItemTask = GetNextItemAsync();

                // save the snapshot if needed
                if (_items.Count >= _savedItemsCount + SAVE_AFTER)
                {
                    var snapshot = _items.ToArray();
                    _savedItemsCount = snapshot.Length;
                    _saveTask = SaveItemsAsync(snapshot);
                }

                // await the next item
                var item = await getNextItemTask;
                _items.Add(item);
            }
        }

        // start
        public void Start(CancellationToken token)
        {
            _token = token;
            _fileName = System.IO.Path.GetTempFileName();
            _items = new List<Item>();

            _processTask = Task.Run(new Func<Task>(ProcessAsync), _token);
        }

        // stop
        public void Stop()
        {
            _processTask.WaitObservingCancellation();
        }
    }

    // Main
    class Program
    {
        public static void Main()
        {
            var cts = new CancellationTokenSource();
            var worker = new Worker();

            Console.WriteLine("Start process");
            worker.Start(cts.Token);

            Thread.Sleep(10000);

            Console.WriteLine("Stop process");
            cts.Cancel();
            worker.Stop();

            Console.WriteLine("Press Enter to exit...");
            Console.ReadLine();
        }
    }

    // Useful extensions
    public static class Extras
    {
        // check if exception is OperationCanceledException
        public static bool IsOperationCanceledException(this Exception ex)
        {
            if (ex is OperationCanceledException)
                return true;

            var aggEx = ex as AggregateException;
            return aggEx != null && aggEx.InnerException is OperationCanceledException;
        }

        public static async Task WaitObservingCancellationAsync(this Task task)
        {
            try
            {
                await task; // await the task to complete
            }
            catch (Exception ex)
            {
                // rethrow if anything but OperationCanceledException
                if (!ex.IsOperationCanceledException())
                    throw;
            }
        }

        // a helper to wait for the task to complete and observe exceptions
        public static void WaitObservingCancellation(this Task task)
        {
            try
            {
                task.Wait(); // wait for the task to complete
            }
            catch (Exception ex)
            {
                // rethrow if anything but OperationCanceledException
                if (!ex.IsOperationCanceledException())
                    throw;
            }
        }
    }
}
noseratio
  • 59,932
  • 34
  • 208
  • 486
  • `List.ToArray` would require a lock: http://stackoverflow.com/questions/14946044/can-toarray-throw-an-exception – Konstantin Spirin Nov 30 '13 at 20:56
  • Do we really need to have a chain of pending `SaveItemsAsync` tasks? It seems like if updates are coming in faster than saving task finishes then your code will actually try to write the file from multiple threads and there will be an IO exception at best. – Konstantin Spirin Nov 30 '13 at 21:00
  • @KonstantinSpirin, I'm not familiar with the rest of your application, but in this implementation `_items.ToArray()` would require a lock **only** if `_items` items are accessed outside `Worker` object, e.g. in `Main`. – noseratio Dec 01 '13 at 00:21
  • @KonstantinSpirin, try this code. If updates are coming faster than saving, any new pending `SaveItemsAsync` calls will be serialized (because of `await _pendingSaveTask`). Note, `SaveItemsAsync` does **not** create a new thread just because it is `async`. Asynchrony doesn't assume multithreading. Inside `writer.WriteAsync(text)`, the Framework may implicitly use another thread from the thread pool, and the code after `await` may continue executing on that thread. However, that doesn't mean I have to add locks everywhere, because the code flow is still mostly linear. – noseratio Dec 01 '13 at 00:30
  • 1
    I've also posted an updated version to deal with the case when updates are coming faster than they get saved. – noseratio Dec 01 '13 at 05:24
  • Saving is now synchronous, `ProcessAsync` blocks until saving is complete before reading reading the next data. – Konstantin Spirin Dec 01 '13 at 08:33
  • 1
    @KonstantinSpirin, *Saving is now synchronous* - absolutely not. `SaveItemsAsync` is a background, asynchronous task, working in parallel with `GetNextItemAsync`. Have your tried the code? – noseratio Dec 01 '13 at 10:59
  • @KonstantinSpirin, I've replaced `Task.Yield` with explicit `Task.Run` inside `SaveItemsAsync`, if that was your concern. The latter is independent of synchronization context, and is more readable, IMO. This way, `ProcessAsync` can even be started on the UI thread, depending on how often your updates arrive. – noseratio Dec 02 '13 at 01:27
1

Start a writer Task when the count of unsaved items in the list crosses a threshold. You can put that logic into Insert under the existing lock. That way a writer thread/Task is only in existence when there is work to do.

I implemented a similar thing recently where I used a Timer to start the persistence work at a certain schedule.

usr
  • 168,620
  • 35
  • 240
  • 369
  • What's the nicest way to guarantee that you only have max one writer running at any time? Depending on which `Timer` you used you might have gotten this guarantee out of the box but Tasks are executed on a thread pool so I might end up running multiple at the same time. – Konstantin Spirin Nov 30 '13 at 20:47
  • 1
    Yeah, I'd use a bool instance variable accessed under a lock. If it is set, a task is running and you just exit. This can also be done with Interlocked but don't be clever in low-throughput cases like this one. – usr Nov 30 '13 at 20:48
  • I had pretty much the same idea, thanks for sharing. – Konstantin Spirin Nov 30 '13 at 21:02
0

Here's what I ended up writing:

public ImmutableList<T> Items { get; private set; }

public void Insert(IEnumerable<T> items)
{
    lock (_syncObj)
    {
        Items = Items.AddRange(items).Sort(_sortOrder);
        StartSaving();
    }
}

Task _activeSavingTask;

void SavingThread()
{
    for (;;)
    {
        var snapshot = Items;
        SaveItems(snapshot);

        lock (_syncObj)
        {
            if (snapshot == Items)
            {
                _activeSavingTask = null;
                return;
            }
        }
    }
}

void StartSaving()
{
    lock (_syncObj)
        if (_activeSavingTask == null)
            _activeSavingTask = Task.Factory
                .StartNew(SavingThread, TaskCreationOptions.LongRunning);
}
Konstantin Spirin
  • 20,609
  • 15
  • 72
  • 90
  • 1
    Accessing "Items" property from two threads without proper lock could cause issues – andrey.tsykunov Dec 03 '13 at 02:49
  • What kind of issues could it cause? `Items` is publicly read-only, all writes are done in a critical section inside `Insert`, reference type assignment is guaranteed to be atomic according to http://stackoverflow.com/questions/2192124/reference-assignment-is-atomic-so-why-is-interlocked-exchangeref-object-object so I can't think of any potential issues myself. – Konstantin Spirin Dec 03 '13 at 05:21
  • both read\write should be protected by lock (or explicit memory barrier). Otherwise thread reading it could get stale value from CPU cache or instructions could be reordered and cause results which very hard to predict – andrey.tsykunov Dec 06 '13 at 03:10
  • Agreed. Nice article on the topic: http://www.albahari.com/threading/part4.aspx#%5FNonBlockingSynch, especially impressive "Do We Really Need Locks and Barriers?" section. – Konstantin Spirin Dec 06 '13 at 04:59
0
class AsyncSaver<T> where T : class
{
    private readonly object _lock = new object();
    private readonly Func<T, Task> _save;
    private T _item;
    private bool _running;

    public AsyncSaver(Func<T, Task> save)
    {
        _save = save;
    }

    public void Enqueue(T item)
    {
        lock (_lock)
        {
            if (_running)
            {
                _item = item;
            }
            else
            {
                _running = true;
                Save(item);
            }
        }
    }

    private async void Save(T item)
    {
        await _save(item);

        lock (_lock)
        {
            if (_item != null)
            {
                var nextItem = _item;
                _item = null;
                Save(nextItem);
            }
            else
            {
                _running = false;
            }
        }
    }
}