2

The task

Suppose we implement an Angular service and need to publish an Observable<number[]> to the world:

numbers: Observable<number[]>;

We want subscribers to:

  1. receive the last value upon subscription
  2. receive the whole modified array every time we change and publish it

Since #1, internally the Observable<number[]> should be "at least" a BehaviorSubject<number[]>.

But what about #2? Let's suppose we need to implement a method publishNumbersChange() which is called whenever we need to change and publish the changed array:

private publishNumbersChange() {
    // Get current numbers array
    ...        

    changeArray();

    // Now publish changed numbers array
    ...
}

The question

What is the RxJS 5 pattern to implement the task of publishing modified array based on its previous items?

Since I'm asking it mainly because currently I'm doing Angular stuff, here is the second part of the question:
What code does Angular (and the like frameworks based on RxJS) use when they provide an Observable which type parameter is an array to subsequently publish updated array?
Do they just keep a copy of the currently published array separately?

Some thoughts

It seems that storing the underlying array separately, so we always have access to it, is the simplest thing. But at the same time it does not look like a RxJS way (need to have a state outside a RxJS stream).

On the other hand, we could do something like the following:

private publishNumbersChange() {
    // To get the latest value from the stream, we have to subscribe
    const subscription: Subscription = this.numbers.subscribe((numbers: number[]) => {
        // We got the last value in stream in numbers argument. Now make changes to the array
        changeArray();

        // And push it back as a new value to the stream
        this.numbers.next(numbers);
    });

    // Also we have to unsubscribe
    subscription.unsubscribe();
}

I see here at least one issue (not counting the complexity\reusability): "race condition" between executing subscription callback and unsubscribing. Looking at that code you can't tell for sure whether the callback would be actually executed. So it doesn't look as a proper way of doing this either.

Alexander Abakumov
  • 13,617
  • 16
  • 88
  • 129
  • Are you modifying the array in place then your observable wont be updated. If you are replacing the array in the observable then it should emit updates – Peter Sep 22 '17 at 16:22
  • @Peter: Yes, we apparently have to do `BehaviorSubject.next(numbers)` with a new array. The question is what is the pattern of getting the old array. modifying it, and publishing again? Is there a ready-to-use opearator in RxJS for that? – Alexander Abakumov Sep 22 '17 at 16:26

1 Answers1

8

It sounds like the operator you may be looking for is scan.

let arraySubject = new BehaviorSubject([]);
let array$ = arraySubject.scan((fullArray, newValue) => fullArray.concat([newValue]), [])

Scan accumulates values over time in an observable stream, and each item in the stream gets the last emitted value and the current value as parameters. executes a function on them and then emits the result. the above example takes a new value and appends it to your full array, the second parameter initializes it to an empty array.

This is clearly kind of restricting though since it only does ONE thing, which may not be robust enough. in this case you need to get clever:

let arraySubject = new BehaviorSubject([]);
let array$ = arraySubject.scan((fullArray, {modifier, payload}) => modifier(fullArray, payload), []);

Now you're passing in an "action" which has a modifier function, which defines how you want to modify the full array, and a payload of any additional data the modifier might need to go into the modifier function along with the full array

so you might do:

let modifier = (full, item) => full.splice(full.indexOf(item), 1);
arraySubject.next({modifier, payload: itemToRemove});

which removes the item you sent through. You can extend this pattern to literally any array modification.

A "gotcha" with scan though is that subscribers only get the accumulated value from the TIME THEY SUBSCRIBED. So, this will happen:

let arraySubject = new BehaviorSubject([]);
let array$ = arraySubject.scan((fullArray, {modifier, payload}) => modifier(fullArray, payload), []);
let subscriber1 = array$.subscribe();
//subscriber1 gets []
let modifier = (full, val) => full.concat([val]);
arraySubject.next({modifier, payload:1});
//subscriber1 gets [1]
arraySubject.next({modifier, payload:2});
//subscriber1 gets [1,2]
let subscriber2 = array$.subscribe();
//subscriber2 gets [2]
arraySubject.next({modifier, payload:3});
//subscriber1 gets [1,2,3]
//subscriber2 gets [2,3]

See what happened there? the only thing stored in the behaviorsubject was the second event, not the full array, scan is storing the full array, so the second subscriber only gets the second action since it wasn't subscribed during the 1st action. So you need a persistent subscriber pattern:

let arraySubject = BehaviorSubject([]);
let arrayModifierSubject = new Subject();
arrayModifierSubject.scan((fullArray, {modifier, payload}) => modifier(fullArray, payload), []).subscribe(arraySubject);

and you modify by calling next on arrayModifierSubject:

let modifier = (full, val) => full.concat([val]);
arrayModifierSubject.next({modifier, payload: 1});

and your subscribers get the array from the array source:

subscriber1 = arraySubject.subscribe();

In this set up, all array modifications go through the modifier subject who in turns broadcasts it to the behaviorsubject who stores the full array for future subscribers and broadcasts it to current subscribers. The behaviorsubject (the store subject) is persistently subscribed to the modifier subject (the action subject), and is the ONLY subscriber to the action subject, so the full array is never lost as the entire history of actions is always maintained.

some sample usages (with the above set up):

// insert 1 at end
let modifier = (full, value) => full.concat([value]);
arrayModifierSubject.next({modifier, payload: 1});

// insert 1 at start
let modifier = (full, value) => [value].concat(full);
arrayModifierSubject.next({modifier, payload: 1});

// remove 1
let modifier = (full, value) => full.splice(full.indexOf(value),1);
arrayModifierSubject.next({modifier, payload: 1});

// change all instances of 1 to 2
let modifier = (full, value) => full.map(v => (v === value.target) ? value.newValue : v);
arrayModifierSubject.next({modifier, payload: {target: 1, newValue: 2}});

you can wrap any of these functions in a "publishNumbersChange" function. How you exactly implement this depends on your needs, you can make functions like:

insertNumber(numberToInsert:number) => {
   let modifier = (full, val) => full.concat([val]);
   publishNumbersChange(modifier, numberToInsert);
}

publishNumbersChange(modifier, payload) => {
   arrayModifierSubject.next({modifier, payload});
}

or you can declare an interface and make classes and use that:

publishNumbersChange({modifier, payload}) => {
   arrayModifierSubject.next({modifier, payload});
}

interface NumberArrayModifier {
    modifier: (full: number[], payload:any) => number[];
    payload: any;
}

class InsertNumber implements NumberArrayModifier {
    modifier = (full: number[], payload: number): number[] => full.concat([payload]);
    payload: number;
    constructor(numberToInsert:number) {
        this.payload = numberToInsert;
    }
}

publishNumbersChange(new InsertNumber(1));

And you can also extend similar functionality to any array modification. One last protip: lodash is a huge help with defining your modifiers in this type of system

so, how might this look in an angular service context?

This is a very simple implementation that isn't highly reusable, but other implementations could be:

const INIT_STATE = [];
@Injectable()
export class NumberArrayService {
    private numberArraySource = new BehaviorSubject(INIT_STATE);
    private numberArrayModifierSource = new Subject();
    numberArray$ = this.numberArraySource.asObservable();

    constructor() {
        this.numberArrayModifierSource.scan((fullArray, {modifier, payload?}) => modifier(fullArray, payload), INIT_STATE).subscribe(this.numberArraySource);
    }

    private publishNumberChange(modifier, payload?) {
        this.numberArrayModifierSource.next({modifier, payload});
    }

    insertNumber(numberToInsert) {
        let modifier = (full, val) => full.concat([val]);
        this.publishNumberChange(modifier, numberToInsert);
    }

    removeNumber(numberToRemove) {
        let modifier = (full, val) => full.splice(full.indexOf(val),1);
        this.publishNumberChange(modifier, numberToRemove);
    }

    sort() {
        let modifier = (full, val) => full.sort();
        this.publishNumberChange(modifier);
    }

    reset() {
        let modifier = (full, val) => INIT_STATE;
        this.publishNumberChange(modifier);
    }
}

Usage here is simple, subscribers just subscribe to numberArray$ and modify the array by calling functions. You use this simple pattern to extend functionality however you like. This controls access to your number array and makes sure it is always modified in ways defined by the api and your state and your subject are always one in the same.

OK but how is this made generic/reusable?

export interface Modifier<T> {
    modifier: (state: T, payload:any) => T;
    payload?: any;
}

export class StoreSubject<T> {
    private storeSource: BehaviorSubject<T>;
    private modifierSource: Subject<Modifier<T>>;
    store$: Observable<T>;

    publish(modifier: Modifier<T>): void {
        this.modifierSource.next(modifier);
    }

    constructor(init_state:T) {
        this.storeSource = new BehaviorSubject<T>(init_state);
        this.modifierSource = new Subject<Modifier<T>>();
        this.modifierSource.scan((acc:T, modifier:Modifier<T>) => modifier.modifier(acc, modifier.payload), init_state).subscribe(this.storeSource);
        this.store$ = this.storeSource.asObservable();
    }
}

and your service becomes:

const INIT_STATE = [];
@Injectable()
export class NumberArrayService {
    private numberArraySource = new StoreSubject<number[]>(INIT_STATE);
    numberArray$ = this.numberArraySource.store$;

    constructor() {
    }

    insertNumber(numberToInsert: number) {
        let modifier = (full, val) => full.concat([val]);
        this.numberArraySource.publish({modifier, payload: numberToInsert});
    }

    removeNumber(numberToRemove: number) {
        let modifier = (full, val) => full.splice(full.indexOf(val),1);
        this.numberArraySource.publish({modifier, payload: numberToRemove});
    }

    sort() {
        let modifier = (full, val) => full.sort();
        this.numberArraySource.publish({modifier});
    }

    reset() {
        let modifier = (full, val) => INIT_STATE;
        this.numberArraySource.publish({modifier});
    }
}
bryan60
  • 28,215
  • 4
  • 48
  • 65
  • I'm sorry if I'm too newbie-ish in RxJS, but I still have no idea how to implement the `publishNumbersChange()` method body from my question. Could you post your full implementation of it that supports item insertions, modifications and deletions, please? – Alexander Abakumov Sep 22 '17 at 17:44
  • the implementation above does support all array modifications, you pass the function you want to use to modify the array as "modifier" and any data required for the modification (other than the full array) as "payload" into the next() call. You can pass in any array modification function you like with this pattern. I added some sample usages of how to manipulate your array – bryan60 Sep 22 '17 at 17:55
  • Though you've definitely got my +1 for your heavy work on this so far, it looks a way more complicated than just keeping a copy of a published array separately. Could you answer to the second part of my updated question (What pattern does Angular use for the same task)? – Alexander Abakumov Sep 22 '17 at 19:27
  • Angular doesn't have a pattern for this in it. This is an application architecture specific thing. This is very similar to the pattern used by ngrx though which is an application framework maintained by members of the angular team. What I'm suggesting may feel strange to you, and simply "storing state" may seem simpler, but the application in that format will become harder to reason about and maintain over time than this system. This system is not very different from simply storing the state, modifying it and rebroadcasting, but it is a different way of approaching things. – bryan60 Sep 22 '17 at 19:32
  • Further, the system I'm recommending is extremely reusable. The pattern can easily be encapsulated into a typed class with all methods needed. – bryan60 Sep 22 '17 at 19:33
  • `Angular doesn't have a pattern for this in it.`: Ok, let me paraphrase the question. When some Angular's class contains an `Observable` in its API, what is the code they use to publish updated arrays? – Alexander Abakumov Sep 22 '17 at 19:36
  • they just use the next() method of the underlying subject, this is how you publish new things with an observable. The expectation is that the next element is entirely new. If you need to accumulate values over time (as in continuously accumulating items in an array), the operator to use is scan as recommended (or reduce in certain situations). My system is the reactive way of accomplishing what you want to accomplish. – bryan60 Sep 22 '17 at 19:39
  • It's the key: I don't need to accumulate something. Let's look at a fairly generic example. You're implementing an Angular `Service` which provides some array to its users. Depending on some logic, it needs to update its users with the new array from time to time. That's it, no assumptions about accumulations or whatever the logic is behind the array changes. Just one array now, and completely different array next time. How to implement it with RxJS? – Alexander Abakumov Sep 22 '17 at 19:45
  • If you need the last value of the array, then you are accumulating over time, whether you are inserting or removing or mapping, you are still using past values to create future values, and this is what it means to accumulate. what I've written IS how you do this with rxjs. It is the reactive style of programming, it seems weird at first but if you embrace it your life will get easier. If you just wanted to broadcast an entirely new array each time with no regards to the past value, you would simply call next() with the new array and you would've never asked this question – bryan60 Sep 22 '17 at 19:48
  • this system is not very complex if you're comfortable with reactive programming concepts and patterns. There are simply 2 subjects, one that stores the state of the array and broadcasts to subscribers, and one that accumulates modifications of the array over time and broadcasts the end result of all those modifications to the store subject. That is really all there is to it. There is an unfamiliar disconnect where you're feeding the function for modifying the array into the action subject instead of just modifying the array and publishing, but this unfamiliarity is part of reactive programming – bryan60 Sep 22 '17 at 19:56
  • You need to write the code for modifying the array no matter what. The only question is where and how. This system is more uniform, more reusable and easier to maintain as your application grows in complexity. This is likely overkill for a simple app but if your app grows beyond a simple couple of screens, this style becomes essential. I've made a few enterprise level Angular apps, this is an very easy to maintain style of coding. You should check out the "weirdness" of things like ngrx or redux which use similar patterns. – bryan60 Sep 22 '17 at 20:04
  • `this system is not very complex if you're comfortable with reactive programming concepts and patterns. There are simply 2 subjects`: What is the benefit of this two-subject system compared to the {one vanilla array for preserving state + 1 subject for publishing updates to subscribers} system? – Alexander Abakumov Sep 22 '17 at 20:09
  • Ease of maintenance, future growth and predictability in how your code will behave. maintaining statefulness and mutating the state directly is a common source of bugs as it becomes possible for your state and your subject to become out of sync with one another. In this system that is simply impossible as they are one in the same. This is the heart of reactive adn functional programming. Once you get comfortable, things just work easier and more predictably. – bryan60 Sep 22 '17 at 20:12
  • It seems like for overcoming out-of-sync issue (at least in this specific case), instead of keeping the state separately (vanilla array), we can simply do `BehaviorSubject.getValue()` to get the last published array. (Disclaimer: I'm aware of [this](https://stackoverflow.com/a/45227115/3345644) :) – Alexander Abakumov Sep 22 '17 at 20:17
  • Subject.getValue() works for getting the last value of the array and is definitely better than independtly storing the array, but you still actually are exposed to a lot of the same issues. The issue is the same that you've moved modification of the array outside of the observable stream which could lead to strange and unintended behavior. As the author of the post you referenced said, it opens the door to unintended side effects. You are increasing your error surface to avoid becoming more comfortable with the reactive style – bryan60 Sep 22 '17 at 20:20
  • I added an example of what a very simple angular service could look like to hopefully clear this style up for you somewhat. – bryan60 Sep 22 '17 at 20:36
  • Suppose 2 consumers try to modify the array simultaneously, do you know what happens if you're storing local state or using getValue? I don't, and you can't, because it depend on the precise timing and the runtime of the functions. You're likely to lose one of the modifications. If you do it my way, you're guaranteed that both modifications happen in the order they were executed. – bryan60 Sep 22 '17 at 22:37