I think the operator concatMap
does something close to what you are looking for. You can review a former answer here on SO to illustrate a similar use case for concatMap
:
RxJS queueing dependent tasks
It is close but not exactly what you want as there is no waiting for an ACK
signal to release the next value. Instead, concatMap
use the completion signal of the currently 'executed' observable to subscribe to the next one. If your observable contains somewhere the execution of an update on a db then those updates will be executed in order. For instance:
function handler (source$) {
// source$ is your source of events from which you generate the update calls
return source$.concatMap(function (event){
return updateDB(event);
})
}
function updateDB(event) {
return Rx.Observable.create(function(observer){
// do the update in the db
// you probably have a success and error handler
// you plug the observer notification into those handlers
if (success) {
// if you need to pass down some value from the update
observer.onNext(someValue);
// In any case, signal completion to allow concatMap to move to next update
observer.onCompleted();
}
if (error) {observer.onError(error);}
})
}
This is a generic code to specialize to the library you are using. You might be able to use directly the operator fromNodeCallback
, or fromCallback
depending on the API of your database update function.
All the same, be mindful that there might be some buffering involved to hold on to the next observable while the current one is being executed, and that buffer can only be finite, so if you do have significant differences in speed between producer and consumer, or memory limitation, you might want to handle things differently.
Also, if you are using RxJS v5, onError
becomes error
, onComplete
becomes complete
, onNext
becomes next
(cf. new observer interface).
Last comment, the lossy/lossless nature of your stream is a concept different to the hot vs. cold nature of the stream. You can have a look at illustrated subscription and data flows for both type of streams.