1

I want to pause my cassandra stream for some async operations before processing the next row.

Each row is received in a readable event listener. I have tried using stream.pause but it does not in fact pauses the stream. I have also tried the same in the 'data' event listener and that too doesn't work. Will be extremely grateful for insights and a resolution perhaps. Here is my code. using async in readable and "awaiting" using await does not in fact prevent the next row to come before the asynchronous function finishes.

function start() {
let stream = client.stream('SELECT * FROM table');
stream
    .on('end', function () {
        console.log(`Ended at ${Date.now()}`);
    })
    .on('error', function (err) {
        console.error(err);
    })
    .on('readable', function () {
        let row = this.read();
        asyncFunctionNeedTowaitForthisBeforeNextRow()
    })
}

//The below doesn't work

function start() {
let stream = client.stream('SELECT * FROM table');
stream
    .on('end', function () {
        console.log(`Ended at ${Date.now()}`);
    })
    .on('error', function (err) {
        console.error(err);
    })
    .on('readable', async function () {
        let row = this.read();
        stream.pause();
        await asyncFunctionNeedTowaitForthisBeforeNextRow();
        stream.resume();
    })
 }
Naman Gupta
  • 155
  • 6

2 Answers2

0

The reason why stream.pause() doesn't work is because the readable event fires multiple times, so the same async function is called again. Same is for the data event.

I would recommend to use a custom writable stream to handle all this async stuff properly.

The writable stream will look something like:

const {Writable} = require('stream');

const myWritable = new Writable({
  async write(chunk, encoding, callback) {
    let row = chunk.toString();
    await asyncFunctionNeedTowaitForthisBeforeNextRow();
    callback(); // Write completed successfully
  }
})

Then adjust your code to use this writable:

function start() {
  let stream = client.stream('SELECT * FROM table');
  stream.pipe(myWritable);
  stream
    .on('end', function () {
      console.log(`Ended at ${Date.now()}`);
    })
    .on('error', function (err) {
      console.error(err);
    })
}
Avraham
  • 928
  • 4
  • 12
0

Note that even though you are declaring your handler for the event 'readable' as an async function, the caller those not await for the returned promise to complete, because Stream expects a normal execution for the event handler.

A solution could be:

stream
  .on('end', () => {})
  .on('error', () => {})
  .on('data', row => {
    stream.pause();
    doSomethingAsync(row).then(() => stream.resume());
  });

Note that, ideally, you should leverage parallelism when doing something async, so it could be better to read a couple of rows each time and then pause.

jorgebg
  • 6,560
  • 1
  • 22
  • 31