I'm new to rx and have been working on some networking code using reactive extensions in dot net. My problem is that the observable of tcpClients i create with an async function is not completed as I expect when I trigger cancellation through a token I supply. Here is a simplified version of the code I have problem with:
public static class ListenerExtensions
{
public static IObservable<TcpClient> ToListenerObservable(
this IPEndPoint endpoint,
int backlog)
{
return new TcpListener(endpoint).ToListenerObservable(backlog);
}
public static IObservable<TcpClient> ToListenerObservable(
this TcpListener listener,
int backlog)
{
return Observable.Create<TcpClient>(async (observer, token) =>
{
listener.Start(backlog);
try
{
while (!token.IsCancellationRequested)
observer.OnNext(await Task.Run(() => listener.AcceptTcpClientAsync(), token));
//This never prints and onCompleted is never called.
Console.WriteLine("Completing..");
observer.OnCompleted();
}
catch (System.Exception error)
{
observer.OnError(error);
}
finally
{
//This is never executed and my progam exits without closing the listener.
Console.WriteLine("Stopping listener...");
listener.Stop();
}
});
}
}
class Program
{
static void Main(string[] args)
{
var home = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 2323);
var cancellation = new CancellationTokenSource();
home.ToListenerObservable(10)
.Subscribe(
onNext: c => Console.WriteLine($"{c.Client.RemoteEndPoint} connected"),
onError: e => Console.WriteLine($"Error: {e.Message}"),
onCompleted: () => Console.WriteLine("Complete"), // Never happens
token: cancellation.Token);
Console.WriteLine("Any key to cancel");
Console.ReadKey();
cancellation.Cancel();
Thread.Sleep(1000);
}
}
If I run this and connect to localhost:2323 I can see that I get a sequence of connected tcpClients. If I however trigger the cancellation of the cancellationtoken the program exits without closing the listener and emitting the onCompleted event like I expect. What am I doing wrong?