0

i have Meteor App which is connected to MongoDB.
In mongo i have a table which has ~700k records. I have a cron job each week, where i read all the records from the table (using Mongo Cursor) and in batches of 10k i want to insert them inside Elastic Search so they are indexed.

let articles = []
Collections.Articles.find({}).forEach(function(doc) {
        articles.push({ 
            index: {_index: 'main', _type: 'article', _id: doc.id }
        },
        doc);
        if (0 === articles.length % 10000) {
          client.bulk({   maxRetries: 5,  index: 'main',   type: 'article',    body: articles   })
          data = []
        }
    })

Since for each is synchronous, goes over each record before it continues, and client.bulk is async, this is overloading the elastic search server and it crashes with Out of Memory Exception.
Is there a way to pause the forEach during the time when the insert is being done? I tried async/await but this does not seem to work as well.

let articles = []
Collections.Articles.find({}).forEach(async function(doc) {
        articles.push({ 
            index: {_index: 'main', _type: 'article', _id: doc.id }
        },
        doc);
        if (0 === articles.length % 10000) {
          await client.bulk({   maxRetries: 5,  index: 'main',   type: 'article',    body: articles   })
          data = []
        }
    })

Any way how to achieve this?

EDIT: I am trying to achieve something like this - if i use promises

let articles = []
Collections.Articles.find({}).forEach(function(doc) {
        articles.push({ 
            index: {_index: 'main', _type: 'article', _id: doc.id }
        },
        doc);
        if (0 === articles.length % 10000) {
        // Pause FETCHING rows with forEach
            client.bulk({   maxRetries: 5,  index: 'main',   type: 'article',    body: articles    }).then(() => {
                                          console.log('inserted')
                                          // RESUME FETCHING rows with forEach
                                          console.log("RESUME READING");  
            })
          data = []
        }
    })
alext
  • 678
  • 1
  • 11
  • 25
  • If 10k batches produce too large request bodies for your receiving server, then why not lowering the batch size to, say, 5K? – Jankapunkt Aug 18 '20 at 11:20
  • That is not the problem, the problem is that the Elasticsearch is overloaded. I need to find a way to "pause" the DB cursor while it is inserting – alext Aug 18 '20 at 11:30

2 Answers2

2

Managed to get this working with ES2018 Async iteration
Got an idea from Using async/await with a forEach loop
Here is the code that is working

let articles = []
let cursor = Collections.Articles.find({})

for await (doc of cursor) {
    articles.push({ 
        index: {_index: 'main', _type: 'article', _id: doc.id }
    },
    doc);
    if (articles.length === 10000) {
        await client.bulk({   maxRetries: 5,  index: 'trusted',   type: 'artikel',    body: articles   })
        articles = []
    }
}

This works correctly and it manages to insert all the records into Elastic Search without crashing.

alext
  • 678
  • 1
  • 11
  • 25
1

If you are concerned with the unthrottled iteration, then may use the internal Meteor._sleepForMs method, that allows you to put a async timeout in your sync-styled code:

Collections.Articles.find().forEach((doc, index) => {
  console.log(index, doc._id)
  Meteor._sleepForMs(timeout)
})

Now this works fine within the Meteor environment (Meteor.startup, Meteor.methods, Meteor.publish).

You cron is likely to be not within this environment (= Fiber) so you may write a wrapper that binds the environment:

const bound = fct => Meteor.bindEnvironment(fct)

const iterateSlow = bound(function (timeout) {
  Collections.Articles.find().forEach((doc, index) => {
    console.log(index, doc._id)
    Meteor._sleepForMs(timeout)
  })
  return true
})

iterateSlow(50) // iterates with 50ms timeout

Here is a complete minimal example, that you can reproduce with a fresh project:

// create a minimal collection
const MyDocs = new Mongo.Collection('myDocs')

// fill the collection
Meteor.startup(() => {
  for (let i = 0; i < 100; i++) {
    MyDocs.insert({})
  }
})

// bind helper
const bound = fct => Meteor.bindEnvironment(fct)

// iterate docs with interval between
const iterateSlow = bound(function (timeout) {
  MyDocs.find().forEach((doc, index) => {
    console.log(index, doc._id)
    Meteor._sleepForMs(timeout)
  })
  return true
})

// simulate external environment, like when cron runs
setTimeout(() => {
  iterateSlow(50)
}, 2000)
Jankapunkt
  • 8,128
  • 4
  • 30
  • 59
  • Thanks for the answer Janka, my app is within meteor context so i can use the sleep. Is there some way that i can "pause or stop the forEach cursor", so i can resume it within promise .then ? – alext Aug 18 '20 at 15:01
  • Can you please add some code example, that shows how you expect this to work with promise.then? – Jankapunkt Aug 18 '20 at 16:07
  • i edited the original post, have a look at the code – alext Aug 18 '20 at 17:15