duplexStream
As TJ suggests, you cannot represent a subscription with a single promise. However, you only need one variable and the ability to recreate the promise. I have used a technique similar to the one described in TJ's link but with notable differences in abstraction. duplexStream
below gives the caller a read
function for any stream consumers in your program, and a write
function to use for stream producers -
function duplexStream () {
let t = defer()
async function* read () {
while (true) yield await t.deferred
}
function write (err, value) {
if (err) t.reject(err)
else t.resolve(value)
t = defer()
}
return [read, write]
}
defer
is a simple abstraction to provide an externally controlled promise -
function defer () {
let resolve, reject
return { deferred: new Promise((res, rej) => (resolve = res, reject = rej)), resolve, reject }
}
Let's say we have two HTML elements -
<p id="foo"></p>
<p id="bar"></p>
channel
duplexStream
is generic and not tethered to a particular library or class. We can now use it to convert an observable to an async iterator. Note read
can be called multiple times and all readers will get values passed to write
. write
can also be given to any number of producers. By providing read
and write
as generic functions to the caller, these handlers can be managed during ordinary publish
and subscribe
events.
Abstracting on top of duplexStream
, we create a channel
with familiar publish
and subscribe
methods -
function channel ([read, write] = duplexStream()) {
return {
publish(v) {
if (v instanceof Error)
write(v)
else
write(null, v)
},
async subscribe(onValue, onError = console.error) {
while (true)
try {
for await (const v of read())
onValue(v)
}
catch (err) {
onError(err)
}
}
}
}
Given some <form>
-
<form id="myform">
<button type="button" name="b1">▶️ next value</button>
<button type="button" name="b2">⏺ some error</button>
<pre><output name="foo"></output></pre>
<pre><output name="bar"></output></pre>
</form>
We create a channel, A
, and make two <button>
publishers and two <output>
subscribers -
const f = document.forms.myform
const A = channel()
// publishers
f.b1.addEventListener("click", event => A.publish(String(new Date)))
f.b2.addEventListener("click", event => A.publish(Error("error")))
// subscribers
A.subscribe(v => f.foo.value += v + "\n")
A.subscribe(v => f.bar.value += v + "\n")
Each click of ▶️ will publish
a value to each subscribe
r -
render preview |
 |
Clicking ⏺ will publish
an error to the console, yet subsequent publishes using ▶️ will still work.
demo
Run the snippet below to verify the results in your own browser -
function duplexStream () {
let t = defer()
async function* read () { while (true) yield await t.deferred }
function write (err, value) { if (err) t.reject(err); else t.resolve(value); t = defer() }
return [read, write]
}
function defer () {
let resolve, reject
return { deferred: new Promise((res, rej) => (resolve = res, reject = rej)), resolve, reject }
}
function channel ([read, write] = duplexStream()) {
return {
publish(v) { if (v instanceof Error) write(v); else write(null, v) },
async subscribe(onValue, onError = console.error) {
while (true) try { for await (const v of read()) onValue(v) } catch (err) { onError(err) }
}
}
}
const f = document.forms.myform
const A = channel()
f.b1.addEventListener("click", event => A.publish(String(new Date)))
f.b2.addEventListener("click", event => A.publish(Error("error")))
A.subscribe(v => f.foo.value += v + "\n")
A.subscribe(v => f.bar.value += v + "\n")
pre { margin: 0.5rem 0; }
<form id="myform">
<button type="button" name="b1">▶️ next value</button>
<button type="button" name="b2">⏺ some error</button>
<pre><output name="foo"></output></pre>
<pre><output name="bar"></output></pre>
</form>