1

It's working example and I am looking ways to improve it further and provide performance boost at code level and if not looking for other scalability option.

Things that I applied:

  • Nodejs stream -> Improved read and parse operation for big csv file with back pressure.
  • Used await loop for batching -> for handling back-pressue and removed the nodejs memory heap error
  • Passing task in Promise.all for async operation
  • Database pooling -> Improved performace however need to be cautious with max pool size
  • Child process to split file and then apply above operations -> could not achieved any performance boost yet.
  • Better Error handling for each async operation to get which process works and which throws error.

I need to process the data in csv file which can be as big as 1 million rows.

After processing each row, have to check the values from database for some validation and then insert the processed data to different tables.

I started with simple node stream and async library however the speed was way to low, 2k rows in 3-5 minutes and sometime it throws nodejs memory error

After some iteration and research have improved the speed to 5k rows in 20-25 seconds and avoided the memory full error.

Bottleneck is even if I change the batch size the time does not change, have added pooling for database.

I can increase the pool size to increase the speed, however need to know how do we decide the max pool size. If pool connection are for each connections or overall connection, given that I cannot change the default connection for mysql as don't have admin access to it

What are some ways to increase this speed?

Here is the code#

insertBig.ts

import { Request, Response } from 'express';
import * as fs from 'fs';
import * as path from 'path';
import { getProductById, insertProduct } from '../repo';
import { performance } from 'perf_hooks';

import Papa from 'papaparse';

let processedNum = 0;

const insertBig = async (req: Request, res: Response) => {
  try {
    const filePath = path.normalize(`${__dirname}./../assets/test.csv`);
    importCSV(filePath).catch((error) => {
      return res.status(500).send('Some error here in isert biG 123');
    });
    return res.status(200).json({ data: 'Data send for processing 123' });
  } catch (error) {
    return res.status(500).send('Some error here in isert biG 123');
  }
};

async function importCSV(filePath: fs.PathLike) {
  let parsedNum = 0;
  const dataStream = fs.createReadStream(filePath);
  const parseStream = Papa.parse(Papa.NODE_STREAM_INPUT, {
    header: true
  });
  dataStream.pipe(parseStream);
  let buffer = [];
  let totalTime = 0;
  const startTime = performance.now();
  for await (const row of parseStream) {
    //console.log('PA#', parsedNum, ': parsed');
    buffer.push(row);
    parsedNum++;
    if (parsedNum % 400 == 0) {
      await dataForProcessing(buffer);
      buffer = [];
    }
  }
  totalTime = totalTime + (performance.now() - startTime);
  console.log(`Parsed ${parsedNum} rows and took ${totalTime} seconds`);
}

const wrapTask = async (promise: any) => {
  try {
    return await promise;
  } catch (e) {
    return e;
  }
};

const handle = async (promise: Promise<any>) => {
  try {
    const data = await promise;
    return [data, undefined];
  } catch (error) {
    return await Promise.resolve([undefined, error]);
  }
};

const dataForProcessing = async (arrayItems: any) => {
  const tasks = arrayItems.map(task);
  const startTime = performance.now();
  console.log(`Tasks starting...`);
  console.log('DW#', processedNum, ': dirty work START');
  try {
    await Promise.all(tasks.map(wrapTask));

    console.log(
      `Task finished in ${performance.now() - startTime} miliseconds with,`
    );
    processedNum++;
  } catch (e) {
    console.log('should not happen but we never know', e);
  }
};

const task = async (item: any) => {
  let table = 'Product';

  if (item.contactNumber == '9999999999') {
    table = 'random table'; // to create read error
  }
  if (item.contactNumber == '11111111111') {
    item.randomRow = 'random'; // to create insert error
  }

  // To add some read process
  const [data, readError] = await handle(getProductById(2, table));
  if (readError) {
    return 'Some error in read of table';
  }
  //console.log(JSON.parse(JSON.stringify(data))[0]['customerName']);
  data;
  // To add some write process
  const [insertId, insertErr] = await handle(insertProduct(item));
  if (insertErr) {
    return `Some error to log and continue process for ${item}`;
  }
  return `Done for ${insertId}`;
};

export { insertBig };

repo.ts

import pool from './dbConfig';

const getProducts = (table: any) => {
  return new Promise((resolve, reject) => {
    pool.query(`SELECT * FROM ${table}`, (error, results) => {
      if (error) {
        reject(error);
      }
      resolve(results);
    });
  });
};
const getProductById = (id: any, table: any) => {
  return new Promise((resolve, reject) => {
    pool.query(`SELECT * FROM ${table} WHERE id = ${id}`, (error, results) => {
      if (error) {
        reject(error);
      }
      resolve(results);
    });
  });
};

const insertProduct = (data: any) => {
  return new Promise((resolve, reject) => {
    pool.query(
      `INSERT INTO Product SET ?`,
      [
        {
          ...data
        }
      ],
      (error, results) => {
        if (error) {
          reject(error);
        }
        resolve(results);
      }
    );
  });
};

export { insertProduct, getProducts, getProductById };

dbConfig.ts

import mysql from 'mysql';
import * as dotenv from 'dotenv';
dotenv.config();

const dbConn = {
  connectionLimit: 80,
  host: process.env.DB_HOST,
  user: process.env.DB_USER,
  password: process.env.DB_PASSWORD,
  database: process.env.DB_NAME
};

const pool = mysql.createPool(dbConn);

pool.getConnection((err, connection) => {
  if (err) {
    if (err.code === 'PROTOCOL_CONNECTION_LOST') {
      console.error('Database connection was closed.');
    }
    if (err.code === 'ER_CON_COUNT_ERROR') {
      console.error('Database has to many connections');
    }
    if (err.code === 'ECONNREFUSED') {
      console.error('Database connection was refused');
    }
  }

  if (connection) {
    connection.release();
  }
  console.log('DB pool is Connected');
  return;
});

// pool.query = promisify(pool.query);

export default pool;

seed data

import { OkPacket } from 'mysql';
import pool from './dbConfig';

const seed = () => {
  const queryString = `CREATE TABLE IF NOT EXISTS Product (
  id int(11) NOT NULL,
  customerName varchar(100) DEFAULT NULL,
  contactNumber varchar(100) DEFAULT NULL,
  modelName varchar(255) NOT NULL,
  retailerName varchar(100) NOT NULL,
  dateOfPurchase varchar(100) NOT NULL,
  voucherCode varchar(100) NOT NULL,
  voucherValue int(10) DEFAULT NULL,
  surveyUrl varchar(255) NOT NULL,
  surveyId varchar(255) NOT NULL,
  createdAt timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  updatedAt timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;`;

  pool.query(queryString, (err, result) => {
    if (err) {
      console.log(err);
    }

    const insertId = (<OkPacket>result).insertId;
    console.log(insertId);
  });
};

export default seed;

Indexing can be done to increase read from mysql

Does increasing the child process will make any impact as nodejs is itself calling the dataProcessing asynchronously with all available pool connection.

As per one comment: I broke the file into 8 parts , It actually reduced the processing and seems irrelevant in terms of performance boost.

child.ts

import * as fs from 'fs';
import * as path from 'path';
import { getProductById, insertProduct } from '../repo';
import { performance } from 'perf_hooks';

import Papa from 'papaparse';

let processedNum = 0;

process.on('message', async function (message: any) {
  console.log('[child] received message from server:', message);
  JSON.stringify(process.argv);
  const filePath = path.normalize(
    `${__dirname}./../output/output.csv.${message}`
  );

  let time = await importCSV(filePath, message);
  if (process.send) {
    process.send({
      child: process.pid,
      result: message + 1,
      time: time
    });
  }

  process.disconnect();
});

async function importCSV(filePath: fs.PathLike, message: any) {
  let parsedNum = 0;
  const dataStream = fs.createReadStream(filePath);
  const parseStream = Papa.parse(Papa.NODE_STREAM_INPUT, {
    header: true
  });
  dataStream.pipe(parseStream);
  let buffer = [];
  let totalTime = 0;
  const startTime = performance.now();
  for await (const row of parseStream) {
    // console.log('Child Server # :', message, 'PA#', parsedNum, ': parsed');
    buffer.push(row);
    parsedNum++;
    if (parsedNum % 400 == 0) {
      await dataForProcessing(buffer, message);
      buffer = [];
    }
  }

  totalTime = totalTime + (performance.now() - startTime);
  //   console.log(
  //     `Child Server ${message} : Parsed ${parsedNum} rows and took ${totalTime} seconds`
  //   );
  return totalTime;
}

const wrapTask = async (promise: any) => {
  try {
    return await promise;
  } catch (e) {
    return e;
  }
};

const handle = async (promise: Promise<any>) => {
  try {
    const data = await promise;
    return [data, undefined];
  } catch (error) {
    return await Promise.resolve([undefined, error]);
  }
};

const dataForProcessing = async (arrayItems: any, message: any) => {
  const tasks = arrayItems.map(task);
  const startTime = performance.now();
  console.log(`Tasks starting... from server ${message}`);
  console.log('CS#: ', message, 'DW#:', processedNum, ': dirty work START');
  try {
    await Promise.all(tasks.map(wrapTask));

    console.log(
      `Task finished in ${performance.now() - startTime} miliseconds with,`
    );
    processedNum++;
  } catch (e) {
    console.log('should not happen but we never know', e);
  }
};

const task = async (item: any) => {
  let table = 'Product';

  if (item.contactNumber == '8800210524') {
    table = 'random table'; // ro create read error
  }
  if (item.contactNumber == '9134743017') {
    item.randomRow = 'random'; // to create insert error
  }

  // To add some read process
  const [data, readError] = await handle(getProductById(2, table));
  if (readError) {
    return 'Some error in read of table';
  }
  //console.log(JSON.parse(JSON.stringify(data))[0]['customerName']);
  data;
  // To add some write process
  const [insertId, insertErr] = await handle(insertProduct(item));
  if (insertErr) {
    return `Some error to log and continue process for ${item}`;
  }
  return `Done for ${insertId}`;
};

parent.ts

import { Request, Response } from 'express';
var child_process = require('child_process');

const insertBigChildProcess = async (req: Request, res: Response) => {
  try {
    var numchild = require('os').cpus().length;
    var done = 0;
    let totalProcessTime: any[] = [];
    for (var i = 1; i <= numchild; i++) {
      const child = child_process.fork(__dirname + '/child.ts');
      child.send(i);
      child.on('message', function (message: any) {
        console.log('[parent] received message from child:', message);
        totalProcessTime.push(message.time);
        const sum = totalProcessTime.reduce(
          (partial_sum, a) => partial_sum + a,
          0
        );
        console.log(sum); // 6
        console.log(totalProcessTime);
        done++;
        if (done === numchild) {
          console.log('[parent] received all results');
        }
      });
    }

  
    return res.status(200).json({ data: 'Data send for processing 123' });
  } catch (error) {
    return res.status(500).send('Some error here in isert biG 123');
  }
};
Mr X
  • 1,637
  • 3
  • 29
  • 55

2 Answers2

0

After processing each row, have to check the values from database for some validation and then insert the processed data to different tables.

Is it possible to do this validation from cache? It could improve the performance as hitting DB for every row validation will take really very long when we are talking about millions of rows.

You can divide the csv files into smaller csv files then you can process these files in parallel. You can use this package to achieve it.

Once you do that run these files using child process in parallel.

// parent.js
var child_process = require('child_process');

var numchild  = require('os').cpus().length;
var done      = 0;

for (var i = 0; i < numchild; i++){
  var child = child_process.fork('./child');
  child.send((i + 1) * 1000);
  child.on('message', function(message) {
    console.log('[parent] received message from child:', message);
    done++;
    if (done === numchild) {
      console.log('[parent] received all results');
      ...
    }
  });
}

// child.js
process.on('message', function(message) {
  console.log('[child] received message from server:', message);
  setTimeout(function() {
    process.send({
      child   : process.pid,
      result  : message + 1
    });
    process.disconnect();
  }, (0.5 + Math.random()) * 5000);
});

#copied from this thread. You can give it a try and see how much time does it take now.

Apoorva Chikara
  • 8,277
  • 3
  • 20
  • 35
  • Also, which hosting service you will be using? AWS? – Apoorva Chikara Jan 08 '22 at 13:08
  • 1
    Which "pool" are you talking about? The "connection pool" is probably irrelevant. The "buffer_pool" should be set based on the amount of RAM available; it helps most processing. Doing things in parallel is probably useless -- they are likely to be stumbling over each other, thereby providing no performance boost. – Rick James Jan 08 '22 at 18:57
  • @ApoorvaChikara implemented that but to what I thoought the connection pool is causing too many connections and also the result are also abrupt, though I am using the same functions inside child process. – Mr X Jan 08 '22 at 19:09
0

I approach such a task from "can I do it all in SQL". This is likely to be the most performant. I would pick between two ways to do "validation" and "processing":

During LOAD DATA

    LOAD DATA ...
        ( ... @a, ..., @b, ...)
        SET cola = ... @a ...,
            colb = ... @b ...

To explain:

  • As the rows are read, some columns are put into @ variables.
  • Then those variables are used in expressions/functions to compute the desired values for the actual columns.
  • Note that this is a way to 'ignore' columns (by not using it in SET) or combining columns.

After LOADing

Run UPDATE statement(s) to do the post-processing en masse. This is likely to be much faster than doing fixups one row at a time.

Rick James
  • 135,179
  • 13
  • 127
  • 222
  • So it's like this is limitation at application level and can't be improved further due to query bottleneck and need to think on optimisation on multiple services calls and database optimisations and querying? – Mr X Jan 08 '22 at 19:15
  • 1
    @MrAJ - Sometimes the best optimization involves redesigning the entire application. My suggestions are compromises somewhere in the middle. Needing to load a million-row file that needs editing sounds like an architectural design flaw. – Rick James Jan 09 '22 at 01:01