I'm using the JavaScript fetch
streams API to consume chunked JSON asynchronously like in this answer.
My application may be receiving up to 25 small JSON objects per second (one for each frame in a video) over the span of an hour.
When the incoming chunks are large (1000+ JSON objects per chunk), my code functions well - fast, minimal memory use - it can easily receive 1,000,000 JSON objects reliably.
When the incoming chunks are smaller (5 JSON objects per chunk), my code functions poorly - slow, lots of memory consumption. The browser dies at about 50,000 JSON objects.
After doing a lot of debugging in the Developer tools, it appears the problem lies in the recursive nature of the code.
I tried to remove the recursion, but it seems required because the API is reliant on my code returning a promise to chain?!
How do I remove this recursion, or should I use something other than fetch
?
Code with recursion (works)
String.prototype.replaceAll = function(search, replacement) {
var target = this;
return target.replace(new RegExp(search, 'g'), replacement);
};
results = []
fetch('http://localhost:9999/').then(response => {
const reader = response.body.getReader();
td = new TextDecoder("utf-8");
buffer = "";
reader.read().then(function processText({ done, value }) {
if (done) {
console.log("Stream done.");
return;
}
try {
decoded = td.decode(value);
buffer += decoded;
if (decoded.length != 65536){
toParse = "["+buffer.trim().replaceAll("\n",",")+"]";
result = JSON.parse(toParse);
results.push(...result);
console.log("Received " + results.length.toString() + " objects")
buffer = "";
}
}
catch(e){
// Doesn't need to be reported, because partial JSON result will be parsed next time around (from buffer).
//console.log("EXCEPTION:"+e);
}
return reader.read().then(processText);
})
});
Code without recursion (doesn't work)
String.prototype.replaceAll = function(search, replacement) {
var target = this;
return target.replace(new RegExp(search, 'g'), replacement);
};
results = []
finished = false
fetch('http://localhost:9999/').then(response => {
const reader = response.body.getReader();
td = new TextDecoder("utf-8");
buffer = "";
lastResultSize = -1
while (!finished)
if (lastResultSize < results.length)
{
lastResultSize = results.length;
reader.read().then(function processText({ done, value }) {
if (done) {
console.log("Stream done.");
finished = true;
return;
}
else
try {
decoded = td.decode(value);
//console.log("Received chunk " + decoded.length.toString() + " in length");
buffer += decoded;
if (decoded.length != 65536){
toParse = "["+buffer.trim().replaceAll("\n",",")+"]";
result = JSON.parse(toParse);
results.push(...result);
console.log("Received " + results.length.toString() + " objects")
buffer = "";
//console.log("Parsed chunk " + toParse.length.toString() + " in length");
}
}
catch(e) {
// Doesn't need to be reported, because partial JSON result will be parsed next time around (from buffer).
//console.log("EXCEPTION:"+e);
}
})
}
});
For completeness, here is the python code I'm using on the test server. Note the line containing sleep which changes chunking behavior:
import io
import urllib
import inspect
from http.server import HTTPServer,BaseHTTPRequestHandler
from time import sleep
class TestServer(BaseHTTPRequestHandler):
def do_GET(self):
args = urllib.parse.parse_qs(self.path[2:])
args = {i:args[i][0] for i in args}
response = ''
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Transfer-Encoding', 'chunked')
self.end_headers()
for i in range (1000000):
self.wfile.write(bytes(f'{{"x":{i}, "text":"fred!"}}\n','utf-8'))
sleep(0.001) # Comment this out for bigger chunks sent to the client!
def main(server_port:"Port to serve on."=9999,server_address:"Local server name."=''):
httpd = HTTPServer((server_address, server_port), TestServer)
print(f'Serving on http://{httpd.server_name}:{httpd.server_port} ...')
httpd.serve_forever()
if __name__ == '__main__':
main()