0

I am trying to do some logic where it will edit the length of minheap.heapList. I am pretty new to Promises so I am not sure how to approach this. Within the while loop I will be calling some async function as well. Not quite sure how to properly handle the race conditions

    let minheap = new MinHeap();
    let i = 0;
    Promise.all(logSources.map(logSource => {
        logSource.popAsync().then(log => {
            minheap.insert({
                ts: log.date.getTime(),
                source: i++,
                data: log
            })
        });
        i += 1;
    })).then(
        while(minheap.heapList.length) {
           ...
           // async logic
           ...
        }   
    );

The goal of this is to try to convert my synchronous code into asynchronous. .pop() from my synchronous code will be replaced with .popAsync() to return a promise.

Here is the synchronous logic:

const lenLogSources = logSources.length;
let minheap = new MinHeap();
for (let i = 0; i < lenLogSources; i++) {
    let log = logSources[i].pop();
    minheap.insert({
        ts: log.date.getTime(),
        source: i,
        data: log
    })
}
while (minheap.heapList.length) {
    let heapNode = minheap.popMin();
    let currTimestamp = heapNode['ts'];
    printer.print(heapNode.data);
    let nextMinTimestamp = minheap.getPeakTimestamp();
    while (currTimestamp <= nextMinTimestamp) {
        let log = logSources[heapNode['source']].pop();
        if (log) {
            let logtime = log.date.getTime();
            if (logtime <= nextMinTimestamp) {
                printer.print(log);
                currTimestamp = logtime;
            } else {
                minheap.insert({
                    ts: logtime,
                    source: heapNode["source"],
                    data: log
                });
                break;
            }
        } else {
            heapNode = minheap.popMin();
            printer.print(heapNode.data);
            currTimestamp = heapNode['ts'];
            if (minheap.heapList.length) {
                nextMinTimestamp = minheap.getPeakTimestamp();
            } else {
                while (true) {
                    let m = logSources[heapNode['source']].pop();
                    if (m) {
                        printer.print(m);
                    } else {
                        console.log('Drained: ' + heapNode['source']);
                        break;
                    }
                }
                break;
            }
        }
    }
}
Liondancer
  • 15,721
  • 51
  • 149
  • 255
  • a) you need to pass a callback to `then`, not a loop construct b) you cannot use a `while` loop around asynchronous logic. Use a recursive approach instead. – Bergi Jan 28 '17 at 10:01

2 Answers2

2

In order to return an array of promises to the Promise.all, you need to return in the map:

Promise.all(logSources.map(logSource => {
    const logSourcePromise = logSource.popAsync().then(log => {
        minheap.insert({
            ts: log.date.getTime(),
            source: i++,
            data: log
        })
    });
    i += 1;
    return logSourcePromise;
}))
Davin Tryon
  • 66,517
  • 15
  • 143
  • 132
2

Your question is a bit unclear, but you could do this:

Promise
  .all(
    logSources.map(
      logSource =>
        logSource
          .popAsync()
          .then(log =>
            minheap.insert({
              ts: log.date.getTime(),
              source: minheap.heapList.length,
              data: log
            }))
    )
  )
  .then(
    () => {
      const fcn = () => 
        (minheap.heapList.length) 
        ? minheap
            .popMinAsync()
            .then(processNode)
            .then(v => results.push(v))
            .then(fcn);
        : Promise.resolve();
      fcn();
    })
  );

Note that your processNode function can itself be asynchronous.

Also note that if you do it this way, the values of source are not going to be assigned in a deterministic order.

Edit:

The answer here suggested an improvement in my answer.

Community
  • 1
  • 1
Michael Lorton
  • 43,060
  • 26
  • 103
  • 144
  • potentially run out of stack space with this approach? For high number of `logSources`? – Liondancer Jan 28 '17 at 11:05
  • 2
    @Liondancer -- First: cool username. Second, I know this _looks_ recursive, but it isn't. `fcn()` isn't called by itself, but by an asynchronous callback; the real stack doesn't grow, but a stack-like linked list of closures is created in the heap. – Michael Lorton Jan 28 '17 at 11:21
  • Still stuck =[ could you take a look please? http://dpaste.com/3ZDGAAZ Trying to follow the patter you suggested. Currently getting a infinite output of `fail` – Liondancer Jan 29 '17 at 12:43
  • 1
    This line is almost certainly wrong: `process.nextTick(whileHeapList())` It should be (I think) `process.nextTick(whileHeapList)` – Michael Lorton Jan 31 '17 at 02:27
  • Made more changes/updates to my code. It doesnt even look like this anymore haha – Liondancer Jan 31 '17 at 02:32