1

I've following interfaces and Observable<Machine[]>, what I want to achive is group by Machine symbol property in Observable<Machine[]> and return mapped observable Observable<Order[]>.

export interface Machine {
    symbol: string;
    price: number;
    amount: number;
    id: number;
}

export interface Order {
    symbol: string;
    machines: OrderMachine[];
}

export interface OrderMachine {
    price: number;
    amount: number;
    id: number;
}

I've tried to use RxJS gropBy operator but it seems it return grouped array one by one.

machines: Machine[] = [
        { amount: 1,  id: 1, symbol: "A", price: 1 },
        { amount: 1,  id: 2, symbol: "A", price: 2 }
    ];


of(machines).pipe(
        takeUntil(this.unsubscribe),
        mergeMap(res => res),
        groupBy(m => m.symbol),
        mergeMap(group => zip(of(group.key), group.pipe(toArray()))),
        map(x => { // here I have probably wrong model [string, Machine[]]
            const orderMachines = x[1].map(y => { return <OrderMachine>{price: y.price, amount: y.amount, id: y.id }})
            return <Order>{ symbol: x[0], machines: orderMachines }  })
        );

as in result I have Observable<Order> istead ofObservable<Order[]>.

expected result model:

orders: Order[] = [
        {   
            symbol: "A", 
            machines: [
                { amount: 1, price: 1, id: 1 },
                { amount: 1, price: 2, id: 2 }
            ]
        }
    ];
Mateusz S.
  • 21
  • 5

2 Answers2

3

Here a possible solution based on your approach but with a few changes:

const machines = [
  { amount: 1, id: 1, symbol: "A", price: 1 },
  { amount: 1, id: 2, symbol: "A", price: 2 },
  { amount: 1, id: 3, symbol: "B", price: 3 }
];

from(machines) // (1)
  .pipe(
    // (2)
    groupBy((m) => m.symbol),
    mergeMap((group) => group.pipe(toArray())),
    map((arr) => ({
      symbol: arr[0].symbol, // every group has at least one element
      machines: arr.map(({ price, amount, id }) => ({
        price,
        amount,
        id
      }))
    })),
    toArray(), // (3)
  )
  .subscribe(console.log);

(1) I changed of(machines) to from(machines) in order to emit the objects from machines one by one into the stream. Before that change the whole array was emitted at once and thus the stream was broken.

(2) I removed takeUntil(this.unsubscribe) and mergeMap(res => res) from the pipe since there is no reason to have them in your example. takeUntil wouldn't have any effect since the stream is finite and synchronous. An identity function (res => res) applied with mergeMap would make sense in a stream of streams which is not the case in your example. Or do you actually need these operators for your project because you have an infinite stream of observables?

(3) toArray() is what transforms Observable<Order> to Observable<Order[]>. It waits until the stream ends and emits all streamed values at once as an array.

edit:

The op has mentioned that he rather needs a solution that is compatible with an infinite stream but because toArray only works with finite streams the provided answer above would never emit anything in such scenario.

To solve this I would avoid using groupBy from rxjs. It cvan be a very powerful tool in other cases where you need to split one stream into several groups of streams but in your case you simply want to group an array and there are easier methods for that that.

this.store.pipe(
    select(fromOrder.getMachines)
    map((arr) =>
        // (*) group by symbol
        arr.reduce((acc, { symbol, price, amount, id }) => {
            acc[symbol] = {
                symbol,
                machines: (acc[symbol] ? acc[symbol].machines : [])
                    .concat({ price, amount, id })
            };
            return acc;
        }, {})
    ),
)
.subscribe((result) => 
    // (**)
    console.log(Object.values(result))
);

(*) you could use a vanilla groupBy implementation that returns an object of the shape {[symbol: string]: Order}.

(**) result is an object here but you can convert it to an array easily but applying Object.values(result)

kruschid
  • 759
  • 4
  • 10
0

@kruschid Thank you very much for your reply, it works properly but unfortynetelly, it doesn't work when I want to use it with my store (ngrx), type is ok but it stops to show log after mergeMap method:

   this.store.pipe(select(fromOrder.getMachines),
                     mergeMap(res => res), // Machine[]
                     groupBy((m) => m.symbol),
                     tap(x => console.log(x)), //this shows object GroupedObservable {_isScalar: false, key: "A", groupSubject: Subject, refCountSubscription: GroupBySubscriber} 
                     mergeMap((group) => group.pipe(toArray())),
                     tap(x => console.log(x)), // this is not printed in console
                     map((arr) => <Order>({
                        symbol: arr[0].symbol,
                        machines: arr.map(({ price, amount, id }) => ({
                            price,
                            amount,
                            id
                        }))
                        })),
                     toArray())) // (3)
Mateusz S.
  • 21
  • 5
  • I've never used ngrx before but my guess is: `mergeMap(res => res)` breaks the stream. I had to look up the signature of `select` and found out that the return type would be `Obsevable` in your specific case, right? That would mean that you can remove `mergeMap(res => res)`. Using `mergeMap` at that point would only make sense if `select` returns `Observable>`. Only then is a `mergeMap` with an identity function `x => x` (or even shorter a `mergeAll`) is usually needed: to flatten a stream of streams (`Observable> -> Observable`). – kruschid Mar 16 '21 at 14:27
  • yes, you're right `select` return `Observable`, but if I remove `mergeMap(res => res)` then `groupBy` `m` parameter is `Machine[]` not `Machine` – Mateusz S. Mar 16 '21 at 18:50
  • you are right. I was not aware that `mergeMap` can also be used to flatten arrays. Good point. Your code doesn't work as expected because most likely `this.store` is an infinite stream but `toArray()` (inside `mergeMap`) waits for for a `complete` signal that never arrives. Try to add `take(1)` before the first `mergeMap` to send a `complete` signal after picking one sample of your latest state. – kruschid Mar 16 '21 at 19:11
  • I am afraid that I cannot use `take(1)` because it must be working with infinite stream and after this changes it return data only once (in my case is empty array) and stop 'listening' for the changes, is there any other option? – Mateusz S. Mar 16 '21 at 19:29