1

I should implement a method that can return a list of devices. The goal is to return a device as soon as one is found without waiting for the search to finish.

To do this I think I use IAsyncEnumerable.

The Discover method of my implementation has an Action that takes care of adding an item to the list as soon as one is found.

How could I modify the Action and make sure that the new device is not added to a list but returned via "yield return"?

Below is the code I used:

public async IAsyncEnumerable<Device> ScanAsync(
    CancellationToken cancellationToken = default(CancellationToken))
{
    var devices = new List<DiscoveryDevice>();

    await Discover(TimeoutSeconds, d => devices.Add(d), cancellationToken);

    yield return new Device("", ""); //ERROR: return always last
}

private async Task Discover(int timeout, Action<DiscoveryDevice> onDeviceDiscovered,
    CancellationToken cancellationToken = default(CancellationToken))
{
    IEnumerable<IOnvifUdpClient> foreachInterface = clientFactory
        .CreateClientForeachInterface();

    if (!foreachInterface.Any())
        throw new Exception(
            "Missing valid NetworkInterfaces, UdpClients could not be created");

    await Task.WhenAll(foreachInterface.Select(
        client => Discover(timeout, client, onDeviceDiscovered,
            cancellationToken)).ToArray());
}

private async Task Discover(int timeout, IOnvifUdpClient client,
    Action<DiscoveryDevice> onDeviceDiscovered,
    CancellationToken cancellationToken = default(CancellationToken))
{
    var messageId = Guid.NewGuid();
    var responses = new List<UdpReceiveResult>();
    var cts = new CancellationTokenSource(TimeSpan.FromSeconds(timeout));

    try
    {
        await SendProbe(client, messageId);

        while (true)
        {
            if (cts.IsCancellationRequested ||
                cancellationToken.IsCancellationRequested)
                break;

            try
            {
                var response = await client.ReceiveAsync()
                                .WithCancellation(cancellationToken)
                                .WithCancellation(cts.Token);

                if (IsAlreadyDiscovered(response, responses)) continue;

                responses.Add(response);
                var discoveredDevice = ProcessResponse(response, messageId);

                if (discoveredDevice != null)
                {
                    Task.Run(() => onDeviceDiscovered(discoveredDevice));
                }
            }
            catch (Exception)
            { }
        }
    }
    finally
    {
        client.Close();
    }
}
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • How are the `DiscoveryDevice` and `Device` types related? – Theodor Zoulias Jul 25 '22 at 21:38
  • A Device object is created with two properties of DiscoveryDevice – Antonio Argentieri Jul 25 '22 at 21:40
  • The code would be cleaner and easier to handle if the innermost `Discover` returned an `IAsyncEnumerable` instead of calling a callback. To monitor multiple devices you could call the new `Discover` once per device then merge the output streams. Instead of `Task.Run(() => onDeviceDiscovered(discoveredDevice));` you could have `yield return discoveredDevice` – Panagiotis Kanavos Jul 27 '22 at 15:21

1 Answers1

2

You need a buffer to store the discovered devices, because the work of discovering the devices is not driven by the enumeration of the resulting IAsyncEnumerable<Device> sequence. In other words you have a push system, hidden behind a pull interface. When this happens you need a buffer, and a suitable buffer for your case is the Channel<T> class. You can find a synopsis of the features of this class here. Usage example:

public IAsyncEnumerable<Device> ScanAsync(
    CancellationToken cancellationToken = default)
{
    Channel<Device> devices = Channel.CreateUnbounded<Device>();

    Task task = Discover(TimeoutSeconds,
        dd => devices.Writer.TryWrite(new Device(dd.X, dd.Y)),
        cancellationToken);

    _ = task.ContinueWith(t => devices.Writer.TryComplete(t.Exception), default,
        TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);

    return devices.Reader.ReadAllAsync(cancellationToken);
}

There are a couple of rough corners in the above code:

  1. There is a fire-and-forget ContinueWith continuation.
  2. The cancellationToken does not cancel gracefully. The resulting sequence might be canceled before the completion of the Discover asynchronous method. That's a second case of fire-and-forget.

It might be possible to improve the code by ditching the Action<DiscoveryDevice> parameter, and passing instead the devices.Writer as argument (a parameter of type ChannelWriter<Device>).

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • Those are problems of this particular code only, and not really fire-and-forget. Especially the first, where the continuation runs in the producer's thread. If you don't want to discard partial results, don't pass the cancellation token to `ReadAllAsync`. Even if you do, I doubt anything will be discarded as [the token is only used to cancel `WaitToRead`](https://github.com/dotnet/runtime/blob/main/src/libraries/System.Threading.Channels/src/System/Threading/Channels/ChannelReader.netcoreapp.cs#L18). No pending items are lost – Panagiotis Kanavos Jul 27 '22 at 15:08
  • 1
    @PanagiotisKanavos C# is not Go. In Go fire-and-forget is how they do things, and they have built-in deadlock detectors that cause [`panic`](https://stackoverflow.com/questions/45871203/understanding-go-channel-deadlocks) in case they found one. C# has no such mechanism in place, and fire-and-forget is highly discouraged by the C# experts. If the `ContinueWith` fire-and-forget fails, the program will just deadlock, without any error message or stack trace. This makes for a frustrating debugging experience, and damaged PR with customers. Please don't promote fire-and-forget in the C# community. – Theodor Zoulias Jul 27 '22 at 16:01
  • I didn't say it is nor do I understand what this comment has to do with anything. Or where you found any references to Go. The code you posted doesn't have any fire-and-forget, in fact that `ContinueWith` runs on the publisher's thread. There's nothing to fail due to the call to `TryComplete` – Panagiotis Kanavos Jul 27 '22 at 16:15
  • 1
    @PanagiotisKanavos here is the fire-and-forget task: `_ = task.ContinueWith(...`. It is the discard `_`. This is a `Task` that is neither awaited, nor observed in any other way. It doesn't matter how unlikely is to fail. It is indeed extremely unlikely to fail. According to the documentation it's impossible to fail. Nevertheless it's still fire-and-forget by definition. One way to prevent it from being fire-and-forget is to `await` it on the thread pool: `ThreadPool.QueueUserWorkItem(async _ => await task);`. This way if the `task` ever fails, the application will crash instead of hang. – Theodor Zoulias Jul 27 '22 at 23:32