Ok, so as you know, the problem lies in these lines:
while ... {
...
try connection.raw("...").all(decoding: RecRegAction.self).wait()
...
}
you want to wait for a number of results and therefore you use a while
loop and .wait()
for all the intermediate results. Essentially, this is turning asynchronous code into synchronous code on the event loop. That is likely leading to deadlocks and will for sure stall other connections which is why SwiftNIO tries to detect that and give you that error. I won't go into the details why it's stalling other connections or why this is likely to lead to deadlocks in this answer.
Let's see what options we have to fix this issue:
- as you say, we could just have this
.wait()
on another thread that isn't one of the event loop threads. For this any non-EventLoop
thread would do: Either a DispatchQueue
or you could use the BlockingIOThreadPool
(which does not run on an EventLoop
)
- we could rewrite your code to be asynchronous
Both solutions will work but (1) is really not advisable as you would burn a whole (kernel) thread just to wait for the results. And both Dispatch
and BlockingIOThreadPool
have a finite number of threads they're willing to spawn so if you do that often enough you might run out of threads so it'll take even longer.
So let's look into how we can call an asynchronous function multiple times whilst accumulating the intermediate results. And then if we have accumulated all the intermediate results continue with all the results.
To make things easier let's look at a function that is very similar to yours. We assume this function to be provided just like in your code
/// delivers partial results (integers) and `nil` if no further elements are available
func deliverPartialResult() -> EventLoopFuture<Int?> {
...
}
what we would like now is a new function
func deliverFullResult() -> EventLoopFuture<[Int]>
please note how the deliverPartialResult
returns one integer each time and deliverFullResult
delivers an array of integers (ie. all the integers). Ok, so how do we write deliverFullResult
without calling deliverPartialResult().wait()
?
What about this:
func accumulateResults(eventLoop: EventLoop,
partialResultsSoFar: [Int],
getPartial: @escaping () -> EventLoopFuture<Int?>) -> EventLoopFuture<[Int]> {
// let's run getPartial once
return getPartial().then { partialResult in
// we got a partial result, let's check what it is
if let partialResult = partialResult {
// another intermediate results, let's accumulate and call getPartial again
return accumulateResults(eventLoop: eventLoop,
partialResultsSoFar: partialResultsSoFar + [partialResult],
getPartial: getPartial)
} else {
// we've got all the partial results, yay, let's fulfill the overall future
return eventLoop.newSucceededFuture(result: partialResultsSoFar)
}
}
}
Given accumulateResults
, implementing deliverFullResult
is not too hard anymore:
func deliverFullResult() -> EventLoopFuture<[Int]> {
return accumulateResults(eventLoop: myCurrentEventLoop,
partialResultsSoFar: [],
getPartial: deliverPartialResult)
}
But let's look more into what accumulateResults
does:
- it invokes
getPartial
once, then when it calls back it
- checks if we have
- a partial result in which case we remember it alongside the other
partialResultsSoFar
and go back to (1)
nil
which means partialResultsSoFar
is all we get and we return a new succeeded future with everything we have collected so far
that's already it really. What we did here is to turn the synchronous loop into asynchronous recursion.
Ok, we looked at a lot of code but how does this relate to your function now?
Believe it or not but this should actually work (untested):
accumulateResults(eventLoop: el, partialResultsSoFar: []) {
connection.raw("FETCH \(chunkSize) FROM \(cursorName)")
.all(decoding: RecRegAction.self)
.map { results -> Int? in
if results.count > 0 {
return results.count
} else {
return nil
}
}
}.map { allResults in
return allResults.reduce(0, +)
}
The result of all this will be an EventLoopFuture<Int>
which carries the sum of all the intermediate result.count
.
Sure, we first collect all your counts into an array to then sum it up (allResults.reduce(0, +)
) at the end which is a bit wasteful but also not the end of the world. I left it this way because that makes accumulateResults
be usable in other cases where you want to accumulate partial results in an array.
Now one last thing, a real accumulateResults
function would probably be generic over the element type and also we can eliminate the partialResultsSoFar
parameter for the outer function. What about this?
func accumulateResults<T>(eventLoop: EventLoop,
getPartial: @escaping () -> EventLoopFuture<T?>) -> EventLoopFuture<[T]> {
// this is an inner function just to hide it from the outside which carries the accumulator
func accumulateResults<T>(eventLoop: EventLoop,
partialResultsSoFar: [T] /* our accumulator */,
getPartial: @escaping () -> EventLoopFuture<T?>) -> EventLoopFuture<[T]> {
// let's run getPartial once
return getPartial().then { partialResult in
// we got a partial result, let's check what it is
if let partialResult = partialResult {
// another intermediate results, let's accumulate and call getPartial again
return accumulateResults(eventLoop: eventLoop,
partialResultsSoFar: partialResultsSoFar + [partialResult],
getPartial: getPartial)
} else {
// we've got all the partial results, yay, let's fulfill the overall future
return eventLoop.newSucceededFuture(result: partialResultsSoFar)
}
}
}
return accumulateResults(eventLoop: eventLoop, partialResultsSoFar: [], getPartial: getPartial)
}
EDIT: After your edit your question suggests that you do not actually want to accumulate the intermediate results. So my guess is that instead, you want to do some processing after every intermediate result has been received. If that's what you want to do, maybe try this:
func processPartialResults<T, V>(eventLoop: EventLoop,
process: @escaping (T) -> EventLoopFuture<V>,
getPartial: @escaping () -> EventLoopFuture<T?>) -> EventLoopFuture<V?> {
func processPartialResults<T, V>(eventLoop: EventLoop,
soFar: V?,
process: @escaping (T) -> EventLoopFuture<V>,
getPartial: @escaping () -> EventLoopFuture<T?>) -> EventLoopFuture<V?> {
// let's run getPartial once
return getPartial().then { partialResult in
// we got a partial result, let's check what it is
if let partialResult = partialResult {
// another intermediate results, let's call the process function and move on
return process(partialResult).then { v in
return processPartialResults(eventLoop: eventLoop, soFar: v, process: process, getPartial: getPartial)
}
} else {
// we've got all the partial results, yay, let's fulfill the overall future
return eventLoop.newSucceededFuture(result: soFar)
}
}
}
return processPartialResults(eventLoop: eventLoop, soFar: nil, process: process, getPartial: getPartial)
}
This will (as before) run getPartial
until it returns nil
but instead of accumulating all of getPartial
's results, it calls process
which gets the partial result and can do some further processing. The next getPartial
call will happen when the EventLoopFuture
process
returns is fulfilled.
Is that closer to what you would like?
Notes: I used SwiftNIO's EventLoopFuture
type here, in Vapor you would just use Future
instead but the remainder of the code should be the same.