3

Many times in UI development I handle events in such a way that when an event first comes - I immediately start processing, but if there is one processing operation in progress - I wait for it to complete before I process another event. If more than one event occurs before the operation completes - I only process the most recent one.

The way I typically do that my process method has a loop and in my event handler I check a field that indicates if I am currently processing something and if I am - I put my current event arguments in another field that is basically a one item sized buffer and when current processing pass completes - I check if there is some other event to process and I loop until I am done.

Now this seems a bit too repetitive and possibly not the most elegant way to do it, though it seems to otherwise work fine for me. I have two questions then:

  1. Does what I need to do have a name?
  2. Is there some reusable synchronization type out there that could do that for me?

I'm thinking of adding something to the set of async coordination primitives by Stephen Toub that I included in my toolkit.

Filip Skakun
  • 31,624
  • 6
  • 74
  • 100
  • `If more than one event occurs before the operation completes - I only process the most recent one.` My gut reaction to this is "Reactive Extensions". – Stephen Cleary Nov 19 '13 at 23:40
  • @StephenCleary [Here are several options on how to do that with Rx.](http://stackoverflow.com/q/11010602/41071) – svick Nov 20 '13 at 15:37

3 Answers3

2

So first, we'll handle the case that you described in which the method is always used from the UI thread, or some other synchronization context. The Run method can itself be async to handle all of the marshaling through the synchronization context for us.

If we're running we just set the next stored action. If we're not, then we indicate that we're now running, await the action, and then continue to await the next action until there is no next action. We ensure that whenever we're done we indicate that we're done running:

public class EventThrottler
{
    private Func<Task> next = null;
    private bool isRunning = false;

    public async void Run(Func<Task> action)
    {
        if (isRunning)
            next = action;
        else
        {
            isRunning = true;
            try
            {
                await action();
                while (next != null)
                {
                    var nextCopy = next;
                    next = null;
                    await nextCopy();
                }
            }
            finally
            {
                isRunning = false;
            }
        }
    }

    private static Lazy<EventThrottler> defaultInstance =
        new Lazy<EventThrottler>(() => new EventThrottler());
    public static EventThrottler Default
    {
        get { return defaultInstance.Value; }
    }
}

Because the class is, at least generally, going to be used exclusively from the UI thread there will generally need to be only one, so I added a convenience property of a default instance, but since it may still make sense for there to be more than one in a program, I didn't make it a singleton.

Run accepts a Func<Task> with the idea that it would generally be an async lambda. It might look like:

public class Foo
{
    public void SomeEventHandler(object sender, EventArgs args)
    {
        EventThrottler.Default.Run(async () =>
        {
            await Task.Delay(1000);
            //do other stuff
        });
    }
}

Okay, so, just to be verbose, here is a version that handles the case where the event handlers are called from different threads. I know you said that you assume they're all called from the UI thread, but I generalized it a bit. This means locking over all access to instance fields of the type in a lock block, but not actually executing the function inside of a lock block. That last part is important not just for performance, to ensure we're not blocking items from just setting the next field, but also to avoid issues with that action also calling run, so that it doesn't need to deal with re-entrancy issues or potential deadlocks. This pattern, of doing stuff in a lock block and then responding based on conditions determined in the lock means setting local variables to indicate what should be done after the lock ends.

public class EventThrottlerMultiThreaded
{
    private object key = new object();
    private Func<Task> next = null;
    private bool isRunning = false;

    public void Run(Func<Task> action)
    {
        bool shouldStartRunning = false;
        lock (key)
        {
            if (isRunning)
                next = action;
            else
            {
                isRunning = true;
                shouldStartRunning = true;
            }
        }

        Action<Task> continuation = null;
        continuation = task =>
        {
            Func<Task> nextCopy = null;
            lock (key)
            {
                if (next != null)
                {
                    nextCopy = next;
                    next = null;
                }
                else
                {
                    isRunning = false;
                }
            }
            if (nextCopy != null)
                nextCopy().ContinueWith(continuation);
        };
        if (shouldStartRunning)
            action().ContinueWith(continuation);
    }
}
Servy
  • 202,030
  • 26
  • 332
  • 449
  • Here's take two. I think this should be more in line with what you were expecting. – Servy Nov 19 '13 at 21:57
  • Does this complete the task that was ignored? It seems to just get overwritten (at `next = action`). Is it garbage collected? What happens to things that were waiting on it? – Nate Diamond Nov 19 '13 at 23:16
  • @NateDiamond This doesn't even *start* the task, as the function to create it isn't called. Given that it never exists in the first place, it obviously is never completed nor is it GC-ed. That is, unless the function is just referring to an already started task, but that would more or less defeat the purpose of the whole model if you've already started the tasks. Also note that you don't really need to worry about garbage collection, as a general rule. Items go away when they're not referenced, so it pretty much just takes care of itself. – Servy Nov 19 '13 at 23:41
  • Thanks, the single-threaded solution you have is pretty much an object-wrapped reusable version of what I usually do and the thread safe version looks pretty well thought through. I'll need to test it when I get a chance and include in my library if you are OK with that (it's MIT licensed). Other than that - I'd still like to figure out a better name for it than just throttler since it is different than the Throttle method from RX and it doesn't even deal with events per se. – Filip Skakun Nov 20 '13 at 08:29
  • Throttling is what it does though, so maybe something like `SingleTaskThrottleAsync`? – Filip Skakun Nov 20 '13 at 08:30
  • @FilipSkakun Yeah, I agree the name doesn't seem the best, but I simply haven't thought of anything better. – Servy Nov 20 '13 at 15:10
  • Thanks, I included that in the latest version of WinRT XAML Tooklit [here](https://winrtxamltoolkit.codeplex.com/SourceControl/latest#WinRTXamlToolkit/Tools/EventThrottlerMultiThreaded.cs). I just did one change to the multithreaded version where I call `ContinueWith(continuation, TaskScheduler.FromCurrentSynchronizationContext())` to ensure that it continues on the same (UI) thread, which seems to work for now, but I am not sure is really correct. Perhaps I should add overloads of the method to specify synchronization context for continuations or simply a `Dispatcher` version of the class. – Filip Skakun Dec 10 '13 at 20:43
2

Does what I need to do have a name?

What you're describing sounds a bit like a trampoline combined with a collapsing queue. A trampoline is basically a loop that iteratively invokes thunk-returning functions. An example is the CurrentThreadScheduler in the Reactive Extensions. When an item is scheduled on a CurrentThreadScheduler, the work item is added to the scheduler's thread-local queue, after which one of the following things will happen:

  1. If the trampoline is already running (i.e., the current thread is already processing the thread-local queue), then the Schedule() call returns immediately.
  2. If the trampoline is not running (i.e., no work items are queued/running on the current thread), then the current thread begins processing the items in the thread-local queue until it is empty, at which point the call to Schedule() returns.

A collapsing queue accumulates items to be processed, with the added twist that if an equivalent item is already in the queue, then that item is simply replaced with the newer item (resulting in only the most recent of the equivalent items remaining in the queue, as opposed to both). The idea is to avoid processing stale/obsolete events. Consider a consumer of market data (e.g., stock ticks). If you receive several updates for a frequently traded security, then each update renders the earlier updates obsolete. There is likely no point in processing earlier ticks for the same security if a more recent tick has already arrived. Thus, a collapsing queue is appropriate.

In your scenario, you essentially have a trampoline processing a collapsing queue with for which all incoming events are considered equivalent. This results in an effective maximum queue size of 1, as every item added to a non-empty queue will result in the existing item being evicted.

Is there some reusable synchronization type out there that could do that for me?

I do not know of an existing solution that would serve your needs, but you could certainly create a generalized trampoline or event loop capable of supporting pluggable scheduling strategies. The default strategy could use a standard queue, while other strategies might use a priority queue or a collapsing queue.

Mike Strobel
  • 25,075
  • 57
  • 69
  • It seems like this would be ideally processed via Reactive Extensions, though I don't know enough to be able to do it myself unfortunately. – Nate Diamond Nov 19 '13 at 23:28
  • I had the same thought, but I couldn't come up with a combination of standard Rx operators that would provide the required functionality. I'm sure @PaulBetts could come up with one. – Mike Strobel Nov 19 '13 at 23:36
  • 1
    I always meant to give RX a try and never got round to it until last week when I spent 4 days trying to do some simple 2D UI gesture tracking with it and ended up doing it with non-RX code in 2 hours after wasting the first 4 days trying to learn RX. I'm either too stupid for it or it is too complicated/not really well suited for what I was doing. I can ask Bart De Smet for help though... :) – Filip Skakun Nov 20 '13 at 07:44
1

What you're describing sounds very similar to how TPL Dataflow's BrodcastBlock behaves: it always remembers only the last item that you sent to it. If you combine it with ActionBlock that executes your action and has capacity only for the item currently being processed, you get what you want (the method needs a better name):

// returns send delegate
private static Action<T> CreateProcessor<T>(Action<T> executedAction)
{
    var broadcastBlock = new BroadcastBlock<T>(null);
    var actionBlock = new ActionBlock<T>(
      executedAction, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });

    broadcastBlock.LinkTo(actionBlock);

    return item => broadcastBlock.Post(item);
}

Usage could be something like this:

var processor = CreateProcessor<int>(
    i =>
    {
        Console.WriteLine(i);
        Thread.Sleep(i);
    });

processor(100);
processor(1);
processor(2);

Output:

100
2
svick
  • 236,525
  • 50
  • 385
  • 514
  • Looks like a cool and simple solution. I'd rather use @Servy's answer if only to avoid the dependency on a component you need to install. Though it does seem interesting enough to maybe get me too look into that Dataflow library... – Filip Skakun Dec 06 '13 at 18:55
  • Could this work with Tasks instead of Actions too, so that I could await a task instead of running the action? – Filip Skakun Dec 06 '13 at 18:58
  • @FilipSkakun Yeah, dataflow, supports `async` delegates. You can do something like `new ActionBlock(async x => await someAsyncMethod(x), …)`. If you want to do that with my code above, change the type of `exectedAction` to `Func`. – svick Dec 06 '13 at 19:09