-1

reading the first n bytes of a byte stream (in form of a AsyncIterable) feels cumbersome and error prone.

Is there a better way to implement this?

async function shift(
  length: number,
  stream: AsyncIterable<Uint8Array>
): Promise<[Uint8Array, AsyncIterable<Uint8Array>]> {
  const prefix = new Uint8Array(length);

  let offset = 0;

  const iterator = stream[Symbol.asyncIterator]();

  while (true) {
    const { done, value } = await iterator.next();

    if (done) {
      throw new Error("Buffer underflow");
    } else {
      const chunk = value;
      if (chunk.length < length - offset) {
        prefix.set(chunk, offset);
        offset += chunk.length;
      } else {
        const slice = chunk.slice(0, length - offset);
        prefix.set(slice, offset);

        return [prefix, prepend(chunk.slice(slice.length), stream)];
      }
    }
  }
}

async function* prepend(
  prefix: Uint8Array,
  stream: AsyncIterable<Uint8Array>
) {
  yield prefix;
  yield* stream;
}
Max Fichtelmann
  • 3,366
  • 1
  • 22
  • 27
  • If you're seeking code review, there's a dedicated SE site for that: https://codereview.stackexchange.com/ Otherwise, "_Is there a better way...?_" questions solicit opinion answers, so they're generally not [on-topic](https://stackoverflow.com/help/on-topic) for SO. – jsejcksn May 04 '23 at 12:19
  • 2
    Before posting on Code Review please read [A guide to Code Review for Stack Overflow users](https://codereview.meta.stackexchange.com/questions/5777/a-guide-to-code-review-for-stack-overflow-users/5778#5778) and [How do I ask a good question?](https://codereview.stackexchange.com/help/how-to-ask). – pacmaninbw May 04 '23 at 13:17
  • 1
    Please don't cross post on Code Review. There is no reason to post there when you have 2 valid answers (and one accepted answer) here on Stack Overflow. – pacmaninbw May 05 '23 at 13:14

2 Answers2

1

stream primitives

We'll start by defining stream primitives -

flatten<T>(t: AsyncIterable<Iterable<T>>): AsyncIterable<T>
take<T>(t: AsyncIterable<T>, n: number): AsyncIterable<T>
skip<T>(t: AsyncIterable<T>, n: number): AsyncIterable<T>
toArray<T>(t: AsyncIterable<T>): Promise<Array<T>>
async function *flatten<T>(t: AsyncIterable<Iterable<T>>): AsyncIterable<T> {
  for await (const a of t) {
    yield *a
  }
}

async function *take<T>(t: AsyncIterable<T>, n: number): AsyncIterable<T> {
  for await (const v of t) {
    if (n-- <= 0) return
    yield v 
  }
  if (n > 0) throw Error("buffer underflow")
}

async function *skip<T>(t: AsyncIterable<T>, n: number): AsyncIterable<T> {
  for await (const v of t) {
    if (n-- > 0) continue
    yield v
  }
  if (n > 0) throw Error("buffer underflow")
}

async function toArray<T>(t: AsyncIterable<T>): Promise<Array<T>> {
  const r = []
  for await (const v of t) r.push(v)
  return r
}

shift

Using these stream primitives, we can write shift in a comfortable and safe way -

shift(stream: AsyncIterable<Uint8Array>, count: number): Promise<[Uint8Array, AsyncIterable<number>]>
async function shift(stream: AsyncIterable<Uint8Array>, count: number) {
  return [
    new Uint8Array(await toArray(take(flatten(stream), count))),
    skip(flatten(stream), count)
  ] as const
}

Let's create a mock buffer and test it -

const buffer: AsyncIterable<Uint8Array> = {
  async *[Symbol.asyncIterator]() {
    for (const v of [[0,1,2],[3,4],[5,6,7,8],[9]]) {
      yield new Uint8Array(v)
      await new Promise(r => setTimeout(r, 100))
    }
  }
}

async function main() {
  const [first, rest] = await shift(buffer, 4)
  console.log({
    first: Array.from(first),
    rest: await toArray(rest)
  })
}

main().then(console.log, console.error)
{
  first: [0, 1, 2, 3],
  rest: [4, 5, 6, 7, 8, 9]
}

demo

Run and verify the result on the typescript playground

Mulan
  • 129,518
  • 31
  • 228
  • 259
  • Instead of defining these yourself, you may as well use the [iterator helper methods](https://github.com/tc39/proposal-iterator-helpers) – Bergi May 04 '23 at 19:50
  • I don't think your `shift` method works as the OP expects: it actually iterates the `stream` twice – Bergi May 04 '23 at 19:51
  • thanks @Bergi I was unaware of the helper methods. – Mulan May 04 '23 at 22:02
1

I think the iterator logic itself can be simplified by using a notClosing helper and normal iteration:

async function shift(
  length: number,
  stream: AsyncIterable<Uint8Array>
): Promise<[Uint8Array, AsyncIterable<Uint8Array>]> {
  const prefix = new Uint8Array(length);
  const iter = stream[Symbol.asyncIterator]();
  let offset = 0;
  for await (const chunk of notClosing(iter)) {
    if (chunk.length < length - offset) {
      prefix.set(chunk, offset);
      offset += chunk.length;
    } else {
      const slice = chunk.slice(0, length - offset);
      prefix.set(slice, offset);
      return [prefix, prepend(chunk.slice(slice.length), iter)];
    }
  }
  throw new Error("Buffer underflow");
}

Unless you want to convert the stream from an iterator of chunks into a much less efficient iterator of individual bytes, there's nothing you can further simplify about the offset logic.

const AsyncIteratorPrototype = Object.getPrototypeOf(Object.getPrototypeOf(async function*(){}.prototype)) as AsyncIterator<any>;
function prepend<T>(val: T, iter: AsyncIterator<T>): AsyncIterable<T> & AsyncIterator<T> {
  return Object.assign(Object.create(AsyncIteratorPrototype), {
    first: true,
    next() {
      if (this.first) {
        const res = {done: false, value: val};
        val = undefined!; // GC
        this.first = false;
        return res;
      }
      return iter.next();
    },
    return: iter.return ? () => iter.return!() : undefined,
  });
}
function notClosing<T>(iter: AsyncIterator<T>): AsyncIterable<T> & AsyncIterator<T> {
  return Object.assign(Object.create(AsyncIteratorPrototype), {
    next: iter.next.bind(iter),
  });
}
Bergi
  • 630,263
  • 148
  • 957
  • 1,375