0

I'm trying to read JSON from a file, filter out stuff I don't want and possibly map and flatten the JSON elements, and then for each filtered JSON line in the file call a function that returns a promise. (Basically tail -f from a bunyan produced file). Additional Requirement: I want the pacing to be based on how fast the functions resolve the promise (output pacing, one element at a time, like Promise.each)

Sounds like a great use for RXJS and promises, right? Well, except that all the examples out there are for promises that produce observable streams, not streams that have subscribers who call functions that return promises.

The available schedulers for RXJS don't seem to have an output-based single concurrency pacing equivalent to Promise.each (I tried flatMapWithMaxConcurrent)

In this code, update returns a promise. The promises are resolved once I fixed a library bug, but the console output is out of order, and the promises are resolved in what appears to be a highly concurrent manner. The promises actually take 5 seconds to resolve, I would expect to a see a console message about every 5 seconds. Instead there's one delay of 5 seconds and a bunch of messages, meaning I think that many messages were run concurrently.

The library I created has Bluebird promises from the ground up, I can't rewrite it without removing promises from all my code. Kinda getting tempted, this is my second all nighter hunting down unresolved promises. I know how I'd write this in "C"...

function tailMyFile(filename, ocu, oce) {
    var handle = new Tail(filename);

    handle.bunyan().filter(function (x) {
        if ("descr" in x &&  "units" in x && "value" in x && "time" in x)  {
            return true;
        }
    })
    .subscribe (
        function(x) {
            if (x.descr == "enter") {
                MyPromisifiedLib.update(oce, x.value, "").done(function() {
                    console.log("updated enter value");
                })
            } else if ("push" in x) {
                MyPromisifiedLib.update(ocu, x.value, "push").done(function() {
                    console.log("updated with push");
                })
            } else {
                MyPromisifiedLib.update(ocu, x.value, "").done(function() {
                    console.log("updated value");
                })
            }
        },
        function(err) {
            console.error("err leaked through to subscriber");
            log.error("err leaked through to subscriber");
        },
        function() { log.info("Completed") }
    );
}   
Paul S
  • 892
  • 10
  • 25
  • If the promise returned by `update` does not resolve, you need to look into the `update` code first. Have you properly tested that function? Once that is done, you can check the input of the `update` function (`x`, `ocu`, `oce`, put some `console.log` out there) to see if they have the expected values. – user3743222 Jan 26 '16 at 11:41
  • Yep, update does resolve in another test setup. I'll note the above function doesn't resolve, I changed it to a flatMap that resolve to toPromise() at the end and returns that promise. I got a little farther in promise resolution. Perhaps what's needed here is a HOWTO on debugging "failure to resolve promises". Yes I've googled that quite a bit, but it's mostly "don't make this mistake in the first place", not "how do you find where you made the mistake"... – Paul S Jan 26 '16 at 17:51
  • Well, debugging 101: add .timeout and see which promise is timing out. SO there is a bug in my library somewhere that only shows up from this code for some reason. I would accept an answer of "Here's debugging 101 for promises that aren't resolving".... – Paul S Jan 26 '16 at 19:02
  • On the subject of debugging promises, there are a few pointers on SO. For instance, http://stackoverflow.com/questions/25827234/how-to-debug-javascript-promises. Google devtools also can help : http://www.html5rocks.com/en/tutorials/developertools/async-call-stack/?redirect_from_locale=es – user3743222 Jan 26 '16 at 23:23
  • Once you solved your particular problem, you can post an answer here for other users who stumble on this thread through a google search. That will help them for sure. – user3743222 Jan 26 '16 at 23:25
  • So I found the library problem. Third party alpha software. Adding timeouts helped locate the issue. I added that to the "how to debug javascript promises". – Paul S Jan 27 '16 at 06:44
  • The entire pattern above doesn't work. Promises get executed out of order asynchronously. When you have an observable stream, you strongly prefer stream order being preserved. The above pattern is entirely wrong. I think what I need help is with a complete rewrite of the idea given the original requirement: You have a library that uses promises, You have a stream of observables and for each element of that stream you need to call the library. You want one library outstanding call at a time (i.e. Promise.each() behavior). How do you code this? Is there a an RXJS scheduler for this? – Paul S Jan 27 '16 at 08:42
  • http://stackoverflow.com/questions/33720112/how-to-start-second-observable-only-after-first-is-completely-done-in-rxjs/33724197#33724197, http://stackoverflow.com/questions/34093370/rx-js-wait-for-callback-to-complete?lq=1. No need for a scheduler, `concat` all your function calls (and think whether you have to use `defer`, see links above) – user3743222 Jan 27 '16 at 11:33
  • The observable doesn't end, i'm doing tail -f on a file. Not sure how I would use concat. Here's what did work: Use a controlled observable, and in the .then function issue a handle.request(1). That successfully forces one outstanding library call at a time, and the pacing is based on however fast the library resolves the promise. Since my interlocutor here appears to be an RXJS expert, I'll post that as the solution if you bless it. – Paul S Jan 27 '16 at 19:59
  • fair enough, I misunderstood the `tail -f` part. Interesting way to use a controlled observable, if it works then all is fine. – user3743222 Jan 27 '16 at 21:04

0 Answers0