0

I have a subject that gets an int every 100 milliseconds. This number increments on each iteration. I subscribe to this subject, get chunks of 50 back and wait a few seconds. Example:

SequenceGenerator gen = new SequenceGenerator();
gen.subj.Buffer(50).Subscribe(
    async x =>
    {
        Console.WriteLine($"1: {x[x.Count - 1]}------");
        await Task.Delay(10000);
        Console.WriteLine($"1:------");
    },
    x => Console.WriteLine("EX"),
    () => Console.WriteLine("1 Done"));

The SequenceGenerator just does the following:

int i = 0;
while (true)
{
    ++i;
    subj.OnNext(i.ToString());
    await Task.Delay(1);
}

Every time the buffer is filled the "onnext" will be called. Even though the last "onnext" is not done yet. Is there a way to adjust this? I want the next "onnext" only do be called, if the last "onnext" is done. Actual behaviour:

1: 50------
(wait ~0.5s)
1: 100------
...

Expected behaviour:

1: 50------
(wait 10 seconds)
1: 100------
...

Is there a way of doing this? Thank you guys in edvance! Best regards.

Paulo Morgado
  • 14,111
  • 3
  • 31
  • 59
Zuendi
  • 139
  • 1
  • 8

1 Answers1

0

Ok I think I have a solution. I found a tip here. Zip helps me alot with this approach. I need another Subject that is ziped to my stream. At least this works:

SequenceGenerator gen = new SequenceGenerator();
Subject<bool> lockReleaser = new Subject<bool>();

Stopwatch watch = new Stopwatch();
watch.Start();
var test = gen.messageQueue.Buffer(TimeSpan.FromSeconds(4),5)
        .Zip(lockReleaser, (res, _) => res)
    .Subscribe(async x =>
    {
        if (x.Count > 0)
        {
            Console.WriteLine($"{watch.Elapsed}: {x[x.Count - 1]}------");
            await Task.Delay(10000);
        }
        lockReleaser .OnNext(true);
        Console.WriteLine($"1:------");
    },
    x => Console.WriteLine("EX"),
    () => Console.WriteLine("1 Done"));
lockReleaser.OnNext(true);
Zuendi
  • 139
  • 1
  • 8