4

I have a highland stream in which each element is a promise for a get request:

const stream = _[promise1, promise2, promise3, ...];

so of course when I run:

const stream.each(console.log)

I only see:

Promise { <pending> }
Promise { <pending> }
Promise { <pending> }

Yet I want to create a pipeline and work on the actual result of the promise. (I don't want to use then or await). I want to resolve these promises via a stream pipeline.

I imagine that there should be a way to make a highland stream to only map resolved promises into a new stream, so I expect to be able to flatten a stream of promises into a stream of their actual values.

Digging around, I suppose that either flatMap or flatten should do what I am looking for, yet I have no clue in how to proceed and all my trial and error has failed me.

I tried:

 stream.flatMap((id: number) => {
     return myAsyncGetRequest(id);
 }).each(console.log)

How do I resolve promises within stream?

k0pernikus
  • 60,309
  • 67
  • 216
  • 347
  • This seems related reahttp://stackoverflow.com/questions/25638527/using-highland-js-to-perform-async-tasks-in-series-with-references-to-original-s – k0pernikus Jan 06 '17 at 17:38

2 Answers2

1

I stumbled over a github issue and there the answer of quarterto enlightened me:

One has to wrap the promise inside Highland for flatMap to work as expected:

 stream.flatMap((id: number) => {
     return Highland(myAsyncGetRequest(id));
 }).each(console.log)

The thread also sheds light on why promises are not awaited for.

quarterto phrased it as:

Highland isn't a Promise library. It can consume promises, like it can consume arrays, callbacks and event emitters, but its API is focused around transforming streamed values, not promises. It's like asking why Array.prototype.map doesn't wait for promises, or why Bluebird doesn't wait for Streams.

Or as said by vqvu:

Highland streams already represent a future array of values, so it doesn't really make sense to treat promises as something special.

k0pernikus
  • 60,309
  • 67
  • 216
  • 347
1

Highland won't fit to your needs too much, although I do think it's possible.

Take a look at scramjet, it's a framework built on top of standard node streams like Highland, but it's based on Promises to work in asynchronous cases exactly the same way as the synchronous ones.

Here's how you'd work on your promises in Scramjet:

const scramjet = require("scramjet");

const stream = scramjet.DataStream.fromArray([d1, d2, d3])
                   .map((data) => aCallThatReturnsAPromise(data))
                   .stringify((resolved) => JSON.stringify(resolved) + "\n")
                   .pipe(fs.createWriteStream('./yourfile.jslines');

Since both Highland and Scramjet work on top of node streams, you can easily pipe to your previous pipeline if you like. And it has no dependencies, so you won't make you software twice as heavy. :)

Michał Karpacki
  • 2,588
  • 21
  • 34