swiftvaporswift-nio

Creating and consuming a cursor with Vapor 3


This might be a can of worms, I'll do my best to describe the issue. We have a long running data processing job. Our database of actions is added to nightly and the outstanding actions are processed. It takes about 15 minutes to process nightly actions. In Vapor 2 we utilised a lot of raw queries to create a PostgreSQL cursor and loop through it until it was empty.

For the time being, we run the processing via a command line parameter. In future we wish to have it run as part of the main server so that progress can be checked while processing is being performed.

func run(using context: CommandContext) throws -> Future<Void> {
    let table = "\"RecRegAction\""
    let cursorName = "\"action_cursor\""
    let chunkSize = 10_000


    return context.container.withNewConnection(to: .psql) { connection in
        return PostgreSQLDatabase.transactionExecute({ connection -> Future<Int> in

            return connection.simpleQuery("DECLARE \(cursorName) CURSOR FOR SELECT * FROM \(table)").map { result in
                var totalResults = 0
                var finished : Bool = false

                while !finished {
                    let results = try connection.raw("FETCH \(chunkSize) FROM \(cursorName)").all(decoding: RecRegAction.self).wait()
                    if results.count > 0 {
                        totalResults += results.count
                        print(totalResults)
                        // Obviously we do our processing here
                    }
                    else {
                        finished = true
                    }
                }

                return totalResults
            }
        }, on: connection)
    }.transform(to: ())
}

Now this doesn't work because I'm calling wait() and I get the error "Precondition failed: wait() must not be called when on the EventLoop" which is fair enough. One of the issues I face is that I have no idea how you even get off the main event loop to run things like this on a background thread. I am aware of BlockingIOThreadPool, but that still seems to operate on the same EventLoop and still causes the error. While I'm able to theorise more and more complicated ways to achieve this, I'm hoping I'm missing an elegant solution which perhaps somebody with better knowledge of SwiftNIO and Fluent could help out with.

Edit: To be clear, the goal of this is obviously not to total up the number of actions in the database. The goal is to use the cursor to process every action synchronously. As I read the results in, I detect changes in the actions and then throw batches of them out to processing threads. When all the threads are busy, I don't start reading from the cursor again until they complete.

There are a LOT of these actions, up to 45 million in a single run. Aggregating promises and recursion didn't seem to be a great idea and when I tried it, just for the sake of it, the server hung.

This is a processing intensive task that can run for days on a single thread, so I'm not concerned about creating new threads. The issue is that I cannot work out how I can use the wait() function inside a Command as I need a container to create the database connection and the only one I have access to is context.container Calling wait() on this leads to the above error.

TIA


Solution

  • Ok, so as you know, the problem lies in these lines:

    while ... {
        ...
        try connection.raw("...").all(decoding: RecRegAction.self).wait()
        ...
    }
    

    you want to wait for a number of results and therefore you use a while loop and .wait() for all the intermediate results. Essentially, this is turning asynchronous code into synchronous code on the event loop. That is likely leading to deadlocks and will for sure stall other connections which is why SwiftNIO tries to detect that and give you that error. I won't go into the details why it's stalling other connections or why this is likely to lead to deadlocks in this answer.

    Let's see what options we have to fix this issue:

    1. as you say, we could just have this .wait() on another thread that isn't one of the event loop threads. For this any non-EventLoop thread would do: Either a DispatchQueue or you could use the BlockingIOThreadPool (which does not run on an EventLoop)
    2. we could rewrite your code to be asynchronous

    Both solutions will work but (1) is really not advisable as you would burn a whole (kernel) thread just to wait for the results. And both Dispatch and BlockingIOThreadPool have a finite number of threads they're willing to spawn so if you do that often enough you might run out of threads so it'll take even longer.

    So let's look into how we can call an asynchronous function multiple times whilst accumulating the intermediate results. And then if we have accumulated all the intermediate results continue with all the results.

    To make things easier let's look at a function that is very similar to yours. We assume this function to be provided just like in your code

    /// delivers partial results (integers) and `nil` if no further elements are available
    func deliverPartialResult() -> EventLoopFuture<Int?> {
        ...
    }
    

    what we would like now is a new function

    func deliverFullResult() -> EventLoopFuture<[Int]>
    

    please note how the deliverPartialResult returns one integer each time and deliverFullResult delivers an array of integers (ie. all the integers). Ok, so how do we write deliverFullResult without calling deliverPartialResult().wait()?

    What about this:

    func accumulateResults(eventLoop: EventLoop,
                           partialResultsSoFar: [Int],
                           getPartial: @escaping () -> EventLoopFuture<Int?>) -> EventLoopFuture<[Int]> {
        // let's run getPartial once
        return getPartial().then { partialResult in
            // we got a partial result, let's check what it is
            if let partialResult = partialResult {
                // another intermediate results, let's accumulate and call getPartial again
                return accumulateResults(eventLoop: eventLoop,
                                         partialResultsSoFar: partialResultsSoFar + [partialResult],
                                         getPartial: getPartial)
            } else {
                // we've got all the partial results, yay, let's fulfill the overall future
                return eventLoop.newSucceededFuture(result: partialResultsSoFar)
            }
        }
    }
    

    Given accumulateResults, implementing deliverFullResult is not too hard anymore:

    func deliverFullResult() -> EventLoopFuture<[Int]> {
        return accumulateResults(eventLoop: myCurrentEventLoop,
                                 partialResultsSoFar: [],
                                 getPartial: deliverPartialResult)
    }
    

    But let's look more into what accumulateResults does:

    1. it invokes getPartial once, then when it calls back it
    2. checks if we have
      • a partial result in which case we remember it alongside the other partialResultsSoFar and go back to (1)
      • nil which means partialResultsSoFar is all we get and we return a new succeeded future with everything we have collected so far

    that's already it really. What we did here is to turn the synchronous loop into asynchronous recursion.

    Ok, we looked at a lot of code but how does this relate to your function now?

    Believe it or not but this should actually work (untested):

    accumulateResults(eventLoop: el, partialResultsSoFar: []) {
        connection.raw("FETCH \(chunkSize) FROM \(cursorName)")
                  .all(decoding: RecRegAction.self)
                  .map { results -> Int? in
            if results.count > 0 {
                return results.count
            } else {
                return nil
            }
       }
    }.map { allResults in
        return allResults.reduce(0, +)
    }
    

    The result of all this will be an EventLoopFuture<Int> which carries the sum of all the intermediate result.count.

    Sure, we first collect all your counts into an array to then sum it up (allResults.reduce(0, +)) at the end which is a bit wasteful but also not the end of the world. I left it this way because that makes accumulateResults be usable in other cases where you want to accumulate partial results in an array.

    Now one last thing, a real accumulateResults function would probably be generic over the element type and also we can eliminate the partialResultsSoFar parameter for the outer function. What about this?

    func accumulateResults<T>(eventLoop: EventLoop,
                              getPartial: @escaping () -> EventLoopFuture<T?>) -> EventLoopFuture<[T]> {
        // this is an inner function just to hide it from the outside which carries the accumulator
        func accumulateResults<T>(eventLoop: EventLoop,
                                  partialResultsSoFar: [T] /* our accumulator */,
                                  getPartial: @escaping () -> EventLoopFuture<T?>) -> EventLoopFuture<[T]> {
            // let's run getPartial once
            return getPartial().then { partialResult in
                // we got a partial result, let's check what it is
                if let partialResult = partialResult {
                    // another intermediate results, let's accumulate and call getPartial again
                    return accumulateResults(eventLoop: eventLoop,
                                             partialResultsSoFar: partialResultsSoFar + [partialResult],
                                             getPartial: getPartial)
                } else {
                    // we've got all the partial results, yay, let's fulfill the overall future
                    return eventLoop.newSucceededFuture(result: partialResultsSoFar)
                }
            }
        }
        return accumulateResults(eventLoop: eventLoop, partialResultsSoFar: [], getPartial: getPartial)
    }
    

    EDIT: After your edit your question suggests that you do not actually want to accumulate the intermediate results. So my guess is that instead, you want to do some processing after every intermediate result has been received. If that's what you want to do, maybe try this:

    func processPartialResults<T, V>(eventLoop: EventLoop,
                                     process: @escaping (T) -> EventLoopFuture<V>,
                                     getPartial: @escaping () -> EventLoopFuture<T?>) -> EventLoopFuture<V?> {
        func processPartialResults<T, V>(eventLoop: EventLoop,
                                         soFar: V?,
                                         process: @escaping (T) -> EventLoopFuture<V>,
                                         getPartial: @escaping () -> EventLoopFuture<T?>) -> EventLoopFuture<V?> {
            // let's run getPartial once
            return getPartial().then { partialResult in
                // we got a partial result, let's check what it is
                if let partialResult = partialResult {
                    // another intermediate results, let's call the process function and move on
                    return process(partialResult).then { v in
                        return processPartialResults(eventLoop: eventLoop, soFar: v, process: process, getPartial: getPartial)
                    }
                } else {
                    // we've got all the partial results, yay, let's fulfill the overall future
                    return eventLoop.newSucceededFuture(result: soFar)
                }
            }
        }
        return processPartialResults(eventLoop: eventLoop, soFar: nil, process: process, getPartial: getPartial)
    }
    

    This will (as before) run getPartial until it returns nil but instead of accumulating all of getPartial's results, it calls process which gets the partial result and can do some further processing. The next getPartial call will happen when the EventLoopFuture process returns is fulfilled.

    Is that closer to what you would like?

    Notes: I used SwiftNIO's EventLoopFuture type here, in Vapor you would just use Future instead but the remainder of the code should be the same.