1

I'm trying to make a stream / lazy list in Node.js (v12.20.1) using Promises. My essential idea with it is that a Stream could simply be a Promise that resolves to a value and another Stream. With that in mind I came up with this recursive definition:

class Stream extends Promise {
    constructor(executor) {
        super((resolve, reject) =>
            executor(
                (value) =>
                    resolve({
                        head: value,
                        tail: new Stream(executor),
                    }),
                reject
            )
        );
    }

    map(f) {
        return this.then((cell) => {
            return {
                head: f(cell.head),
                tail: cell.tail.map(f),
            };
        });
    }
}

Now, this actually does everything I expect it to but prints out a couple of warning messages whenever a value is resolved. For example, using an event emitter as the source for the stream:

 > const Stream = require('./stream')
undefined
 > const { EventEmitter } = require('events')
undefined
 > const emitter = new EventEmitter();
undefined
 > const stream = new Stream((yieldValue, reject) => emitter.once('foo', yieldValue));
undefined
 > stream.map(console.log);
Stream [Promise] { <pending> }
 > emitter.emit('foo', 1);
true
 > 1
(node:22271) UnhandledPromiseRejectionWarning: TypeError: Promise executor has already been invoked with non-undefined arguments
    at /wd/stream.js:4:13
    at new Promise (<anonymous>)
    at new Stream (/wd/stream.js:3:9)
    at /wd/stream.js:8:31
    at processTicksAndRejections (internal/process/task_queues.js:97:5)
(node:22271) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). To terminate the node process on unhandled promise rejection, use the CLI flag `--unhandled-rejections=strict` (see https://nodejs.org/api/cli.html#cli_unhandled_rejections_mode). (rejection id: 1)
(node:22271) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.
 > emitter.emit('foo', 2);
true
 > 2
(node:22271) UnhandledPromiseRejectionWarning: TypeError: Promise executor has already been invoked with non-undefined arguments
    at /wd/stream.js:4:13
    at new Promise (<anonymous>)
    at new Stream (/wd/stream.js:3:9)
    at /wd/stream.js:8:31
    at processTicksAndRejections (internal/process/task_queues.js:97:5)
(node:22271) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). To terminate the node process on unhandled promise rejection, use the CLI flag `--unhandled-rejections=strict` (see https://nodejs.org/api/cli.html#cli_unhandled_rejections_mode). (rejection id: 2)

I really don't understand what is going wrong here. I don't get what the problem is with invoking the executor multiple times since it's just a function, and I certainly don't get what's causing the second warning since, as far as I can tell, none of the Promises are being rejected here?

Thanks in advance for any help!

cdo
  • 13
  • 3
  • Is it possible you're trying to design something that already exists? Node has a Stream class already. It isn't a subclass of promise, but it does have a promise api in which the async methods return promises. – danh Jan 23 '21 at 02:03
  • @danh Yeah I'm making this more for learnings than anything else :) – cdo Jan 23 '21 at 07:56

1 Answers1

2

The problem is that this.then() constructs a new promise, following the contract of the Promise constructor which you have subclassed. To do so, it calls your Stream constructor with an executor that will use the passed resolver functions to resolve the new promise with the results from the then callbacks. This executor is expected to be called back only once - however your Stream does call it multiple times, once immediately and then during the resolve call.

Of the various promises constructed by your stream.map(console.log); call that never are handled anywhere, some appear to get rejected - rightfully if you ask me.

A Stream may consist of recursively chained promises1, but a stream is not a Promise. Do not use subclassing!

1: See this or that (also here) for examples. Notice that this linked-list like concept has been superseded by asynchronous iterators (blog post) now which have a stateful next() method. You will also want to have a look at What is the difference between async generators and Observables?.

Bergi
  • 630,263
  • 148
  • 957
  • 1,375