0

I have a function that can break down a message into multiple message chunks. I need these messages to be posted in order to my post function. However I do not want the Observable to block other posts that are incoming. My solution would be in some combination of of the concat operator inside a mergemap but I cannot seem to get it to work properly

I am not sure I can make a diagram but here is my attempt:

-1-2------3|->
--4--5--6|->
desired output:
[4,5,6]
[1,2,3]

I need request 1 to execute before 2 before 3 and 4 before 5 and before 6. In English I think I would have an observable of observables and I want that to map into observable streams and then map to a standard array for each observable output stream. I am just not sure how to do this exactly. I've been messing around with the code for a long time trying to conceptualize what I just stated and here is my best attempt:

    interface SendInfo {
        message: discord.Message
        content: string
        options?: discord.MessageOptions
    }
    export const sendMessage$: Subject<SendInfo> = new Subject();

    const regex = /[\s\S]{1,1980}(?:\n|$)/g;
    export const sentMessages$ = sendMessage$.pipe(
        mergeMap(
            (input: SendInfo):
            Observable<(discord.Message | discord.Message[] | null)[]> => {
                const chunks: string[] = input.content.match(regex) || [];
                const superObservable: Observable<Observable<discord.Message | discord.Message[] | null>> = concat(chunks.map(
                    (chunk: string):
                    Observable<discord.Message | discord.Message[] | null> => {
                        const bound = input.message.channel.send.bind(
                            undefined,
                            chunk,
                            input.options,
                        );
                        return Network.genericNetworkObservable<discord.Message | discord.Message[]>(
                            bound,
                        );
                    }
                ));

                return superObservable.pipe(
                    mergeMap(e => e),
                    toArray(),
                );
            }
        ),
        tap((e): void => Utils.logger.fatal(e)),
        share(),
    );

My output:

[2019-10-21T17:24:15.322] [FATAL] messageWrapper.ts:72 - [ { channel: { send: [Function] }, content: 'msg1' } ]
[2019-10-21T17:24:15.324] [FATAL] messageWrapper.ts:72 - [ { channel: { send: [Function] }, content: 'msg2' } ]
[2019-10-21T17:24:15.325] [FATAL] messageWrapper.ts:72 - [ { channel: { send: [Function] }, content: 'msg3' } ]

I feel like I'm close to a solution but I cannot figure out how to exactly merge this into a single array. I also don't know if it is functionally correct or not.

kmm3
  • 391
  • 3
  • 14

2 Answers2

1

I have tackled the preserving order thing before in this question RxJS: MergeMap with Preserving Input order

So using my parallelExecute you could then reduce the values to an array.

parallelExecute(...yourObservables).pipe(
  reduce((results, item) => [...results, item], [])
);

here is the parallelExecute function.

const { BehaviorSubject, Subject } = rxjs;
const { filter } = rxjs.operators;

const parallelExecute = (...obs$) => {
  const subjects = obs$.map(o$ => {
    const subject$ = new BehaviorSubject();
    const sub = o$.subscribe(o => { subject$.next(o); });
    return { sub: sub, obs$: subject$.pipe(filter(val => val)) };
  });
  const subject$ = new Subject();
  sub(0);
  function sub(index) {
    const current = subjects[index];
    current.obs$.subscribe(c => {
      subject$.next(c);
      current.obs$.complete();
      current.sub.unsubscribe();
      if (index < subjects.length -1) {
        sub(index + 1);
      } else {
        subject$.complete();
      }
    });
  }
  return subject$;
}
Adrian Brand
  • 20,384
  • 4
  • 39
  • 60
  • I am having trouble with the reduction. It times out my test bench (2000 ms). I can run flatmap on it and it will work ``map((x) => [x].flatMap(x=>x))`` but it doesn't give me the full array. – kmm3 Oct 22 '19 at 14:20
  • Ok I got it working. There were errors when converting to Typescript so I must have introduced a bug while converting it. – kmm3 Oct 23 '19 at 01:06
  • Question: Does this fire the events in sequence? That is what I need. – kmm3 Oct 23 '19 at 02:39
  • That is the point, to emit the responses in the order they were fired in, not the order they finished in. – Adrian Brand Oct 23 '19 at 03:05
  • I want them to _fire and complete_ in order and return an array of the results – kmm3 Oct 23 '19 at 07:54
  • Read my answer and there is a reduce statement to turn the results into an array. – Adrian Brand Oct 23 '19 at 09:44
  • I think there is a misunderstanding here by me or you. I want the requests to be chained in serial fashion not parallel so that one completes after the other and so forth. But I also want to do multiple of these serial chains in parallel. I believe I have the solution for that anyway now I'll post it. – kmm3 Oct 23 '19 at 09:50
  • If you want them in serial just switchMap recursively. – Adrian Brand Oct 23 '19 at 21:43
  • I did something like that with promises in my answer. – kmm3 Oct 24 '19 at 05:42
0

I figured out the operator I was looking for was combineAll() and not toArray() after a concat statement. I have another implementation as well with promises. Now I believe both of these should work but I will post the one I'm more sure of first which is the promises.

Implementation one using promises:

    export const sentMessages$ = sendMessage$.pipe(
        mergeMap(
            (input: SendInfo):
            Observable<(discord.Message | null)[]> => {
                const chunks: string[] = input.content.match(regex) || [];
                const observables: Observable<(discord.Message | null)[]>[] = chunks.map(
                    (chunk: string):
                    Observable<(discord.Message | null)[]> => {
                        const bound = input.message.channel.send.bind(
                            undefined,
                            chunk,
                            input.options,
                        );
                        // eslint-disable-next-line max-len
                        return Network.genericNetworkObservable<discord.Message | discord.Message[]>(
                            bound,
                        ).pipe(
                            // eslint-disable-next-line comma-dangle
                            map((x): (discord.Message | null)[] => [x].flatMap(
                                (t): (discord.Message | discord.Message[] | null) => t
                            ))
                        );
                    }
                );

                const promises = observables
                    .map(
                        (obs: Observable<(discord.Message | null)[]>):
                        Promise<(discord.Message | null)[]> => obs.toPromise()
                    );

                const reduced = promises
                    .reduce(async (promiseChain, currentTask):
                    Promise<(discord.Message | null)[]> => [
                        ...await promiseChain,
                        ...await currentTask,
                    ].flatMap((x): (discord.Message | null) => x));

                return from(reduced);
            }
        ),
        share(),
    );

Implementation two pure RxJs:

    export const sentMessages$ = sendMessage$.pipe(
        mergeMap(
            (input: SendInfo):
            Observable<(discord.Message | null)[]> => {
                const chunks: string[] = input.content.match(regex) || [];
                const observables: Observable<(discord.Message | null)[]>[] = chunks.map(
                    (chunk: string):
                    Observable<(discord.Message | null)[]> => {
                        const bound = input.message.channel.send.bind(
                            undefined,
                            chunk,
                            input.options,
                        );
                        // eslint-disable-next-line max-len
                        return Network.genericNetworkObservable<discord.Message | discord.Message[]>(
                            bound,
                        ).pipe(
                            // eslint-disable-next-line comma-dangle
                            map((x): (discord.Message | null)[] => [x].flatMap(
                                (t): (discord.Message | discord.Message[] | null) => t
                            ))
                        );
                    }
                );

                return concat(observables).pipe(
                    combineAll(),
                    map(x => x.flatMap(t => t)),
                );
            }
        ),
        share(),
    );

I believe the promises ones will work because it's reduced into a explicit chain. I am not sure if I am missing some nuance with the concat operator so I'm not 100% sure it will work. The test-bench I wrote is actually not working properly due to a misunderstanding of how promises execute with the defer operator in RxJs but I am getting the expected order according to my bench. I believe my misunderstanding was the reason I didn't come up with these solutions easily.

[2019-10-23T06:09:13.948] [FATAL] messageWrapper.ts:109 - [
  { channel: { send: [Function] }, content: 'msg7' },
  { channel: { send: [Function] }, content: 'msg8' },
  { channel: { send: [Function] }, content: 'msg9' }
]
[2019-10-23T06:09:14.243] [FATAL] messageWrapper.ts:109 - [
  { channel: { send: [Function] }, content: 'msg4' },
  { channel: { send: [Function] }, content: 'msg5' },
  { channel: { send: [Function] }, content: 'msg6' }
]
[2019-10-23T06:09:14.640] [FATAL] messageWrapper.ts:109 - [
  { channel: { send: [Function] }, content: 'msg1' },
  { channel: { send: [Function] }, content: 'msg2' },
  { channel: { send: [Function] }, content: 'msg3' }
]
      ✓ should execute concurrently. (753ms)
kmm3
  • 391
  • 3
  • 14