4

Is it possible to create a single TransformStream out of several other TransformStreams using whatwg streams (the web Streams API)?

For example, if I have two TransformStreams which run in sequence, like transformer1 and transformer2:

readable.pipeThrough(transformer1).pipeThrough(transformer2).pipeTo(writable)

Ultimately, I'd like to be able to convert that to

readable.pipeThrough(allTransformers).pipeTo(writable)

Where allTransformers is the TransformStream combining transformer1 and transformer2.

Below is not real functional code, but I'd think there would be a way to do something like this:

const allTransformers = transformer1.pipeThrough(transformer2)

This is clearly a simplified example, but you can imagine there being many transform streams and I'd like to refactor to a single, reusable transform pipeline.

Shruggie
  • 869
  • 6
  • 20

1 Answers1

8

I had the same issue, here are my solutions.

In the example below, UpperCaseTransformStream pipes the stream through TextDecoderStream, UpperCaseTextStream and TextEncoderStream.

class UpperCaseTextStream extends TransformStream {
  constructor() {
    super({
      transform(chunk, controller) {
        controller.enqueue(chunk.toUpperCase());
      }
    });
  }
}

class UpperCaseTransformStream {
  constructor(...strategies) {
    const { writable, readable } = new TransformStream({}, ...strategies);
    this.writable = writable;
    this.readable = readable
      .pipeThrough(new TextDecoderStream())
      .pipeThrough(new UpperCaseTextStream())
      .pipeThrough(new TextEncoderStream());
  }
}

const TEXT_CHUNK =
 "Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy " +
 "nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat.";
const TEXT_CONTENT = new Array(1024 * 1024).fill(TEXT_CHUNK).join("");
const readableInput = new Blob([TEXT_CONTENT]).stream();
const readableOuput = readableInput.pipeThrough(new UpperCaseTransformStream());  
(async () => {
  const text = await new Response(readableOuput).text();
  console.log("Test OK?", text == TEXT_CONTENT.toUpperCase());
})().catch(console.error);

Here is a more generic implementation using the class PipelineStream which extends TransformStream and accepts an array of TransformStream instances as first parameter.

class UpperCaseTextStream extends TransformStream {
  constructor() {
    super({
      transform(chunk, controller) {
        controller.enqueue(chunk.toUpperCase());
      }
    });
  }
}

class PipelineStream extends TransformStream {
  constructor(transformStreams, ...strategies) {
    super({}, ...strategies);

    const readable = [super.readable, ...transformStreams]
      .reduce((readable, transform) => readable.pipeThrough(transform));

    Object.defineProperty(this, "readable", {
      get() {
        return readable;
      }
    });
  }
}

const TEXT_CHUNK =
 "Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy " +
 "nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat.";
const TEXT_CONTENT = new Array(1024 * 1024).fill(TEXT_CHUNK).join("");
const readableInput = new Blob([TEXT_CONTENT]).stream();
const readableOuput = readableInput.pipeThrough(new PipelineStream([
  new TextDecoderStream(),
  new UpperCaseTextStream(),
  new TextEncoderStream()]));
(async () => {
  const text = await new Response(readableOuput).text();
  console.log("Test OK?", text == TEXT_CONTENT.toUpperCase());
})().catch(console.error);
check_ca
  • 1,716
  • 1
  • 12
  • 17
  • Do you know if this PipelineStream is able to accept the next chunk before the last TransformStream inside has enqueued the current chunk? I was playing with a slightly different approach and found the PipelineStream waits for each internal TransformStream to finish before accepting the next chunk and calling `transform()` again, which made it a bit less optimized than explicitly chaining multiple `.pipeThrough`s. – Shruggie Jul 08 '22 at 16:40
  • I don't think this is possible. You can still try to ping the spec authors, that's what I did before implementing the solution. This helped me to validate it (I found the solution before the answer), see https://twitter.com/check_ca/status/1544817673560784896 and https://twitter.com/jaffathecake/status/1545387422414180353. – check_ca Jul 08 '22 at 17:48
  • BTW, you might mark this question as answered ;) – check_ca Jul 08 '22 at 19:31
  • Thanks for clarifying, and yes, this is the best possible answer. Appreciate it! – Shruggie Jul 08 '22 at 21:59