I have a mix of async/await (Actors) running "legacy" code (Process Pipes and callback blocks). I have a situation similar to this where my actor's function returns a value, while also storing a deferred callback once some work is completed ( actor should not block while waiting for the work to complete, the actor purpose is only to support id generation and internal updates which need to be threadsafe). Ideally I would like to structure the callback to also be ergonomic as an "async" variant I am using an actor for thread safety. One of the async functions stores a callback which is called once the work is completed or once a timeout has occurred. Currently getting a deadlock as well as "ugly" nesting.
typealias Block = (String?) async -> Void // do I need the async keyword here necessarily?
actor Processor {
let id = Int
var callbacks = [Int:Block]()
let process : Process // external process which communicates of stdin/stdout
init(processPath: String) async throws {
process = Process()
process.launchPath = processPath
process.standardInput = Pipe()
process.standardOutput = Pipe()
(process.standardOutput as! Pipe).fileHandleForReading.readabilityHandler = {[weak self] handler in
// does not support async/await directly, wrapping in a task, perhaps better mechanism?
Task {[weak self] in
await self?.handleOutput(data: handler.availableData)
}
}
}
struct Result: Codable {
id: Int ,
output: String
}
func handleOutput(data: Data) {
if let decoded = try? JSONDecoder().decode(Result.self, from: data),
let id = decoded.id ,
let callback = pop(id: id) {
await callback(decoded.output) // call the block, this time with real value vs when there was a timeout
}
}
func work(text: String, completion: @escaping Block) async -> WorkArgs {
id += 1 // increment the running id
// store the callback
callbacks[id] = completion
// timeout if the result doesn't come back in a reasonable amount of time.
// I will perform a check after 1 second, and pop the callback if needed
// problematic here..., how to wait while also not blocking this function
Task { [weak self] in
// invalidate if needed
try await Task.sleep(nanoseconds: 1_000_000_000)
// dead lock here:
if let bl = await self?.pop(id: id) {
print("invalidated the task \(id)")
await bl(nil)
}
}
// call out to the external process with this convention, it will run async and print a result with the same id
let command = "\(id) \(text)\n"
let d = command(using: .utf8)
try! (process.standardInput as! Pipe).fileHandleForWriting.write(contentsOf: d)
}
// pop this callback
func pop(id: Int) -> Block? {
return callbacks.removeValue(forKey: id)
}
}
struct WorkArgs {
let id: Int
let date: Date
}
actor IdGen {
private var id : Int64 = 0
func next() -> Int64 {
id += 1
return id
}
}
actor CallbackActor {
var pendingCallbacks = [Int: (String) -> Void]()
func push(_ key: Int, block: @escaping (String) -> Void) {
pendingCallbacks[key] = block
}
func pop(_ key: Int64) -> AutoCompleteBlock? {
return pendingCallbacks.removeValue(forKey: key)
}
}
Before I get to the timeout question, we should probably talk about how to wrap the Process
within Swift concurrency.
One pattern would be to use AsyncSequence
(i.e., an AsyncStream
) for stdout
:
actor ProcessWithStream {
private let process = Process()
private let stdin = Pipe()
private let stdout = Pipe()
private let stderr = Pipe()
private var buffer = Data()
init(url: URL) {
process.standardInput = stdin
process.standardOutput = stdout
process.standardError = stderr
process.executableURL = url
}
func start() throws {
try process.run()
}
func terminate() {
process.terminate()
}
func send(_ string: String) {
let data = Data("\(string)\n".utf8)
stdin.fileHandleForWriting.write(data)
}
func stream() -> AsyncStream<Data> {
AsyncStream(Data.self) { continuation in
stdout.fileHandleForReading.readabilityHandler = { handler in
continuation.yield(handler.availableData)
}
process.terminationHandler = { handler in
continuation.finish()
}
}
}
}
Then you can for await
that stream:
let process = ProcessWithStream(url: url)
override func viewDidLoad() {
super.viewDidLoad()
Task {
try await startStream()
print("done")
}
}
@IBAction func didTapSend(_ sender: Any) {
let string = textField.stringValue
Task {
await process.send(string)
}
textField.stringValue = ""
}
func startStream() async throws {
try await process.start()
let stream = await process.stream()
for await data in stream {
if let string = String(data: data, encoding: .utf8) {
print(string, terminator: "")
}
}
}
This is a simple approach. And it looks fine (because I am printing responses without a terminator).
But one needs to be careful because readabilityHandler
will not always be called with the full Data
of some particular output. It might be broken up or split across separate calls to the readabilityHandler
.
Another pattern would be to use lines
, which avoids the problem of readabilityHandler
possibly being called multiple times for a given output:
actor ProcessWithLines {
private let process = Process()
private let stdin = Pipe()
private let stdout = Pipe()
private let stderr = Pipe()
private var buffer = Data()
private(set) var lines: AsyncLineSequence<FileHandle.AsyncBytes>?
init(url: URL) {
process.standardInput = stdin
process.standardOutput = stdout
process.standardError = stderr
process.executableURL = url
}
func start() throws {
lines = stdout.fileHandleForReading.bytes.lines
try process.run()
}
func terminate() {
process.terminate()
}
func send(_ string: String) {
let data = Data("\(string)\n".utf8)
stdin.fileHandleForWriting.write(data)
}
}
Then you can do:
let process = ProcessWithLines(url: url)
override func viewDidLoad() {
super.viewDidLoad()
Task {
try await startStream()
print("done")
}
}
@IBAction func didTapSend(_ sender: Any) {
let string = textField.stringValue
Task {
await process.send(string)
}
textField.stringValue = ""
}
func startStream() async throws {
try await process.start()
guard let lines = await process.lines else { return }
for try await line in lines {
print(line)
}
}
This avoids the breaking of responses mid-line.
You asked:
How to support ... timeout in swift actor
The pattern is to wrap the request in a Task
, and then start a separate task that will cancel that prior task after a Task.sleep
interval.
But this is going to be surprisingly complicated in this case, because you have to coordinate that with the separate Process
which will otherwise still proceed, unaware that the Task
has been canceled. That can theoretically lead to problems (e.g., the process gets backlogged, etc.).
I would advise integrating the timeout logic in the app invoked by the Process
, rather than trying to have the caller handle that. It can be done (e.g. maybe write the process app to capture and handle SIGINT
and then the caller can call interrupt
on the Process
). But it is going to be complicated and, most likely, brittle.