3

I am using client.stream() from Cassandra DB's driver to fetch large result sets using pages, then for each row result returned I am pushing it into an array defined at the top of my scope.

After the queries are finished, I want to return my array but It always returns 'undefined' and I am guessing its because fetching the queries takes a long time so Javascript proceeds with the return statement before the object is even populated.

For those not familiar with this driver: The client.stream is a function, and it takes a little while to get some data. I need to wait for this to finish before returning the object!

E.g

function foo() {
  var resultArray: [];
  var query = "select username from users where userRank = 3";
  client.stream(query, {prepare: true})
    .on('readable' function () {
      var row;
      while (row = this.read()) {
        resultArray.push(row.username); 
      }
    })
    .on('end', function () {
      return obj; // The object only exists in this scope but cant return from here
    });
}

When I call this var returned = foo(); I get undefined as the return value.

stark0323
  • 71
  • 1
  • 8

2 Answers2

2

If you want to use the stream API, you will need to create your own Promise instance and resolve it when the stream ended.

There is no point in buffering all the rows on your own and then return a Promise, the driver can do that for you. If you are not concerned about the memory consumption of having all those rows in memory, you can do:

// Disable paging
// NOTE: Memory consumption will depend on the amount of rows
// and the amount of concurrent requests
const options = { prepare: true, fetchSize: 0 };
const promise = client.execute(query, params, options);

See the documentation for more information: https://docs.datastax.com/en/developer/nodejs-driver/latest/features/paging/

jorgebg
  • 6,560
  • 1
  • 22
  • 31
1

To add to the answer, I was able to get stream to work wrapped in a Promise.

            new Promise((resolve, reject) => {
                const results = [];

                return client
                    .stream(query, params, options)
                    .on('readable', function() {
                        // 'readable' is emitted as soon a row is received and parsed
                        let row;

                        while ((row = this.read())) {
                            results.push(row);
                        }
                    })
                    .on('end', function() {
                        // Stream ended, there aren't any more rows
                        return resolve(results);
                    })
                    .on('error', function(err) {
                        return reject(err);
                    });
            }),
Jazzy
  • 6,029
  • 11
  • 50
  • 74