Background: I am familiar with concurrency using Locks and Semaphores, and am learning Swift's new concurrency features.
Here is a simplified example of an asynchronous queue in Swift using DispatchSemaphore:
class AsynchronousQueue {
var data: [MyDataType] = []
var semaphore = DispatchSemaphore(value: 0)
func push(data: MyData) {
data.append(data)
semaphore.signal()
}
func pop() -> MyData {
semaphore.wait()
return data.popLast()
}
}
Note that this is a little bit oversimplified - I will also want to restrict the editing of the data array such that its not mutated by two threads at the same time, which can be accomplished with Locks, or with Actors. I also may want to send some sort of "cancel" signal, which also is a relatively small change.
The piece that I am not sure how to accomplish with Swift Concurrency is the role that the Semaphore plays here. I want pop()
to return immediately if data is available, or to wait, potentially indefinitely, if data is unavailable.
I have encountered a lot of posts discouraging the use of Semaphores at all in modern Swift, code, but I have not seen an example of how to do this kind of waiting with Swift concurrency features (await/async/actor
) that is not significantly more complicated than using a semaphore.
Note that DispatchSemaphore
can't be used from within an async
function, so it seems difficult to use the new await/async/actor
features together with code based on DispatchSemaphore
, which is why I ask.
Technically, what you have described is a stack (or a LIFO queue).
Here is my implementation.
It supports push
, blocking and non-blocking pop
as well as peek
and cancel
.
isEmpty
and stackDepth
properties are available.
actor AsyncStack<Element> {
private var storage = [Element]()
private var awaiters = [CheckedContinuation<Element,Error>]()
/// Push a new element onto the stack
/// - Parameter newElement: The element to push
/// - Returns: Void
public func push(_ newElement: Element) async-> Void {
if !awaiters.isEmpty {
let awaiter = awaiters.removeFirst()
awaiter.resume(returning: newElement)
} else {
storage.append(newElement)
}
}
/// Pop the element at the top of the stack or wait until an element becomes available
/// - Returns: The popped element
/// - Throws: An AsyncQueueError if the waiting pop is cancelled
public func popOrWait() async throws -> Element {
if let element = storage.popLast() {
return element
}
return try await withCheckedThrowingContinuation { continuation in
awaiters.append(continuation)
}
}
/// Pop an element from the top of the stack if possible
/// - Returns: An element or nil if the stack is empty
public func pop() async -> Element? {
return storage.popLast()
}
/// Return the element at the top of the stack, if any, without removing it
/// - Returns: The element a the top of the stack or nil
public func peek() async -> Element? {
return storage.last
}
/// True if the stack is empty
public var isEmpty: Bool {
get async {
return storage.isEmpty
}
}
/// Current stack depth
public var stackDepth: Int {
get async {
return storage.count
}
}
/// Cancel all pending popOrWait operations.
/// Pending operations will throw `AsyncQueue.OperationCancelled`
/// - Returns: Void
public func cancel() async -> Void {
for awaiter in awaiters {
awaiter.resume(throwing: AsyncQueueError.OperationCancelled)
}
awaiters.removeAll()
}
public enum AsyncQueueError: Error {
case OperationCancelled
}
}
I have used a CheckedContinuation
to block the pop
operation if the stack is empty. The continuations are held in their own array to allow for multiple, blocked "poppers". The cancel
operation cancels all outstanding popOrWait
operations. The cancelled calls will throw
.
Blocked pop
operations complete in FIFO order.