As part of an application I am building, I am reading and manipulating large (approximately 5.5GB, 8 million rows) csv files using csv-parse. I have the process running relatively smoothly, but am stuck on one item - catching the errors throw by an inconsistent number of columns.
I'm using the pipe function because it works well with the rest of the application, but my question is, how can I redirect errors thrown by the parser to a log and allow the process to continue?
I recognize that I could use the relax_column_count
option to skip the records which have an inconsistent number of columns, and that option is almost sufficient. The catch is that for data quality assessment purposes I need to log those records so I can go back and review what caused the incorrect number of columns (the process is a feed with many potential fault points).
As a side note, I know the easiest way to solve this would be to clean up the data upstream of this process, but unfortunately I don't control the data source.
In the example set, for example, I get the following error:
events.js:141
throw er; // Unhandled 'error' event
Error: Number of columns on line (line number) does not match header
Sample data (not actually my data, but demonstrating the same problem):
year, month, value1, value2
2012, 10, A, B
2012, 11, B, C,
2012, 11, C, D,
2013, 11, D, E,
2013, 11, E, F,
2013, 11, F,
2013, 11, G, G,
2013, 1, H, H,
2013, 11, I, I,
2013, 12, J, J,
2014, 11, K, K,
2014, 4, L, L,
2014, 11, M, M,
2014, 5, N,
2014, 11, O, N,
2014, 6, P, O,
2015, 11, Q, P,
2015, 11, R, Q,
2015, 11, S, R,
2015, 11, T, S,
Code:
const fs = require('fs');
const parse = require('csv-parse');
const stringify = require('csv-stringify');
const transform = require('stream-transform');
const paths = {
input: './sample.csv',
output: './output.csv',
error: './errors.csv',
}
var input = fs.createReadStream(paths.input);
var output = fs.createWriteStream(paths.output);
var error = fs.createWriteStream(paths.error);
var stringifier = stringify({
header: true,
quotedString: true,
});
var parser = parse({
relax: true,
delimiter: ',',
columns: true,
//relax_column_count: true,
})
var transformer = transform((record, callback) => {
callback(null, record);
}, {parallel: 10});
input.pipe(parser).pipe(transformer).pipe(stringifier).pipe(output);
Thoughts?