0

I am receiving rows of data every second from Kafka. For each batch of data, I am inserting into my database.

My app keeps reading the last message and id of each batch. The issue here is that the promises are not running in series, but running concurrently after one batch is finished, and they keep reading the same message and id. I want each promise to have it's own message and id, as defined by the order they came in from the for-loop in the first function.

I think I need to use closures, however I am not sure how I can apply them here. I don't want to use timers!

Thanks!

// This is live data, coming in concurrently, forever. Promises from previous batch must be resolved before the next batch is received.
batchOfRows.on('message', function (data) {
    for (var i = 0; i < batchOfRows.rows.length; i++) {
        validate(batchOfRows.rows[i])
            .then(result => console.log(result))
            .catch(error => console.log(error));
    }
});

// For each row received, give it an ID and then insert into the DB
function validate(data) {
    return new Promise((resolve, reject) => {
        message = data;
        id = message.date + message.location
        DB.execute('select * from table1 where id = ?', id) // This is a promise function provided by the database driver (Cassandra)
            .then(result => {
                // Insert into the table at this ID
                insertIntoDB(message, id)
                    .then(result => resolve(result))
                    .catch(error => reject(error));
            })
            .catch(error => {
                reject(error);
            });
    });
}

// Inserting into DB
function insertIntoDB(message, id) {
    return new Promise((resolve, reject) => {
        query = "insert into table2 where id = ? and messageBody = ?";

        DB.execute(query, [id, JSON.Stringify(message)])
            .then(result => resolve("Successfully inserted message ID " + id))
            .catch(error => reject("Error inserting!"));
    });
}

EDIT (danh's solution):

var kafka = require('kafka-node');
client = new kafka.Client("localhost:2181"), Consumer = kafka.Consumer;
// This is like an event listener.
batchOfRows = new Consumer(
    client, [{
        topic: 'my_topic',
        partition: 0,
        offset: 0
    }], {
        fromOffset: false
    }
);

let results = [];
let promises = Promise.resolve();

function processQueue() {
    queue.forEach(element => {
        promises = promises.then(element.map(processElement)).then(elementResult => {
            // results.push(elementResult); // Don't want result to increase in size! I have put this inside insertDB then I clear it below
            console.log(results.length); // First received batch prints: 0. Second received batch prints 72. Third received batch prints 75
            results = [];  
            queue.shift();
        });
    });
}

batchOfRows.on('message', function (data) {
    console.log(batchOfRows.value.length); // First received batch prints: 72. Second received batch prints 75. Third received batch prints 76
    queue.push(batchOfRows.rows);
    processQueue();
});

function processElement(data) {
    const id = data.date + data.location
    return  DB.execute('select * from table1 where id = ?', id)
              .then(result => insertIntoDB(data, id).then(() => result));
}

function insertIntoDB(message, id) {
    const query = "insert into table2 where id = ? and messageBody = ?";
    return DB.execute(query, [id, JSON.Stringify(message)])
        .then(result => {
            // Pushing the result here
            results.push(result); // Seems like it does not push the results from the first batch from batchOfRows until it receives the second batch
            console.log("Test") // On the first batch prints "Test" 72 times right away
        });
}

EDIT I have modified the processQueue function just slightly by adding a element.map(processUpdate) because the batches received from batchOfRows are actually arrays, and I need to perform that DB query for each item inside that array.

I have also removed results.push(elementResult) because elementResult is actually undefined for some reason. I have moved results.push(elementResult) into insertIntoDB and named it as results.push(result). This may be where the error originates (I don't know how to return the result from insertIntoDB back to the calling promise function processQueue).

If you take a glance at insertIntoDB, if I console.log("test") it will print test the same number of times as there are elements in the batchOfRows array, signifying that it has resolved all promises in that batch. So on the first batch/message, if there are 72 rows, it will print "Test" 72 times. But if I change that console.log("Test") to simply results.push(result), or even results.push("test"), and then print results.length it will still give me 0 until the second batch completes even though I expect the length to be 72.

stark0323
  • 71
  • 1
  • 8
  • Possible duplicate of [JavaScript ES6 promise for loop](https://stackoverflow.com/questions/40328932/javascript-es6-promise-for-loop) – joaner Jun 23 '18 at 13:55
  • Here's a worry I had while we were discussing this post: simultaneous access to a queue. See this post (attended to by the most distinguished promise tag people on SO, but unanswered). https://stackoverflow.com/questions/26756463/nodejs-readers-writers-concurrency One reason I suggested writing all the data to the db as it appeared and then handling the db-persisted queue on a separate process was to let the db enforce atomicity of those operations. I think that would be a more professional solution than the one I proposed – danh Jun 25 '18 at 19:40
  • 1
    In more direct terms, could my suggested solution run `push` on the queue at the very same moment another promise is running`shift` on it? – danh Jun 25 '18 at 19:45
  • Does this answer your question? [JavaScript closure inside loops – simple practical example](https://stackoverflow.com/questions/750486/javascript-closure-inside-loops-simple-practical-example) – Liam Oct 18 '21 at 07:50

2 Answers2

3

It might be helpful to abstract the ideas a little bit, and represnt them explicitly in data (rather than data retained implictly in the promises). Start with a queue:

let queue = [];

Add stuff to the queue with queue.push(element) and get and remove in order of arrival with element = queue.shift()

Our goal is to process whatever's on the queue, in the order, saving the results in order. The processing itself is async, and we want to finish one queue item before starting the next, so we need a chain of promises (called promises) to process the queue:

let results = [];
let promises = Promise.resolve();

function processQueue() {
    queue.forEach(element => {
        promises = promises.then(processElement(element)).then(elementResult => {
            results.push(elementResult);
            queue.shift();
        });
    });
}

We can convince ourselves that this is right without even thinking about what processElement() does, so long as it returns a promise. (In the OP case, that promise is a promise to deal with an array of "rows"). processElement() will do it's thing, and the result (an array of results in the OP case) will get pushed to results.

Confident that the ordering of operations makes sense, when a new batch arrives, add it to the queue, and then process whatever's on the queue:

batchOfRows.on('message', function (data) {
    queue.push(batchOfRows.rows);
    processQueue();
});

We just need to define processElement(). Use @YuryTarabanko's helpful suggestions for that here (and leave his answer marked correct, IMO)

function processElement(data) {
    const id = data.date + data.location
    return  DB.execute('select * from table1 where id = ?', id)
              .then(result => insertIntoDB(data, id).then(() => result));
}

function insertIntoDB(message, id) {
    const query = "insert into table2 where id = ? and messageBody = ?";
    return DB.execute(query, [id, JSON.Stringify(message)])
}

One nice side-effect of this is that you can measure progress. If the inputs are arriving too fast then the expression:

queue.length - results.length

... will grow over time.

EDIT Looking at the newer code, I am puzzled by why a query is done for each row (each element in batchOfRows.rows). Since the result of that query is ignored, don't do it...

function processElement(data) {
    const id = data.date + data.location
    // we know everything we need to know to call insert (data and id)
    // just call it and return what it returns :-)
    return insertIntoDB(data, id);
}

I understand now that this will be a long-running task, and it shouldn't accumulate results (even linearly). The cleaner fix for that is remove every reference to the results array that I suggested. The minimal version of insert just inserts and returns the result of the insertion...

function insertIntoDB(message, id) {
    const query = "insert into table2 where id = ? and messageBody = ?";
    return DB.execute(query, [id, JSON.Stringify(message)]);
}

I think you added some code to log results (a better test that it worked would be to check the database via some outside process, but if you want to log, just remember to pass-through the result value after logging.

anyPromise.then(result => {
    console.log(result);
    return result;  // IMPORTANT
})
danh
  • 62,181
  • 10
  • 95
  • 136
  • Thanks for this great solution! I am sure many people will find this solution as helpful as I did. I am actually going to try to implement this method too. – stark0323 Jun 23 '18 at 18:53
  • One question: Is this viable to leave running for a long, long time? (Potentially forever)? Will memory become an issue? – stark0323 Jun 23 '18 at 19:10
  • 1
    Memory growth will be a (very non-linear) function of the rate of arrival of a batch vs the time required to process it. If you can get a batch done before the next one arrives (on average), then memory won't grow at all. However, to the extent new batches show up before previous batches are finished, memory will grow out exponentially. – danh Jun 23 '18 at 19:38
  • A batch every second, with each batch containing ~70 rows. I think it should be fine? – stark0323 Jun 23 '18 at 20:06
  • 1
    My guess would be fine, too. Try it with logging to see if the queue grows. – danh Jun 23 '18 at 20:49
  • I have replaced the results.push inside processElement with console.log(results.length). I push my query result into the results array once insertIntoDB finishes. Whenever I get the first batch (ie. on the first iteration) the results array length is 0 even though console.log(batchOfRows.rows) gives the actual value for the first message. What could be wrong here? It looks like there is an offset. – stark0323 Jun 23 '18 at 21:45
  • 1
    I'm not sure. Looking over the code again, I don't really understand the data parameter to anonymous function passed to `.on('message`, function(data)...` I don't see the definition of batchOfRows, either. Maybe, add a little more context to the question? – danh Jun 23 '18 at 23:30
  • I think if I knew how to return the result from insertIntoDB back to the calling promise function processQueue it would probably fix the problem. – stark0323 Jun 24 '18 at 00:27
  • I can study the code further, but a quick look reveals that your debugging modification to `insertIntoDB` added a then block to db.execute. That db execute promise will now return whatever the then function returns, which is nothing. After `console.log("Test")`, just add a return of whatever you want to return, like `return results;` – danh Jun 24 '18 at 14:25
  • Note that single expression arrow `=>` functions return implicitly, but when there are braces `{ }`, one must return *explicitly* with a return statement. – danh Jun 24 '18 at 14:31
  • I am still getting undefined for the elementResult even after explicitly returning the result(s). Am I doing anything wrong on this line: promises.then(element.map(processElement)).then(elementResult => { //stuff here }); Note: I am also doing some small changes right before I call insertIntoDB inside the .then() of the calling db.execute, however I made sure to return insertIntoDB as well. I will update the post. – stark0323 Jun 24 '18 at 14:54
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/173714/discussion-between-danh-and-stark0323). – danh Jun 24 '18 at 14:56
2

You have various antipatterns in your code. First you don't need to manually create a promise likely you never need to call new Promise. Second, you are breaking promise chains by not returning a nested promise from within onFulfill handler. And finally you are polluting global scope when not declaring variables id = message.date + message.location

// This is live data, coming in concurrently, forever. Promises from previous batch must be resolved before the next batch is received.
let pending = Promise.resolve([]); // previous batch starting w/ resolved promise
batchOfRows.on('message', function (data) {
    // not sure where was batchRows comming from in your code
    const nextBatch = () => Promise.all(
      data.batchOfRows.rows.map(validate)
    );

    // reassign pending to a new promise
    // whatever happend to previous promise we keep running
    pending = pending
      .then(nextBatch)
      .catch(e => console.error(e))
});

// For each row received, give it an ID and then insert into the DB
function validate(data) {
    const id = data.date + data.location
    return  DB.execute('select * from table1 where id = ?', id)
              .then(result => insertIntoDB(data, id).then(() => result));
}

// Inserting into DB
function insertIntoDB(message, id) {
    const query = "insert into table2 where id = ? and messageBody = ?";
    return DB.execute(query, [id, JSON.Stringify(message)])
}
Yury Tarabanko
  • 44,270
  • 9
  • 84
  • 98
  • Thank you so much! Do you mind explaining to me in English how exactly const nextBatch = () => Promise.all(data.batchOfRows.rows.map(validate)); works? What's the anonymous function for? Also, You are doing () => result several times inside the returned promises, what's that for and where do they end up? – stark0323 Jun 23 '18 at 17:47
  • @stark0323, was just reading this thread, just about to suggest a more explicit data structure (for clarity, at least), but it looks like you have it solved. This answer declare a little helper function called `nextBatch`, that builds an array of promises by calling validate on each element (`map`) of `batchOfRows.rows`, then passes that array of promises to `Promise.all()` – danh Jun 23 '18 at 17:54
  • @danh I (and maybe many others) would definitely find your solution useful (if not more useful). Do you mind sharing your code for clarity? – stark0323 Jun 23 '18 at 17:56
  • I agree with this answer. I've added one below that spells out the objectives with a little bit more data represented explicitly, but borrows from the code here, which is why I'd advise leaving this one marked correct. – danh Jun 23 '18 at 18:24