1
  • I am trying to load 50000 items from the database with text in them, tag them and update the tags
  • I am using pg-promise and pg-query-stream for this purpoes
  • I was able to get the streaming part working properly but updating has become problematic with so many update statements

Here is my existing code

const QueryStream = require('pg-query-stream')
const JSONStream = require('JSONStream')

function prepareText(title, content, summary) {
  let description
  if (content && content.length) {
    description = content
  } else if (summary && summary.length) {
    description = summary
  } else {
    description = ''
  }
  return title.toLowerCase() + ' ' + description.toLowerCase()
}

async function tagAll({ db, logger, tagger }) {
  // you can also use pgp.as.format(query, values, options)
  // to format queries properly, via pg-promise;
  const qs = new QueryStream(
    'SELECT feed_item_id,title,summary,content FROM feed_items ORDER BY pubdate DESC, feed_item_id DESC'
  )
  try {
    const result = await db.stream(qs, (s) => {
      // initiate streaming into the console:
      s.pipe(JSONStream.stringify())
      s.on('data', async (item) => {
        try {
          s.pause()
          // eslint-disable-next-line camelcase
          const { feed_item_id, title, summary, content } = item

          // Process text to be tagged
          const text = prepareText(title, summary, content)
          const tags = tagger.tag(text)

          // Update tags per post
          await db.query(
            'UPDATE feed_items SET tags=$1 WHERE feed_item_id=$2',
            // eslint-disable-next-line camelcase
            [tags, feed_item_id]
          )
        } catch (error) {
          logger.error(error)
        } finally {
          s.resume()
        }
      })
    })
    logger.info(
      'Total rows processed:',
      result.processed,
      'Duration in milliseconds:',
      result.duration
    )
  } catch (error) {
    logger.error(error)
  }
}

module.exports = tagAll
  • The db object is the one from pg-promise whereas the tagger simply extracts an array of tags from text contained in the variable tags
  • Too many update statements are executing from what I can see in the diagnostics, is there a way to batch them?
PirateApp
  • 5,433
  • 4
  • 57
  • 90
  • 1
    You should be using [multi-row updates](https://stackoverflow.com/questions/39119922/postgresql-multi-row-updates-in-node-js/39130689#39130689), along with the approach documented in [Data Imports](https://github.com/vitaly-t/pg-promise/wiki/Data-Imports). – vitaly-t Jul 01 '20 at 16:19
  • thanks @vitaly-t the multi row updates rely on accumulating one large array and inserting everything, my concern is it will build a very massive array in memory before writing it off with the method you linked – PirateApp Jul 01 '20 at 16:22
  • 1
    `Data Import` shows you how to partition such updates, if you follow it. – vitaly-t Jul 01 '20 at 16:33
  • 1
    [pg-iterator](https://github.com/vitaly-t/pg-iterator) might be of help, passing it `Pool` from `pg-promise` as `db.$pool` ;) It would give you a simpler syntax and better scalability. – vitaly-t Nov 23 '22 at 10:59
  • @vitaly-t how would you update inside the async for loop, it would execute too many update statements, should i batch them every 200 or 300 statements and then do it? – PirateApp Nov 26 '22 at 02:51
  • 1
    Best is to use [helpers.update](http://vitaly-t.github.io/pg-promise/helpers.html#.update) to group updates. – vitaly-t Nov 26 '22 at 03:48

2 Answers2

1

If you can do everything with one sql statement, you should! Here you're paying the price of a back and forth between node and your DB for each line of your table, which will take most of the time of your query.

Your request can be implemented in pure sql:

update feed_items set tags=case 
    when (content = '') is false then lower(title) || ' ' || lower(content) 
    when (summary = '') is  false then lower(title) || ' ' || lower(summary) 
    else title end;

This request will update all your table at once. I'm sure it'd be some order of magnitude faster than your method. On my machine, with a table containing 100000 rows, the update time is about 600ms.

Some remarks:

  • you don't need to order to update. As ordering is quite slow, it's better not to.
  • I guess the limit part was because it is too slow? If it is the case, then you can drop it, 50000 rows is not a big table for postgres.
  • I bet this pg-stream things does not really stream stuff out of the DB, it only allows you to use a stream-like api from the results it gathered earlier... No problem about that, but I thought maybe there was a misconception here.
autra
  • 895
  • 6
  • 21
  • cannot be done in pure SQL because of this line const tags = tagger.tag(text) the tagger is an object of a class that does a lot of stuff like go through text, match regex and then return an array of strings, it is a complicated operation which cannot be implemented in pure sql, if i were to make a single array to accumulate all 50000 updates, it would be a very large array that stays completely in memory, there must be a way that you can somehow execute few queries but at the same time not keep the whole set of tags and ids in memory, thanks for the attempt – PirateApp Jul 01 '20 at 12:11
  • Of course you can. Either reimplement the parsing in sql, or yes, put all the 50000 values in memory (if they are only tags, it's completely doable). – autra Feb 22 '21 at 08:06
  • 50000 items in memory take 400 mb, i need titles and descriptions of each article so that i can run my taggng regex over it and extract tags, titles are on average 30 words long and descriptions on average 200 words, it ll take a lot of memory if i dont use a stream – PirateApp Mar 01 '21 at 06:30
  • if it's only you and once in a while, 400mb could be ok with modern hardware. – autra Mar 01 '21 at 11:16
  • 1
    I go back to " it is a complicated operation which cannot be implemented in pure sql". PostgreSQL has a very rich set of string manipulation functions and features, including (but not limited to) a full regex support. Reimplementing it in sql is certainly possible, but of course without having the code, I cannot tell if it's going to be long or not. – autra Mar 01 '21 at 11:17
0

This is the best I could come up with to batch the queries inside the stream so that we dont need to load all data in memory or run too many queries. If anyone knows a better way to batch especially with t.sequence feel free to add another answer

const BATCH_SIZE = 5000
async function batchInsert({ db, pgp, logger, data }) {
  try {
    // https://vitaly-t.github.io/pg-promise/helpers.ColumnSet.html
    const cs = new pgp.helpers.ColumnSet(
      [
        { name: 'feed_item_id', cast: 'uuid' },
        { name: 'tags', cast: 'varchar(64)[]' },
      ],
      {
        table: 'feed_items',
      }
    )
    const query =
      pgp.helpers.update(data, cs) + ' WHERE v.feed_item_id=t.feed_item_id'
    await db.none(query)
  } catch (error) {
    logger.error(error)
  }
}

async function tagAll({ db, pgp, logger, tagger }) {
  // you can also use pgp.as.format(query, values, options)
  // to format queries properly, via pg-promise;
  const qs = new QueryStream(
    'SELECT feed_item_id,title,summary,content FROM feed_items ORDER BY pubdate DESC, feed_item_id DESC'
  )
  try {
    const queryValues = []
    const result = await db.stream(qs, (s) => {
      // initiate streaming into the console:
      s.pipe(JSONStream.stringify())
      s.on('data', async (item) => {
        try {
          s.pause()
          // eslint-disable-next-line camelcase
          const { feed_item_id, title, summary, content } = item

          // Process text to be tagged
          const text = prepareText(title, summary, content)
          const tags = tagger.tag(text)
          queryValues.push({ feed_item_id, tags })

          if (queryValues.length >= BATCH_SIZE) {
            const data = queryValues.splice(0, queryValues.length)
            await batchInsert({ db, pgp, logger, data })
          }
        } catch (error) {
          logger.error(error)
        } finally {
          s.resume()
        }
      })
    })
    await batchInsert({ db, pgp, logger, data: queryValues })
    return result
  } catch (error) {
    logger.error(error)
  }
}
PirateApp
  • 5,433
  • 4
  • 57
  • 90