As some of the other posters have mentioned this can be implemented with Rx. With Rx the function will return an IObservable<Data>
which can be subscribed to and it pushes data to the subscriber as it becomes available. IObservable
also supports LINQ and adds some extension methods of its own.
Update
I added a couple of generic helper methods to make the usage of the reader reusable as well as support for cancellation.
public static class ObservableEx
{
public static IObservable<T> CreateFromSqlCommand<T>(string connectionString, string command, Func<SqlDataReader, Task<T>> readDataFunc)
{
return CreateFromSqlCommand(connectionString, command, readDataFunc, CancellationToken.None);
}
public static IObservable<T> CreateFromSqlCommand<T>(string connectionString, string command, Func<SqlDataReader, Task<T>> readDataFunc, CancellationToken cancellationToken)
{
return Observable.Create<T>(
async o =>
{
SqlDataReader reader = null;
try
{
using (var conn = new SqlConnection(connectionString))
using (var cmd = new SqlCommand(command, conn))
{
await conn.OpenAsync(cancellationToken);
reader = await cmd.ExecuteReaderAsync(CommandBehavior.CloseConnection, cancellationToken);
while (await reader.ReadAsync(cancellationToken))
{
var data = await readDataFunc(reader);
o.OnNext(data);
}
o.OnCompleted();
}
}
catch (Exception ex)
{
o.OnError(ex);
}
return reader;
});
}
}
The implementation of ReadData
is now greatly simplified.
private static IObservable<Data> ReadData()
{
return ObservableEx.CreateFromSqlCommand(connectionString, "select * from Data", async r =>
{
return await Task.FromResult(new Data()); // sample code to read from reader.
});
}
Usage
You can subscribe to the Observable by giving it an IObserver
but there is also overloads that take lambdas. As data becomes available the OnNext
callback gets called. If there is an exception, the OnError
callback gets called. Finally, if there is no more data the OnCompleted
callback gets called.
If you want to cancel the observable, simply dispose of the subscription.
void Main()
{
// This is an asyncrhonous call, it returns straight away
var subscription = ReadData()
.Skip(5) // Skip first 5 entries, supports LINQ
.Delay(TimeSpan.FromSeconds(1)) // Rx operator to delay sequence 1 second
.Subscribe(x =>
{
// Callback when a new Data is read
// do something with x of type Data
},
e =>
{
// Optional callback for when an error occurs
},
() =>
{
//Optional callback for when the sequenc is complete
}
);
// Dispose subscription when finished
subscription.Dispose();
Console.ReadKey();
}