swiftswift5swift-concurrencyswift-task

Store a reference to a TaskGroup or add to it outside of scope


I'm trying to replace DispatchGroup with a TaskGroup because I can cancel a TaskGroup... however I can't store a reference to a TaskGroup..

The methods withTaskGroup(...) and withDiscardingTaskGroup(...) seem to be built around the assumption that you'll always have all of your child tasks you'll ever have in the task group exactly ready on the next line. However what if you have a bunch of code without references to one another and you want to throw your tasks into the common group? Like if you want to track them, queue them, cancel them later, etc? It seems you can't... at least that is what this question is for..

Example code:

class SHACalculationGroup {
    // 'DiscardingTaskGroup' initializer is inaccessible due to 'internal' protection level
    internal let taskGroup: TaskGroup = DiscardingTaskGroup()
    let lock = AsyncSemaphore(value: 8)

    private func calculateShaHash(forUrl url: URL) async {
        await lock.wait()
        do {
            try let sha256 = url.sha256().hexStr
            log.debug("sha256 is \(sha256 ?? "EMPTY")")
        } catch {
            log.error("Could not get sha256 of the file. \(error)")
        }
        lock.signal()
    }

    func kickoffShaCalculation(forUrl url: URL) {
        taskGroup.addTask(operation:
            Task.detached(priority: .utility) {
                await calculateShaHash(forUrl: url)
            }
        )
    }
}

I get the error on the init for DiscardingTaskGroup: 'DiscardingTaskGroup' initializer is inaccessible due to 'internal' protection level

Seriously what the heck.. how are we supposed to centralize random common operations from around our application without being able to store a reference to this thing? I'd like to know what in the heck they were thinking if you might be able to comment on that as well.

Thanks for any help!


Solution

  • Yes, you do not want to access that TaskGroup outside of the withTaskGroup. The documentation says:

    Don’t use a task group from outside the task where you created it.

    What you can do is have your task group monitor an AsyncChannel, part of Apple’s Swift Async Algorithms package:

    import AsyncAlgorithms
    
    actor SHACalculationGroup {
        private let channel = AsyncChannel<URL>()
    
        // called first, to start monitoring the channel
    
        func monitor() async {
            await withTaskGroup(of: Void.self) { group in
                var count = 0
                for await url in channel {
                    count += 1
                    if count > 8 { await group.next() }
    
                    group.addTask { await self.calculation(for: url) }
                }
            }
        }
    
        // called as you add items
    
        func add(url: URL) async {
            await channel.send(url)
        }
    
        // the calculation (nonisolated)
    
        private nonisolated func calculation(for url: URL) async {
            do {
                …
            } catch {
                log.error("Could not get sha256 of the file. \(error)")
            }
        }
    }
    

    Hopefully, this illustrates the pattern. Use a channel to submit requests later. And, for what it is worth, I personally use group.next() to constrain the degree of concurrency in a task group, not AsyncSemaphore.

    I profiled this (with OSSignposter Ⓢ signposts where tasks were added, and intervals where they actually ran):

    enter image description here


    As an aside, I assume that your calculations, while slow enough to benefit parallelism and getting them off the main actor, are not so slow that they will block threads from the cooperative thread pool for a prolonged period of time. We must remember that we have a contract with the cooperative thread pool to never impede forward progress. For more information, see WWDC 2022 video Visualize and optimize Swift concurrency, which talks about getting slow, synchronous work out of the Swift concurrency system.

    For example, if we are launching a series of slow, synchronous units of work, at OperationQueue might be more appropriate. Just set the queue’s maxConcurrentOperationCount, add your operations, and you can either cancel the whole queue or individual operations if you want. By getting these synchronous/blocking tasks out of the cooperative thread pool, we can uphold our contract with Swift concurrency to never impede forward progress.