1

I was trying to implement observable waiting for onNext action to complete before proceeding with next operation. Only way I found working is using SemaphoreSlim. Does Reactive have any way to do that without use of SemaphoreSlim? I couldn't find any.

 SemaphoreSlim _semaphoreSlim = new SemaphoreSlim(1,1);
  subject.Window(() => subject.Throttle(TimeSpan.FromMilliseconds(500)))
            .SelectMany(c => c.ToList())
            .Subscribe(async x =>
                {
                    await _semaphoreSlim.WaitAsync();
                    try
                    {
                        //await Async code here
                    }
                    finally
                    {
                        _semaphoreSlim.Release();
                    }
                });

Forcing Subscribe to be sync

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
Vincent P
  • 33
  • 6
  • 1
    Take a look at these: [1](https://stackoverflow.com/questions/23006852/how-to-call-back-async-function-from-rx-subscribe), [2](https://stackoverflow.com/questions/24843000/reactive-extensions-subscribe-calling-await), [3](https://stackoverflow.com/questions/37129159/subscribing-to-observable-sequence-with-async-function). The core Rx interfaces are synchronous. An alternative version of Rx with async interfaces has been implemented ([AsyncRx](https://github.com/dotnet/reactive/tree/main/AsyncRx.NET)), but [has not been released yet](https://github.com/dotnet/reactive/issues/1118). – Theodor Zoulias Mar 15 '21 at 09:28

1 Answers1

2

To start with I created a sample Media_Load_Async that simulates what you're doing.

static int max = 5;
static Random random = new Random();
static TimeSpan[] delays = Enumerable.Range(0, max).Select(x => TimeSpan.FromSeconds(random.Next(5) + 2)).ToArray();

async Task<Unit> Media_Load_Async(int index, TimeSpan delay)
{
    Console.WriteLine($"Start {index} - {delay}");
    await Task.Delay(delay);
    Console.WriteLine($"End {index}");
    return Unit.Default;
}

If I run this simple query then I get the results that you're trying to avoid:

Observable
    .Range(0, max)
    .Subscribe(async x => await Media_Load_Async(x, delays[x]));
Start 0 - 00:00:05
Start 1 - 00:00:06
Start 2 - 00:00:02
Start 3 - 00:00:06
Start 4 - 00:00:04
End 2
End 4
End 0
End 3
End 1

If I put in your SemaporeSlim code I get what I think you want:

SemaphoreSlim _semaphoreSlim = new SemaphoreSlim(1, 1);
Observable
    .Range(0, max)
    .Subscribe(async x =>
    {
        await _semaphoreSlim.WaitAsync();
        try
        {
            await Media_Load_Async(x, delays[x]);
        }
        finally
        {
            _semaphoreSlim.Release();
        }
    });
Start 0 - 00:00:04
End 0
Start 1 - 00:00:03
End 1
Start 2 - 00:00:06
End 2
Start 3 - 00:00:04
End 3
Start 4 - 00:00:03
End 4

Next, if I move the call to Media_Load_Async to inside the query itself then I'm back to the original issue:

Observable
    .Range(0, max)
    .SelectMany(x => Observable.FromAsync(() => Media_Load_Async(x, delays[x])))
    .Subscribe();
Start 0 - 00:00:06
Start 1 - 00:00:02
Start 2 - 00:00:06
Start 3 - 00:00:05
Start 4 - 00:00:02
End 4
End 1
End 3
End 0
End 2

But if I change out the SelectMany for a Select/Concat pair then I get what you want without a SemaphoreSlim:

Observable
    .Range(0, max)
    .Select(x => Observable.FromAsync(() => Media_Load_Async(x, delays[x])))
    .Concat()
    .Subscribe();
Start 0 - 00:00:04
End 0
Start 1 - 00:00:05
End 1
Start 2 - 00:00:02
End 2
Start 3 - 00:00:06
End 3
Start 4 - 00:00:06
End 4
Enigmativity
  • 113,464
  • 11
  • 89
  • 172
  • I was trying your solution, but unfortunately Observable.FromAsync gives me troubles because of being invoked on another SynchronizationContext. I need to observe on UI Thread. – Vincent P Mar 24 '21 at 14:38
  • So the solution you've pointed worked with a little adjustment. I'll leave code here in case: subject.Window(() => subject.Throttle(TimeSpan.FromMilliseconds(500))) .SelectMany(c => c.ToList()) .Select(c => (reloadMap: c.Any(z => z.reloadMap), c.LastOrDefault(z => z.rowIndex.HasValue).rowIndex)) .Select(x => Observable.FromAsync(() => (Task)this.Invoke(new MethodInvoker(()=> x.rowIndex.HasValue ? Media_Load_Async(x.rowIndex.Value) : Task.CompletedTask)))) .Concat() .Subscribe(); – Vincent P Mar 24 '21 at 15:16