0

This is map.pool of my asynchronous function composition library, rubico.

You could use it like this; sleepThenDo is a toy function used for testing map.pool

const { map } = require('rubico')

const sleepThenDo = (ms, fn) => x => new Promise(resolve => {
  setTimeout(() => {
    fn(x)
    resolve()
  }, ms)
})

map.pool(2, sleepThenDo(1000, console.log))([1, 2, 3, 4, 5, 6]) /*
*waits a bit*
1
2 
*waits a bit*
3
4
*waits a bit*
5
6
*/

This is the implementation.

const mapPoolIndexedWorker = insert => (size, fn, resolve, reject, x, y, i) => {
  if (i >= x.length) return
  if (reject._called) return
  let point
  try {
    point = fn(x[i])
  } catch (err) {
    reject._called = true
    return reject(err)
  }
  if (isPromise(point)) {
    point.then(res => {
      insert(y, res, i)
      if (i === x.length - 1) {
        resolve(y)
      } else {
        mapPoolIndexedWorker(insert)(size, fn, resolve, reject, x, y, i + size)
      }
    }).catch(err => {
      reject._called = true
      reject(err)
    })
  } else {
    insert(y, point, i)
    if (i === x.length - 1) {
      resolve(y)
    } else {
      mapPoolIndexedWorker(insert)(size, fn, resolve, reject, x, y, i + size)
    }
  }
}

const mapPoolArrayWorker = mapPoolIndexedWorker((y, xi, i) => { y[i] = xi })

const mapPoolArray = (size, fn, x) => new Promise((resolve, reject) => {
  if (x.length < 1) return resolve([])
  const y = []
  for (let i = 0; i < Math.min(x.length, size); i++) {
    mapPoolArrayWorker(size, fn, resolve, reject, x, y, i, i) // start off the workers
  }
})

mapPoolArray is then called if the data argument is an array.

Is there the possibility of a memory leak here? It certainly works, but I want to know if it's okay memory-wise to think about a "pool" of workers this way. I would also like to know if there is any way to gaurantee a PACKED array on return

richytong
  • 2,387
  • 1
  • 10
  • 21
  • `sleepThenDo` doesn't handle exceptions correclty – Bergi May 27 '20 at 09:50
  • 1
    The `mapPoolArray` function doesn't seem to work a) when the array is empty b) when the workers don't finish in order c) when the array contains an `undefined` value – Bergi May 27 '20 at 09:56
  • Why this overcomplicated code with `insert` and curried functions? Why not just put a simple closure inside `mapPoolArray`? – Bergi May 27 '20 at 09:58
  • 1
    There's a quite severe leak if some of the `fn` executions throw an exception: the other workers just continue to run. – Bergi May 27 '20 at 10:00
  • Also it's not quite clear from the documentation if the `fn` calls are expected to be run in the order of the elements in the array. (They don't). – Bergi May 27 '20 at 10:03
  • I use `insert` with curried functions because I want to leave the possibility to `mapPoolAnotherIndexedDataStructure`, for example `mapPoolString` if anyone ever wants that. For example, I have two functions that use a similar abstraction [for sets and maps](https://github.com/a-synchronous/rubico/blob/master/index.js#L428-L446) – richytong May 27 '20 at 14:55
  • `map.pool` only works for arrays, sets, and maps at the moment. My plan is to extend it for all iterables. For example, [map](https://github.com/a-synchronous/rubico/blob/master/index.js#L332) supports arrays, strings, sets, maps, typed arrays, async iterables, generated iterables, objects, and reducing functions – richytong May 27 '20 at 15:08
  • the reason I ask about gauranteeing `PACKED` arrays is for b) when workers don't finish in order. Each worker knows the index where the evaluated should go, and assigns it in `insert` as `(y, xi, i) => { y[i] = xi }`. When workers don't finish in order, it is possible that this assignment will create a hole. – richytong May 27 '20 at 15:42
  • 1
    I think you really have bigger problems to worry about than the microoptimisation of packed vs holey arrays. As long as the end result is not sparse and has all the desired values, you're good. (That said, it's trivial to pre-allocate the array and fill it with `undefined` values) – Bergi May 27 '20 at 15:53
  • Why is it a problem when the workers don't finish in order? @Bergi – richytong May 27 '20 at 16:49
  • 1
    I haven't tested it in detail, but I think the `if (i === x.length - 1)` check would break down – Bergi May 27 '20 at 17:00
  • thanks, I'll look into that – richytong May 27 '20 at 17:05
  • @Bergi it was indeed breaking down; I rediscovered the problem through my tests for the unordered version. Sets and Maps had incomplete returns. Arrays failed too after more rigorous tests. May I have your review? [pr](https://github.com/a-synchronous/rubico/pull/12) – richytong May 27 '20 at 19:16
  • I don't think I'd be of great help, because I would do it completely differently altogether :-) – Bergi May 27 '20 at 19:32
  • Could you please share with me how you would do it? – richytong May 27 '20 at 19:37
  • 1
    Probably something between [this](https://github.com/petkaantonov/bluebird/blob/master/src/map.js) and [that](https://github.com/bergus/creed/blob/cancellation/src/combinators.js#L12-L16). Or it not using the internals of a promise library: see [here](https://stackoverflow.com/a/38778887/1048572) (thought there are [simpler ways](https://stackoverflow.com/a/39197252/1048572)) – Bergi May 27 '20 at 19:51
  • @Bergi I updated the pr with this [commit](https://github.com/a-synchronous/rubico/pull/12/commits/32cf9fe299d36b3a2f0bad5996979f0c387841ad), would you say it is more robust? – richytong May 27 '20 at 22:06

0 Answers0