1

I have a static collection, say of tasks to call remote rest api:

static ConcurrentBag<Task<HttpResponseMessage>> _collection = new ConcurrentBag<Task<HttpResponseMessage>>();

static void Main(string[] args)
{
    Task.Factory.StartNew(() => Produce());
    Task.Factory.StartNew(() => Consume());

    Console.ReadKey();
}

One thread adds new items into it:

private static void Produce()
{
    while (true)
    {
        var task = HttpClientFactory.Create().GetAsync("http://example.com");
        _collection.Add(task);
        Thread.Sleep(500);
    }
}

And another thread should process those items:

private static void Consume()
{
    _collection.ToObservable()
               .Subscribe(
                   t => Console.WriteLine("++"),
                   ex => Console.WriteLine(ex.Message),
                   () => Console.WriteLine("Done"));
}

But it runs only once and completes prematurely. So output is;

++

Done

Community
  • 1
  • 1
abatishchev
  • 98,240
  • 88
  • 296
  • 433

1 Answers1

4

It would be interesting if it worked like that... but sadly it doesn't. The ToObservable extension method is defined on the IEnumerable<T> interface - so it's getting a point-in-time snap shot of the collection.

You need a collection than can be observed, such as ObservableCollection. With this, you can respond to add events to feed an Rx pipeline (perhaps by wiring the CollectionChanged event up with Observable.FromEventPattern). Bear in mind that this collection doesn't support concurrent adds. Such a technique is one way to "enter the monad" (i.e. obtain an IObservable<T>).

Equivalent is adding your request payloads to a Subject. Either way, you can then project them into asynchronous requests. So say (for arguments sake), your Produce signature looked like this:

private static async Task<HttpResponseMessage> Produce(string requestUrl)

Then you might construct an observable to convert the requestUrls to async web requests using your Produce method like so:

var requests = new Subject<string>();
var responses = requests.SelectMany(
    x => Observable.FromAsync(() => Produce(x)));

responses.Subscribe(
               t => Console.WriteLine("++"),
               ex => Console.WriteLine(ex.Message),
               () => Console.WriteLine("Done"));

And submit each request with something like:

requests.OnNext("http://myurl");

If you need concurrent adds, see Observable.Synchronize.

If you need to control the thread(s) that handle the responses, use ObserveOn which I wrote a lengthy explanation of here.

Community
  • 1
  • 1
James World
  • 29,019
  • 9
  • 86
  • 120
  • Added a note about `ObserveOn` – James World Sep 18 '14 at 07:39
  • Awesome! Thanks for the explanation of a point-in-time snapshot. That's definitely makes sense and seconds what I'm observing. – abatishchev Sep 18 '14 at 17:14
  • Can it use `Task` here? I can't have a string because it may be a GET, POST, etc. request. And would be more generic solution. I think I can make it working myself :) – abatishchev Sep 18 '14 at 17:16
  • The thing to bear in mind is that `Task` is really a different async paradigm - so you can, but it's probably worth using `ToObservable()` to convert to an `IObservable` - in which case your subject will be a `Subject>` (i.e. a stream of streams) which can be flattened with a call to `SelectMany(x => x)` to give a flat stream of `HttpResponseMessage`. – James World Sep 18 '14 at 17:30
  • I'm confused. Produce() is producing new requests. So OnNext() is too. Who is consuming them then? Thanks! – abatishchev Sep 18 '14 at 21:40
  • The subscriber(s) to the stream... in my answer (and your question), it's the code implementing the OnNext handler that is writing to the console. – James World Sep 19 '14 at 07:03