77

I want to pipe data from an amazon kinesis stream to a an s3 log or a bunyan log.

The sample works with a file write stream or stdout. How would I implmeny my own writable stream?

//this works
var file = fs.createWriteStream('my.log')
kinesisSource.pipe(file)

this doesn't work saying it has no method 'on'

var stream = {}; //process.stdout works however
stream.writable = true;
stream.write =function(data){
    console.log(data);
};
kinesisSource.pipe(stream);

what methods do I have to implement for my own custom writable stream, the docs seem to indicate I need to implement 'write' and not 'on'

MonkeyBonkey
  • 46,433
  • 78
  • 254
  • 460

3 Answers3

161

To create your own writable stream, you have three possibilities.

Create your own class

For this you'll need:

  1. To extend the Writable class.
  2. To call the Writable constructor in your own constructor.
  3. To define a _write() method in the prototype of your stream object.

Here's an example:

var stream = require('stream');
var util = require('util');

function EchoStream () { // step 2
  stream.Writable.call(this);
};
util.inherits(EchoStream, stream.Writable); // step 1
EchoStream.prototype._write = function (chunk, encoding, done) { // step 3
  console.log(chunk.toString());
  done();
}

var myStream = new EchoStream(); // instanciate your brand new stream
process.stdin.pipe(myStream);

Extend an empty Writable object

Instead of defining a new object type, you can instanciate an empty Writable object and implement the _write() method:

var stream = require('stream');
var echoStream = new stream.Writable();
echoStream._write = function (chunk, encoding, done) {
  console.log(chunk.toString());
  done();
};

process.stdin.pipe(echoStream);

Use the Simplified Constructor API

If you're using io.js, you can use the simplified constructor API:

var writable = new stream.Writable({
  write: function(chunk, encoding, next) {
    console.log(chunk.toString());
    next();
  }
});

Use an ES6 class in Node 4+

class EchoStream extends stream.Writable {
  _write(chunk, enc, next) {
    console.log(chunk.toString());
    next();
  }
}
Paul Mougel
  • 16,728
  • 6
  • 57
  • 64
  • 2
    to support object mode replace `chunk.toString()` per `chunk.toString ? chunk.toString() : chunk` – Gab Aug 30 '16 at 13:53
  • What does this line do: "util.inherits(EchoStream, stream.Writable); // step 1 "? – Giorgi Moniava Feb 02 '17 at 08:25
  • It makes `EchoStream` a "sub-class" of `stream.Writable`: its prototype methods are inherited from it and `stream.Writable` is accessible using the `super_` property. See [the documentation](https://nodejs.org/docs/latest/api/util.html#util_util_inherits_constructor_superconstructor) for more information. – Paul Mougel Feb 02 '17 at 13:15
  • What's the benefit of "Create your own class" over "Extend an empty Writable object"? – bucabay Mar 23 '18 at 17:58
11

Actually to create a writeable stream is quite simple. Here's is the example:

var fs = require('fs');
var Stream = require('stream');

var ws = new Stream;
ws.writable = true;
ws.bytes = 0;

ws.write = function(buf) {
   ws.bytes += buf.length;
}

ws.end = function(buf) {
   if(arguments.length) ws.write(buf);
   ws.writable = false;

   console.log('bytes length: ' + ws.bytes);
}

fs.createReadStream('file path').pipe(ws);

Also if you want to create your own class, @Paul give a good answer.

Ionică Bizău
  • 109,027
  • 88
  • 289
  • 474
TonyAdo
  • 135
  • 3
  • Cannot read property 'length' of undefined... at the buf.length in ws.write function definition – shrutim Feb 02 '17 at 23:59
  • should be buf.length – Syed Mishar Newaz Aug 03 '20 at 16:36
  • This may have been correct at one time, but not anymore. The Node documentation now says: "The stream.Writable class is extended to implement a Writable stream. Custom Writable streams must call the new stream.Writable([options]) constructor and implement the writable._write() and/or writable._writev() method." https://nodejs.org/api/stream.html#stream_implementing_a_writable_stream – FlippingBinary Jul 04 '21 at 13:11
  • This is completely wrong both logically and syntactically – mstephen19 Dec 07 '22 at 12:21
5

Here is an example directly from nodejs docs
https://nodejs.org/api/stream.html#an-example-writable-stream

const { Writable } = require('stream');
class MyWritable extends Writable {
  _write(chunk, encoding, callback) {
    if (chunk.toString().indexOf('a') >= 0) {
      callback(new Error('chunk is invalid'));
    } else {
      callback();
    }
  }
}
Ragnoroct
  • 105
  • 2
  • 5