1

I'm trying to use node.js Transform streams to combine a bunch of different datasources (essentially gzipped csvs but with a strange structure, they all have identical byte-lengths) and write the combined data as MongoDB documents without loading any of the files completely into memory. A picture tells a thousand words. It looks like this:

file1       file2      file3      file4     file5      file6 ...
   \          |          |          |         |          /
    --(transform steps: gunzip, remove newlines, etc.)---
     \        |          |          |         |        /
doc: {data1, data2, data3, data4, data5, data6, metadata} <------ combine step
                              ||
                           (MongoDB)

Each readable stream (file1, 2 etc.) is piped into its own serial pipeline of gunzip and some other transforms. Each file stream is then piped into one instance of the combiner. Just before it does, I add a streamIndex to the chunk so I can tell them apart from inside the combiner transform. Like this:

let slice = chunk.slice(i, i + length);
slice.streamIndex = this.streamIndex;
this.push(slice);

I need exactly one piece of data from each stream at the combine step, so when data is received there I look at the streamIndex of the chunk I received and buffer the data until there is enough from each source. I then combine the sources into a document, add metadata, and send it via a bulk insert to MongoDB.


The problem is that the streams read at slightly different rates, and node locks up if it has to start buffering too much data. If left unchecked one of the streams can easily run into having 1,000,000+ more entries buffered than another, which leads to huge garbage collection that essentially grinds the node process to a halt.

I've tried to add backpressure to the individual upstreams to slow them down when I know they've provided too much data by:

  • returning false from the transform / writable function (stepping through in the debugger seemed to suggest this had no effect at all, and whether the combiner was a writable stream or a transform didn't make a difference)
  • corking over-delivering streams via e.g. file1.cork()
  • manually setting the combine step's writable stream to corked via this._writableState.corked = true
  • And the most reliable-seeming, by temporarily unpiping the offending stream:

    this.streams.forEach((stream, i) => {
        if (this.dataBuffers[i].length > MAX_BUFFER_LENGTH) {
            stream.unpipe(this);
        };
    });
    

The last step (combined with re-piping when the buffers get low) keeps all the buffers smoothly sitting at exactly MAX_BUFFER_LENGTH. Only that I get a stream 'end' event at only about 10-15% through reading the data. I observed that some data was going missing (flowing out of the pipe and into the abyss) when experimenting with different things, so I can only imagine the same thing is happening here.

It seems like a really trivial issue really, and I'm pretty new to streams, so it's possible I'm just doing something stupid. Basically my question is: how do I get each stream to provide a bit of data, wait for the others to do the same, combine, read a bit more from each; and so on, without losing data (and without buffering too much)?


Final details:

  • all the connections are pipes, which should theoretically deal with backpressure out of the box ('new mode' streaming - stream.pause/resume won't work, nor would I want it to: from what I've understood that's another situation where data starts going missing). I don't want to apply back pressure to everything, just the file stream(s) providing too much data.
  • the setup works and the database ends up with the correct data when just reading one (or even two) files simultaneously - it's node's greedy model of "take whatever you can get!" with multiple streams that seems to make things compete here. It's quite possible the setup doesn't work with the unpiping model though, I'll try that next. I just had everything running without any rate-limiting, which is why I'm stuck now.
  • I can't just process the streams serially – inserting the millions of documents once, then repeatedly updating each document via a process of find and update is prohibitively slow.
  • The result I'm looking for looks a lot like what the poster of How to interleave streams (with backpressure) doesn't want, under the heading "zipAsArray". From what I understand though, bacon.js doesn't work with node streams.
  • this is supposed to be a one-off process on a standalone server, after which I will use mongodb as usual, so it doesn't matter how much CPU it chews, so long as it completes in reasonable time (with the huge garbage collection it would probably never finish in this lifetime).
Community
  • 1
  • 1
ephemer
  • 1,239
  • 8
  • 21
  • 1
    This answer to this is a multi-part blog post. I recommend slimming down the question a bit. Checkout [highland.js](http://highlandjs.org), maybe the zipAll function. As a last resort, you may be able to record the position where it begins to fail, and then re-run with that length as an offset (dependent on the type of data being read). – chrisbajorin Mar 01 '16 at 02:47
  • Thanks for the suggestion. I've tried to cut it down here: http://stackoverflow.com/questions/35738816/unpipe-without-losting-data-abridged-version. It's still quite long, but I want to avoid answers of people saying "just use a pipe/transform", because that's not possible here – ephemer Mar 02 '16 at 05:00
  • @cdbajorin I'm not sure how I can figure out when it begins to fail- it seems to just run correctly and then suddenly one or more of the input streams end. Unfortunately I don't have a good way of verifying correctness of the data at the point where it ends (only at the start). – ephemer Mar 02 '16 at 05:01
  • Is there some metadata on each stream event that links them together, or is it simply the order of items as they come in? What is the data structure of each emit? What is the source of the readable stream? Is the Readable instance your own implementation? Not all streams are equal, which is why I made the comment about a blog post. – chrisbajorin Mar 02 '16 at 19:35
  • It's a standard readable stream from `request`. The others are passing Buffers of varying lengths, other than documentsToDatabase (writable stream at the end), which receives objects from datapointsToDocuments (the merge stage) – ephemer Mar 02 '16 at 20:34
  • There's no metadata linking them together, it's just so that the merger knows which stream it's come from. It's just an index, the streams are created via map into an array, and streamIndex is the array index associated with one particular stream. But I've given up on a self-rolled solution for now and am using highland.js instead. It's slower (it waits to receive one chunk from each stream before concatonating), but it works – ephemer Mar 02 '16 at 21:39
  • It's possible this was all a false alarm and that the server has just been cutting off my requests before completion. There were only 2-10 simultaneously but it seems that was enough. I've downloaded the gzip files to the local machine (not ideal, but doable) and running through it now that way. It's about 25% through with 10 simultaneous streams so far, which is further than before. – ephemer Mar 02 '16 at 21:41
  • Update: yes it was just the connections being dropped. Pretty annoying that this isn't thrown as an error in `request`, would have saved me literally days of work on this. – ephemer Mar 03 '16 at 00:35

0 Answers0