0

I am trying to create a CSV class that can be used inside other scripts in my application. When the CSV class is instantiated, it creates a writable stream to a file specified by the user. The write and destroy methods seem to work, but I can't figure out how to get the 'writeEnd' member on the class to execute once the data has finished writing to the file.

The 'writeEnd' member variable is a function that should be overridden by the user. For example, here is a file where I am testing out the classes functionality, and overriding 'writeEnd' to be a function of my own choosing.

P.S. Please see the question in bold at the bottom!

const CSV = require('./shared/classes/csv');
const csv = new CSV(__dirname);

csv.writeEnd = () => {
  console.log('Finished!');
};

for (let i = 0; i < 1000000; i++) {
  csv.write('Hello World.');
}

I was hoping for 'Finished!' to be logged to the console, but the function does not fire at all. I hope I am doing something wrong that someone can catch pretty easily.

For your reference, here is the class file untouched:

const { createWriteStream } = require('fs');
const { Readable } = require('stream');

/**
 * @class CSV
 */
module.exports = class CSV {
  constructor(path) {
    this.readStream = new Readable({ read() {} });
    this.writeStream = createWriteStream(`${path}/csv/data.csv`);
    this.readStream.pipe(this.writeStream);

    this.writeEnd = () => {};
  }

  /**
   * @method write
   * @param {any} data
   */
  write(data) {
    this.readStream.push(`${data}\n`);
  }

  /**
   * @method destroy
   */
  destroy() {
    this.readStream.destroy();
    this.writeStream.destroy();
  }
};

Below, is one of my failed attempts:

/**
 * @class CSV
 */
module.exports = class CSV {
  constructor(path) {
    this.readStream = new Readable({ read() {} });
    this.writeStream = createWriteStream(`${path}/csv/data.csv`);
    this.readStream.pipe(this.writeStream);

    // I'm wondering if this executes immediately because no writing is taking place
    // during instantiation
    this.writeStream.on('finish', this.writeEnd);
    this.writeEnd = () => {};
  }

  /**
   * @method write
   * @param {any} data
   */
  write(data) {
    this.readStream.push(`${data}\n`);
  }

  /**
   * @method destroy
   */
  destroy() {
    this.readStream.destroy();
    this.writeStream.destroy();
  }
};

I am wondering if I need to actually listen for the very first time the readStream gets data pushed to it, then set the 'finish' callback?

Dan Zuzevich
  • 3,651
  • 3
  • 26
  • 39
  • In your code, I only see assignment of the variable `writeEnd` but it is never called. – L. Meyer Jul 28 '19 at 11:42
  • I left those parts out, because I tried a few different ways. In my first attempt I tried putting a callback on this.writeStream.on('finish') right inside the classes constructor. That did not work either – Dan Zuzevich Jul 28 '19 at 11:44
  • Perhaps I will post some of the failed attempts. – Dan Zuzevich Jul 28 '19 at 11:44
  • Updated the post to include a failed attempt, and an additional important question at the very bottom. – Dan Zuzevich Jul 28 '19 at 11:50
  • In any case, there should be a `writeStream.end()` once it's finished because how could it know otherwise? I mean you could write one another 'Hello World' so the stream is still open. – L. Meyer Jul 28 '19 at 11:53
  • If I put csv.writeStream.end() after the for loop, the script crashes and throws the error, 'write after end'. I get that its because streams are async, but I have no idea how to tell the stream that its finished once the for loop is done, and has written all the data to the file. – Dan Zuzevich Jul 28 '19 at 11:59
  • 1
    What is the point for Readable stream here? Why don't you use Writable stream with write method? – vbarbarosh Jul 28 '19 at 12:10
  • Prob because of me misunderstanding streams. I'll look into that. – Dan Zuzevich Jul 28 '19 at 14:11

4 Answers4

1

The problem is that the custom/overriden writeEnd-method is never called, since the event-emitter keeps a reference to the original handler, i.e. the function you set in your constructor: this.writeEnd = () => {};

The easiest way is to allow to pass a callback function to the constructor of the CSV-class and use this as a finish-handler. Consider this simple example:

const EventEmitter = require("events").EventEmitter;

class CSV {

    constructor(customWriteEndCb) {
        this.writeEnd = () => {
            console.log("in original writeEnd");
        };
        this.writeEnd = customWriteEndCb || this.writeEnd;
        this.writeStream = new EventEmitter();
        this.writeStream.on('finished', this.writeEnd);
    }

    testMe() {
        this.writeStream.emit('finished');
    }

}

const customWriteEnd = () => {
    console.log("in custom writeEnd")
}

const csv = new CSV(customWriteEnd);
csv.testMe(); // will print "in custom writeEnd"
eol
  • 23,236
  • 5
  • 46
  • 64
  • Thank you for the great idea of adding a callback function that can be passed to the constructor. I am going to answer my own question, as I did somethings differently. – Dan Zuzevich Jul 28 '19 at 14:29
  • @DanZuzevich: Happy to help! Could you please upvote my answer though? Thanks :) – eol Jul 28 '19 at 21:30
1

So it was through a group effort of different answers and comments that landed me on a simple solution! Thank you very much to everyone who took the time to share their advice.

I removed the readable stream, as that was completely unnecesssary, and simply used the write method on the writable stream. I also pass a callback function to the constructor upon instantiation.

Here is my final code:

const { createWriteStream } = require('fs');

/**
 * @class CSV
 */
module.exports = class CSV {
  constructor(path, cb) {
    this.writeStream = createWriteStream(`${path}/csv/data.csv`);
    this.writeStream.on('finish', cb);
  }

  /**
   * @method write
   * @param {any} data
   */
  write(data) {
    this.writeStream.write(`${data}\n`);
  }

  /**
   * @method end
   */
  end() {
    this.writeStream.end();
  }
};

And the test file:

const CSV = require('./shared/classes/csv');
const csv = new CSV(__dirname, cb);

function cb() {
  console.log('You win sir.');
}

for (let i = 0; i < 1000000; i++) {
  csv.write('Hello World.');
}

csv.end();
Dan Zuzevich
  • 3,651
  • 3
  • 26
  • 39
0

Correct me if I'm wrong but here is a minimal working example:

const { createWriteStream } = require('fs');

class CSV {
  constructor(path) {
    this.writeStream = createWriteStream(`${path}/csv/data.csv`);
    this.writeEnd = () => {};
  }
  write(data) {
    this.writeStream.write(`${data}\n`)
  }
  end() {
    this.writeStream.end()
    this.writeStream.on('finish', this.writeEnd)  
  }
};

const csv = new CSV(__dirname);
csv.writeEnd = () => console.log('Finished')
for (let i = 0; i < 1000000; i++) {
  csv.write('Hello World.');
}
csv.end()

I removed the readable stream that I find unnecessary, and the destroy shouldn't be called.

Use end() instead of destroy if data should flush before close

https://nodejs.org/api/stream.html#stream_writable_destroy_error

You can see a Finished at the end.

L. Meyer
  • 2,863
  • 1
  • 23
  • 26
0

I'm not sure why are you using classes here, anyway:

const fs = require('fs');

class CSV
{
    constructor(path) {
        this._ws = fs.createWriteStream(`${path}/csv/data.csv`);
    }

    write(data) {
        this._ws.write(`${data}\n`);
    }

    close() {
        const _this = this;
        return new Promise(function (resolve, reject) {
            _this._ws.once('finish', resolve);
            _this._ws.once('error', reject);
            _this._ws.end();
        });
    }
}

async function main()
{
    const csv = new CSV('path1');

    for (let i = 0; i < 1000000; ++i) {
        csv.write(`chunk ${i}`);
    }

    await csv.close();

    console.log('end');
}

function panic(error)
{
    console.error(error);
    process.exit(1);
}

// https://stackoverflow.com/a/46916601/1478566
main().catch(panic).finally(clearInterval.bind(null, setInterval(a=>a, 1E9)));

And the following is approach without classes:

const fs = require('fs');

async function main()
{
    const ws = fs.createWriteStream('a.txt');

    for (let i = 0; i < 1000000; ++i) {
        ws.write(`chunk ${i}\n`);
    }

    ws.end();

    await promise_from_stream(ws);

    console.log('end');
}

function panic(error)
{
    console.error(error);
    process.exit(1);
}

function promise_from_stream(stream)
{
    /**
     * https://stackoverflow.com/a/34310963/1478566
     * > end and finish are the same event BUT on different types of
     * > Streams.
     * >   * stream.Readable fires ONLY end and NEVER finish
     * >   * stream.Writable fires ONLY finish and NEVER end
     */
    return new Promise(function (resolve, reject) {
        stream.once('end', resolve);
        stream.once('finish', resolve);
        stream.once('error', reject);
    });
}

// https://stackoverflow.com/a/46916601/1478566
main().catch(panic).finally(clearInterval.bind(null, setInterval(a=>a, 1E9)));
vbarbarosh
  • 3,502
  • 4
  • 33
  • 43