0

Problem: I have a subscription to a never ending messaging service, my code needs to check if any message satisfies the condition, if it is satisfied, then close the subscription before all the messages are processed and return true. If I have processed all the messages and the condition isn't satisfied then I need to close the subscription and return false.

For example, the condition is foo = 5:

message dataset early success :
msg1: foo=1
msg2: foo=2
msg3: foo=5 <= condition satisfied, return true and stop processing
msg4: foo=6

message dataset failure :
msg1: foo=1
msg2: foo=2
msg3: foo=3 
msg4: foo=4 <= no more messages, return false and stop processing

The subscription I use has a synchronous method that I have to pass an async EventHandler. Here is my functioning code that works for both scenarios, lastMessageReceivedDateTime tracks when a message was last received (to identify the end of the messages) and _conditionStatisfied tells me if I've got my data:

private DateTime lastMessageReceivedDateTime;
private bool _conditionSatisfied;

public Task<bool> CheckSubscription(IThirdParyCode connection)
{
     var subscription = connection.Subscribe(async (obj, args) =>
     {
         lastMessageReceivedDateTime = DateTime.Now;
         if(args.Message.foo == 5)
         {
              _conditionSatisfied = true;
         }
     });

     while (lastMessageReceivedDateTime.AddSeconds(1) > DateTime.Now  && !_conditionSatisfied)
     {
         Thread.Sleep(500);
     }

     subscription?.Unsubscribe();
     return _activityCheckSatisfied;
}

This works, but I wanted to know if there was a better solution.

Note: I can't simply await the async method, as it never returns/completes until I unsubscribe.

More info: The type of the connection is an IStanConnection (from NATS), and the signature of Subscribe is:

IStanSubscription Subscribe(string subject, StanSubscriptionOptions options,
    EventHandler<StanMsgHandlerArgs> handler);

I had simplified the signature to focus on the code I had issue with.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
Mr Giggles
  • 2,483
  • 3
  • 22
  • 35
  • What is the type of the `connection` object, and what is the signature of the `connection.Subscribe` method? Could you include this info in the question? – Theodor Zoulias Jul 31 '21 at 07:14
  • Does the `async (obj, args) =>...` delegate in the code sample corresponds to the `EventHandler handler` argument of the `Subscribe` method? – Theodor Zoulias Jul 31 '21 at 08:32
  • Don't you get a [CS1998](https://stackoverflow.com/questions/29923215/should-i-worry-about-this-async-method-lacks-await-operators-and-will-run-syn) warning about an async method without an await operator? – Theodor Zoulias Jul 31 '21 at 08:43
  • @zoulias yes the delegate refers to the eventhandler. I don't get an error as there is an await otherstuff() in the delegate that I've omitted for clarity – Mr Giggles Jul 31 '21 at 11:15
  • Mr Giggles honestly I know nothing about the NATS server and so I can't answer your question, but I am trying to make it more clear for others who may be able to help. I would suggest to remove the `async` from the delegate, since it is not relevant to the problem, to avoid creating confusion about why is there. – Theodor Zoulias Jul 31 '21 at 11:23

1 Answers1

0

Based on your code example I can assume that the message stream ends if there were no new messages within a second of the last message. Your solution can be modified to eliminate active waiting loop and replace it with single await call. It would be based on two tasks:

  1. First task would track successful completion (_conditionSatisfied in your example) and is going to be set by TaskCompletionSource.SetResult
  2. Second task would try to signal end of the stream by using combination of CancellationToken task wrapper (example implementation of such wrapper) and CancellationTokenSource.CancelAfter which would try to cancel task after each iteration with deferral. This should replace lastMessageReceivedDateTime.AddSeconds(1) > DateTime.Now condition.

Modified code should look like this:

private CancellationTokenSource streamEndCancellation = new CancellationTokenSource();
private TaskCompletionSource<bool> satisfiedCompletionSource = new TaskCompletionSource<bool>();

public async Task<bool> CheckSubscription(IThirdParyCode connection)
{
     // CancellationTokenTaskSource is in third-party library and not part of .NET
     var streamEndSource = new CancellationTokenTaskSource<bool>(streamEndCancellation.Token);

     var subscription = connection.Subscribe(async (obj, args) =>
     {
         lastMessageReceivedDateTime = DateTime.Now;
         if(args.Message.foo == 5)
         {
             satisfiedCompletionSource.SetResult(true);
         }
         streamEndCancellation.CancelAfter(1000);
     });

     Task<bool> actualTask = await Task.WhenAny<bool>(satisfiedCompletionSource.Task, streamEndSource.Task);
          
     subscription?.Unsubscribe();
     return !actualTask.IsCanceled;
}
Petr
  • 349
  • 1
  • 4