4

I am stuck at an architectural decision. I have Node + Express app, It has an API to upload files. After the upload is done, the response is closed and uploaded file is processed by FFMPEG batch wise with the help of Bull Queue + Redis. This structure works fine but recently I started testing on Server side events to give updates about processing to the end user. But I am unable to pass the response object to Bull Queue to write regular updates from server to user.

1. Imports

import childProcess from 'child_process';
import Bull from 'bull'
const Queue = new Bull('background_job', {redis: {port: process.env.port, host: process.env.host, password: process.env.password}});

2. Upload function

const uploadVideo = async(req, res) => {
    try{
        const result = await authUser(req);
        const result2 = await checkUploadFile(result);
        const result3 = await insertPost(result2, res);
        await Queue.add(result3.data, result3.opts)
    } catch(err){
        res.status(403).send(err);
    }
}

3. Promises

const authUser = (req) => {
    return new Promise((resolve, reject) => {
      //do some work
    })
}

const checkUploadFile = (result) => {
    return new Promise((resolve, reject) => {
      //do some more work
    })
}

const insertPost= (result, res) => {
    return new Promise((resolve, reject) => {
      //do final work
       ...........
      //preparing server side events
       const headers = {
            'Content-Type': 'text/event-stream',
            'Connection': 'keep-alive',
            'Cache-Control': 'no-cache',
            'Access-Control-Allow-Origin': '*'
        };
        res.writeHead(200, headers);
        res.write(JSON.stringify({status: true, id: 1})); //testing server side events for the first time

        //Let's continue to Bull
        const data = {res: res} <- error here: TypeError: Converting circular structure to JSON 
        const opts = {removeOnComplete: true, removeOnFail: true}
        resolve({data: data, opts: opts});
    })
}

4. Queue Process

Queue.process((job, done) => {
    const res = job.data.res
    childProcess.execFile('someScript.sh', [`some`, `arguments`], { stdio: ['pipe', 'pipe', 'ignore']}, (err, stderr, stdout) => {
        if(err){
            done(new Error("Failed: " + err))
            res.write(JSON.stringify({status: true, id: 2})); //here using SSE
            res.end()
        } else {
            done()
            res.write(JSON.stringify({status: false})); //here using SSE
            res.end()
        }
    })
})

5. Error logged by PM2

TypeError: Converting circular structure to JSON
    --> starting at object with constructor 'Socket'
    |     property 'parser' -> object with constructor 'HTTPParser'
    --- property 'socket' closes the circle

I tried to use JSON.stringify(res) to pass the response object as JSON but that didn't work either. Now I'm considering if this approach is right or should I go with Socket.io (which is an overkill for a simple server side events)

Thank you

2 Answers2

4

Why do you even write this line:

const data = {res: res} <- error here: TypeError: Converting circular structure to JSON.

You still have access to the response object in uploadVideo function where you call insertPost. so it can simply be:

await Queue.add(res, result3.opts).

For instance:

const uploadVideo = async(req, res) => {
    try{
        const result = await authUser(req);
        const result2 = await checkUploadFile(result);
        const result3 = await insertPost(result2, res);
        await Queue.add(res, result3.opts); // still have access to res
    } catch(err){
        res.status(403).send(err);
    }

Remove this line:

const data = {res: res} <- error here: TypeError: Converting circular structure to JSON 

Just use response

Queue.process((res, done) => {
    //const res = job.data.res
    childProcess.execFile('someScript.sh', [`some`, `arguments`], { stdio: ['pipe', 'pipe', 'ignore']}, (err, stderr, stdout) => {
        if(err){
            done(new Error("Failed: " + err))
            res.write(JSON.stringify({status: true, id: 2})); //here using SSE
            res.end()
        } else {
            done()
            res.write(JSON.stringify({status: false})); //here using SSE
            res.end()
        }
    })
});

Edit:

I see what you mean. Had a look at the bull module. Why cant you do something like this.

const uploadVideo = async(req, res) => {
  try{
      res.jobId = 0; // we need a way to know if job completed is our request          const result = await authUser(req);
      const result2 = await checkUploadFile(result);
      const result3 = await insertPost(result2, res);
      Queue.add({id: res.jobId, somedatafromresult3: 'result3.somedata' }, result3.opts);
      Queue.on("completed", (err, data) => {
        if (data.id === res.jobId) { // check to see if completed job is our one.
          res.write(JSON.stringify(data)); //here using SSE
          res.end()
        }
        console.log(data);
      });
  } catch(err){
      res.status(403).send(err);
  }
}

Then in your process function, simply return the data which will be emitted. i.e

  videoQueue.process(function(job, done){
  childProcess.execFile('someScript.sh', [`some`, `arguments`], { stdio: ['pipe', 'pipe', 'ignore']}, (err, stderr, stdout) => {
    if(err){
        done(err, {status: true, id: job.data.id});
    } else {
      done(null, {status: false, id: job.data.id});
    }
})
})

;

kg99
  • 746
  • 2
  • 14
  • This doesn't work. It's throwing the same error. And I am not directly passing `res` from `uploadVideo` function because I need to pass other data too in the `Queue.add` method. – Abhinandan Chakraborty Nov 28 '20 at 09:50
  • How can it throw the same error since "res" is not assigned anywhere. The problem was const data = {res: res}. SO if you removed it how can the error still be there. Yes, but you can extend the parameters that Queue.add takes. – kg99 Nov 28 '20 at 09:53
  • On which line is it causing the error now? – kg99 Nov 28 '20 at 09:53
  • Also where is the definition of Queue.add() cause i mistook Queue.process for Queue.add. – kg99 Nov 28 '20 at 09:55
  • `Queue.add(yourData, bullsOptions)` is provided by `Bull`. Bull waits for the queue to finish any existing task and then forward this to `Queue.process` which provides two callback parameters `job` & `done`. `job` can now be used to access custom data which was passed from `Queue.add` like this `const myObj = job.data.YourCustomObjName`. – Abhinandan Chakraborty Nov 28 '20 at 10:15
  • Hi, thanks for your response, I am accepting your answer. I just made a little change in my source code. That is I added `Queue.add` & `Queue.process` inside a new function `startQueue(res, data, opts)`. The idea is to allow Bull to manage random jobId, have access to `res` object & to execute more shell script inside the `Queue.process`. However, my only doubt is if calling this function everytime a new API request is made will make the Bull lose track of existing task queue? or increase memory usage? – Abhinandan Chakraborty Nov 28 '20 at 13:55
  • Try it and have a look at the memory footprint. – kg99 Nov 28 '20 at 14:12
  • Thanks, so far everything looks great in first couple of test. Thanks again for your help! – Abhinandan Chakraborty Nov 28 '20 at 14:14
0

You can use job.progress() to communicate with the route that's connected to the client via SSE. Update the progress with job.progress(percent), passing in a number. The Express route scope can then spin on this and emit SSE events to the client as the job progresses.

Here's a basic runnable example as a proof of concept you can add your processing, error handling and job.progress and SSE logic onto.

const express = require("express");
const fs = require("fs").promises;
const path = require("path");
const Queue = require("bull");

const sleep = (ms=1000) =>
  new Promise(resolve => setTimeout(resolve, ms))
;

const queue = new Queue("test", process.env.REDIS_URL);
queue.process(4, async job => {
  for (let i = 1; i <= job.data.seconds; i++) {
    await job.progress(i / job.data.seconds * 100 | 0);
    await sleep();
  }

  return Promise.resolve(`job ${job.id} complete!`);
});

const app = express();
app
  .set("port", process.env.PORT || 5000)
  .get("/", async (req, res) => {
    try {
      res.set({
        "Access-Control-Allow-Origin": "*",
        "Cache-Control": "no-cache",
        "Connection": "keep-alive",
        "Content-Type": "text/event-stream",
      });
      res.flushHeaders();

      const job = await queue.add({
        seconds: Math.abs(+req.query.seconds) || 10,
      });

      let connected = true;
      res.on("close", () => {
        connected = false;
      });
  
      for (; connected; await sleep()) {
        const j = await queue.getJob(job.id);
        const progress = await j.progress();
        res.write(`${progress}\n`);

        if (progress >= 100) { // TODO handle job errors
          break;
        }
      }

      res.write(await job.finished());
    }
    catch (err) {
      res.write(err.message);
    }
    finally {
      res.end();
    }
  })
  .listen(app.get("port"), () =>
    console.log(`server listening on port ${app.get("port")}`)
  )
;

Sample run:

$ curl localhost:5000
0
10
20
30
40
50
60
70
80
90
100
job 64 complete!

See also How to use server-sent-events in express.js which has a sample client that can read the response stream.

ggorlen
  • 44,755
  • 7
  • 76
  • 106