-1

Can I do this or not? It seems Not.

List<Int32> Seq = new List<Int32>() { 5, 6, 7, 8 };
IObservable<Int32> obs_seq = Seq.ToObservable();

obs_seq.Subscribe(
    onNext:
    async (_seqMember) =>
    {
        Stopwatch DelayWatc = Stopwatch.StartNew();
        await Task.Delay(delay: TimeSpan.FromSeconds(3));
        Debug.WriteLine($"{nameof(DelayWatc.Elapsed.TotalMilliseconds)} {DelayWatc.Elapsed.TotalMilliseconds:N1}.");                        
    });

If I put the await after the Write, it prints all of them immediately.

If I put the await before the Write, it never prints anything.

I think nobody is waiting on Subscribe async lambda Task Action.

Somewhere around ten thousand Ticks it decides it can't wait anymore and just throws that lambda into the data hole. Or 5 ten Thousandths of a second.

await Task.Delay(delay: TimeSpan.FromSeconds( (Double) 1E-4 )); will produce 4 stopwatches (.1 millis, and 3 at .0009 millis).

await Task.Delay(delay: TimeSpan.FromSeconds( (Double) 5E-4 )); never produces any output

Strange at best. What's the semantics of this? What's the protocol? SHOULD THIS EVEN COMPILE IF SUBSCRIBE IS NOT GOING TO HONOR AWAIT THE ASYNC LAMBDA

UPDATE

The longer await DOES produce the output, but only after a Thread.Sleep is done, so again, what's the protocol? Should Thread.Sleep block the continuation but the UI message pump thread doesn't?

HOPEFUL FINAL UPDATE

IObservable<Int64> obs_interval = Observable.Interval(period: TimeSpan.FromSeconds(3));
obs_interval.Publish().Connect();

Task T = Task.Run(() =>
{
obs_interval.Latest().First();
});

while (!T.IsCompleted)
{
T.Wait(timeout: TimeSpan.FromMilliseconds(1E2));
Thread.Sleep(timeout: TimeSpan.FromMilliseconds(1E2));
Thread.Yield();
}

This will block each sequence until interval period (+- 1E2 millis), but there seems to be no way to yield the idle time back so as soon as Subscribe method call returns, all threads are blocked.

I feel.like Zip is a good solution but someone explain to me how the observation of the resulting sequence can stall for three seconds without blocking Thread sleep in my code. It must be hardcoded with elevated scheduling because my subscribe lambda is not honored.

What I want to do is put synchronous logic in async lambda that says task wait for nominal for some network latency then if not completed just send another CONGESTION PRODUCING ping on the next clock tick.

Andyz Smith
  • 698
  • 5
  • 20
  • Because you've marked the observer as async, the observer for each integer in the sequence is executed immediately without waiting for the previous to finish. I'm not sure that it's possible to have .Subscribe() wait for each one to asynchronously complete. – Klaycon Nov 12 '19 at 15:57
  • I took your example and run it. Looks like it runs as you expected. 4 lines after 3 sec. I can see only one case when it could not return to you the result, It is when you stop your application before Task are finish (3 sec). – Che Nov 12 '19 at 16:30
  • If I await 3 seconds before Write, each subscribe completes spontaneously after 10k ticks and then 2.9999 seconds is left over and never executes and never hits the Write. – Andyz Smith Nov 12 '19 at 17:27
  • @Che see above... – Andyz Smith Nov 12 '19 at 21:23
  • Related: What if I have two clock ticks that I want to honor? But I don't know in my code which one is longer, don't have the information, Do I just do Zip.Zip.Zip et cetera? Furthermore, why does Zip have magic limit of 16? – Andyz Smith Nov 14 '19 at 16:44

2 Answers2

0

This seems to be the answer, utilizing

Select...FromAsync...Concat...Subscribe

Howto call back async function from rx subscribe?

Although the clarity of the construct seems poor. Does Concat always behave the same way with delayed Observables? Will Concat always honor the async nature of it's FromAsync source?

Will this operate the same way on multiple platforms (Java/Windows)?

Andyz Smith
  • 698
  • 5
  • 20
0

This is what I'm looking for generally:

IObservable<Int32> obs_sync= Observable.Create<Int32>(subscribeAsync:
(async (obsrvr) =>
{
    Task _B = Task.Run(async () =>
    {
        while (DateTime.Now.Second % 2 != 0);
    });

     Task _AB = Task.Run(() =>
     {
        Observable.Interval(period: TimeSpan.FromMilliseconds(5E2)).Take(2);
     });

    while (!(_B.IsCompleted && _AB.IsCompleted))
    {
        await Task.Delay(TimeSpan.FromMilliseconds(15E1));
    }

obsrvr.OnNext(0);

}));

Stopwatch BUSWATCH = Stopwatch.StartNew();
obs_sync.Publish().Connect();
Task.Run(() => obs_sync.Repeat().Take(5).Wait()).Wait();
//Debug.WriteLine($"{nameof(DateTime.Now.Millisecond)} : {DateTime.Now.Millisecond:N2}.");
Debug.WriteLine($"{nameof(BUSWATCH.Elapsed.TotalMilliseconds)} : {BUSWATCH.Elapsed.TotalMilliseconds:N2}.");
Andyz Smith
  • 698
  • 5
  • 20