1

I am working on a client/server application. The server sends messages to the client, but the order cannot be guaranteed. I am using TCP... I don't want to get into why the order cannot be guaranteed (it is to do with threads on the server).

Anyway, on the client, I am processing messages like this:

private Queue<byte[]> rawMessagesIn = new Queue<byte[]>();
public ConcurrentBag<ServerToClient> messages = new ConcurrentBag<ServerToClient>();

public void Start()
{
    var processTask = Task.Factory.StartNew(() =>
    {
        while (run)
        {
            process();
        }
    });
}

void process(){
    if(rawMessagesIn.Count > 0){
        var raw_message = rawMessagesIn.Dequeue();
        var message = (ServerToClient)Utils.Deserialize(raw_message);
            messages.Add(message);
    }
}

private void OnDataReceived(object sender, byte[] data)
{
    rawMessagesIn.Enqueue(data);
}

Now, it is important that when I call messages.TryTake() or messages.TryPeek() that the message out is the next in the sequence. Every message has a number/integer representing its order. For example, message.number = 1

I need to use TryPeek because the message at index 0 might be the correct message or it might be the wrong message, in which case we remove the message from the bag. However, there is a possibility that the message is a future required message, and so it should not be removed.

I have tried using message.OrderBy(x=>x.number).ToList(); but I cannot see how it will work. If I use the OrderBy and get a sorted list SL and the item at index 0 is the correct one, I cannot simply remove or modify messages because I do not know its position in the ConcurrentBag!

Does anyone have a suggestion for me?

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
pookie
  • 3,796
  • 6
  • 49
  • 105
  • What sets the value for `message.number`? Are those guaranteed to be unique? If so, could you use a [`ConcurrentDictionary`](https://msdn.microsoft.com/en-us/library/dd287191(v=vs.110).aspx) with `message.number` as the `Key`? – Rufus L May 02 '18 at 20:04
  • @RufusL Those are set on the server. No, they are not guaranteed to be unique. However, the last to arrive is always correct. I thought about `ConcurrentDictionary` and if the key exists, just overwrite the value. Not sure that is suitable, though. – pookie May 02 '18 at 20:07
  • I would re-evaluate how this is setup. But, if I was stuck in this scenario, I would probably end up with a custom class where I managed a List myself using `lock`. Not ideal, but I don't see any of the thread-safe collections being viable in this scenario. Hopefully I am wrong and would love to see a proper answer. – TyCobb May 02 '18 at 20:07
  • 2
    Just saw your comment about `last to arrive is always correct`. Perhaps `ConcurrentStack`? It's LIFO. – TyCobb May 02 '18 at 20:12
  • "No, they are not guaranteed to be unique. However, the last to arrive is always correct. " Doesn't this mean you shouldn't process _any_ messages until they've all arrived? Or am I misunderstanding something? – gnud May 02 '18 at 20:13
  • @gnud I must check if the message with the required `number` has arrived, so I need to check each message. However, the message might be invalid and so it can be thrown away as the valid one will arrive later. – pookie May 02 '18 at 20:16
  • OK. If the message is invalid, will the valid one that you receive later have the same number as the invalid one? Or is this a sequence number that always rises as the server creates new messages, even though you might receive them in any order? – gnud May 02 '18 at 20:20
  • @gnud "If the message is invalid, will the valid one that you receive later have the same number as the invalid one?". Yup, exactly. The latest message is essentially an `updated` one. – pookie May 02 '18 at 20:22
  • 3
    See - I thought that when you received message "1" you only cared about message "2" and up. But you always need to store all messages, at most one for each number. So why not just use a ConcurrentDictionary? – gnud May 02 '18 at 20:23
  • Thanks @all. In the end, the `ConcurrentDictionary` turned out to be suitable. Nevertheless, it would be nice if there was a thread-safe sorted collection in .Net. I'm lucky that the key `number` can simply be overwritten, but that may not always be the case. – pookie May 03 '18 at 08:32

1 Answers1

0

My suggestion is to switch from manually managing queues, to a TransformBlock<TInput,TOutput> from the TPL Dataflow library. This component is a combination of an input queue, and output queue, and a processor that transforms the TInput to TOutput. The EnsureOrdered functionality is built-in, and it is the default. Example:

private readonly TransformBlock<byte[], ServerToClient> _transformer;

public Client() // Constructor
{
    _transformer = new((byte[] raw_message) =>
    {
        ServerToClient message = (ServerToClient)Utils.Deserialize(raw_message);
        return message;
    }, new ExecutionDataflowBlockOptions()
    {
        EnsureOrdered = true, // Just for clarity. true is the default.
        MaxDegreeOfParallelism = 1, // the default is 1
    });
}

private void OnDataReceived(object sender, byte[] data)
{
    bool accepted = _transformer.Post(data);
    // The accepted will be false in case the _transformer has failed.
}

public bool TryReceiveAll(out IList<ServerToClient> messages)
{
    return _transformer.TryReceiveAll(out messages);
}

There are many ways to consume the ServerToClient messages that are stored in the output queue of the block. The example above demonstrates the TryReceiveAll method. There are also the TryReceive, Receive, ReceiveAsync and ReceiveAllAsync (some of them are extension methods). You can also use the lower level method OutputAvailableAsync as shown here. Linking it to another dataflow block is also an option.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104