2

I have an ETL query which requires me to read a large number of rows and then apply some transformation on them and save them back to a separate table in Postgres. I am using pg-query-stream and I plan to run the test function below inside a bullmq job.

How do I measure progress (current rows processed / total number of rows) for the given stream below?

const { Pool } = require('pg');
const JSONStream = require('JSONStream');
const QueryStream = require('pg-query-stream');

function test() {
  const pool = new Pool({
    database: process.env.POSTGRES_DB,
    host: process.env.POSTGRES_HOST,
    port: +process.env.POSTGRES_PORT,
    password: process.env.POSTGRES_PASSWORD,
    ssl: process.env.POSTGRES_SSL === 'true',
    user: process.env.POSTGRES_USER,
  });
  const query = `
    SELECT 
        feed_item_id, 
        title, 
        summary, 
        content 
    FROM 
        feed_items 
    ORDER BY 
        pubdate DESC, 
        feed_item_id
    LIMIT 50
    `;

  pool.connect((err, client, done) => {
    if (err) throw err;
    const queryStream = new QueryStream(query, [], {
      batchSize: 200,
    });
    const stream = client.query(queryStream);
    console.log(stream);
    stream.pipe(JSONStream.stringify());
    stream.on('error', (error) => {
      console.error(error);
      done();
    });
    stream.on('end', () => {
      console.log('stream has ended');
      done();
    });
    stream.on('data', async (row) => {
      stream.pause();
      console.log('data received', row.feed_item_id);
      //   const progress = index / ???
      //   Simulate async task
      await timeout(10);
      stream.resume();
    });
  });
}

test();
PirateApp
  • 5,433
  • 4
  • 57
  • 90
  • 1
    Should be very easy via [pg-iterator](https://github.com/vitaly-t/pg-iterator), because you can control every next-row read. – vitaly-t Nov 23 '22 at 10:30
  • @vitaly-t i l definitely take a look into this one, thank you for the headsup – PirateApp Nov 23 '22 at 12:30

1 Answers1

1

To do that you'll need to first get count of the rows in the table.
You can do it as described here.

And with that count you can pipe QueryStream into another stream and measure progress there:

const Stream = require('stream')

class ProgressStream extends Stream.Writable{
  constructor(total) {
    super();
    this.i = 0;
    this.total = total;
  }

  _write(chunk, enc, done) {
    this.i++;
    if (this.i % 100 === 1) {
      console.log(`Processed ${this.i} out of ${this.total} of rows`);
    }
    done();
  }
}

const progressStream = new ProgressStream(count);
progressStream.on('finish', () => {
  console.log(`All ${progressStream.i} rows processed!`)
});

stream.pipe(progressStream);
Pavel Bely
  • 2,245
  • 1
  • 16
  • 24