1

I need to merge 2 arrays using UniRx in order to get Observable which emits first elements of arrays then second elements and so on, then emits the rest of the longest array

I tried Zip but Zip cuts the tail of longest array I tried Merge with Scheduler.DefaultSchedulers.Iteration but it starts a parallel threads which I don't want

var x1 = new[] {1, 2, 3}.ToObservable();
var x2 = new[] {4, 5, 6, 7, 8, 9}.ToObservable();
var merge = x1.Merge(x2);
merge.Subscribe(i => print(i));

I expected 1 4 2 5 3 6 7 8 9 I got 1 2 3 4 5 6 7 8 9

  • I suspect this is a bug in the UniRx package. Merge is what you want, and when I run your example with the dotnet/reactive package I get the expected answer. – Cameron MacFarland Aug 15 '19 at 12:41
  • 1
    That would be an implementation detail. Merge doesn’t make any promises about ordering with cold, on-subscribe items. – Shlomo Aug 15 '19 at 14:04
  • 1
    @Ilya Bokovenko The question is unclear. Why do you need RX for that? Why not use an IEnumerable based solution? If you need an implementation for IObservable I suggest you to provide a marble diagram what you actually want to happen and that cover all cases that can happen, e.g. multiple values on x1 before x2 provides values and vice versa etc. – Felix Keil Aug 16 '19 at 11:10

3 Answers3

1

This works for me as you expect:

var a1 = new int[] { 1, 2, 3 };
var a2 = new int[] { 4, 5, 6, 7, 8, 9 };

var x1 = a1.Select(x => (int?)x).ToObservable().Concat(Observable.Repeat((int?)null));
var x2 = a2.Select(x => (int?)x).ToObservable().Concat(Observable.Repeat((int?)null));

var query =
    x1
        .Zip(x2, (i1, i2) => new [] { i1, i2 })
        .TakeWhile(xs => !(xs[0] == null && xs[1] == null))
        .SelectMany(xs => xs)
        .Where(x => x != null)
        .Select(x => x.Value);

query

Enigmativity
  • 113,464
  • 11
  • 89
  • 172
0

If you know the length of the arrays, you could concatentate the Zip sequence with the other two and skip the number of already zipped elements. This is C# but should get you the idea:

var a1 = new int[] { 1, 2, 3 };
var a2 = new int[] { 4, 5, 6, 7, 8, 9 };

var x1 = a1.ToObservable();
var x2 = a2.ToObservable();

int skip = Math.Min(a1.Length, a2.Length);
Observable.Zip(x1, x2).SelectMany(x => x)
    .Concat(x1.Skip(skip))
    .Concat(x2.Skip(skip))
    .Subscribe(i => Console.WriteLine(i));

Thank you, this code works for arrays. But how to implement this operation for generic IObservable<int> in which case Length is not accessible?

You can always count the number of elements in the zipped observable:

var x1 = new int[] { 1, 2, 3 }.ToObservable();
var x2 = new int[] { 4, 5, 6, 7, 8, 9 }.ToObservable();

var zip = Observable.Zip(x1, x2);
int skip = await zip.Count().LastAsync();

zip.SelectMany(x => x)
    .Concat(x1.Skip(skip))
    .Concat(x2.Skip(skip))
    .Subscribe(i => Console.WriteLine(i));
mm8
  • 163,881
  • 10
  • 57
  • 88
  • Thank you, this code works for arrays. But how to implement this operation for generic IObservable in which case Length is not accessible? (I must formulate problems more clear ) – Ilya Bokovenko Aug 15 '19 at 18:15
0

The solution posted by @mm8 will work with two Observables with a defined end (like arrays masquerading as observables), but will hang forever with infinite observables, which are a common case. For example:

var odds = Observable.Interval(TimeSpan.FromMilliseconds(10))
    .Select(i => i * 2 + 1);
var evens = Observable.Interval(TimeSpan.FromMilliseconds(10))
    .Select(i => i * 2);

var zip = Observable.Zip(evens, odds);
int skip = await zip.Count().LastAsync();

// Hangs forever here.

If you're looking to interleave arrays, the best way to do it is to interleave as arrays. Rx is meant to work on Observables. This isn't an observable problem.

Shlomo
  • 14,102
  • 3
  • 28
  • 43