I'm trying to use node.js Transform streams to combine a bunch of different datasources (essentially gzipped csvs but with a strange structure, they all have identical byte-lengths) and write the combined data as MongoDB documents without loading any of the files completely into memory. A picture tells a thousand words. It looks like this:
file1 file2 file3 file4 file5 file6 ...
\ | | | | /
--(transform steps: gunzip, remove newlines, etc.)---
\ | | | | /
doc: {data1, data2, data3, data4, data5, data6, metadata} <------ combine step
||
(MongoDB)
Each readable stream (file1, 2 etc.) is piped into its own serial pipeline of gunzip and some other transforms. Each file stream is then piped into one instance of the combiner. Just before it does, I add a streamIndex to the chunk so I can tell them apart from inside the combiner transform. Like this:
let slice = chunk.slice(i, i + length);
slice.streamIndex = this.streamIndex;
this.push(slice);
I need exactly one piece of data from each stream at the combine step, so when data is received there I look at the streamIndex of the chunk I received and buffer the data until there is enough from each source. I then combine the sources into a document, add metadata, and send it via a bulk insert to MongoDB.
The problem is that the streams read at slightly different rates, and node locks up if it has to start buffering too much data. If left unchecked one of the streams can easily run into having 1,000,000+ more entries buffered than another, which leads to huge garbage collection that essentially grinds the node process to a halt.
I've tried to add backpressure to the individual upstreams to slow them down when I know they've provided too much data by:
- returning false from the transform / writable function (stepping through in the debugger seemed to suggest this had no effect at all, and whether the combiner was a writable stream or a transform didn't make a difference)
- corking over-delivering streams via e.g.
file1.cork()
- manually setting the combine step's writable stream to corked via
this._writableState.corked = true
And the most reliable-seeming, by temporarily unpiping the offending stream:
this.streams.forEach((stream, i) => { if (this.dataBuffers[i].length > MAX_BUFFER_LENGTH) { stream.unpipe(this); }; });
The last step (combined with re-piping when the buffers get low) keeps all the buffers smoothly sitting at exactly MAX_BUFFER_LENGTH
. Only that I get a stream 'end' event at only about 10-15% through reading the data. I observed that some data was going missing (flowing out of the pipe and into the abyss) when experimenting with different things, so I can only imagine the same thing is happening here.
It seems like a really trivial issue really, and I'm pretty new to streams, so it's possible I'm just doing something stupid. Basically my question is: how do I get each stream to provide a bit of data, wait for the others to do the same, combine, read a bit more from each; and so on, without losing data (and without buffering too much)?
Final details:
- all the connections are pipes, which should theoretically deal with backpressure out of the box ('new mode' streaming - stream.pause/resume won't work, nor would I want it to: from what I've understood that's another situation where data starts going missing). I don't want to apply back pressure to everything, just the file stream(s) providing too much data.
- the setup works and the database ends up with the correct data when just reading one (or even two) files simultaneously - it's node's greedy model of "take whatever you can get!" with multiple streams that seems to make things compete here. It's quite possible the setup doesn't work with the unpiping model though, I'll try that next. I just had everything running without any rate-limiting, which is why I'm stuck now.
- I can't just process the streams serially – inserting the millions of documents once, then repeatedly updating each document via a process of find and update is prohibitively slow.
- The result I'm looking for looks a lot like what the poster of How to interleave streams (with backpressure) doesn't want, under the heading "zipAsArray". From what I understand though, bacon.js doesn't work with node streams.
- this is supposed to be a one-off process on a standalone server, after which I will use mongodb as usual, so it doesn't matter how much CPU it chews, so long as it completes in reasonable time (with the huge garbage collection it would probably never finish in this lifetime).