3

I want to write an application that evaluates sensor data from two sensors. Both sensors send their data in Package objects which are split into Frame objects. A Package is essentially a Tuple<Timestamp, Data[]>, a Frame is a Tuple<Timestamp, Data>. Then I need to consume always the Frame with the earliest timestamp from both sources.

So basically my object stream is

Package -(1:n)-> Frame \
                        }-pair synchronized-> Tuple<Frame, Frame>
Package -(1:n)-> Frame /

Example

Assume each Package contains either 2 or 3 values (reality: 5-7) and integer timestamps that increment by 1 (reality: ~200Hz => ~5ms increment). The "data" is just timestamp * 100 for sake of simplicity.

Packages (timestamp, values[])

Source 1:
{(19, [1700, 1800, 1900]), (22, [2000, 2100, 2200]), (26, [2500, 2600]),
 (29, [2700, 2800, 2900]), ...}

Source 2:
{(17, [1500, 1600, 1700]), (19, [1800, 1900]), (21, [2000, 2100]),
 (26, [2400, 2500, 2600]), ...}

After (1:n) steps:

Frames (timestamp, value)

Source 1:
{(17, 1700), (18, 1800), (19, 1900), (20, 2000), (21, 2100),
 (22, 2200), (25, 2500), (26, 2600), (27, 2700), (28, 2800),
 (29, 2900), ...}

Source 2:
{(15, 1500), (16, 1600), (17, 1700), (18, 1800), (19, 1900),
 (20, 2000), (21, 2100), (24, 2400), (25, 2500), (26, 2600), ...}

After the pair synchronized step:

Merged tuples (timestamp, source1, source2)

{(15, null, 1500), (16, null, 1600), (17, 1700, 1700), (18, 1800, 1800),
 (19, 1900, 1900), (20, 2000, 2000), (21, 2100, 2100), (22, 2200, null),
 (24, null, 2400), (25, 2500, 2500), (26, 2600, 2600), ...}

Note that timestamp 23 is missing since none of both sources sent a value. That's just a side effect. I can put an empty tuple in or not, doesn't matter. It also doesn't matter if the tuple is (27, 2700, 2700) or ((27, 2700), (27, 2700)), i. e. Tuple<Timestamp, Data, Data> or Tuple<Frame, Frame>.


I'm pretty sure the (1:n) part should be a TransformManyBlock<Package, Frame> if I got the documentation right.

But which block do I use for the pair synchronized part? At first, I thought the JoinBlock<Frame, Frame> would be what I was looking for, but it appears it just pairs two elements index-wise. But since it is neither ensured that both pipelines start with the same timestamp nor that both pipelines will always produce a steady stream of continuous timestamps (because sometimes packages with a few frames may be lost in transmission), this is not an option. So what I need is more of a "MergeBlock" with a possibility to decide which element of both input streams to propagate to the output next (if any).

I figured I'd have to write something like this myself. But I'm having trouble to write the code that properly handles two ISourceBlock variables and one ITargetBlock variable. I'm basically stuck as early as can be:

private void MergeSynchronized(
    ISourceBlock<Frame> source1,
    ISourceBlock<Frame> source2,
    ITargetBlock<Tuple<Frame, Frame>> target)
{
  var frame1 = source1.Receive();
  var frame2 = source2.Receive();

  //Loop {
  //  Depending on the timestamp [mis]match,
  //  either pair frame1+frame2 or frame1+null or null+frame2, and
  //  replace whichever frame(s) was/were propagated already
  //  with the next frame from the respective pipeline
  //}
}

I'm not even sure about this draft: Should the method be async so I can use var frame1 = await source1.ReceiveAsnyc();? What is the loop's condition? Where and how to check for completion? How to solve the obvious problem that my code means I have to wait until a gap in a stream is over to realize that there was a gap?

The alternative I thought about is to add an additional block in the pipelines, ensuring that enough "sentinel frames" are put into the pipeline per sensor so that aligning always the first from each pipeline will align the correct two. I guess that would be a kind-of TransformManyBlock which reads a Frame, compares the "expected" timestamp with the actual timestamp, and then inserts sentinel frames for the missing timestamps until the frame's timestamp is correct again.

Or is the pair synchronized part the place to stop with TPL Dataflow objects and start the actual code that already works with the Data part?

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
LWChris
  • 3,320
  • 1
  • 22
  • 39
  • For the records: I have a strong feeling that trying to synchronize two DataFlow pipelines kinda nukes the whole point of DataFlow in the first place. DataFlow seems to be all about enabling you to process data as fast as possible without having to think about threads or loops... So maybe this is rubbish altogether and I should output the data to the result stream as fast as possible and then combine them when the second stream _also_ has a result later on? – LWChris Apr 22 '19 at 11:37
  • _consume always the Frame with the earliest timestamp from both sources_ can you clarify this a bit, it seems that's exactly what you would get with a `JoinBlock`, under what conditions would you want your merge to join a `Frame` with null? – JSteward Apr 23 '19 at 00:04
  • @JSteward From both split pipelines I take the current element from the queue. This will be the earliest timestamp per queue. So I have two elements timestamps. Either, the timestamps align, then I need to pair them, or they don't align, then I need to only consume the earlier one and wait for that pipeline to "catch up" so to say. But even once they are in sync, this doesn't mean they will stay in sync henceforth. – LWChris Apr 23 '19 at 09:47
  • @JSteward I added a detailed example. – LWChris Apr 23 '19 at 15:07

2 Answers2

3

The problem with the TPL DataFlow API is, that everything is internal/private and/or sealed. This gives you not so much possibilities to extend the API.

Anyway for your problem it might be a good idea to implement a new SynchronizedJoinBlock class. The actual business logic is located in the GetMessagesRecursive method:

    public sealed class SynchronizedJoinBlock<T1, T2>
        : IReceivableSourceBlock<Tuple<T1, T2>>
    {
        private readonly object _syncObject = new object();
        private readonly Func<T1, T2, int> _compareFunction;
        private readonly Queue<T1> _target1Messages;
        private readonly Queue<T2> _target2Messages;
        private readonly TransformManyBlock<T1, Tuple<T1, T2>> _target1;
        private readonly TransformManyBlock<T2, Tuple<T1, T2>> _target2;
        private readonly BatchedJoinBlock<Tuple<T1, T2>, Tuple<T1, T2>> _batchedJoinBlock;
        private readonly TransformManyBlock<Tuple<IList<Tuple<T1, T2>>, IList<Tuple<T1, T2>>>, Tuple<T1, T2>> _transformManyBlock;

        public ITargetBlock<T1> Target1 => _target1;

        public ITargetBlock<T2> Target2 => _target2;

        public Task Completion => _transformManyBlock.Completion;

        public SynchronizedJoinBlock(Func<T1, T2, int> compareFunction)
        {
            _compareFunction = compareFunction
                ?? throw new ArgumentNullException(nameof(compareFunction));
            _batchedJoinBlock = new BatchedJoinBlock<Tuple<T1, T2>, Tuple<T1, T2>>(1);
            _target1Messages = new Queue<T1>();
            _target2Messages = new Queue<T2>();

            Func<ICollection<Tuple<T1, T2>>> getMessagesFunction = () =>
            {
                lock (_syncObject)
                {
                    if (_target1Messages.Count > 0 && _target2Messages.Count > 0)
                    {
                        return GetMessagesRecursive(_target1Messages.Peek(), _target2Messages.Peek()).ToArray();
                    }
                    else
                    {
                        return new Tuple<T1, T2>[0];
                    }
                }
            };

            _target1 = new TransformManyBlock<T1, Tuple<T1, T2>>((element) =>
            {
                _target1Messages.Enqueue(element);
                return getMessagesFunction();
            });
            _target1.LinkTo(_batchedJoinBlock.Target1, new DataflowLinkOptions() { PropagateCompletion = true });

            _target2 = new TransformManyBlock<T2, Tuple<T1, T2>>((element) =>
            {
                _target2Messages.Enqueue(element);
                return getMessagesFunction();
            });
            _target2.LinkTo(_batchedJoinBlock.Target2, new DataflowLinkOptions() { PropagateCompletion = true });

            _transformManyBlock = new TransformManyBlock<Tuple<IList<Tuple<T1, T2>>, IList<Tuple<T1, T2>>>, Tuple<T1, T2>>(
                element => element.Item1.Concat(element.Item2)
            );
            _batchedJoinBlock.LinkTo(_transformManyBlock, new DataflowLinkOptions() { PropagateCompletion = true });
        }

        private IEnumerable<Tuple<T1, T2>> GetMessagesRecursive(T1 value1, T2 value2)
        {
            int result = _compareFunction(value1, value2);
            if (result == 0)
            {
                yield return Tuple.Create(_target1Messages.Dequeue(), _target2Messages.Dequeue());
            }
            else if (result < 0)
            {
                yield return Tuple.Create(_target1Messages.Dequeue(), default(T2));

                if (_target1Messages.Count > 0)
                {
                    foreach (var item in GetMessagesRecursive(_target1Messages.Peek(), value2))
                    {
                        yield return item;
                    }
                }
            }
            else
            {
                yield return Tuple.Create(default(T1), _target2Messages.Dequeue());

                if (_target2Messages.Count > 0)
                {
                    foreach (var item in GetMessagesRecursive(value1, _target2Messages.Peek()))
                    {
                        yield return item;
                    }
                }
            }
        }

        public void Complete()
        {
            _target1.Complete();
            _target2.Complete();
        }

        Tuple<T1, T2> ISourceBlock<Tuple<T1, T2>>.ConsumeMessage(
            DataflowMessageHeader messageHeader,
            ITargetBlock<Tuple<T1, T2>> target, out bool messageConsumed)
        {
            return ((ISourceBlock<Tuple<T1, T2>>)_transformManyBlock)
                .ConsumeMessage(messageHeader, target, out messageConsumed);
        }

        void IDataflowBlock.Fault(Exception exception)
        {
            ((IDataflowBlock)_transformManyBlock).Fault(exception);
        }

        public IDisposable LinkTo(ITargetBlock<Tuple<T1, T2>> target,
            DataflowLinkOptions linkOptions)
        {
            return _transformManyBlock.LinkTo(target, linkOptions);
        }

        void ISourceBlock<Tuple<T1, T2>>.ReleaseReservation(
            DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target)
        {
            ((ISourceBlock<Tuple<T1, T2>>)_transformManyBlock)
                .ReleaseReservation(messageHeader, target);
        }

        bool ISourceBlock<Tuple<T1, T2>>.ReserveMessage(
            DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target)
        {
            return ((ISourceBlock<Tuple<T1, T2>>)_transformManyBlock)
                .ReserveMessage(messageHeader, target);
        }

        public bool TryReceive(Predicate<Tuple<T1, T2>> filter, out Tuple<T1, T2> item)
        {
            return _transformManyBlock.TryReceive(filter, out item);
        }

        public bool TryReceiveAll(out IList<Tuple<T1, T2>> items)
        {
            return _transformManyBlock.TryReceiveAll(out items);
        }
    }
Hardy Hobeck
  • 218
  • 3
  • 6
  • nice. and you're absolutely correct about (what I consider to be) the major limitation of the otherwise very nice TPL Dataflow library. – davidbak Oct 11 '19 at 19:09
1

Here is an implementation of a SynchronizedJoinBlock block, similar with the one presented in Hardy Hobeck's answer. This one takes care of some minor details, like cancellation, handling exceptions, and dealing with propagating the remaining items when the input blocks Target1 and Target2 are marked as completed. Also the merging logic does not involve recursion, which should make it perform better (hopefully, I didn't measure it) and not be susceptible to stack overflow exceptions. Small deviation: the output is a ValueTuple<T1, T2> instead of Tuple<T1, T2> (with the intention of reducing allocations).

public sealed class SynchronizedJoinBlock<T1, T2> : IReceivableSourceBlock<(T1, T2)>
{
    private readonly Func<T1, T2, int> _comparison;
    private readonly Queue<T1> _queue1 = new Queue<T1>();
    private readonly Queue<T2> _queue2 = new Queue<T2>();
    private readonly ActionBlock<T1> _input1;
    private readonly ActionBlock<T2> _input2;
    private readonly BufferBlock<(T1, T2)> _output;
    private readonly object _locker = new object();

    public SynchronizedJoinBlock(Func<T1, T2, int> comparison,
        CancellationToken cancellationToken = default)
    {
        _comparison = comparison ?? throw new ArgumentNullException(nameof(comparison));

        // Create the three internal blocks
        var options = new ExecutionDataflowBlockOptions()
        {
            CancellationToken = cancellationToken
        };
        _input1 = new ActionBlock<T1>(Add1, options);
        _input2 = new ActionBlock<T2>(Add2, options);
        _output = new BufferBlock<(T1, T2)>(options);

        // Link the input blocks with the output block
        var inputTasks = new Task[] { _input1.Completion, _input2.Completion };
        Task.WhenAny(inputTasks).Unwrap().ContinueWith(t =>
        {
            // If ANY input block fails, then the whole block has failed
            ((IDataflowBlock)_output).Fault(t.Exception.InnerException);
            if (!_input1.Completion.IsCompleted) _input1.Complete();
            if (!_input2.Completion.IsCompleted) _input2.Complete();
            ClearQueues();
        }, default, TaskContinuationOptions.OnlyOnFaulted |
            TaskContinuationOptions.RunContinuationsAsynchronously,
            TaskScheduler.Default);
        Task.WhenAll(inputTasks).ContinueWith(t =>
        {
            // If ALL input blocks succeeded, then the whole block has succeeded
            try
            {
                if (!t.IsCanceled) PostRemaining(); // Post what's left
            }
            catch (Exception ex)
            {
                ((IDataflowBlock)_output).Fault(ex);
            }
            _output.Complete();
            ClearQueues();
        }, default, TaskContinuationOptions.NotOnFaulted |
            TaskContinuationOptions.RunContinuationsAsynchronously,
            TaskScheduler.Default);
    }

    public ITargetBlock<T1> Target1 => _input1;
    public ITargetBlock<T2> Target2 => _input2;
    public Task Completion => _output.Completion;

    private void Add1(T1 value1)
    {
        lock (_locker)
        {
            _queue1.Enqueue(value1);
            FindAndPostMatched_Unsafe();
        }
    }

    private void Add2(T2 value2)
    {
        lock (_locker)
        {
            _queue2.Enqueue(value2);
            FindAndPostMatched_Unsafe();
        }
    }

    private void FindAndPostMatched_Unsafe()
    {
        while (_queue1.Count > 0 && _queue2.Count > 0)
        {
            var result = _comparison(_queue1.Peek(), _queue2.Peek());
            if (result < 0)
            {
                _output.Post((_queue1.Dequeue(), default));
            }
            else if (result > 0)
            {
                _output.Post((default, _queue2.Dequeue()));
            }
            else // result == 0
            {
                _output.Post((_queue1.Dequeue(), _queue2.Dequeue()));
            }
        }
    }

    private void PostRemaining()
    {
        lock (_locker)
        {
            while (_queue1.Count > 0)
            {
                _output.Post((_queue1.Dequeue(), default));
            }
            while (_queue2.Count > 0)
            {
                _output.Post((default, _queue2.Dequeue()));
            }
        }
    }

    private void ClearQueues()
    {
        lock (_locker)
        {
            _queue1.Clear();
            _queue2.Clear();
        }
    }

    public void Complete() => _output.Complete();

    public void Fault(Exception exception)
        => ((IDataflowBlock)_output).Fault(exception);

    public IDisposable LinkTo(ITargetBlock<(T1, T2)> target,
        DataflowLinkOptions linkOptions)
        => _output.LinkTo(target, linkOptions);

    public bool TryReceive(Predicate<(T1, T2)> filter, out (T1, T2) item)
        => _output.TryReceive(filter, out item);

    public bool TryReceiveAll(out IList<(T1, T2)> items)
        => _output.TryReceiveAll(out items);

    (T1, T2) ISourceBlock<(T1, T2)>.ConsumeMessage(
        DataflowMessageHeader messageHeader, ITargetBlock<(T1, T2)> target,
        out bool messageConsumed)
        => ((ISourceBlock<(T1, T2)>)_output).ConsumeMessage(
            messageHeader, target, out messageConsumed);

    void ISourceBlock<(T1, T2)>.ReleaseReservation(
        DataflowMessageHeader messageHeader, ITargetBlock<(T1, T2)> target)
        => ((ISourceBlock<(T1, T2)>)_output).ReleaseReservation(
            messageHeader, target);

    bool ISourceBlock<(T1, T2)>.ReserveMessage(
        DataflowMessageHeader messageHeader, ITargetBlock<(T1, T2)> target)
        => ((ISourceBlock<(T1, T2)>)_output).ReserveMessage(
            messageHeader, target);
}

Usage example:

var joinBlock = new SynchronizedJoinBlock<(int, int), (int, int)>(
    (x, y) => Comparer<int>.Default.Compare(x.Item1, y.Item1));

var source1 = new (int, int)[] {(17, 1700), (18, 1800), (19, 1900),
    (20, 2000), (21, 2100), (22, 2200), (25, 2500), (26, 2600),
    (27, 2700), (28, 2800), (29, 2900)};

var source2 = new (int, int)[] {(15, 1500), (16, 1600), (17, 1700),
    (18, 1800), (19, 1900), (20, 2000), (21, 2100), (24, 2400),
    (25, 2500), (26, 2600)};

Array.ForEach(source1, x => joinBlock.Target1.Post(x));
Array.ForEach(source2, x => joinBlock.Target2.Post(x));

joinBlock.Target1.Complete();
joinBlock.Target2.Complete();

while (joinBlock.OutputAvailableAsync().Result)
{
    Console.WriteLine($"> Received: {joinBlock.Receive()}");
}

Output:

Received: ((0, 0), (15, 1500))
Received: ((0, 0), (16, 1600))
Received: ((17, 1700), (17, 1700))
Received: ((18, 1800), (18, 1800))
Received: ((19, 1900), (19, 1900))
Received: ((20, 2000), (20, 2000))
Received: ((21, 2100), (21, 2100))
Received: ((22, 2200), (0, 0))
Received: ((0, 0), (24, 2400))
Received: ((25, 2500), (25, 2500))
Received: ((26, 2600), (26, 2600))
Received: ((27, 2700), (0, 0))
Received: ((28, 2800), (0, 0))
Received: ((29, 2900), (0, 0))

It is assumed that the incoming data are ordered.

This class shares a similar structure with the JoinDependencyBlock class I posted some time ago in a somewhat related question.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • Switched from `List`s to `Queue`s for internal storage. – Theodor Zoulias Nov 06 '19 at 06:17
  • This looks amazing. Your assumption the input data was ordered is correct. I will test it and accept this if it works. – LWChris Nov 09 '19 at 16:43
  • Thats good news, because if it wasn't ordered you should have to pass comparers for `T1` and `T2`, in addition to the `comparison` lambda, and also replace the internal `Queue`s with some ordered data structure. And .NET doesn't offer any efficient ordered collection that allows duplicates. – Theodor Zoulias Nov 10 '19 at 12:03
  • I finally understand the clever way how this works. Basically, as long as both queues have at least one element in them, deplete them as required. Break depletion as soon as one queue is empty. Each method filling up one queue re-triggers this depletion mechanism. – LWChris Nov 22 '19 at 13:56
  • Since `FindAndPostMatched_Unsafe` runs within the `lock` context of `Add1` and `Add2`, adding more values to any `Queue` is blocked until one queue is depleted. Assume 2 starts a lot later than 1, it could be that a huge amount of frames needs to be depleted from `_queue1` when the first frame of `_queue2` is added, blocking both `Add#` methods for a long time. Would it be better to use [`ConcurrentQueue`](https://learn.microsoft.com/de-de/dotnet/api/system.collections.concurrent.concurrentqueue-1) and put the `Enqueue` calls in `Add1` and `Add2` out of the `lock` context? – LWChris Nov 22 '19 at 14:07
  • @LWChris I don't expect it to be an issue, because the method calls inside the `lock` are very cheap (with the exception of the `comparison` that is supplied by the caller so it can be anything). I don't think that the two `Queue`s can be substituted safely with `ConcurrentQueue`s in this case, because they are manipulated in tandem. You could consider acquiring-releasing the lock on every loop inside the `FindAndPostMatched`, which would add a general (minuscule) overhead but would prevent a lengthy blocking in the extreme case you mentioned. – Theodor Zoulias Nov 22 '19 at 14:29
  • 1
    Hm, yeah... given in my case that I "only" record at 200 Hz, adding one element every 5ms should be plenty of time to deplete a lot of data from a queue miles ahead the other. The only other concern one can have is that [`lock` doesn't guarantee FIFO](https://stackoverflow.com/a/4228884/1843468), but the code would be self-healing, just pairing a couple too many frames with `null`. – LWChris Nov 22 '19 at 14:52
  • @LWChris I just read the linked question. In your case I think that since there will be only two threads competing for the lock, it will be impossible for a sneaky thread to steal the lock from another thread that requested for it earlier. Some time ago I made some tests to study lock contention, and I observed that two threads can acquire the same lock a million times per second each, with almost no contention. So 200 times per second is nothing to worry about. :-) – Theodor Zoulias Nov 22 '19 at 18:19