2

I have an observable that is pulling events from a server, filtering events for the application type, then subscribing and dispatching the event to one or more handlers to handle.

The handlers then go off and do some async update to the db, and I find that the observable will crank out events so fast that the updates are stepping on each other. Which I should have expected.

So I think I need my handlers to each employ its own observable to act as a queue which will handle one event and wait for an ack.

So my question is, how can I create an observable that receives messages continuously and dispatches one message at a time waiting for an acknowledgment before releasing the next message?

Also the observables need to be Cold I think, as I can not lose messages.

halfer
  • 19,824
  • 17
  • 99
  • 186
Raif
  • 8,641
  • 13
  • 45
  • 56

1 Answers1

2

I think the operator concatMap does something close to what you are looking for. You can review a former answer here on SO to illustrate a similar use case for concatMap : RxJS queueing dependent tasks

It is close but not exactly what you want as there is no waiting for an ACK signal to release the next value. Instead, concatMap use the completion signal of the currently 'executed' observable to subscribe to the next one. If your observable contains somewhere the execution of an update on a db then those updates will be executed in order. For instance:

function handler (source$) {
  // source$ is your source of events from which you generate the update calls
  return source$.concatMap(function (event){
    return updateDB(event);
  })
}

function updateDB(event) {
  return Rx.Observable.create(function(observer){
    // do the update in the db
    // you probably have a success and error handler 
    // you plug the observer notification into those handlers
    if (success) {
      // if you need to pass down some value from the update
      observer.onNext(someValue);
      // In any case, signal completion to allow concatMap to move to next update
      observer.onCompleted();
    }
    if (error) {observer.onError(error);}
  })
}

This is a generic code to specialize to the library you are using. You might be able to use directly the operator fromNodeCallback, or fromCallback depending on the API of your database update function.

All the same, be mindful that there might be some buffering involved to hold on to the next observable while the current one is being executed, and that buffer can only be finite, so if you do have significant differences in speed between producer and consumer, or memory limitation, you might want to handle things differently.

Also, if you are using RxJS v5, onError becomes error, onComplete becomes complete, onNext becomes next (cf. new observer interface).

Last comment, the lossy/lossless nature of your stream is a concept different to the hot vs. cold nature of the stream. You can have a look at illustrated subscription and data flows for both type of streams.

Community
  • 1
  • 1
user3743222
  • 18,345
  • 5
  • 69
  • 75
  • Great stuff, thank you. It's close anyway. So if have an observable I can regulate the flow by calling onNext (Or next as the case may be) to get the next item from the queue. However, the connection between the observables is still a problem, I think. I have a module that processes messages and then sends each one to perhaps 2 or 3 other modules. The producer observable should not wait or be throttled by any one of those handler modules, but internally the handler modules should act sequentially. – Raif Feb 02 '16 at 17:24
  • whow, it's up to v5? Looks like i'm using v2.5.3. Not that that is the problem yet but i'm sure it will be so I"ll upgrade. – Raif Feb 02 '16 at 17:25
  • v5 is in beta, so I would recommend upgrading to v4, unless you know what you are doing – user3743222 Feb 02 '16 at 17:33
  • Well, try it out and see if it works. I believe it should. The producer observable (whichever that is) is not throttled or waiting. The buffering happens at `concatMap` level. But anyways, we could be discussing abstractly forever, best is to try, and if it does not work, you will have to be more specific about your architecture (maybe posting some code). – user3743222 Feb 02 '16 at 17:37
  • cool will do. Have to wait till this evening as it's a personal project, but I'll let you know. thanks, – Raif Feb 02 '16 at 19:27