3

I'm using the JavaScript fetch streams API to consume chunked JSON asynchronously like in this answer.

My application may be receiving up to 25 small JSON objects per second (one for each frame in a video) over the span of an hour.

When the incoming chunks are large (1000+ JSON objects per chunk), my code functions well - fast, minimal memory use - it can easily receive 1,000,000 JSON objects reliably.

When the incoming chunks are smaller (5 JSON objects per chunk), my code functions poorly - slow, lots of memory consumption. The browser dies at about 50,000 JSON objects.

After doing a lot of debugging in the Developer tools, it appears the problem lies in the recursive nature of the code.

I tried to remove the recursion, but it seems required because the API is reliant on my code returning a promise to chain?!

How do I remove this recursion, or should I use something other than fetch?


Code with recursion (works)

String.prototype.replaceAll = function(search, replacement) {
    var target = this;
    return target.replace(new RegExp(search, 'g'), replacement);
};

results = []

fetch('http://localhost:9999/').then(response => {
    const reader = response.body.getReader();
    td = new TextDecoder("utf-8");
    buffer = "";

    reader.read().then(function processText({ done, value }) {
        if (done) {
          console.log("Stream done.");
          return;
        }

        try {
            decoded = td.decode(value);
            buffer += decoded;
            if (decoded.length != 65536){
                toParse = "["+buffer.trim().replaceAll("\n",",")+"]";
                result = JSON.parse(toParse);
                results.push(...result);
                console.log("Received " + results.length.toString() + " objects")
                buffer = "";
            }
        }
        catch(e){
            // Doesn't need to be reported, because partial JSON result will be parsed next time around (from buffer).
            //console.log("EXCEPTION:"+e);
        }

        return reader.read().then(processText);
    })
});

Code without recursion (doesn't work)

String.prototype.replaceAll = function(search, replacement) {
    var target = this;
    return target.replace(new RegExp(search, 'g'), replacement);
};

results = []
finished = false

fetch('http://localhost:9999/').then(response => {
    const reader = response.body.getReader();
    td = new TextDecoder("utf-8");
    buffer = "";
    lastResultSize = -1

    while (!finished)
        if (lastResultSize < results.length)
        {
            lastResultSize = results.length;
            reader.read().then(function processText({ done, value }) {

                if (done) {
                  console.log("Stream done.");
                  finished = true;
                  return;
                }
                else
                    try {
                        decoded = td.decode(value);
                        //console.log("Received chunk " + decoded.length.toString() + " in length");
                        buffer += decoded;
                        if (decoded.length != 65536){
                            toParse = "["+buffer.trim().replaceAll("\n",",")+"]";
                            result = JSON.parse(toParse);
                            results.push(...result);
                            console.log("Received " + results.length.toString() + " objects")
                            buffer = "";
                            //console.log("Parsed chunk " + toParse.length.toString() + " in length");
                        }
                    }
                    catch(e) {
                        // Doesn't need to be reported, because partial JSON result will be parsed next time around (from buffer).
                        //console.log("EXCEPTION:"+e);
                    }
            })
        }
});

For completeness, here is the python code I'm using on the test server. Note the line containing sleep which changes chunking behavior:

import io
import urllib
import inspect
from http.server import HTTPServer,BaseHTTPRequestHandler
from time import sleep


class TestServer(BaseHTTPRequestHandler):

    def do_GET(self):
        args = urllib.parse.parse_qs(self.path[2:])
        args = {i:args[i][0] for i in args}
        response = ''

        self.send_response(200)
        self.send_header('Content-type', 'text/html')
        self.send_header('Access-Control-Allow-Origin', '*')
        self.send_header('Transfer-Encoding', 'chunked')
        self.end_headers()

        for i in range (1000000):
            self.wfile.write(bytes(f'{{"x":{i}, "text":"fred!"}}\n','utf-8'))
            sleep(0.001)  # Comment this out for bigger chunks sent to the client!

def main(server_port:"Port to serve on."=9999,server_address:"Local server name."=''):
    httpd = HTTPServer((server_address, server_port), TestServer)
    print(f'Serving on http://{httpd.server_name}:{httpd.server_port} ...')
    httpd.serve_forever()


if __name__ == '__main__':
    main()
QA Collective
  • 2,222
  • 21
  • 34
  • Have you tried this in other browsers and do you observe the same behavior in other browsers? – sideshowbarker Oct 12 '18 at 20:03
  • I've tried this in both Firefox and Chrome. They both behave similarly, albeit with different performance profiles-e.g. Firefox runs longer but slower, Chrome runs quicker but receives fewer JavaScript objects. – QA Collective Oct 12 '18 at 22:27
  • It might be worth filing browser bugs to get insights from them about their implementations and whether the nature of the requirements somehow constrain them at all for better optimizing for this scenario. Worst case, the reply to say they’re aware this can be an issue for developers but they don’t plan to change their implementations. Best case, they agree it’s a scenario that should be possible and they change their implementations so that the browser doesn’t end up running out of memory and crashing in that scenario. – sideshowbarker Oct 12 '18 at 22:50
  • 1
    I'm not sure its the browser itself that is the source of the problem - recursion of conservative depth is still okay in a browser. My main concern is that I can't see how to implement an iterative version of this algorithm and it really seems that the Fetch Streams API has been written supposing recursion as the best case, when in my case, it's not. Since I'm not getting much activity here, I might try the Streams API working group (https://github.com/whatwg/streams) and point them here. Thanks for your QA and ideas :) – QA Collective Oct 15 '18 at 01:55

1 Answers1

4

The part you're missing is that the function passed to .then() is always called asynchronously, i.e. with an empty stack. So there is no actual recursion here. This is also why your 'without recursion' version doesn't work.

The simple solution to this is to use async functions and the await statement. If you call read() like this:

const {value, done} = await reader.read();

...then you can call it in a loop and it will work how you would expect.

I don't know specifically where your memory leak is, but your use of global variables looks like a problem. I recommend you always put 'use strict'; at the top of your code so the compiler will catch these problems for you. Then use let or const whenever you declare a variable.

I recommend you use TextDecoderStream to avoid problems when a character is split between multiple chunks. You will also have issues when a JSON object is split between multiple chunks.

See Append child writable stream demo for how to do this safely (but note that you need TextDecoderStream where that demo has "TextDecoder").

Note also the use of a WritableStream in that demo. Firefox doesn't support it yet AFAIK, but WritableStream provides much easier syntax to consume chunks without having to explicitly loop or recurse. You can find the web streams polyfill here.

QA Collective
  • 2,222
  • 21
  • 34
lunchrhyme
  • 148
  • 6
  • Thanks for a thorough answer. In the end, the culprit was actually the Chrome/Firefox debuggers themselves!! I changed my `Console.Log` to `document.write` for debug output, closed the debugger and the first code sample worked fine with both large and small chunks. My code did already handle split characters/JSON, although not elegantly. So I did a re-write as suggested: https://pastebin.com/hWGtQZvA but it wasn't as performant as my original code (surprisingly). So the fastest and 100% native code ended up as https://pastebin.com/Dr1CQVa2 which I'll now further refine. Thanks! – QA Collective Oct 16 '18 at 05:26
  • pipeThrough() is not optimised yet, so read() still gives the best performance, as you found. – lunchrhyme Oct 17 '18 at 06:55
  • Is there a way to identify when the data was split in different chunks? (In his case you suggested TextDecoderStream, but in my case here I am receiving a amount of bytes that sometimes are getting split in different chunks...) – rsc Oct 07 '20 at 20:21
  • To identify when a character was split across multiple chunks, I think you'd need to decode the characters yourself. If the input is UTF-8, this is not too difficult, but it would of course be very slow. – lunchrhyme Oct 12 '20 at 09:21