0

I have two streams:

  • getPost is a fromPromise stream that streams a single post. sample: author done
  • getPostsStream streams a list of posts for a given author. sample: a1 done, a2 done

I have 3 authors, it's also a stream, sample: Bacon.fromArray(["a", "b", "c"]).

I want to get posts for authors, by combining these streams:

I have two conditions:

  • order: one post from each author, then next post for each author. sample: a1--b1--c1--a2--b2--c2
  • concurrency: getPost stream can't run in parallel, (that results in many http requests), sample: a1--b1--c1 should happen sequentially, it shouldn't burst.

Here's the code:

 var getPost = function(author) {
   return Bacon.fromPromise(new RSVP.Promise(function(resolve) {
     setTimeout(function() { resolve(author + " done"); }, 1000);
   }));
 };

 var getPostsStream = function(author) {
   return Bacon.fromArray([1, 2, 3, 4]).flatMapConcat(function(v) {
     return getPost(author + v);
   });
 };

 var combinedFlatMap = Bacon.fromArray(["a", "b", "c"]).flatMap(function(v) {
   return getPostsStream(v);
 });

 var combinedFlatMapConcat = Bacon.fromArray(["a", "b", "c"]).flatMapConcat(function(v) {
   return getPostsStream(v);
 });

 //combinedFlatMap.log();
 combinedFlatMapConcat.log();

I do flatMap on author stream and return posts stream for that author. This results in:

a1 done
b1 done
c1 done
a2 done
b2 done
c2 done

But a1--b1--c1 comes in bursts so there is no concurrency limit of 1.

I do flatMapConcat:

a1 done
a2 done
a3 done
a4 done
b1 done
b2 done
b3 done
b4 done

This works sequentially as I want, but this time it's out of order.

Edit:

I've played with streams, know how various combinators work. My final work is two streams getPostsStream(author) and author stream. I can't combine these, with correct order. Which is a1 b1 a2 b2, so one post from each getPostsStream(author) and back. Your example is wrong, try using my streams:

  • author stream: Bacon.fromArray(["a", "b", "c"]).
  • postsStream : getPostsStream(author) from the above example.

What I need is, using these two streams produce the output:

a1 done `wait 1 sec`
b1 done `wait 1 sec`
c1 done `wait 1 sec`
a2 done `wait 1 sec`
b2 done `wait 1 sec`
c2 done `wait 1 sec`

Note that, letters a b c represent authors and numbers 1 2 3 represent the posts.

How can I achieve the correct order with a concurrency limit of 1?

user3995789
  • 3,452
  • 1
  • 19
  • 35

1 Answers1

0

Have you tried: observable.flatMapFirst(f) like flatMap, but only spawns a new stream if the previously spawned stream has ended.

EDIT:

Start simple. If we want the order a1-a2-b1-b2, then we first make a stream of events in right order. After that we flatMapWithConcurrencyLimit(1, getPost) or flatMapConcat(getPost) it with getPost (or some other Promise -returning function).

Bacon.fromArray(["a1", "a2", "b1", "b2"]).flatMapConcat(function (x) { 
  return Bacon.fromPromise(new Promise(function (resolve) {
    console.log("start  " + x + " " + new Date().getTime());
    setTimeout(function () {
      resolve(x + " " + new Date().getTime());
    }, 2000);
  }));
}).log("output");

Results into:

start  a1 1419796654587
output a1 1419796656589
start  a2 1419796656589
output a2 1419796658591
start  b1 1419796658592
output b1 1419796660593
tart  b2 1419796660594
output b2 1419796662595
output <end>

ANOTHER EDIT

If you start with author stream (a-b-c-d), it's very complicated to get posts stream with right ordering (a1-b1-c1-d1-a2-b2...).

  • If an author stream is asynchronous and infinite (a..b....c..d..), you can't know when you can fetch the second post of the first author.
  • It's possible if an author stream is asynchronous and finite, but than you don't want to wait until the end of author stream (which make take long)
  • If the stream is synchronous and finite (fromArray), it's much simpler to stay in array world for as long as possible. In this case you can use underscore or lodash to arrange fetches in the right order, and then go into asynchrous Bacon world.

var authors = ['a', 'b', 'c', 'd'];
var postIdsByAuthor = _.map(authors, function (a) {
  return _.map([1, 2, 3, 4], function (i) {
    return a + i;
  });
});
var postIds = _.flatten(_.zip.apply(null, postIdsByAuthor));
// ["a1", "b1", "c1", "d1", "a2", "b2", "c2", "d2", "a3", "b3", "c3", "d3", "a4", "b4", "c4", "d4"]

transpose trick

The asynchronous case can be solved, yet not with single built-in combinator. Yet, it's unclear from your question. Yet then the reasonable constraints would be to - fetch posts by author in round robin fashion - keep getPost "worker" always busy

So one scenario would be (time on horizontal-axis)

authors: a....b........c....................
getPost: a1.a2.b1.a3.b2.c1.a4.b3.c2.b4.c3.c4

P.S. If you or someone downvote an answer, please comment why.

Community
  • 1
  • 1
phadej
  • 11,947
  • 41
  • 78
  • I've tried every combination of `flatMap`, `map` etc without luck. I guess it needs a new version of `flatMap`. I have no idea how to solve this problem, I started learning some functional programming to understand how baconjs works. – user3995789 Dec 28 '14 at 19:26
  • This example is simple and not what I want, try using my streams, see my edit please. – user3995789 Dec 28 '14 at 21:29
  • I amended a post once more. But I'm not sure if I (or anyone else) understand your question right. – phadej Dec 30 '14 at 13:23
  • appreciate your effort, I have 10 authors, each with 1000 posts. I want to treat the `posts` as infinite stream. So how can I combine 10 infinite streams in `round robin fashion`. Note that `flatMap` does combine in `round-robin`, but it's concurrent. I need the `round-robin` as well as concurrent limit of 1. Finally when I try `flatMapConcat` it loses the `round-robin` order, and streams like `a1 a2 a3` ignoring others. – user3995789 Dec 30 '14 at 13:40
  • I asked http://stackoverflow.com/questions/27707317/how-to-interleave-streams-with-backpressure Hopefully it will answer your question as well. I'm sorry, but currently I'm too busy to think about the solution properly. – phadej Dec 30 '14 at 14:51