5

This might be a can of worms, I'll do my best to describe the issue. We have a long running data processing job. Our database of actions is added to nightly and the outstanding actions are processed. It takes about 15 minutes to process nightly actions. In Vapor 2 we utilised a lot of raw queries to create a PostgreSQL cursor and loop through it until it was empty.

For the time being, we run the processing via a command line parameter. In future we wish to have it run as part of the main server so that progress can be checked while processing is being performed.

func run(using context: CommandContext) throws -> Future<Void> {
    let table = "\"RecRegAction\""
    let cursorName = "\"action_cursor\""
    let chunkSize = 10_000


    return context.container.withNewConnection(to: .psql) { connection in
        return PostgreSQLDatabase.transactionExecute({ connection -> Future<Int> in

            return connection.simpleQuery("DECLARE \(cursorName) CURSOR FOR SELECT * FROM \(table)").map { result in
                var totalResults = 0
                var finished : Bool = false

                while !finished {
                    let results = try connection.raw("FETCH \(chunkSize) FROM \(cursorName)").all(decoding: RecRegAction.self).wait()
                    if results.count > 0 {
                        totalResults += results.count
                        print(totalResults)
                        // Obviously we do our processing here
                    }
                    else {
                        finished = true
                    }
                }

                return totalResults
            }
        }, on: connection)
    }.transform(to: ())
}

Now this doesn't work because I'm calling wait() and I get the error "Precondition failed: wait() must not be called when on the EventLoop" which is fair enough. One of the issues I face is that I have no idea how you even get off the main event loop to run things like this on a background thread. I am aware of BlockingIOThreadPool, but that still seems to operate on the same EventLoop and still causes the error. While I'm able to theorise more and more complicated ways to achieve this, I'm hoping I'm missing an elegant solution which perhaps somebody with better knowledge of SwiftNIO and Fluent could help out with.

Edit: To be clear, the goal of this is obviously not to total up the number of actions in the database. The goal is to use the cursor to process every action synchronously. As I read the results in, I detect changes in the actions and then throw batches of them out to processing threads. When all the threads are busy, I don't start reading from the cursor again until they complete.

There are a LOT of these actions, up to 45 million in a single run. Aggregating promises and recursion didn't seem to be a great idea and when I tried it, just for the sake of it, the server hung.

This is a processing intensive task that can run for days on a single thread, so I'm not concerned about creating new threads. The issue is that I cannot work out how I can use the wait() function inside a Command as I need a container to create the database connection and the only one I have access to is context.container Calling wait() on this leads to the above error.

TIA

David Monagle
  • 1,701
  • 16
  • 19
  • Hmm, for the server load it shouldn't matter if you run this synchronously (blocking a thread) or asynchronously. In either case, one will be processed at a time. The `wait()` function will also still run it asynchronously but on top of that it will block the calling thread. So your while loop and `wait` won't consume fewer resources than the asynchronous version (it's the opposite). But given that you asked how you can run the wait without blocking: – Johannes Weiss Aug 01 '18 at 17:43
  • Sorry, I can't edit my above comment anymore. I was meant to say that given you asked how to run `wait()` on something: Just will need to send it to any thread that isn't an `EventLoop`. For example this will do it: `DispatchQueue.global().async { while ... { someFuture.wait() } }`. But this is not a good idea, you will just increase the overhead both in thread hops and in threads that you need. The work itself will still be done on the event loop it will just then be sent to another thread who's waiting for it. – Johannes Weiss Aug 01 '18 at 17:50

2 Answers2

10

Ok, so as you know, the problem lies in these lines:

while ... {
    ...
    try connection.raw("...").all(decoding: RecRegAction.self).wait()
    ...
}

you want to wait for a number of results and therefore you use a while loop and .wait() for all the intermediate results. Essentially, this is turning asynchronous code into synchronous code on the event loop. That is likely leading to deadlocks and will for sure stall other connections which is why SwiftNIO tries to detect that and give you that error. I won't go into the details why it's stalling other connections or why this is likely to lead to deadlocks in this answer.

Let's see what options we have to fix this issue:

  1. as you say, we could just have this .wait() on another thread that isn't one of the event loop threads. For this any non-EventLoop thread would do: Either a DispatchQueue or you could use the BlockingIOThreadPool (which does not run on an EventLoop)
  2. we could rewrite your code to be asynchronous

Both solutions will work but (1) is really not advisable as you would burn a whole (kernel) thread just to wait for the results. And both Dispatch and BlockingIOThreadPool have a finite number of threads they're willing to spawn so if you do that often enough you might run out of threads so it'll take even longer.

So let's look into how we can call an asynchronous function multiple times whilst accumulating the intermediate results. And then if we have accumulated all the intermediate results continue with all the results.

To make things easier let's look at a function that is very similar to yours. We assume this function to be provided just like in your code

/// delivers partial results (integers) and `nil` if no further elements are available
func deliverPartialResult() -> EventLoopFuture<Int?> {
    ...
}

what we would like now is a new function

func deliverFullResult() -> EventLoopFuture<[Int]>

please note how the deliverPartialResult returns one integer each time and deliverFullResult delivers an array of integers (ie. all the integers). Ok, so how do we write deliverFullResult without calling deliverPartialResult().wait()?

What about this:

func accumulateResults(eventLoop: EventLoop,
                       partialResultsSoFar: [Int],
                       getPartial: @escaping () -> EventLoopFuture<Int?>) -> EventLoopFuture<[Int]> {
    // let's run getPartial once
    return getPartial().then { partialResult in
        // we got a partial result, let's check what it is
        if let partialResult = partialResult {
            // another intermediate results, let's accumulate and call getPartial again
            return accumulateResults(eventLoop: eventLoop,
                                     partialResultsSoFar: partialResultsSoFar + [partialResult],
                                     getPartial: getPartial)
        } else {
            // we've got all the partial results, yay, let's fulfill the overall future
            return eventLoop.newSucceededFuture(result: partialResultsSoFar)
        }
    }
}

Given accumulateResults, implementing deliverFullResult is not too hard anymore:

func deliverFullResult() -> EventLoopFuture<[Int]> {
    return accumulateResults(eventLoop: myCurrentEventLoop,
                             partialResultsSoFar: [],
                             getPartial: deliverPartialResult)
}

But let's look more into what accumulateResults does:

  1. it invokes getPartial once, then when it calls back it
  2. checks if we have
    • a partial result in which case we remember it alongside the other partialResultsSoFar and go back to (1)
    • nil which means partialResultsSoFar is all we get and we return a new succeeded future with everything we have collected so far

that's already it really. What we did here is to turn the synchronous loop into asynchronous recursion.

Ok, we looked at a lot of code but how does this relate to your function now?

Believe it or not but this should actually work (untested):

accumulateResults(eventLoop: el, partialResultsSoFar: []) {
    connection.raw("FETCH \(chunkSize) FROM \(cursorName)")
              .all(decoding: RecRegAction.self)
              .map { results -> Int? in
        if results.count > 0 {
            return results.count
        } else {
            return nil
        }
   }
}.map { allResults in
    return allResults.reduce(0, +)
}

The result of all this will be an EventLoopFuture<Int> which carries the sum of all the intermediate result.count.

Sure, we first collect all your counts into an array to then sum it up (allResults.reduce(0, +)) at the end which is a bit wasteful but also not the end of the world. I left it this way because that makes accumulateResults be usable in other cases where you want to accumulate partial results in an array.

Now one last thing, a real accumulateResults function would probably be generic over the element type and also we can eliminate the partialResultsSoFar parameter for the outer function. What about this?

func accumulateResults<T>(eventLoop: EventLoop,
                          getPartial: @escaping () -> EventLoopFuture<T?>) -> EventLoopFuture<[T]> {
    // this is an inner function just to hide it from the outside which carries the accumulator
    func accumulateResults<T>(eventLoop: EventLoop,
                              partialResultsSoFar: [T] /* our accumulator */,
                              getPartial: @escaping () -> EventLoopFuture<T?>) -> EventLoopFuture<[T]> {
        // let's run getPartial once
        return getPartial().then { partialResult in
            // we got a partial result, let's check what it is
            if let partialResult = partialResult {
                // another intermediate results, let's accumulate and call getPartial again
                return accumulateResults(eventLoop: eventLoop,
                                         partialResultsSoFar: partialResultsSoFar + [partialResult],
                                         getPartial: getPartial)
            } else {
                // we've got all the partial results, yay, let's fulfill the overall future
                return eventLoop.newSucceededFuture(result: partialResultsSoFar)
            }
        }
    }
    return accumulateResults(eventLoop: eventLoop, partialResultsSoFar: [], getPartial: getPartial)
}

EDIT: After your edit your question suggests that you do not actually want to accumulate the intermediate results. So my guess is that instead, you want to do some processing after every intermediate result has been received. If that's what you want to do, maybe try this:

func processPartialResults<T, V>(eventLoop: EventLoop,
                                 process: @escaping (T) -> EventLoopFuture<V>,
                                 getPartial: @escaping () -> EventLoopFuture<T?>) -> EventLoopFuture<V?> {
    func processPartialResults<T, V>(eventLoop: EventLoop,
                                     soFar: V?,
                                     process: @escaping (T) -> EventLoopFuture<V>,
                                     getPartial: @escaping () -> EventLoopFuture<T?>) -> EventLoopFuture<V?> {
        // let's run getPartial once
        return getPartial().then { partialResult in
            // we got a partial result, let's check what it is
            if let partialResult = partialResult {
                // another intermediate results, let's call the process function and move on
                return process(partialResult).then { v in
                    return processPartialResults(eventLoop: eventLoop, soFar: v, process: process, getPartial: getPartial)
                }
            } else {
                // we've got all the partial results, yay, let's fulfill the overall future
                return eventLoop.newSucceededFuture(result: soFar)
            }
        }
    }
    return processPartialResults(eventLoop: eventLoop, soFar: nil, process: process, getPartial: getPartial)
}

This will (as before) run getPartial until it returns nil but instead of accumulating all of getPartial's results, it calls process which gets the partial result and can do some further processing. The next getPartial call will happen when the EventLoopFuture process returns is fulfilled.

Is that closer to what you would like?

Notes: I used SwiftNIO's EventLoopFuture type here, in Vapor you would just use Future instead but the remainder of the code should be the same.

Johannes Weiss
  • 52,533
  • 16
  • 102
  • 136
  • Thanks for taking the time to write out that answer. I've updated my question to clarify some of this, but using recursion is not really much of an option. If recursion worked, I probably could have just used a big query and chunked the result. Really I don't see any way around being able to call wait on the query, and I'm happy to do it as it's running off the command line. I would just need a way to run the query off the main event loop from the Command. – David Monagle Aug 01 '18 at 10:40
  • @DavidMonagle sorry, I can't quite follow. Why do you think that the recursiveness of this makes it any different? In both cases, we process one query at a time. In fact, when using `wait()` from another thread you'll do exactly the work that is done in my proposed function and a little bit more. Why do you think you need to `wait()` on the results? – Johannes Weiss Aug 01 '18 at 17:48
  • @DavidMonagle After re-reading your question I think you want to process the intermediate results immediately rather than accumulating them. I added `processPartialResult` as a way which allows you do do that. – Johannes Weiss Aug 01 '18 at 18:07
  • Thanks for your help Johannes. I've used a slightly less generic version of what you have posted above and it gets all the way through every record. My concerns about recursion and what it might do to the stack seem to be unfounded as the Swift compiler seems to be clever enough to realise there is not need to keep every iteration on the heap here. – David Monagle Aug 02 '18 at 23:25
  • 1
    @DavidMonagle it’s actually not the Swift compiler in this case. If a result is not available immediately the stack will unwind and NIO will wait for events call out later. Because none of your results can be available immediately (because you need networking to get results), this won't be a problem. If you had a long chain of Futures all fulfilled before you `.then` them, then you would run into issues. Those are quite easy to fix though: wrap the recursion into `eventloop.submit { innerFuture.then { $0 } }`. (There’s a NIO PR open to allow you to get rid of the odd `.then { $0 }`). – Johannes Weiss Aug 03 '18 at 09:02
  • 1
    If you use `eventloop.submit { ... }` you force the stack to _always_ unwind and go back into the event loop. We don't always do this automatically because it's slightly less efficient than nesting the stacks if and only if the result is immediately available. – Johannes Weiss Aug 03 '18 at 09:04
0

Here's the generic solution, rewritten for NIO 2.16/Vapor 4, and as an extension to EventLoop

extension EventLoop {

    func accumulateResults<T>(getPartial: @escaping () -> EventLoopFuture<T?>) -> EventLoopFuture<[T]> {
        // this is an inner function just to hide it from the outside which carries the accumulator
        func accumulateResults<T>(partialResultsSoFar: [T] /* our accumulator */,
                                  getPartial: @escaping () -> EventLoopFuture<T?>) -> EventLoopFuture<[T]> {
            // let's run getPartial once
            return getPartial().flatMap { partialResult in
                // we got a partial result, let's check what it is
                if let partialResult = partialResult {
                    // another intermediate results, let's accumulate and call getPartial again
                    return accumulateResults(partialResultsSoFar: partialResultsSoFar + [partialResult],
                                             getPartial: getPartial)
                } else {
                    // we've got all the partial results, yay, let's fulfill the overall future
                    return self.makeSucceededFuture(partialResultsSoFar)
                }
            }
        }
        return accumulateResults(partialResultsSoFar: [], getPartial: getPartial)
    }
}