1

My class has a field:

IList<Tuple<Object,DateTime>> items = new List<Tuple<Object,DateTime>>();

And an API like:

//Add 'o' to the list. At 'expiry' 'o' shoudl be removed from the list and some action perfromed
public void Add(Object o,DateTime expiry, Callback callback)
{
 items.Add(new Tuple<Object,DateTime>(o,DateTime.Now);
 ???
}

At the specified time the item is removed and some action is performed. I do not want to use a polling loop.

I had considered something like:

public void Add(Object o,DateTime expiry, Callback callback)
{
 items.Add(new Tuple<Object,DateTime>(o,DateTime.Now);
 Task.Delay(expiry - DateTime.Now).ContinueWith(() => {items.Remove(o); callback.notify(o);});
}

Obviously this isn't thread safe but it also seems like it might be a bad idea to have so many tasks.

What is a good way to accomplish this goal neatly? Added details:

  • items may be removed from the list some other way in the interim
  • I probably want a way to clear the list and stop all these tasks
Mr. Boy
  • 60,845
  • 93
  • 320
  • 589
  • Add code that prunes the collection before use. You don't want the items to disappear from the list in the middle of a foreach, for instance, so make it controlled. – Lasse V. Karlsen Jul 30 '20 at 11:08
  • @LasseV.Karlsen apologies I had missed the detail that some entity needs to _know_ about the expiry. Otherwise your simple lazy approach would make a lot more sense. – Mr. Boy Jul 30 '20 at 11:13
  • 2
    Are you talking about some kind of [cache with expiration](https://stackoverflow.com/q/32414054/1997232) ? – Sinatr Jul 30 '20 at 11:21
  • [MemoryCache](https://learn.microsoft.com/en-us/dotnet/api/system.runtime.caching.memorycache) is what you are looking for? – Alexander Petrov Jul 30 '20 at 11:39

2 Answers2

2

after a bit of thinking I came with the following idea:

  • have 1 task run indefenitly, which checks the collection and removes expired after a certain time items, invokes the callbacks for expired
  • have a field which returns not expired items

I assume that you add to the field and multiple thread read from it, so use BlockingCollection for thread safe adding, which can be found in System.Collections.Concurrent which implements the consumer producer pattern.

You could use a field which returns only not expired item from your storage. The storage holds the items until the indefenitly running task starts the clean up.

Here is my example:

 private static readonly BlockingCollection<(object, DateTime, Callback)> _items = new BlockingCollection<(object, DateTime, Callback)>();
    private static bool _running;
public void Start()
    {
        _running = true;
        Task.Factory.StartNew(async () =>
        {
            while (_running)
            {
                await Task.Delay(1000);
                //block adding until we are finished cleaning up
                //we dont need to block when we invoke expired
                var expired = new List<(object, DateTime, Callback)>();
                lock (_items)
                {
                    var items = _items.ToArray();
                    var now = DateTime.Now;
                    var notExpired = items.Where(item => item.Item2 > now);
                    expired.AddRange(items.Where(item => item.Item2 <= now));
                    while(_items.Count > 0)
                    {
                        _items.Take();
                    }

                    foreach (var item in notExpired)
                    {
                        _items.Add(item);
                    }
                }

                foreach (var item in expired)
                {
                    var (o, _, callback) = item;
                    callback?.notify(o);
                }
            }
        });
    }
public IEnumerable<object> Items
    {
        get
        {
            var now = DateTime.Now;
            foreach (var item in _items.ToArray())
            {
                var (text, expiry, _) = item;
                if(expiry > now)
                    yield return text;
            }
        }
    }
    public void Add(object o, DateTime expiry, Callback callback)
    {
        _items.Add((new Random().Next(int.MaxValue).ToString(), DateTime.Now + TimeSpan.FromSeconds(10)), callback);
    }
Maheshvara
  • 71
  • 4
0

A couple of months ago we had been tasked to POC somewhat similar. In our case we needed Memoization, which caches the results of an operation for each input for a specific period of time. In other words if you issue the same operation (call the same method with the same input) and you are in the predefined time range then the response is served from a cache otherwise it performs the original request.

First we have introduced the following helper class:

class ValueWithTTL<T>
{
    public Lazy<T> Result { get; set; }
    public DateTime ExpiresAt { get; set; }
}

The ExpiresAt represents a time in a future when the Result becomes stale.

We have used a ConcurrentDictionary to store the cached results.

Here is the simplified version of the Memoizer helper class:

public static class Memoizer
{
    public static Func<K, V> Memoize<K, V>(this Func<K, V> toBeMemoized, int ttlInMs = 5*1000)
        where K : IComparable
    {
        var memoizedValues = new ConcurrentDictionary<K, ValueWithTTL<V>>();
        var ttl = TimeSpan.FromMilliseconds(ttlInMs);

        return (input) => 
        {
            if (memoizedValues.TryGetValue(input, out var valueWithTtl))
            {
                if (DateTime.UtcNow >= valueWithTtl.ExpiresAt)
                {
                    memoizedValues.TryRemove(input, out _);
                    valueWithTtl = null;
                    Console.WriteLine($"!!!'{input}' has expired");
                }
            }

            if (valueWithTtl != null)
                return valueWithTtl.Result.Value;

            var toBeCached = new Lazy<V>(() => toBeMemoized(input));
            var toBeExpired = DateTime.UtcNow.AddMilliseconds(ttlInMs);
            var toBeCachedWithTimestamp = new ValueWithTTL<V> { Result = toBeCached, ExpiresAt = toBeExpired};
            memoizedValues.TryAdd(input, toBeCachedWithTimestamp);
            return toBeCachedWithTimestamp.Result.Value;
        };
    }
}
  • It receives a Func which will be executed only if the given input is not present inside the memoizedValues or if it does present but the ExpiresAt is smaller than now.
  • The Memoize returns a Func with the same signature as the toBeMemoized, so it will nicely work as a decorator or a wrapper.

Here is the sync probe:

private static readonly WebClient wclient = new WebClient();
private static string[] uris = { "http://google.com", "http://9gag.com", "http://stackoverflow.com", "http://gamepod.hu", "http://google.com", "http://google.com", "http://stackoverflow.com" };

static void SyncProbe(IEnumerable<string> uris)
{
    var getAndCacheContent = Memoizer.Memoize<string, string>(wclient.DownloadString);
    var rand = new Random();
    foreach (var uri in uris)
    {
        var sleepDuration = rand.Next() % 1500;
        Thread.Sleep(sleepDuration);
        Console.WriteLine($"Slept: {sleepDuration}ms");

        var sp = Stopwatch.StartNew();
        _ = getAndCacheContent(uri);
        sp.Stop();
        Console.WriteLine($"'{uri}' request took {sp.ElapsedMilliseconds}ms");
    }
}

And here is the async probe:

private static readonly HttpClient client = new HttpClient();
private static string[] uris = { "http://google.com", "http://9gag.com", "http://stackoverflow.com", "http://gamepod.hu", "http://google.com", "http://google.com", "http://stackoverflow.com" };

static async Task AsyncProbe(IEnumerable<string> uris)
{
    var getAndCacheContent = Memoizer.Memoize<string, Task<string>>(client.GetStringAsync);
    var downloadTasks = new List<Task>();
    var rand = new Random();
    foreach (var uri in uris)
    {
        var sleepDuration = rand.Next() % 1500;
        await Task.Delay(sleepDuration);
        Console.WriteLine($"Slept: {sleepDuration}ms");

        downloadTasks.Add(Task.Run(async () =>
        {
            var sp = Stopwatch.StartNew();
            _ = await getAndCacheContent(uri);
            sp.Stop();
            Console.WriteLine($"'{uri}' request took {sp.ElapsedMilliseconds}ms");
        }));
    }

    await Task.WhenAll(downloadTasks.ToArray());
}

Finally a sample output

Slept: 983ms
'http://google.com' request took 416ms
Slept: 965ms
'http://9gag.com' request took 601ms
Slept: 442ms
'http://stackoverflow.com' request took 803ms
Slept: 1047ms
'http://gamepod.hu' request took 267ms
Slept: 844ms
!!!'http://google.com' has expired
'http://google.com' request took 201ms
Slept: 372ms
'http://google.com' request took 0ms
Slept: 302ms
'http://stackoverflow.com' request took 0ms
  • As you can see we have issued 3 requests against google and 2 of them has been executed normally and 1 of them served from the cache. The second attempt could not be served from the cache because it became stale.
  • We have issued 2 requests against stackoverflow and the first one executed normally and the second one served from the cache.

This was just a POC so there are plenty room for improvement:

  • Make the memoizedValues bounded and use some eviction policy
  • Make use of WeakReference (1)
  • Make use of MemoryCache despite its known limitation
  • etc.
Peter Csala
  • 17,736
  • 16
  • 35
  • 75