I am receiving rows of data every second from Kafka. For each batch of data, I am inserting into my database.
My app keeps reading the last message
and id
of each batch. The issue here is that the promises are not running in series, but running concurrently after one batch is finished, and they keep reading the same message
and id
. I want each promise to have it's own message
and id
, as defined by the order they came in from the for-loop in the first function.
I think I need to use closures, however I am not sure how I can apply them here. I don't want to use timers!
Thanks!
// This is live data, coming in concurrently, forever. Promises from previous batch must be resolved before the next batch is received.
batchOfRows.on('message', function (data) {
for (var i = 0; i < batchOfRows.rows.length; i++) {
validate(batchOfRows.rows[i])
.then(result => console.log(result))
.catch(error => console.log(error));
}
});
// For each row received, give it an ID and then insert into the DB
function validate(data) {
return new Promise((resolve, reject) => {
message = data;
id = message.date + message.location
DB.execute('select * from table1 where id = ?', id) // This is a promise function provided by the database driver (Cassandra)
.then(result => {
// Insert into the table at this ID
insertIntoDB(message, id)
.then(result => resolve(result))
.catch(error => reject(error));
})
.catch(error => {
reject(error);
});
});
}
// Inserting into DB
function insertIntoDB(message, id) {
return new Promise((resolve, reject) => {
query = "insert into table2 where id = ? and messageBody = ?";
DB.execute(query, [id, JSON.Stringify(message)])
.then(result => resolve("Successfully inserted message ID " + id))
.catch(error => reject("Error inserting!"));
});
}
EDIT (danh's solution):
var kafka = require('kafka-node');
client = new kafka.Client("localhost:2181"), Consumer = kafka.Consumer;
// This is like an event listener.
batchOfRows = new Consumer(
client, [{
topic: 'my_topic',
partition: 0,
offset: 0
}], {
fromOffset: false
}
);
let results = [];
let promises = Promise.resolve();
function processQueue() {
queue.forEach(element => {
promises = promises.then(element.map(processElement)).then(elementResult => {
// results.push(elementResult); // Don't want result to increase in size! I have put this inside insertDB then I clear it below
console.log(results.length); // First received batch prints: 0. Second received batch prints 72. Third received batch prints 75
results = [];
queue.shift();
});
});
}
batchOfRows.on('message', function (data) {
console.log(batchOfRows.value.length); // First received batch prints: 72. Second received batch prints 75. Third received batch prints 76
queue.push(batchOfRows.rows);
processQueue();
});
function processElement(data) {
const id = data.date + data.location
return DB.execute('select * from table1 where id = ?', id)
.then(result => insertIntoDB(data, id).then(() => result));
}
function insertIntoDB(message, id) {
const query = "insert into table2 where id = ? and messageBody = ?";
return DB.execute(query, [id, JSON.Stringify(message)])
.then(result => {
// Pushing the result here
results.push(result); // Seems like it does not push the results from the first batch from batchOfRows until it receives the second batch
console.log("Test") // On the first batch prints "Test" 72 times right away
});
}
EDIT I have modified the processQueue function just slightly by adding a element.map(processUpdate) because the batches received from batchOfRows are actually arrays, and I need to perform that DB query for each item inside that array.
I have also removed results.push(elementResult) because elementResult is actually undefined for some reason. I have moved results.push(elementResult) into insertIntoDB and named it as results.push(result). This may be where the error originates (I don't know how to return the result from insertIntoDB back to the calling promise function processQueue).
If you take a glance at insertIntoDB, if I console.log("test") it will print test the same number of times as there are elements in the batchOfRows array, signifying that it has resolved all promises in that batch. So on the first batch/message, if there are 72 rows, it will print "Test" 72 times. But if I change that console.log("Test") to simply results.push(result), or even results.push("test"), and then print results.length it will still give me 0 until the second batch completes even though I expect the length to be 72.