It's working example and I am looking ways to improve it further and provide performance boost at code level and if not looking for other scalability option.
Things that I applied:
- Nodejs stream -> Improved read and parse operation for big csv file with back pressure.
- Used await loop for batching -> for handling back-pressue and removed the nodejs memory heap error
- Passing task in Promise.all for async operation
- Database pooling -> Improved performace however need to be cautious with max pool size
- Child process to split file and then apply above operations -> could not achieved any performance boost yet.
- Better Error handling for each async operation to get which process works and which throws error.
I need to process the data in csv file which can be as big as 1 million rows.
After processing each row, have to check the values from database for some validation and then insert the processed data to different tables.
I started with simple node stream and async library however the speed was way to low, 2k rows in 3-5 minutes and sometime it throws nodejs memory error
After some iteration and research have improved the speed to 5k rows in 20-25 seconds and avoided the memory full error.
Bottleneck is even if I change the batch size the time does not change, have added pooling for database.
I can increase the pool size to increase the speed, however need to know how do we decide the max pool size. If pool connection are for each connections or overall connection, given that I cannot change the default connection for mysql as don't have admin access to it
What are some ways to increase this speed?
Here is the code#
insertBig.ts
import { Request, Response } from 'express';
import * as fs from 'fs';
import * as path from 'path';
import { getProductById, insertProduct } from '../repo';
import { performance } from 'perf_hooks';
import Papa from 'papaparse';
let processedNum = 0;
const insertBig = async (req: Request, res: Response) => {
try {
const filePath = path.normalize(`${__dirname}./../assets/test.csv`);
importCSV(filePath).catch((error) => {
return res.status(500).send('Some error here in isert biG 123');
});
return res.status(200).json({ data: 'Data send for processing 123' });
} catch (error) {
return res.status(500).send('Some error here in isert biG 123');
}
};
async function importCSV(filePath: fs.PathLike) {
let parsedNum = 0;
const dataStream = fs.createReadStream(filePath);
const parseStream = Papa.parse(Papa.NODE_STREAM_INPUT, {
header: true
});
dataStream.pipe(parseStream);
let buffer = [];
let totalTime = 0;
const startTime = performance.now();
for await (const row of parseStream) {
//console.log('PA#', parsedNum, ': parsed');
buffer.push(row);
parsedNum++;
if (parsedNum % 400 == 0) {
await dataForProcessing(buffer);
buffer = [];
}
}
totalTime = totalTime + (performance.now() - startTime);
console.log(`Parsed ${parsedNum} rows and took ${totalTime} seconds`);
}
const wrapTask = async (promise: any) => {
try {
return await promise;
} catch (e) {
return e;
}
};
const handle = async (promise: Promise<any>) => {
try {
const data = await promise;
return [data, undefined];
} catch (error) {
return await Promise.resolve([undefined, error]);
}
};
const dataForProcessing = async (arrayItems: any) => {
const tasks = arrayItems.map(task);
const startTime = performance.now();
console.log(`Tasks starting...`);
console.log('DW#', processedNum, ': dirty work START');
try {
await Promise.all(tasks.map(wrapTask));
console.log(
`Task finished in ${performance.now() - startTime} miliseconds with,`
);
processedNum++;
} catch (e) {
console.log('should not happen but we never know', e);
}
};
const task = async (item: any) => {
let table = 'Product';
if (item.contactNumber == '9999999999') {
table = 'random table'; // to create read error
}
if (item.contactNumber == '11111111111') {
item.randomRow = 'random'; // to create insert error
}
// To add some read process
const [data, readError] = await handle(getProductById(2, table));
if (readError) {
return 'Some error in read of table';
}
//console.log(JSON.parse(JSON.stringify(data))[0]['customerName']);
data;
// To add some write process
const [insertId, insertErr] = await handle(insertProduct(item));
if (insertErr) {
return `Some error to log and continue process for ${item}`;
}
return `Done for ${insertId}`;
};
export { insertBig };
repo.ts
import pool from './dbConfig';
const getProducts = (table: any) => {
return new Promise((resolve, reject) => {
pool.query(`SELECT * FROM ${table}`, (error, results) => {
if (error) {
reject(error);
}
resolve(results);
});
});
};
const getProductById = (id: any, table: any) => {
return new Promise((resolve, reject) => {
pool.query(`SELECT * FROM ${table} WHERE id = ${id}`, (error, results) => {
if (error) {
reject(error);
}
resolve(results);
});
});
};
const insertProduct = (data: any) => {
return new Promise((resolve, reject) => {
pool.query(
`INSERT INTO Product SET ?`,
[
{
...data
}
],
(error, results) => {
if (error) {
reject(error);
}
resolve(results);
}
);
});
};
export { insertProduct, getProducts, getProductById };
dbConfig.ts
import mysql from 'mysql';
import * as dotenv from 'dotenv';
dotenv.config();
const dbConn = {
connectionLimit: 80,
host: process.env.DB_HOST,
user: process.env.DB_USER,
password: process.env.DB_PASSWORD,
database: process.env.DB_NAME
};
const pool = mysql.createPool(dbConn);
pool.getConnection((err, connection) => {
if (err) {
if (err.code === 'PROTOCOL_CONNECTION_LOST') {
console.error('Database connection was closed.');
}
if (err.code === 'ER_CON_COUNT_ERROR') {
console.error('Database has to many connections');
}
if (err.code === 'ECONNREFUSED') {
console.error('Database connection was refused');
}
}
if (connection) {
connection.release();
}
console.log('DB pool is Connected');
return;
});
// pool.query = promisify(pool.query);
export default pool;
seed data
import { OkPacket } from 'mysql';
import pool from './dbConfig';
const seed = () => {
const queryString = `CREATE TABLE IF NOT EXISTS Product (
id int(11) NOT NULL,
customerName varchar(100) DEFAULT NULL,
contactNumber varchar(100) DEFAULT NULL,
modelName varchar(255) NOT NULL,
retailerName varchar(100) NOT NULL,
dateOfPurchase varchar(100) NOT NULL,
voucherCode varchar(100) NOT NULL,
voucherValue int(10) DEFAULT NULL,
surveyUrl varchar(255) NOT NULL,
surveyId varchar(255) NOT NULL,
createdAt timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
updatedAt timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;`;
pool.query(queryString, (err, result) => {
if (err) {
console.log(err);
}
const insertId = (<OkPacket>result).insertId;
console.log(insertId);
});
};
export default seed;
Indexing can be done to increase read from mysql
Does increasing the child process will make any impact as nodejs is itself calling the dataProcessing asynchronously with all available pool connection.
As per one comment: I broke the file into 8 parts , It actually reduced the processing and seems irrelevant in terms of performance boost.
child.ts
import * as fs from 'fs';
import * as path from 'path';
import { getProductById, insertProduct } from '../repo';
import { performance } from 'perf_hooks';
import Papa from 'papaparse';
let processedNum = 0;
process.on('message', async function (message: any) {
console.log('[child] received message from server:', message);
JSON.stringify(process.argv);
const filePath = path.normalize(
`${__dirname}./../output/output.csv.${message}`
);
let time = await importCSV(filePath, message);
if (process.send) {
process.send({
child: process.pid,
result: message + 1,
time: time
});
}
process.disconnect();
});
async function importCSV(filePath: fs.PathLike, message: any) {
let parsedNum = 0;
const dataStream = fs.createReadStream(filePath);
const parseStream = Papa.parse(Papa.NODE_STREAM_INPUT, {
header: true
});
dataStream.pipe(parseStream);
let buffer = [];
let totalTime = 0;
const startTime = performance.now();
for await (const row of parseStream) {
// console.log('Child Server # :', message, 'PA#', parsedNum, ': parsed');
buffer.push(row);
parsedNum++;
if (parsedNum % 400 == 0) {
await dataForProcessing(buffer, message);
buffer = [];
}
}
totalTime = totalTime + (performance.now() - startTime);
// console.log(
// `Child Server ${message} : Parsed ${parsedNum} rows and took ${totalTime} seconds`
// );
return totalTime;
}
const wrapTask = async (promise: any) => {
try {
return await promise;
} catch (e) {
return e;
}
};
const handle = async (promise: Promise<any>) => {
try {
const data = await promise;
return [data, undefined];
} catch (error) {
return await Promise.resolve([undefined, error]);
}
};
const dataForProcessing = async (arrayItems: any, message: any) => {
const tasks = arrayItems.map(task);
const startTime = performance.now();
console.log(`Tasks starting... from server ${message}`);
console.log('CS#: ', message, 'DW#:', processedNum, ': dirty work START');
try {
await Promise.all(tasks.map(wrapTask));
console.log(
`Task finished in ${performance.now() - startTime} miliseconds with,`
);
processedNum++;
} catch (e) {
console.log('should not happen but we never know', e);
}
};
const task = async (item: any) => {
let table = 'Product';
if (item.contactNumber == '8800210524') {
table = 'random table'; // ro create read error
}
if (item.contactNumber == '9134743017') {
item.randomRow = 'random'; // to create insert error
}
// To add some read process
const [data, readError] = await handle(getProductById(2, table));
if (readError) {
return 'Some error in read of table';
}
//console.log(JSON.parse(JSON.stringify(data))[0]['customerName']);
data;
// To add some write process
const [insertId, insertErr] = await handle(insertProduct(item));
if (insertErr) {
return `Some error to log and continue process for ${item}`;
}
return `Done for ${insertId}`;
};
parent.ts
import { Request, Response } from 'express';
var child_process = require('child_process');
const insertBigChildProcess = async (req: Request, res: Response) => {
try {
var numchild = require('os').cpus().length;
var done = 0;
let totalProcessTime: any[] = [];
for (var i = 1; i <= numchild; i++) {
const child = child_process.fork(__dirname + '/child.ts');
child.send(i);
child.on('message', function (message: any) {
console.log('[parent] received message from child:', message);
totalProcessTime.push(message.time);
const sum = totalProcessTime.reduce(
(partial_sum, a) => partial_sum + a,
0
);
console.log(sum); // 6
console.log(totalProcessTime);
done++;
if (done === numchild) {
console.log('[parent] received all results');
}
});
}
return res.status(200).json({ data: 'Data send for processing 123' });
} catch (error) {
return res.status(500).send('Some error here in isert biG 123');
}
};