1

I am preparing for Tech Jobs and came across an interview problem that is now being asked frequently. There are some code snippets I have found on LeetCode etc., but most of them are in Java. I am wondering if I am provided this problem in an interview, is there a way to design/write the solution in C# (things like PriorityQueue etc. are unavailable here)?

Below are the basic use-cases.

  1. Multiple queues maintained by the Library

  2. Each queue must support multiple publishers and subscribers.

  3. Each queue has a maximum retention period beyond which a message in the queue should not reside in memory.

  4. Each message inside the queue can have an optional TTL value. Any message with expired TTL should not be consumed by any subscriber and should not reside in the memory as well.

  5. Each consumer should read all the messages.

Something similar (multiple producers/multiple consumers) have been posted before here Multiple Producers/Consumers, but nothing much is available online for C#.

Any suggestions on how can I use standard .NET APIs to design a solution for this.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104

1 Answers1

0

The Reactive Extensions library (Rx) seems like a good candidate for solving this problem. It is based on two interfaces: IObservable<T> = publisher and IObserver<T> = subscriber, and offers many implementations and combinators of these interfaces. For example the class ReplaySubject<T> implements both of these interfaces, and it has a constructor that accepts a parameter TimeSpan window, that defines the maximum time length of the replay buffer.

You can see below how you could instantiate a ReplaySubject<T> having a 30 seconds retention period, how to enqueue an item in it, how to subscribe a consumer that will receives notifications for all items currently in the subject, as well as all items that will be enqueued in the future, and how to end a subscription to stop receiving notifications:

using System.Reactive.Subjects;
//...
var subject = new ReplaySubject<Item>(TimeSpan.FromSeconds(30));
//...
subject.OnNext(new Item());
//...
var subscription = subject.Subscribe((Item x) => Console.WriteLine($"Received {x}"));
//...
subscription.Dispose();

As for the "expired TTL" functionality, it seems tricky because AFAIK there are no built-in components offering this functionality out of the box in Rx. I thought about implementing this as a custom ISubject<T>, based on a structure of nested subjects: ReplaySubject<ReplaySubject<T>>. The inner subjects would contain a single value, and would be instantiated with a window equal to the TTL expiration period of the specific item. The consumer would subscribe to this structure after merging it with the Merge operator. This idea is originated from this question: How can I clear the buffer on a ReplaySubject? Here is an implementation:

/// <summary>
/// Represents an object that is both an observable sequence as well as an observer.
/// Each notification is broadcasted to all subscribed and future observers, subject
/// to buffer trimming and notification expiration policies.
/// </summary>
public class ExpirableReplaySubject<T> : ISubject<T>
{
    private readonly TimeSpan _window;
    private readonly Func<T, TimeSpan> _expireAfterSelector;
    private readonly ReplaySubject<ISubject<T>> _replaySubject;
    private readonly IObservable<T> _replaySubjectMerged;

    public ExpirableReplaySubject(TimeSpan window,
        Func<T, TimeSpan> expireAfterSelector)
    {
        _window = window;
        _expireAfterSelector = expireAfterSelector;
        _replaySubject = new ReplaySubject<ISubject<T>>(window);
        _replaySubjectMerged = _replaySubject.Merge();
    }

    public void OnNext(T value)
    {
        var expireAfter = _expireAfterSelector(value);
        if (expireAfter > _window) expireAfter = _window;
        var inner = new ReplaySubject<T>(1, expireAfter); inner.OnNext(value);
        _replaySubject.OnNext(inner);
    }

    public void OnCompleted()
    {
        // All subjects, inner and outer, must be completed
        _replaySubject.OnCompleted();
        _replaySubject.Subscribe(subject => subject.OnCompleted());
    }

    public void OnError(Exception error)
    {
        // Faulting the outer subject is enough
        _replaySubject.OnError(error);
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        return _replaySubjectMerged.Subscribe(observer);
    }
}

The window parameter configures the retention timespan of the subject, and the expireAfterSelector is a selector that runs for each new item received, and returns the expiration of this specific item.

The problem with this approach is that when the inner subjects are evicted from the outer _replaySubject, according to the window policy, they are not completed (their OnCompleted method is not invoked). Without completing the inner subjects they will never be unsubscribed from the _replaySubjectMerged, resulting to a permanent memory leak. So a bit more work is required. Instead of using ReplaySubject<T>s with a bufferSize equal to 1 as inner subjects, we can implement a lightweight ISubject<T> containing a single expirable value:

private class ExpirableValueSubject<T> : ISubject<T>
{
    private T _value;
    private Timer _timer; // Becomes null when the subject is completed

    public ExpirableValueSubject(T value, TimeSpan expireAfter)
    {
        if (expireAfter <= TimeSpan.Zero) return; // Expired upon creation
        _value = value;
        _timer = new Timer(arg => ((ExpirableValueSubject<T>)arg).OnCompleted(),
            this, expireAfter, Timeout.InfiniteTimeSpan);
    }

    public void OnNext(T value) => throw new InvalidOperationException();
    public void OnError(Exception error) => throw new InvalidOperationException();
    public void OnCompleted()
    {
        lock (this) { _timer?.Dispose(); _timer = null; _value = default; }
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        lock (this)
        {
            if (_timer != null) observer.OnNext(_value);
            observer.OnCompleted();
        }
        return Disposable.Empty;
    }
}

This custom subject emits instantly an OnCompleted notification upon subscription, and its OnCompleted method has been hijacked for releasing the _value and disposing the _timer.

This class uses this as a locker object, which is not advisable in general. In this case the class is intended as an internal component, and references to instances of this class will not leak to the external world, so locking on this should be OK.

To complete our solution, all you have to do is replace this line:

var inner = new ReplaySubject<T>(1, expireAfter); inner.OnNext(value);

...with this:

var inner = new ExpirableValueSubject<T>(value, expireAfter);

With this change, the ExpirableReplaySubject<T> class should work as expected, without leaking memory. It is thread-safe of course, as all ISubject<T> implementations aimed for general usage should be.

Usage example:

var subject = new ExpirableReplaySubject<Item>(TimeSpan.FromSeconds(30),
    item => item.TTL);

Note: The original implementation (Revision 1) used an encapsulated BehaviorSubject<T> as inner subject. This approach was problematic because a BehaviorSubject<T> releases its internal value only when it is disposed, not when it is completed. And after disposing it it becomes unusable (it throws ObjectDisposedExceptions).

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104