11

I have an efficient C# application that receives 80 bytes of data at a rate of 5k to 10k records per second on a multi threaded CPU.

I need to now set up a in memory-cache to detect and filter duplicate records so I can suppress them from travelling further in the pipeline.

Cache specs (maximum thresholds)

  • 80 bytes of data
  • 10,000 records / second
  • 60 Seconds of cache = Key quantity = 60,000
  • (sub total 48000000 bytes = 48Mb )
  • Ideal cache size = 5 minutes (or 240Mb)
  • Acceptable runtime cache size bloat = 1 GB

Question

What is the best way to set up an in-memory cache, dictionary, hashtable, array, etc that will allow the most efficient lookups, purging of old cache data, and prevent expiration of data that is hit.

I looked at ASP.Net Cache, System.Runtime.MemoryCache but think I need something more lightweight and customized to achieve correct throughput. I'm also looking at the System.Collections.Concurrent as an alternative and this related whitepaper.

Does anyone have suggestions on what the best approach would be?

makerofthings7
  • 60,103
  • 53
  • 215
  • 448
  • 2
    as always with "throughput questions": it depends heavily on your runtime environment (HW, OS etc.). I can only highly recommend to profile alternative options and see how they perform, otherwise this might be a case of "premature optimization"... – Yahia May 12 '12 at 13:34
  • Take a look at [redis](http://redis.io/) – oleksii May 12 '12 at 14:10
  • That's where you might need the "unsafe" keyword and work with pointers directly. – frenchie May 12 '12 at 16:53
  • @frenchie Once I use pointers, what logic should I use with them? – makerofthings7 May 12 '12 at 16:56
  • @oleksii I would avoid Redis unless you needed it on a separate machine (it's all network access). Kyoto Cabinet etc are better for something embedded, but again only if you need *persistence*. If not, it's a roll your own in memory thing. – yamen May 12 '12 at 22:27
  • How does 60 seconds translate to 60.000 keys with 10.000 keys per second? – Lasse V. Karlsen May 12 '12 at 22:49
  • @yamen - I'm at 75% now – makerofthings7 May 13 '12 at 02:33

3 Answers3

11

Remember, don't prematurely optimise!

There may be a reasonably concise way of doing this without resorting to unmanaged code, pointers and the like.

A quick test on my old, ordinary laptop shows that you can add 1,000,000 entries to a HashSet while removing 100,000 entries in ~100ms. You can then repeat that with the same 1,000,000 values in ~60ms. This is for working with just longs - 80 byte data structures are obviously larger, but a simple benchmark is in order.

My recommendations:

  • Implement the 'lookup' and 'duplicate detection' as a HashSet, which is extremely fast for inserting, removing and finding.

  • Implement the actual buffer (that receives new events and expires old ones) as a suitably large circular/ring buffer. This will avoid memory allocations and deallocations, and can add entries to the front and remove them from the back. Here are some helpful links including one (the second one) that describes algorithms for expiring items in the cache:

Circular Buffer for .NET

Fast calculation of min, max, and average of incoming numbers

Generic C# RingBuffer

How would you code an efficient Circular Buffer in Java or C#

  • Note that the circular buffer is even better if you want your cache to be bounded by number of elements (say 100,000) rather than time of events (say the last 5 minutes).

  • When items are removed from the buffer (which searches from the end first), they can be removed from the HashSet also. No need to make both data structures the same.

  • Avoid multithreading until you need it! You have a naturally 'serial' workload. Unless you know one of your CPU threads can't handle the speed, keep it in a single thread. This avoids contention, locks, CPU cache misses and other multithreading headaches that tend to slow things down for workloads that are not embarrassingly parallel. My main caveat here is that you may want to offload the 'receiving' of the events to a different thread from the processing of them.

  • The above recommendation is the main idea behind Staged event-driven architecture (SEDA) that is used as the basis for high-performance and stable-behaviour event-driven systems (such as messaging queues).

The above design can be wrapped cleanly, and attempts to achieve the raw performance required with a minimum of complexity. This only provides a decent baseline from which efficiency can now be extracted and measured.

(Note: If you need persistence for the cache, look at Kyoto Cabinet. If you need the cache to be visible to other users or distributed, look at Redis.

Community
  • 1
  • 1
yamen
  • 15,390
  • 3
  • 42
  • 52
  • Got it. All my input data is coming in from many threads. After this advice, I'm using ConcurrentQueue to FIFO the data to one consolidated thread. The code for the single thread is pasted as an answer below. – makerofthings7 May 13 '12 at 19:42
0

I don't have anything to back this up but I do like some weekend practice :)

To solve the purging you could use a circular cache where the latest values overwrite the oldest (you won't have an exact n minutes cache that way of course) so you only need to remember the offset where your last record went. You could initialize the cache by filling it with copies of the first record, to prevent it from matching a record with only 0's with the cache's uninitialized data.

You could then simply start matching from the first byte, if the record doesn't match, skip that record's remaining bytes and attempt to match the next until the end of the cache.

If the records contain a header followed by data you may want to match backwards to increase the speed at which you find nonmatching data.

C.Evenhuis
  • 25,996
  • 2
  • 58
  • 72
  • I started going down this path, but thought that is the same as doing Dict. So I then considered rounding the time into minute intervals with a distinct dictionary for each minute. I then considered multiple threads, and linked lists to track interval and use a hash to track the data. Then my head exploded. – makerofthings7 May 12 '12 at 15:35
  • Maybe I could use a concurrent collection to track the data (keyed by the hash), with a `QueryCount` and `LastQuery` property. Then to handle purging of old data I could have an array of ConcurrentCollections (one per minute) that tracks what Hash codes are potential purge candidate. An independent purge thread could read the purge candidate collection and lazily read the object in the Master Collection and see if LastQuery is below threshold. Purge if the data is stale, do nothing if the data is active. – makerofthings7 May 12 '12 at 15:42
  • That last comment is assuming that multithreading is the only way to go for this... and that a suitable threading abstraction is required. – makerofthings7 May 12 '12 at 15:43
  • The circular cache I had in mind would simply be one huge allocation of memory, in unmanaged memory or pinned. As soon as hashing would be more efficient, my answer could be deleted. – C.Evenhuis May 12 '12 at 16:11
0

Here is a sample that will work with a single thread. The code uses two dictionaries to track the data. One dictionary is used to track records per interval hashDuplicateTracker and a second dictionary to age out certain values of the dictionary HashesByDate

Bugs: CheckDataFreshness has a few bugs in it relating to ElementAt()... I'm working through this.

Some improvements I should make

  • Replace the Linq operator ElementAt(x) with something else
  • Make sure CheckDataFreshness runs no more frequently than once per interval

To make this multithreaded

  • Replace Dictionary with ConcurrentDictionary for FrequencyOfMatchedHash, DecrementRecordHash,
  • Get a sorted version of ConcurrentDictionary or use locks for HashesByDate

 public class FrequencyOfMatchedHash : Dictionary<int,int>
{ 
    public void AddRecordHash(int hashCode)
    {
        if (this.ContainsKey(hashCode))
        {
            this[hashCode]++;
        }
        else
        {
            this.Add(hashCode, 1);
        }
    }
    public void DecrementRecordHash(int hashCode)
    {
        if (this.ContainsKey(hashCode))
        {
            var val = this[hashCode];
            if (val <= 1)
            {
                this.Remove(hashCode);
            }
        } 
    }

    public override string ToString()
    {
        return this.Count + " records";
    }
}

public class HashDuplicateTracker : Dictionary<int, int >
{

    internal void AddRecord(int recordHash)
    {
        if (this.ContainsKey(recordHash))
        {
            this[recordHash]++;
        }
        else
        {
            this.Add(recordHash, 1);
        }
    }
}


public class HashesByDate : SortedDictionary<DateTime, FrequencyOfMatchedHash>
{
    internal void AddRecord(DateTime dt, int recordHash)
    {
        if (this.ContainsKey(dt))
        {
            this[dt].AddRecordHash(recordHash);
        }
        else
        {

            var tmp = new FrequencyOfMatchedHash();
            tmp.AddRecordHash(recordHash);

            var tmp2 = new FrequencyOfMatchedHash();
            tmp2.AddRecordHash(recordHash);
            this.Add(dt, tmp);
        }
    }
}
public class DuplicateTracker
{
    HashDuplicateTracker hashDuplicateTracker = new HashDuplicateTracker();

    // track all the hashes by date
    HashesByDate hashesByDate = new HashesByDate();


    private TimeSpan maxRange;
    private int average;

    public DuplicateTracker(TimeSpan range)
    {
        this.maxRange = range;
    }

    public void AddRecordHash(DateTime dt, int recordHash)
    {
        if (hashesByDate.Count == 0)
        {
            hashDuplicateTracker.AddRecord(recordHash);
            hashesByDate.AddRecord(dt, recordHash);

            return;
        }
        else
        {
            // Cleanup old data
            DateTime maxDate = hashesByDate.ElementAt(hashesByDate.Count - 1).Key;
            DateTime oldestPermittedEntry = maxDate - maxRange;

            if (dt >= oldestPermittedEntry)
                try
                {
                    hashDuplicateTracker.AddRecord(recordHash);
                    hashesByDate.AddRecord(dt, recordHash);

                    CheckDataFreshness(oldestPermittedEntry);
                }
                catch (ArgumentException e)
                {
                    // An entry with the same key already exists.
                    // Increment count/freshness
                    hashesByDate[dt].AddRecordHash(recordHash);
                    hashDuplicateTracker[recordHash]++;
                    CheckDataFreshness(oldestPermittedEntry);
                }
        }
    }


    /// <summary>
    /// This should be called anytime data is added to the collection
    /// 
    /// If threading issues are addressed, a more optimal solution would be to run this on an independent thread.
    /// </summary>
    /// <param name="oldestEntry"></param>
    private void CheckDataFreshness(DateTime oldestEntry)
    {
        while (hashesByDate.Count > 0)
        {
            DateTime currentDate = hashesByDate.ElementAt(0).Key;

            if (currentDate < oldestEntry)
            {
                var hashesToDecrement = hashesByDate.ElementAt(0).Value;

                for (int i = 0; i < hashesToDecrement.Count; i++)
                {
                    int currentHash = hashesToDecrement.ElementAt(0).Key;
                    int currentValue = hashesToDecrement.ElementAt(0).Value;

                    // decrement counter for hash
                    int tmpResult = hashDuplicateTracker[currentHash] - currentValue;
                    if (tmpResult == 0)
                    {
                        // prevent endless memory growth.
                        // For performance this might be deferred 
                        hashDuplicateTracker.Remove(tmpResult);
                    }
                    else
                    {
                        hashDuplicateTracker[currentHash] = tmpResult;
                    }

                    // remove item
                    continue;
                }

                hashesByDate.Remove(currentDate);

            }
            else
                break;
        }
    }

 }
makerofthings7
  • 60,103
  • 53
  • 215
  • 448
  • Good to see an attempt. I think this is on the wrong side of efficiency though, for a number of reasons. 1) You want the data structure that is being used to add and remove objects to be naturally FIFO of some sort, since you always add at one end and remove on the other. It will be optimised for that. 2) Unless you need to track the number of duplicates all the time, a `HashSet` will be far more efficient than a Dictionary, especially since it's always just one operation to add or one operation to check. – yamen May 13 '12 at 20:36
  • @yamen - I'm tracking the number of duplicates because it is part of the customer's spec. In addition, I think it's possible to use the count of expiring hashes to update the Average and Rate properties without having to search for and re-sum the Average and Rate properties for every interval. I can't think of a natural FIFO solution that allows this, or... – makerofthings7 May 13 '12 at 20:52
  • @yamen I guess I'm creating hoppingWindow with this code http://msdn.microsoft.com/en-us/library/ff518448.aspx ... which might be the most appropriate. – makerofthings7 May 13 '12 at 20:55
  • Number of duplicates total is simple with `HashSet` (`Add` returns true if it did add). You can then keep only those around that were duplicates, but still get the far more efficient structure. You can also keep a running `average` but still use a FIFO data structure - just keep a running sum and count, increment it as you add, decrement as you subtract. You started this question asking for efficiency, and the current approach is very inefficient. – yamen May 13 '12 at 20:56
  • @yamen Thanks for the advice, I'm trying to implement it. Will be experimenting with HashSet – makerofthings7 May 13 '12 at 21:51