Suppose we have a Mac app that uses Process
(formerly NSTask
) to run some other executable which writes data to both STDOUT and STDERR. This process might take a long time to do its work. Prior to Swift Concurrency, we would do this:
// Assume the object with this function is bound to the MainActor:
func doSomeWork()
{
DispatchQueue.global(qos: .default).async
{
let task = Process()
let errorPipe = Pipe()
let errorFileHandle = errorPipe.fileHandleForReading
task.standardError = errorPipe
let outputPipe = Pipe()
let outputFileHandle = outputPipe.fileHandleForReading
task.standardOutput = outputPipe
// Need to read fileHandles because if more than 64kb of data is written, they'll block the whole thread until we read them.
var outputData = Data(capacity: 1000)
var errorData = Data(capacity: 1000)
// Semaphore to make sure our readability handlers exit before we continue. NSTask does not guarantee they'll finish before -waitUntilExit fires.
let sema = DispatchSemaphore(value: 0)
outputFileHandle.readabilityHandler = { handle in
let newData = handle.availableData()
if newData.count == 0 {
// end-of-data signal is an empty data object.
outputFileHandle.readabilityHandler = nil
sema.signal()
} else {
outputData.append(newData)
}
}
errorFileHandle.readabilityHandler = { handle in
let newData = handle.availableData()
if newData.count == 0 {
// end-of-data signal is an empty data object.
errorFileHandle.readabilityHandler = nil
sema.signal()
} else {
errorData.append(newData)
}
}
task.terminationHandler = { task in
sema.signal()
}
defer {
errorFileHandle.closeFile()
outputFileHandle.closeFile()
}
do {
try task.run()
} catch {
// handle error
}
sema.wait()
sema.wait()
sema.wait()
// Use outputData and errorData to do whatever...
}
}
Note that the readabilityHandlers
make no guarantee on which thread they're executed. And Process
makes no guarantee that each readabilityHandler
is done writing data by the time task.terminationHandler
is called. Process also makes no guarantee that the readabilityHandlers are finished by the time waitUntilExit()
returns. Do not suggest that alternative; I've been there and I have the t-shirt.
Therefore, I use a semaphore to ensure that all three pieces of the puzzle are 100% finished before I continue.
outputData
and errorData
are shared mutable state across threads, but because of the semaphore, it's guaranteed that nothing will read from either object until all the writes from the two readabilityHandlers
have finished.
I understand that I can use an Actor
to wrap outputData
and errorData
to satisfy the compiler's error about Mutatation of captured var
outputData in concurrently-executing code.
and formalize my currently "manual" guard against race conditions (at the expense of a lot more context-switching overhead.)
In Swift concurrency, threads are always supposed to make forward progress. Which means no semaphores. So even if I move the Process
and the readabilityHandlers
to an Actor
, how do I properly "await" all three pieces being done?
As a general rule, where we might have reached for a semaphore, we would now frequently await
a task or continuation. If wrapping some traditional asynchronous API that does not yet support Swift concurrency, we would often do this with withCheckedContinuation
or withCheckedThrowingContinuation
. But as you correctly note, we would scrupulously avoid semaphores in Swift concurrency.
But, in this case, when dealing with a series of asynchronous events, I would probably make an AsyncSequence
for the availableData
:
extension Pipe {
struct AsyncAvailableData: AsyncSequence {
typealias Element = Data
let pipe: Pipe
func makeAsyncIterator() -> AsyncStream<Element>.Iterator {
AsyncStream { continuation in
pipe.fileHandleForReading.readabilityHandler = { @Sendable handle in
let data = handle.availableData
guard !data.isEmpty else {
continuation.finish()
return
}
continuation.yield(data)
}
continuation.onTermination = { _ in
pipe.fileHandleForReading.readabilityHandler = nil
}
}.makeAsyncIterator()
}
}
var availableData: AsyncAvailableData { AsyncAvailableData(pipe: self) }
}
I would then use async let
(SE-0317) to run these in concurrently:
func doSomeWork() async throws {
let process = Process()
process.executableURL = command
let inputPipe = Pipe()
let outputPipe = Pipe()
let errorPipe = Pipe()
process.standardError = errorPipe
process.standardOutput = outputPipe
process.standardInput = inputPipe
// optionally, you might want to return whatever non-zero termination status code the process returned
process.terminationHandler = { process in
if process.terminationStatus != 0 {
exit(process.terminationStatus)
}
}
async let outputTask = outputPipe.availableData.joined()
async let errorTask = errorPipe.availableData.joined()
try process.run()
let outputData = await outputTask
let errorData = await errorTask
…
}
Where:
extension Pipe.AsyncAvailableData {
func joined() async -> Data {
var result = Data()
for await data in self {
result.append(data)
}
return result
}
}
Note, I created an asynchronous sequence of the Data
representing the availableData
supplied to the readabilityHandler
. Often, I would create an AsyncSequence
of the individual bytes of data as shown in https://stackoverflow.com/a/76941591/1271826. That way, if you wanted to process them line by line, you get that behavior for free. But if you just load the whole stream in memory at one time, then there is no need to add that overhead of converting the sequence of Data
to a stream of bytes and then back to one large Data
.
While I hope I answered your question above, for the sake of future readers, I must note that there are serious downsides to attempting to load all of the standardOutput
and standardError
into their respective Data
. Specifically, you will have to load the full output in memory at the same time.
We would generally want to process the data as it comes in, rather than ever trying to hold the whole thing in memory at one time. In trivial use-cases, loading the whole thing into RAM works fine, but as the input grows, it becomes less and less practical. Caveat emptor.