3

I have to parse a large (100+ MB) JSON file with the following format:

{
 "metadata": {
   "account_id": 1234
   // etc.
 },
 "transactions": [
   {
     "transaction_id": 1234,
     "amount": 2
   },
   // etc. for (potentially) 1000's of lines
 ]
}

The output of this parsing is a JSON array with the account_id appended to each of the transactions:

[
 {
   "account_id": 1234,
   "transaction_id": 1234,
   "amount": 2
 },
 // etc.
]

I'm using the stream-json library to avoid loading the whole file into memory at the same time. stream-json allows me to pick individual properties, and then stream them one at a time, depending on if they're an array or object

I'm also trying to avoid parsing the JSON twice by piping the read of the JSON file to two separate streams, which is possible in nodejs.

I'm using a Transform stream for generating the output, setting a property on the Transform stream object that stores the account_id.

Pseudo code (with obvious race condition) below:

const { parser } = require('stream-json');
const { pick } = require('stream-json/filters/Pick');
const { streamArray } = require('stream-json/streamers/StreamArray');
const { streamObject } = require('stream-json/streamers/StreamObject');
const Chain = require('stream-chain');
const { Transform } = require('stream');

let createOutputObject = new Transform({
 writableObjectMode:true,
 readableObjectMode:true,
 transform(chunk, enc, next) => {
  if (createOuptutObject.account_id !== null) {
     // generate the output object
  } else {
     // Somehow store the chunk until we get the account_id...
  } 
 } 
});
createOutputObject.account_id = null;

let jsonRead = fs.createReadStream('myJSON.json');
let metadataPipline = new Chain([
  jsonRead,
  parser(),
  pick({filter: 'metadata'}),
  streamObject(),
]);

metadataPipeline.on('data', data => {
 if (data.key === 'account_id') {
  createOutputObject.account_id = data.value;
 }
});

let generatorPipeline = new Chain([
 jsonRead, // Note same Readable stream as above
 parser(),
 pick({filter: 'tracks'}),
 streamArray(),
 createOutputObject,
 transformToJSONArray(),
 fs.createWriteStream('myOutput.json')
]);

To resolve this race condition (i.e. converting to JSON array before account_id is set), I've tried:

  • Using createOutputObject.cork() to hold data up until account_id is set.
    • The data just passes through to transformToJSONArray().
  • Keeping the chunks in an array in createOutputObject until account_id is set.
    • Can't figure out how to re-add the stored chunks after account_id is set.
  • Using setImmediate() and process.nextTick() to call createOutputObject.transform later on, hoping that account_id is set.
    • Overloaded stack so that nothing could get done.

I've considered using stream-json's streamValues function, which would allow me to do a pick of metadata and transactions. But the documentation leads me to believe that all of transactions would be loaded into memory, which is what I'm trying to avoid:

As every streamer, it assumes that individual objects can fit in memory, but the whole file, or any other source, should be streamed.

Is there something else that can resolve this race condition? Is there anyway I can avoid parsing this large JSON stream twice?

Michael Dandini
  • 313
  • 1
  • 8
  • wild guess... on the input side, instead of piping the JSON.read() ... you may be able to explicitly divy it up by reading to a buffer, do arrayCopy on the buffer , converting each of 2 buffers to separate instances of "readables" then, on 2 separate inputStreams from the 2 readables. ie the arrayCopy provides encapsulation of the forked memory/objects. – Robert Rowntree Feb 22 '19 at 18:03
  • Correct me if that's not your case - you want `stream1` to run until `account_id` is found, but not write to `stream2` until you have it. I could suggest using [rereadable-stream](https://www.npmjs.com/package/rereadable-stream) I wrote and rewind the stream as soon as you found your account id. – Michał Karpacki Feb 27 '19 at 14:11

0 Answers0