0

I have started to look at ReactiveX and can't figure out if it would be a good fit for a problem I'm trying to solve because either I don't now enough about ReactiveX or it doesn't have what I need.

Let's say that I'm constantly receiving messages that could be of 20 different types. All messages should be first saved to a database. Then I will need some further analysis. I am interested in types A, B, C and D coming in that order (doesn't have to be one after another). When message A comes that should be considered as start of a process that I need to trigger. Then I should wait for message B (any other message type can arrive while waiting) to arrive and to execute step in the process. After message B I wait for message C and execute step in the process. Then I wait for message D which marks the end of the process. Then I need to start over and wait for message A which starts the new process.

I am using .NET, but code from any platform would probably be ok to figure out how (or if) this can be done.

UPDATE: Giving some more context

Using @Enigmativity sample code I'll try to expand this question a little bit. Messages are produced by devices. So let's assume that in "A1,B2,B1,C1, F3,...." stream first letter is message type and the number is ID of a device. So messages A, B, C and D needs to be of a same device to be considered as a match. Server always gets all the messages because device will repeat them until it gets confirmation. This is what single device can produce (stream can contain messages mixed up messages from all devices):

A1,B1,H1,F1,A1 - here device restarted before completing whatever it's doing so first A1,B1 should be ignored and we now start over waiting for A, B, C and D.

A1,B1,C1,B1 - this can not happen. A1 will always come before B, C or D. It may not get to D sometimes, but then it will start over.

2 Answers2

0

To the extent that Rx.NET's API tracks with RxJS, this is straightforward. Assuming we already have an observable of all messages:

const ofType = theType => filter(({type}) => type === theType);

const a$ = messages$.pipe(ofType('a'));
const b$ = messages$.pipe(ofType('b'));
const c$ = messages$.pipe(ofType('c'));
const d$ = messages$.pipe(ofType('d'));

const handleB$ = b$.pipe(take(1), concatMap(bMsg => /* do the b step */));
const handleC$ = c$.pipe(take(1), concatMap(cMsg => /* do the c step */));
const waitForD$ = d$.pipe(take(1));

const process$ = a$.pipe(
  // while we are handling this "a" message, ignore other "a" messages
  exhaustMap((aMsg) => {
    // these will execute sequentially. once complete, we go back to
    // listening for "a" messages
    return concat(handleB$, handleC$, waitForD$);
  })
);

Note that process$ is an observable that will emit the results of the "b" and "c" step, as well as the "d" message. This output can be ignored or suppressed if you prefer.


I see that Rx.NET might be missing an exhaustMap implementation. Here's an SO question addressing this.

backtick
  • 2,685
  • 10
  • 18
0

I'm not sure, from your description, if there is any guarantee that you'll always get each of the message types A, B, C, and D, without getting another set or overlapping values. I've got two approaches in case there are issues with restarting if a second A comes before the final D, for example.

Here's my basic code set up:

var subject = new Subject<string>();

IObservable<(string a, string b, string c, string d)> query = ...

query.Subscribe(x => Console.WriteLine($"{x.a} {x.b} {x.c} {x.d}"));

"A1,B1,A2,C1,F1,D1,A3,A4,B2,B3,A5,C2,B4,F2,D2,D3,C3,D3"
    .Split(',')
    .ToObservable()
    .Subscribe(subject);

Here's the case when everything is in sequence and perfectly matching (albeit with other types of messages interspersed:

IObservable<(string a, string b, string c, string d)> query =
    subject
        .Do(x => { /* Save here */ })
        .Publish(ss =>
        {
            var ssa = ss.Where(s => s[0] == 'A');
            var ssb = ss.Where(s => s[0] == 'B');
            var ssc = ss.Where(s => s[0] == 'C');
            var ssd = ss.Where(s => s[0] == 'D');
            return Observable.When(
                ssa
                    .And(ssb)
                    .And(ssc)
                    .And(ssd)
                    .Then((a, b, c, d) => (a: a, b: b, c: c, d: d)));
        });

This query uses the very powerful, but rarely used pattern/plan queries (aka joins) in Rx.

If you do have issues with needing to reset when the messages come out of order and you need the latest ones, then I think this works:

IObservable<(string a, string b, string c, string d)> query =
    subject
        .Do(x => { /* Save here */ })
        .Publish(ss =>
                ss
                    .Where(s => s[0] == 'A')
                    .Select(sa => ss.Where(s => s[0] == 'B').Select(sb => (a: sa, b: sb)))
                    .Switch()
                    .Select(sab => ss.Where(s => s[0] == 'C').Select(sc => (a: sab.a, b: sab.b, c: sc)))
                    .Switch()
                    .Select(sabc => ss.Where(s => s[0] == 'D').Select(sd => (a: sabc.a, b: sabc.b, c: sabc.c, d: sd)))
                    .Switch());

The first query gives this:

A1 B1 C1 D1
A2 B2 C2 D2
A3 B3 C3 D3

All nice and paired up.

The second one gives this:

A1 B1 C1 D1
A4 B3 C2 D2
A4 B3 C2 D3
A5 B4 C3 D3
Enigmativity
  • 113,464
  • 11
  • 89
  • 172
  • Thank you for your answer. I have learned a lot! Since you addressed the question of overlapping values I'll update my question to give a little bit more context so maybe then you can help me some more. – user3084198 Apr 30 '21 at 07:31