swiftasync-awaitsemaphoreswift-concurrencydispatchsemaphore

How to implement an asynchronous queue in Swift concurrency?


Background: I am familiar with concurrency using Locks and Semaphores, and am learning Swift's new concurrency features.

Here is a simplified example of an asynchronous queue in Swift using DispatchSemaphore:

class AsynchronousQueue {
    var data: [MyDataType] = []
    var semaphore = DispatchSemaphore(value: 0)
    
    func push(data: MyData) {
        data.append(data)
        semaphore.signal()
    }

    func pop() -> MyData {
        semaphore.wait()
        return data.popLast()
    }
}

Note that this is a little bit oversimplified - I will also want to restrict the editing of the data array such that its not mutated by two threads at the same time, which can be accomplished with Locks, or with Actors. I also may want to send some sort of "cancel" signal, which also is a relatively small change.

The piece that I am not sure how to accomplish with Swift Concurrency is the role that the Semaphore plays here. I want pop() to return immediately if data is available, or to wait, potentially indefinitely, if data is unavailable.

I have encountered a lot of posts discouraging the use of Semaphores at all in modern Swift, code, but I have not seen an example of how to do this kind of waiting with Swift concurrency features (await/async/actor) that is not significantly more complicated than using a semaphore.

Note that DispatchSemaphore can't be used from within an async function, so it seems difficult to use the new await/async/actor features together with code based on DispatchSemaphore, which is why I ask.


Solution

  • Technically, what you have described is a stack (or a LIFO queue).

    Here is my implementation.

    It supports push, blocking and non-blocking pop as well as peek and cancel.

    isEmpty and stackDepth properties are available.

    actor AsyncStack<Element> {
        private var storage = [Element]()
        private var awaiters = [CheckedContinuation<Element,Error>]()
        
        
        /// Push a new element onto the stack
        /// - Parameter newElement: The element to push
        /// - Returns: Void
    
        public func push(_ newElement: Element) async-> Void {
            if !awaiters.isEmpty {
                let awaiter = awaiters.removeFirst()
                awaiter.resume(returning: newElement)
            } else {
                storage.append(newElement)
            }
        }
        
        /// Pop the  element at the top of the stack or wait until an element becomes available
        /// - Returns: The popped element
        /// - Throws: An AsyncQueueError if the waiting pop is cancelled
    
        public func popOrWait() async throws -> Element {
            if let element = storage.popLast() {
                return element
            }
            return try await withCheckedThrowingContinuation { continuation in
                awaiters.append(continuation)
            }
        }
        
        /// Pop an element from the top of the stack if possible
        /// - Returns: An element or nil if the stack is empty
    
        public func pop() async -> Element? {
            return storage.popLast()
        }
        
        /// Return the element at the top of the stack, if any, without removing it
        /// - Returns: The element a the top of the stack or nil
        public func peek() async -> Element? {
            return storage.last
        }
        
        /// True if the stack is empty
    
        public var isEmpty: Bool {
            get async {
                return storage.isEmpty
            }
        }
    
        /// Current stack depth
    
        public var stackDepth: Int {
            get async {
                return storage.count
            }
        }
        
        /// Cancel all pending popOrWait operations.
        /// Pending operations will throw `AsyncQueue.OperationCancelled`
        /// - Returns: Void
    
        public func cancel() async -> Void {
            for awaiter in awaiters {
                awaiter.resume(throwing: AsyncQueueError.OperationCancelled)
            }
            awaiters.removeAll()
        }
        
        public enum AsyncQueueError: Error {
            case OperationCancelled
        }
    }
    

    I have used a CheckedContinuation to block the pop operation if the stack is empty. The continuations are held in their own array to allow for multiple, blocked "poppers". The cancel operation cancels all outstanding popOrWait operations. The cancelled calls will throw.

    Blocked pop operations complete in FIFO order.