0

Let say we have a service called A and it has the function subscribe(callback). The subscription from the service is an open connection that can receive data at any time, and the data can be accessed through the callback. Can we convert this callback to promise? if so, how?

Sample

A.subscribe((error, data) => {
    // do something with data
});
Mehdi
  • 62
  • 6

2 Answers2

3

Can we convert this callback to promise?

Not just one promise, no, because a promise can only be settled once, with a single fulfillment value (on success), but you have a series of values (often called an "observable"). So you can't convert that to return a promise, unless of course you want the promise to only fulfill with just one of the values (the first, for instance).

You could convert it to an asynchronous iterator (perhaps using an async generator function). As the name implies, an asynchronous iterator provides a series of values asynchronously, by returning a series of promises. I can't say whether an asynchronous iterator is applicable to your use case, but it's the closest thing to a promise-based observable that comes to mind.

Here's an observable-to-async-iterator implementation designed for Angular's observables, but which could be adapted as necessary. (Sadly, there's no license indicated, so I can't copy it into the answer.)

T.J. Crowder
  • 1,031,962
  • 187
  • 1,923
  • 1,875
  • I wonder what the Github EULA says for Gist licensing. I provided a similar technique and full functioning demo in a separate answer in this thread. – Mulan Jul 14 '21 at 16:23
1

duplexStream

As TJ suggests, you cannot represent a subscription with a single promise. However, you only need one variable and the ability to recreate the promise. I have used a technique similar to the one described in TJ's link but with notable differences in abstraction. duplexStream below gives the caller a read function for any stream consumers in your program, and a write function to use for stream producers -

function duplexStream () {
  let t = defer()
  async function* read () {
    while (true) yield await t.deferred
  }
  function write (err, value) {
    if (err) t.reject(err)
    else t.resolve(value)
    t = defer()
  }
  return [read, write]
}

defer is a simple abstraction to provide an externally controlled promise -

function defer () {
  let resolve, reject
  return { deferred: new Promise((res, rej) => (resolve = res, reject = rej)), resolve, reject }
}

Let's say we have two HTML elements -

<p id="foo"></p>
<p id="bar"></p>

channel

duplexStream is generic and not tethered to a particular library or class. We can now use it to convert an observable to an async iterator. Note read can be called multiple times and all readers will get values passed to write. write can also be given to any number of producers. By providing read and write as generic functions to the caller, these handlers can be managed during ordinary publish and subscribe events.

Abstracting on top of duplexStream, we create a channel with familiar publish and subscribe methods -

function channel ([read, write] = duplexStream()) {
  return {
    publish(v) {
      if (v instanceof Error)
        write(v)
      else
        write(null, v)
    },
    async subscribe(onValue, onError = console.error) {
      while (true)
        try {
          for await (const v of read())
            onValue(v)
        }
        catch (err) {
          onError(err)
        }
    }
  }
}

Given some <form> -

<form id="myform">
  <button type="button" name="b1">▶️ next value</button>
  <button type="button" name="b2">⏺ some error</button>
  <pre><output name="foo"></output></pre>
  <pre><output name="bar"></output></pre>
</form>

We create a channel, A, and make two <button> publishers and two <output> subscribers -

const f = document.forms.myform
const A = channel()

// publishers
f.b1.addEventListener("click", event => A.publish(String(new Date)))
f.b2.addEventListener("click", event => A.publish(Error("error")))

// subscribers
A.subscribe(v => f.foo.value += v + "\n")
A.subscribe(v => f.bar.value += v + "\n")

Each click of ▶️ will publish a value to each subscriber -

render preview
render-preview

Clicking ⏺ will publish an error to the console, yet subsequent publishes using ▶️ will still work.

demo

Run the snippet below to verify the results in your own browser -

function duplexStream () {
  let t = defer()
  async function* read () { while (true) yield await t.deferred }
  function write (err, value) { if (err) t.reject(err); else t.resolve(value); t = defer() }
  return [read, write]
}

function defer () {
  let resolve, reject
  return { deferred: new Promise((res, rej) => (resolve = res, reject = rej)), resolve, reject }
}

function channel ([read, write] = duplexStream()) {
  return {
    publish(v) { if (v instanceof Error) write(v); else write(null, v) },
    async subscribe(onValue, onError = console.error) {
      while (true) try { for await (const v of read()) onValue(v) } catch (err) { onError(err) }
    }
  }
}

const f = document.forms.myform
const A = channel()
f.b1.addEventListener("click", event => A.publish(String(new Date)))
f.b2.addEventListener("click", event => A.publish(Error("error")))
A.subscribe(v => f.foo.value += v + "\n")
A.subscribe(v => f.bar.value += v + "\n")
pre { margin: 0.5rem 0; }
<form id="myform">
  <button type="button" name="b1">▶️ next value</button>
  <button type="button" name="b2">⏺ some error</button>
  <pre><output name="foo"></output></pre>
  <pre><output name="bar"></output></pre>
</form>
Mulan
  • 129,518
  • 31
  • 228
  • 259