1

first.js

let data = {}
  const ingSelf = this
  const prom = await new Promise((resolve, reject) => {
    data = inputs.input[0].pipe(through2.obj(function (chunk, enc, next) {
      const throughSelf = this
      ingestionSelf.myFunction(node, { input: [chunk] }, inputData, request, internalCall).then((resp) => {
        if (R.type(resp) === "String") {
          resp = JSON.parse(resp)
        }
        throughSelf.push(resp[0])
        resolve(resp)
        next()
      })
    }))
  })

  if (prom) {
    return data
  }

Second.js

data.on("data", (chunk) => {
      if (R.includes(R.type(chunk), ["Object", "Array"])){
        pushToKafkaQueue(topicName, JSON.stringify(chunk), request)
      } else {
        pushToKafkaQueue(topicName, chunk, request)
      }
    })

Getting data upto 32 records after that stream stopped. Actual records are 5000. If i write code as below then 5000 records are getting.

 data.on("data", (chunk) => {
      data.pause();
      if (R.includes(R.type(chunk), ["Object", "Array"])){
        pushToKafkaQueue(topicName, JSON.stringify(chunk), request)
      } else {
        pushToKafkaQueue(topicName, chunk, request)
      }
      setTimeout(() => {
        data.resume();
      }, 0);
    })

But this solution is not proper one. For every record/chunk pause the stream and again resume it immediately. Is there any good solution to resolve this issue in proper way?

Venkat
  • 3,447
  • 6
  • 42
  • 61
  • if `pushToKafkaQueue` is asynchronous, you should try `async(chunk) { ..; await pushToKafkaQueue(...); ...; }`. If chunk is type of Buffer (passed by reference), I can imagine that it could be overwritten during async processing.. – bigless Nov 20 '19 at 16:44
  • @bigless Yes added async and await but no use. – Venkat Nov 21 '19 at 05:27

0 Answers0