1

I'm new to rx and have been working on some networking code using reactive extensions in dot net. My problem is that the observable of tcpClients i create with an async function is not completed as I expect when I trigger cancellation through a token I supply. Here is a simplified version of the code I have problem with:

public static class ListenerExtensions
{
    public static IObservable<TcpClient> ToListenerObservable(
        this IPEndPoint endpoint,
        int backlog)
    {
        return new TcpListener(endpoint).ToListenerObservable(backlog);
    }

    public static IObservable<TcpClient> ToListenerObservable(
        this TcpListener listener,
        int backlog)
    {
        return Observable.Create<TcpClient>(async (observer, token) => 
        {
            listener.Start(backlog);

            try
            {
                while (!token.IsCancellationRequested)
                    observer.OnNext(await Task.Run(() => listener.AcceptTcpClientAsync(), token));
                //This never prints and onCompleted is never called.
                Console.WriteLine("Completing..");
                observer.OnCompleted();
            }
            catch (System.Exception error)
            {
                observer.OnError(error);   
            }
            finally
            {
                //This is never executed and my progam exits without closing the listener.
                Console.WriteLine("Stopping listener...");
                listener.Stop();
            }
        });
    }
}
class Program
{
   static void Main(string[] args)
    {
        var home = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 2323);
        var cancellation = new CancellationTokenSource();

        home.ToListenerObservable(10)
            .Subscribe(
                onNext: c => Console.WriteLine($"{c.Client.RemoteEndPoint} connected"),
                onError: e => Console.WriteLine($"Error: {e.Message}"),
                onCompleted: () => Console.WriteLine("Complete"), // Never happens
                token: cancellation.Token);

        Console.WriteLine("Any key to cancel");
        Console.ReadKey();
        cancellation.Cancel();
        Thread.Sleep(1000);
    }
}

If I run this and connect to localhost:2323 I can see that I get a sequence of connected tcpClients. If I however trigger the cancellation of the cancellationtoken the program exits without closing the listener and emitting the onCompleted event like I expect. What am I doing wrong?

svick
  • 236,525
  • 50
  • 385
  • 514
Erik Göök
  • 135
  • 6
  • That code will never run because the cancellation will trigger a `OperationCancelledException` to be thrown. – Paulo Morgado Jun 18 '17 at 19:22
  • https://stackoverflow.com/questions/14524209/what-is-the-correct-way-to-cancel-an-async-operation-that-doesnt-accept-a-cance – Shane Neuville Jun 18 '17 at 20:14
  • And, if you look at this [post from Stephen Cleary](http://blog.stephencleary.com/2015/03/a-tour-of-task-part-9-delegate-tasks.html "A Tour of Task, Part 9: Delegate Tasks"), you'll see that passing a cancellation token to `Task.Run` doesn't do exactly what you might think it does. – Paulo Morgado Jun 18 '17 at 20:31
  • Yeah, I understand that now. I just looked at the method signature and jumper to conclusions. – Erik Göök Jun 18 '17 at 21:12
  • @ErikGöök - Let me know if my answer works you expect it to. – Enigmativity Jun 20 '17 at 01:05
  • Related: [How to cancel an observable sequence](https://stackoverflow.com/questions/6759833/how-to-cancel-an-observable-sequence) – Theodor Zoulias Jan 05 '21 at 06:27

2 Answers2

6

It's always good to try to avoid writing too much try/catch code and to muck around with cancellation tokens. Here's a way of doing what you're doing without moving away from standard Rx operators. Please not I couldn't fully test this code so it might still require a little tweaking.

Try this:

var query = Observable.Create<TcpClient>(o =>
{
    var home = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 2323);
    var listener = new TcpListener(home);
    listener.Start();
    return
        Observable
            .Defer(() => Observable.FromAsync(() => listener.AcceptTcpClientAsync()))
            .Repeat()
            .Subscribe(o);
});

var completer = new Subject<Unit>();
var subscription =
    query
        .TakeUntil(completer)
        .Subscribe(
            onNext: c => Console.WriteLine($"{c.Client.RemoteEndPoint} connected"),
            onError: e => Console.WriteLine($"Error: {e.Message}"),
            onCompleted: () => Console.WriteLine("Complete"));

Console.WriteLine("Enter to cancel");
Console.ReadLine();
completer.OnNext(Unit.Default);
Thread.Sleep(1000);

The key thing here is the completer that asks like a cancellation token. It finishes the subscription naturally, unlike subscription.Dispose() which finishes it without the OnCompleted call.

Enigmativity
  • 113,464
  • 11
  • 89
  • 172
  • That's really cool. I made a disposable like this: `var stop = Disposable.Create(() => listener.Stop());` Then return a CompositeDisposeable. Now I'm not sure how important it is to call stop() before exiting the program. First I thought I had to to be able to start a listener to the same socket right after. It turns out this doesn't guarantee it of I understand correctly. The operating system might keep them open for up to four minutes after my program terminates in a waiting state. I think this gives me an "address in use" error if I run the listener again right after I end it. – Erik Göök Jun 23 '17 at 21:49
1

So turns out I was confused about a few things here. This article helped me get on track. In fact I ended up copying the code from there.

public static class TaskCancellations
{
    public static async Task<T> WithCancellation<T>(
        this Task<T> task,
        CancellationToken token)
    {
        var cancellation = new TaskCompletionSource<bool>();
        using (token.Register(s =>
             (s as TaskCompletionSource<bool>).TrySetResult(true), cancellation))
        {
            if (task != await Task.WhenAny(task, cancellation.Task))
                throw new OperationCanceledException(token);
            return await task;
        }
    }
}

And use it with the tcplistener like this:

public static IObservable<TcpClient> ToListenerObservable(
    this TcpListener listener,
    int backlog)
{
    return Observable.Create<TcpClient>(async (observer, token) =>
    {
        listener.Start(backlog)
        try
        {
            while (!token.IsCancellationRequested)
            {
                observer.OnNext(await listener.AcceptTcpClientAsync()
                    .WithCancellation(token));
            }
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine("Completing...");
            observer.OnCompleted();
        }
        catch (System.Exception error)
        {
            observer.OnError(error);
        }
        finally
        {
            Console.WriteLine("Stopping listener...");
            listener.Stop();
        }
    });
}

Everything works as expected now.

Erik Göök
  • 135
  • 6
  • 1
    I'm not familiar with the `TcpListener` API, but I think you should use something like `using (token.Register(() => listener.Stop()))`. – Paulo Morgado Jun 19 '17 at 08:57