2

LevelUP Documentation says that pipe() can be used (https://github.com/rvagg/node-levelup/#pipes-and-node-stream-compatibility).

I've tried the following code:

db.createValueStream().pipe(response)

But I could't do it, I've got an error:

events.js:72
        throw er; // Unhandled 'error' event
              ^
TypeError: Invalid non-string/buffer chunk
    at validChunk (_stream_writable.js:150:14)
    at Writable.write (_stream_writable.js:179:12)
    at write (_stream_readable.js:573:24)
    at flow (_stream_readable.js:582:7)
    at ReadStream.pipeOnReadable (_stream_readable.js:614:5)
    at ReadStream.EventEmitter.emit (events.js:92:17)
    at emitReadable_ (_stream_readable.js:408:10)
    at emitReadable (_stream_readable.js:404:5)
    at readableAddChunk (_stream_readable.js:165:9)
    at ReadStream.Readable.push (_stream_readable.js:127:10)

The actual problem is about memory usage when a use the event 'data' (). Then i was trying do make a stream.Transform and use pipe() to do what a I need. Once memory leak in event emmiter is a problem: Memory leak when using streams in Node.js?

UPDATE

I've tried @paul-mougel without success. The function of error event are not called, and the it crashed. This is a piece of the code:

    var rs = db.createValueStream();

    request.on('close', function(){
        rs.destroy();
        response.end();
    });

    rs.on('end', function(){
        response.end();
    });
    rs.on('error', function(err){
        console.err('READ STREAM ERROR:',err.message);
        response.end();
        rs.destroy();
    });

    response.on('error', function(err){
        console.log('RESPONSE ERROR:',err);
        rs.destroy();
    });

    rs.pipe(stringifier).pipe(response);
Community
  • 1
  • 1
Leonardo
  • 358
  • 2
  • 17
  • 4
    You're piping a stream of objects into a stream of bytes. The target stream can't handle objects. – SLaks Nov 29 '13 at 21:01

1 Answers1

9

There are multiple things to consider.

First, you get this exception because you don't listen to the error event. In the case of streams, always listen to it, which i) will allow you to log the issue ii) won't make the program crash.

var valueStream = db.createValueStream()
valueStream.on('error', function (err) {
  console.error('valueStream.on error ' + err.message);
});
valueStream.pipe(response);
response('error', function (err) {
  console.error('response error ' + err.message);
});

Second, db.createValueStream() creates a readable stream in object mode (see source): it will output javascript objects. On the other hand, your response is a writable stream in byte mode: it only takes bytes as input, hence the error event. What you can do is create a Transform stream that will take javascript objects as input and output their stringified version:

var stream = require('stream')
var stringifier = new stream.Transform();
stringifier._writableState.objectMode = true;
stringifier._transform = function (data, encoding, done) {
    this.push(JSON.stringify(data));
    this.push('\n');
    done();
}

valueStream.pipe(stringifier).pipe(response);

Please note that we create a transform stream that takes objects as input and outputs bytes. See the documentation for more information.

However, you'll have to tell us more about the specific problem you're trying to solve by piping the leveled stream into a request: the above solution isn't a very good solution.

Third, you didn't encounter a memory leak when using .on('data'). Adding this listener transforms the stream into flowing mode, which means that it will output the data as fast as possible. You could always use the .pause() and .resume() methods to stop and restart the stream. But using the new v0.10 stream interface (aka streams2) helps you to deal with this issue, as the Readable, Writable and Transform classes take care of all of that for you.

Paul Mougel
  • 16,728
  • 6
  • 57
  • 64
  • Sorry, I did both suggestion but it didn't work. I did not call the function in error-event function. It just crashed with the same message. Thank you for your answer. – Leonardo Nov 29 '13 at 21:31
  • I'm trying to make a pipe to transform it (just like you suggested in your example) but I've got this error. – Leonardo Nov 29 '13 at 21:33
  • Updated my answer. It's indeed the `response` stream that emits the error, and not the `valueStream`. You should still tell us what's your goal by piping the valueStream into the HTTP response. – Paul Mougel Nov 29 '13 at 21:36
  • I'm trying to make a kind of dump o the entire base of a leveldb. But the memory usage increases a lot. I was searching a little bit about it, and I discover it was because que listener of the 'data' event, or something like that. Anyway, I tested with pipe and it worked ok, without increasing of memory. Then I tried to make the same I was doing with file stream in leveldb readstream. – Leonardo Nov 29 '13 at 21:40
  • Fair enough, then my Transform stream suggestion should work. – Paul Mougel Nov 29 '13 at 21:49
  • What's happening? Does it output an error? Can you share a [gist](https://gist.github.com) with some code so that we can look at it? – Paul Mougel Nov 29 '13 at 21:58
  • Paul, I'have made a code of test for reproduce the error: https://gist.github.com/leonardonsantos/7754933 – Leonardo Dec 02 '13 at 19:13
  • 1
    Alright that was a simple mistake on my part: the Transform stream takes bytes as input and outputs objects, which means that its **writable** side is in `objectMode` and that its **readable** side is in byte mode, and not the other way around. I updated my answer and [your gist](https://gist.github.com/PaulMougel/7766857). – Paul Mougel Dec 03 '13 at 10:07