25

What is the best way to create a readable stream from an array and pipe values to a writable stream? I have seen substack's example using setInterval and I can implement that successfully using 0 for the interval value, but I am iterating over a lot of data and triggering gc every time is slowing things down.

// Working with the setInterval wrapper
var arr = [1, 5, 3, 6, 8, 9];

function createStream () {
    var t = new stream;
    t.readable = true;
    var times = 0;
    var iv = setInterval(function () {
        t.emit('data', arr[times]);
        if (++times === arr.length) {
            t.emit('end');
            clearInterval(iv);
        }
    }
}, 0);

// Create the writable stream s
// ....

createStream().pipe(s);

What I would like to do is emit values without the setInterval. Perhaps using the async module like this:

async.forEachSeries(arr, function(item, cb) {
    t.emit('data', item);
    cb();
}, function(err) {
 if (err) {
     console.log(err);
 }
 t.emit('end');
});

In this case I iterate the array and emit data, but never pipe any values. I have already seen shinout's ArrayStream, but I think that was created before v0.10 and it is a bit more overhead than I am looking for.

TankofVines
  • 1,107
  • 2
  • 14
  • 23
  • 1
    I don't think you will be able to get much less overhead than ArrayStream (110 sloc). async is going to be similar to substack's example in it's use of setImmediate. I don't think you need setImmediate/setInterval for every data event since you're not doing IO but you will need to handle pause/resume which ArrayStream does for you. Curious to see what answers you get. – Peter Lyons May 31 '13 at 02:59
  • Thanks for the input. I guess my biggest concern with ArrayStream was that it had not been updated since the changes in the Stream API in v0.10, but those worries could be unfounded. I was surprised that it had so few downloads which makes me believe that others are doing this differently. – TankofVines May 31 '13 at 12:51
  • Please consider unaccepting the accepted answer and accepting the one recommendign the standard `Readable.from` – Benjamin Gruenbaum Jan 16 '22 at 12:19

5 Answers5

44

You can solve this problem by creating a readable stream and pushing values into it.

Streams are a pain, but it's often easier to work with them directly than to use libraries.

Array of strings or buffers to stream

If you're working with an array of strings or buffers, this will work:

'use strict'
const Stream = require('stream')
const readable = new Stream.Readable()

readable.pipe(process.stdout)

const items = ['a', 'b', 'c']
items.forEach(item => readable.push(item))

// no more data
readable.push(null)

Notes:

  • readable.pipe(process.stdout) does two things: puts the stream into "flowing" mode and sets up the process.stdout writable stream to receive data from readable
  • the Readable#push method is for the creator of the readable stream, not the stream consumer.
  • You have to do Readable#push(null) to signal that there is no more data.

Array of non-strings to stream

To make a stream from an array of things that are neither strings nor buffers, you need both the readable stream and the writable stream to be in "Object Mode". In the example below, I made the following changes:

  • Initialize the readable stream with {objectMode: true}
  • Instead of piping to process.stdout, pipe to a simple writable stream that is in object mode.

      'use strict'
      const Stream = require('stream')
    
      const readable = new Stream.Readable({objectMode: true})
    
      const writable = new Stream.Writable({objectMode: true})
      writable._write = (object, encoding, done) => {
        console.log(object)
    
        // ready to process the next chunk
        done()
      }
    
      readable.pipe(writable)
    
      const items = [1, 2, 3]
      items.forEach(item => readable.push(item))
    
      // end the stream
      readable.push(null)
    

Performance Note

Where is the data coming from? If it's a streaming data source, it's better to manipulate the stream using a transform stream than to convert to/from an array.

Max Heiber
  • 14,346
  • 12
  • 59
  • 97
  • How can we chain the writable from your last exemple ? – Gura Apr 04 '16 at 09:39
  • 1
    @Gura, what are you trying to do? `readable.pipe(writable).pipe(somethingElse)`? If so, `writable` will have to be a Transform stream. I can add info about that if it's what you're looking for. – Max Heiber Apr 04 '16 at 15:00
  • I used Transform, it works very well ! thanks @mheiber – Gura Apr 04 '16 at 16:19
  • 3
    Fantastic answer, and especially the first resource you linked to. I found it to be incredibly thorough and useful. – Dave Voyles Aug 12 '17 at 20:38
  • @DaveVoyles-MSFT re the first resource: it's the go-to that everyone recommends, but I found the Node docs themselves invaluable for figuring out everything I needed. In particular, I tend to need transform streams and the handbook doesn't cover those. – Max Heiber Sep 22 '17 at 16:22
  • 1
    Might be missing to implement `_read()` of the Readable stream. See here for a similar answer https://stackoverflow.com/a/22085851/2012945 (strings to streams). – edmundo096 Feb 20 '18 at 16:56
  • I'm pretty sure the `readable.push()` call needs to be checking if the streams internal buffer is full, no? Otherwise, there's a risk that some elements in the array don't make it into the stream. – Ethan Sep 07 '19 at 20:51
  • Is it important to look at the return value of `push()`, and to stop `push()`ing if there is backpressure? – bennlich Dec 03 '19 at 07:28
  • @bennlich that sounds important. Would you be up for editing the answer if you find a good way to do this? – Max Heiber Dec 06 '19 at 13:33
25

As of Node 12.3, you can use stream.Readable.from(iterable, [options]) instead.

const { Readable } = require("stream");

const arr = [1, 5, 3, 6, 8, 9];
const readableStream = Readable.from(arr);

readableStream.on("data", (row) => console.log(row));

It also works with Objects

const arr = [
  { index: 1, hello: { foo: "bar" } },
  { index: 2, hello: { foo: "bar" } },
  { index: 3, hello: { foo: "bar" } },
];
const readableStream = Readable.from(arr);
VC.One
  • 14,790
  • 4
  • 25
  • 57
austin_ce
  • 1,063
  • 15
  • 28
  • It should work for an `Iterable` -- do you have an example of it not working for arrays of objects? – austin_ce May 27 '20 at 22:02
  • Oh, you're right. I had tried it in something I was working on and it didn't work. I assumed it was because the Readable wasn't in `objectMode`. But I just created a simple example and it worked. I'll delete my comment. – Cully May 28 '20 at 01:36
15

tl;dr;

This is a LIFO solution. Array.prototype.pop() has similar behavior to shift but applied to the last element in an array.

const items = [1,2,3]
const stream = new Readable({
  objectMode: true,
  read() {
    const item = items.pop()
    if (!item) {
      this.push(null);
      return;
    }
    this.push(item)
  },
})
Lajos
  • 2,549
  • 6
  • 31
  • 38
  • Should we not `shift()`, to send the array in order, instead of `pop()`. – FRD Dec 01 '19 at 00:13
  • 1
    This is a LIFO solution. Array.prototype.pop() has similar behavior to shift but applied to the last element in an array. – Lajos Dec 04 '19 at 12:10
  • Would you edit the answer to clarify that? Because all the other answers, and indeed my own expectation coming into this question, are around FIFO. – FRD Dec 16 '19 at 17:03
2

I wound up using ArrayStream for this. It did resolve the issue with the GC being triggered too often. I was getting warnings for a recursive process.nextTick from node so modified the nextTick callbacks in ArrayStream to setImmediate and that fixed the warnings and seems to be working well.

TankofVines
  • 1,107
  • 2
  • 14
  • 23
2

It's an old question, but if anyone stumbles on this, node-stream-array is a much simpler and more elegant implementation for Node.js >= v0.10

var streamify = require('stream-array'),
  os = require('os');

streamify(['1', '2', '3', os.EOL]).pipe(process.stdout);
Rudolf Meijering
  • 1,559
  • 1
  • 14
  • 20
  • 1
    Unfortunately, node-stream-array does not return a true Readable, and that may cause problem with other parts of the code (ex: missing destroy function when used with promisepipe) – Ludovic Pollet Nov 23 '18 at 15:58