2

Situation:
I've encountered a use case for the rxjs Observable system, where I may need to add piped commands to a Subscription after it has been started.

In my case, the application I'm working on has to passively listen to a push notification system. A number of messages can be pushed out over this system, which my system needs to respond to. However, there's a foreseeable case where a dynamically-loaded view that will be implemented in the future will need to add a listener to the push notification system.

Question:
Given that my app is in a state where my Subscription already exists, can I add an additional pipe after .subscribe(() => {}) has been invoked?

// this.something is an Observable<any>, for discussion purposes.
const subscription = this.something.subscribe(() => { // commands });

this.something.pipe(
  map((something) => {
    // ...Commands that I want to add to the subscription...
  })
);

...And if I do that, then what happens, if anything?

Solution:
The two answers by @user2216584 and @SerejaBogolubov both had an aspect of the answer to this question.

My high-level push notification listener service needed to to do two things:

  1. Hold onto the subscription, and
  2. Be able to draw from a list of listeners.

The complication is that each listener needs to be listening for a different message. Put differently, if I receive a message on foo_DEV, the app needs to do something different than if the push notification system pushes a message on bar_DEV.

So, here's what I came up with:

export interface PushNotificationListener {
  name: string,
  onMessageReceived: (msg: PushNotificationMessage) => any,
  messageSubject$: Subject<PushNotificationMessage>
}

export class PushNotificationListenerService {
  private connection$: Observable<PushNotificationConnection>;
  private subscription$: Subscription;

  private listeners: PushNotificationListener[] = [];

  constructor(
    private connectionManager: PushNotificationConnectionManager
  ) {
  }

  connect() {
    // Step 1 - Open the socket connection!
    this.connection$ = this.connectionManager.connect(
      // The arguments for setting up the websocket are unimportant here.
      // The underlying implementation is similarly unimportant.
    );
  } 

  setListener(
    name: string,
    onMessageReceived: (msg: PushNotificationMessage) => any
  ) {
    // Step 3...or maybe 2...(shrug)...
    // Set listeners that the subscription to the high-order connection
    // will employ.
    const newListener: PushNotificationListener = {
      name: name,
      onMessageReceived: onMessageReceived,
      messageSubject$: null
    };

    this.listeners.push(newListener);
  }

  listen() {
    // Step 2 - Listen for changes to the high-order connection observable.
    this.subscription$ = this.connection$
      .subscribe((connection: PushNotificationConnection) => {
        console.info('Push notification connection established');

        for (let listener of this.listeners) {
         listener.messageSubject$ = connection.subscribe(listener.name);
         listener.messageSubject$.subscribe((message: PushNotificationMessage) => {
           listener.onMessageReceived(message);
         }
        }
      },
      (error: any) => {
        console.warn('Push notification connection error', error);
      }
  }
}

I discovered through careful study of the internal code that comprises the core of my push notification system, that we've already got a higher-order Observable. The websocket code creates an observable (connectionManager.connect()), that needs to be cached in the service, and subscribed to. As that code is specific to where I work, I can say no more about it.

However, caching the listeners is important too! The subscribe call in .listen() just iterates through all the attached listeners any time that the connection changes state, so I can extemporaneously add listeners through .addListener(), and because of how rxjs' Observable system inherently works, AND the fact that I'm working from an in-scope list of listeners, I have a system whereby I can dynamically set listeners, even if .connect() is called before any listeners are configured.

This code probably can still benefit from redesign/refactoring, but I have something that works, which is the important first step of any good coding. Thank you all!

Andrew Gray
  • 3,756
  • 3
  • 39
  • 75

2 Answers2

3

[I am editing my answer because the previous answer was as per the very first code shared by the author; As mentioned in the comment, the author has changed/corrected the code] -

I doubt that the following code will impact anything in subscription -

this.something.pipe(
  map((something) => {
    // ...Commands that I want to add to the subscription...
  })
);

You could try a higher order function while initially setting up your observable and if higher order function is in the scope you can reassign it. I also doubt that it will work because of the following reasons -

  1. When Observable is set up, observable keeps the reference of the function passed which will be invoked on subscribe [https://medium.com/@benlesh/learning-observable-by-building-observable-d5da57405d87]. Now if you reassign the higher order function then the observable function still points to the old reference. By reassigning the higher order function, you have not changed your original function reference which was set up when you initially set up the observable.

  2. Assume that for some reason, higher order reassignment works, In that case, also there is a fair chance that before your older higher-order function executes you might have reassigned your higher order function (because if source observable makes async call to backend, while the code is awaited, javascript event loop might have reassigned the higher order function and when async call comes back it will execute the new assigned higher order function). Maybe this code will clarify my point-

let higherOrderFunc = map(x => x * 2);

this.something
    .pipe(
          mergeMap(_ => //call to backend; async call),
          higherOrderFunc,
         ).subscribe();
higherOrderFunc = map(x => x * 3); // this will execute before async call completes
user2216584
  • 5,387
  • 3
  • 22
  • 29
  • I revised my question's code snippet. I oversimplified my code, which led to me transcribing the code for my question wrongly. – Andrew Gray Jun 07 '19 at 19:47
  • 1
    @AndrewGray I have edited my previous answer [after you updated the code]. Have a look and let me know if it makes sense. – user2216584 Jun 07 '19 at 21:22
  • It makes perfect sense. After some experimentation and working through this independently, I arrived at a similar conclusion (before reading this answer). Between your and Sereja Bogolubov's answers, I think I've got this figured out! Thank y'all. – Andrew Gray Jun 10 '19 at 14:29
2

Well, you could do it quite easily. Say, you want some runtime-deferred map. Than you do something like map(this.myMapper), where myMapper is a private field visible within appropriate scope. By mutating that private field you can kinda add/remove additional behaviors. For example, map(x => x) would mean absence of any mapping.

However, it seems to me that you're abusing the rxjs. Most probably what you really need is right higher order observable (observable that emits observable, "stream of streams"). That would be much more rxjsic and cleaner solution. So think twice.

Zazaeil
  • 3,900
  • 2
  • 14
  • 31
  • You provided two interesting possibilities. I'll go ahead and redesign my code. Thanks for the insights. – Andrew Gray Jun 07 '19 at 20:45
  • 1
    Sereja - Between your answer and user2216584's, I've got working code that does what I want it to. I'll edit my question with the final implementation. – Andrew Gray Jun 10 '19 at 14:31