0

Okay, so I'm currently trying to create a file writing queue. Each file can only be written to one at a time. I have a controller that accepts a filename and byte array of data to write out and returns a Task that completes when the write is done.

The problem is that I don't need to write redundantly. I'd like it if each request for a given file would be put into a 'bucket', only writing the most recent request and ignoring the others. I would call it a 'queue', but being able to remove items from the end is counter to the definition of a queue (at least, in my understanding).

Is there a simple way to do this with Observable? Let me try and atomize the steps.

  • Accepts a string and byte array
  • If there is an item being processed for the string, put the Task 'on deck'
  • If the there is an item 'on deck' for the string, update that item with the new data

I'm having trouble with the second and third steps. I tried a simple Switch by filename, but I run in to the issue where it does not wait for one that is already running to finish before starting. It also returns immediately instead of waiting for the new item to finish.

Any help that you may be able to give would be greatly appreciated!

Nate Diamond
  • 5,525
  • 2
  • 31
  • 57
  • possible duplicate of: http://stackoverflow.com/questions/20081996/is-there-such-a-synchronization-tool-as-single-item-sized-async-task-buffer/20082924#20082924 – Servy Nov 19 '13 at 22:26
  • That may indeed work. Was that seriously just asked less than an hour before I asked? That's incredible. – Nate Diamond Nov 19 '13 at 23:03
  • Also, does this complete the task that was ignored? It seems to just get overwritten. Is it garbage collected? What happens to things that were waiting on it? – Nate Diamond Nov 19 '13 at 23:15

1 Answers1

0

I came up with this implementation, which allows for concurrent updates to different files, and leverages an Rx scheduler for easy and fast testability. The UpdateFile routine just sleeps for 5 virtual seconds, but you can change that for the real stuff.

Rather than a plain Task I return a Task<string> where the result is the filename that was updated. I also used a generic type TData for the update payload to support testing.

I'm sure there's room for improvement in efficiency, but this seems to be a correct implementation that is reasonably efficient:

public class FileUpdateController<TData>
{
    private readonly object _gate = new object();
    private readonly Dictionary<string, TaskCompletionSource<string>> _pendingTasks;
    private readonly Dictionary<string, TaskCompletionSource<string>> _runningTasks;
    private readonly Dictionary<string, TData> _dataCache;
    private readonly IScheduler _scheduler;

    public FileUpdateController(IScheduler scheduler)
    {
        if (scheduler == null) scheduler = ThreadPoolScheduler.Instance;
        _scheduler = scheduler;
        _pendingTasks = new Dictionary<string, TaskCompletionSource<string>>();
        _runningTasks = new Dictionary<string, TaskCompletionSource<string>>();
        _dataCache = new Dictionary<string, TData>();
    }

    public Task<string> QueueFileUpdate(string filename, TData data)
    {
        lock (_gate)
        {
            if (!_pendingTasks.ContainsKey(filename))
            {
                var tcs = new TaskCompletionSource<string>(filename);
                _pendingTasks.Add(filename, tcs);
            }

            var task = _pendingTasks[filename].Task;

            if (!_runningTasks.ContainsKey(filename))
            {
                MoveToRunning(filename, data);                    
            }
            else
            {
                _dataCache[filename] = data;
            }

            return task;
        }            
    }

    private void MoveToRunning(string filename, TData data)
    {
        _runningTasks.Add(filename, _pendingTasks[filename]);
        _pendingTasks.Remove(filename);
        _scheduler.Schedule(() => UpdateFile(filename, data));
    }

    private async void UpdateFile(string filename, TData data)
    {
        Console.WriteLine("Updating file " + filename + " with data " + data);
        await Observable.Timer(TimeSpan.FromSeconds(5), _scheduler).ToTask();
        lock (_gate)
        {
            var tcs = _runningTasks[filename];
            _runningTasks.Remove(filename);

            if (_pendingTasks.ContainsKey(filename))
            {
                var cachedData = _dataCache[filename];
                _dataCache.Remove(filename);
                MoveToRunning(filename, cachedData);                    
            }

            tcs.SetResult(filename);
            Console.WriteLine("Updated file " + filename + " with data " + data);
        }            
    }
}

And here are some of my tests (use nuget packages rx-testing and nunit). They do lack to ability to test that the correct data was updated, but you can expect test console output for that:

public class FileUpdateControllerTests
{
    [Test]
    public void SingleUpdate()
    {
        var scheduler = new TestScheduler();
        var sut = new FileUpdateController<string>(scheduler);

        var task = sut.QueueFileUpdate("test", "1");

        Assert.AreNotEqual(TaskStatus.RanToCompletion, task.Status);

        scheduler.Start();

        Assert.AreEqual(TaskStatus.RanToCompletion, task.Status);
        Assert.AreEqual("test", task.Result);
    }

    [Test]
    public void TwoUpdatesToSameFile()
    {
        var scheduler = new TestScheduler();
        var sut = new FileUpdateController<string>(scheduler);

        var task1 = sut.QueueFileUpdate("test", "1");
        var task2 = sut.QueueFileUpdate("test", "2");

        Assert.AreNotEqual(TaskStatus.RanToCompletion, task1.Status);

        scheduler.AdvanceBy(TimeSpan.FromSeconds(5).Ticks + 1);

        Assert.AreNotEqual(TaskStatus.RanToCompletion, task2.Status);
        Assert.AreEqual(TaskStatus.RanToCompletion, task1.Status);

        scheduler.AdvanceBy(TimeSpan.FromSeconds(5).Ticks + 2);
        Assert.AreEqual(TaskStatus.RanToCompletion, task2.Status);

        Assert.AreEqual("test", task1.Result);
        Assert.AreEqual("test", task2.Result);
    }

    [Test]
    public void TwoUpdatesToDifferentFiles()
    {
        var scheduler = new TestScheduler();
        var sut = new FileUpdateController<string>(scheduler);

        var task1 = sut.QueueFileUpdate("test1", "1");
        var task2 = sut.QueueFileUpdate("test2", "2");

        Assert.AreNotEqual(TaskStatus.RanToCompletion, task1.Status);

        scheduler.AdvanceBy(TimeSpan.FromSeconds(5).Ticks + 1);

        Assert.AreEqual(TaskStatus.RanToCompletion, task1.Status);
        Assert.AreEqual(TaskStatus.RanToCompletion, task2.Status);

        Assert.AreEqual("test1", task1.Result);
        Assert.AreEqual("test2", task2.Result);
    }

    [Test]
    public void UpdatingDataOnPendingFileWorks()
    {
        var scheduler = new TestScheduler();
        var sut = new FileUpdateController<string>(scheduler);

        var task1 = sut.QueueFileUpdate("test", "1");
        var task2 = sut.QueueFileUpdate("test", "2");
        var task3 = sut.QueueFileUpdate("test", "3");

        Assert.AreSame(task2, task3);

        scheduler.AdvanceBy(TimeSpan.FromSeconds(5).Ticks + 1);

        Assert.AreEqual(TaskStatus.RanToCompletion, task1.Status);
        Assert.AreNotEqual(TaskStatus.RanToCompletion, task2.Status);

        scheduler.AdvanceBy(TimeSpan.FromSeconds(5).Ticks + 1);

        Assert.AreEqual(TaskStatus.RanToCompletion, task2.Status);
    }
}
James World
  • 29,019
  • 9
  • 86
  • 120
  • To test the data write, you could do something like pass an `IFileUpdater` to the ctor with a `Task WriteFile(string filename, TData data)` method that you could mock out in the tests, and that you would call from the `UpdateFile` method... I'll leave that as an exercise. :) – James World Nov 20 '13 at 14:49