I am writing a script that reads large log files, aggregates them, stores aggregated data into mongo and detailed data into very large amount of small gzip files.
I already have working implementation in Perl and I was wondering to do this in Node.js (sorry, cannot disclose the Perl version).
While I've been able to implement parsing, aggregation and storing into mongo, I'm a bit stuck with the part of "storing very large amount of small gzip files".
Importing process:
logReader
instance reads/parses single log file in async and emitsdata
event every now and then (pausing the read stream and waiting for resume call),end
when EOF is reachedimport
instance listens ondata
events emitted bylogReader
(now I need to drop the detailed data into small gzip files in sync way and resume the logReader afterwards)- on
end
the leftovers are stored as in step 2. AND aggregated documents are stored into mongo (mongo storing already done using bulk op and standard mongo driver)
There will be multiple processes of import
instances doing this job and can conflict when attempting to write the files (therefore I need to use flock via fs-ext).
Let's say the import instance is in logReader data
event callback (step 2):
- I need to write (create or append!) a few thousand gzip files and for each:
- open gzip file with precise location in append mode
- lock the file with exclusive lock
- seek to its end (also to know whether there are any data in there already - for empty file I want to store a header)
- create gzip transform stream which should pipe into the opened file (and I am not sure if this will work properly in Node.js - in Perl I specified 'Append' option when doing this)
- write data into gzip stream
- return information about lines written and time it took to the "for each file" loop
Simplified code below:
var fs = require('fs-ext'),
deasync = require('deasync'),
zlib = require('zlib');
IndexedFs.prototype.write = function(path, data) {
var io, pos, t = new Date();
io = fs.createWriteStream(path, {flags: 'a'});
while (io.fd === null) { deasync.runLoopOnce(); }
try {
fs.flockSync(io.fd, 'ex');
} catch (e) {
console.log("Failed to lock file '%s':\n %s", path, e);
io.end();
return false;
}
try {
pos = fs.seekSync(io.fd, 0, 2); // seek to end
} catch (e) {
console.log("Failed to seek end in file '%s':\n %s", path, e);
io.end();
return false;
}
io = zlib.createGzip().pipe(io);
if (pos === 0) { io.write(__HEADER.join("\t") + "\n"); }
count = _writeData(io, data); // this just serializes and does io.write(...)
io.end();
return [count, new Date() - t];
};
I need the above function to be "synchronous" (I want to block the whole process anyway) and to return information about lines written and time it took.
I found deasync
and that helped me to wait on the file to be actually opened after calling createWriteStream()
. The writing works but the files are not gzipped (so I don't even know if the appending will work).
I understood zlib in Node.js is async only - therefore I would appreciate hints/advices/best practices how to achieve what I'm trying to do.