Situation:
I've encountered a use case for the rxjs Observable
system, where I may need to add pipe
d commands to a Subscription
after it has been started.
In my case, the application I'm working on has to passively listen to a push notification system. A number of messages can be pushed out over this system, which my system needs to respond to. However, there's a foreseeable case where a dynamically-loaded view that will be implemented in the future will need to add a listener to the push notification system.
Question:
Given that my app is in a state where my Subscription
already exists, can I add an additional pipe after .subscribe(() => {})
has been invoked?
// this.something is an Observable<any>, for discussion purposes.
const subscription = this.something.subscribe(() => { // commands });
this.something.pipe(
map((something) => {
// ...Commands that I want to add to the subscription...
})
);
...And if I do that, then what happens, if anything?
Solution:
The two answers by @user2216584 and @SerejaBogolubov both had an aspect of the answer to this question.
My high-level push notification listener service needed to to do two things:
- Hold onto the subscription, and
- Be able to draw from a list of listeners.
The complication is that each listener needs to be listening for a different message. Put differently, if I receive a message on foo_DEV
, the app needs to do something different than if the push notification system pushes a message on bar_DEV
.
So, here's what I came up with:
export interface PushNotificationListener {
name: string,
onMessageReceived: (msg: PushNotificationMessage) => any,
messageSubject$: Subject<PushNotificationMessage>
}
export class PushNotificationListenerService {
private connection$: Observable<PushNotificationConnection>;
private subscription$: Subscription;
private listeners: PushNotificationListener[] = [];
constructor(
private connectionManager: PushNotificationConnectionManager
) {
}
connect() {
// Step 1 - Open the socket connection!
this.connection$ = this.connectionManager.connect(
// The arguments for setting up the websocket are unimportant here.
// The underlying implementation is similarly unimportant.
);
}
setListener(
name: string,
onMessageReceived: (msg: PushNotificationMessage) => any
) {
// Step 3...or maybe 2...(shrug)...
// Set listeners that the subscription to the high-order connection
// will employ.
const newListener: PushNotificationListener = {
name: name,
onMessageReceived: onMessageReceived,
messageSubject$: null
};
this.listeners.push(newListener);
}
listen() {
// Step 2 - Listen for changes to the high-order connection observable.
this.subscription$ = this.connection$
.subscribe((connection: PushNotificationConnection) => {
console.info('Push notification connection established');
for (let listener of this.listeners) {
listener.messageSubject$ = connection.subscribe(listener.name);
listener.messageSubject$.subscribe((message: PushNotificationMessage) => {
listener.onMessageReceived(message);
}
}
},
(error: any) => {
console.warn('Push notification connection error', error);
}
}
}
I discovered through careful study of the internal code that comprises the core of my push notification system, that we've already got a higher-order Observable
. The websocket code creates an observable (connectionManager.connect()
), that needs to be cached in the service, and subscribed to. As that code is specific to where I work, I can say no more about it.
However, caching the listeners is important too! The subscribe
call in .listen()
just iterates through all the attached listeners any time that the connection changes state, so I can extemporaneously add listeners through .addListener()
, and because of how rxjs' Observable
system inherently works, AND the fact that I'm working from an in-scope list of listeners, I have a system whereby I can dynamically set listeners, even if .connect()
is called before any listeners are configured.
This code probably can still benefit from redesign/refactoring, but I have something that works, which is the important first step of any good coding. Thank you all!