swiftasync-awaitactorswift-concurrency

How to support an async callback with timeout in swift actor


I have a mix of async/await (Actors) running "legacy" code (Process Pipes and callback blocks). I have a situation similar to this where my actor's function returns a value, while also storing a deferred callback once some work is completed ( actor should not block while waiting for the work to complete, the actor purpose is only to support id generation and internal updates which need to be threadsafe). Ideally I would like to structure the callback to also be ergonomic as an "async" variant I am using an actor for thread safety. One of the async functions stores a callback which is called once the work is completed or once a timeout has occurred. Currently getting a deadlock as well as "ugly" nesting.

typealias Block = (String?) async -> Void // do I need the async keyword here necessarily?

actor Processor {
  let id = Int
  var callbacks = [Int:Block]()
  let process : Process // external process which communicates of stdin/stdout
  init(processPath: String) async throws {
    process = Process()
    process.launchPath = processPath
    process.standardInput = Pipe()
    process.standardOutput = Pipe()
    (process.standardOutput as! Pipe).fileHandleForReading.readabilityHandler = {[weak self] handler in
       // does not support async/await directly, wrapping in a task, perhaps better mechanism?
       Task {[weak self] in
          await self?.handleOutput(data: handler.availableData)
       }
    }
  }

  struct Result: Codable { 
     id: Int ,
     output: String
  }
  func handleOutput(data: Data) {
     if let decoded = try? JSONDecoder().decode(Result.self, from: data),
            let id = decoded.id ,
            let callback = pop(id: id) {
            await callback(decoded.output) // call the block, this time with real value vs when there was a timeout
        }
  }
  func work(text: String, completion: @escaping Block) async -> WorkArgs {
     id += 1 // increment the running id
     // store the callback
     callbacks[id] = completion
     // timeout if the result doesn't come back in a reasonable amount of time. 
     // I will perform a check after 1 second, and pop the callback if needed
     // problematic here..., how to wait while also not blocking this function
     Task { [weak self] in
       // invalidate if needed
       try await Task.sleep(nanoseconds: 1_000_000_000)
       // dead lock here:
       if let bl = await self?.pop(id: id) {
            print("invalidated the task \(id)")
            await bl(nil)
       }
     }
     // call out to the external process with this convention, it will run async and print a result with the same id
     let command = "\(id) \(text)\n"
     let d = command(using: .utf8)

     try! (process.standardInput as! Pipe).fileHandleForWriting.write(contentsOf: d)
     
  }
  // pop this callback
  func pop(id: Int) -> Block? {
        return callbacks.removeValue(forKey: id)
  }
}

struct WorkArgs {
  let id: Int
  let date: Date
}

actor IdGen {
    private var id : Int64 = 0
    func next() -> Int64 {
        id += 1
        return id
    }
}

actor CallbackActor {
var pendingCallbacks = [Int: (String) -> Void]()
    func push(_ key: Int, block: @escaping  (String) -> Void) {
        pendingCallbacks[key] = block
    }
    func pop(_ key: Int64) -> AutoCompleteBlock? {
        return pendingCallbacks.removeValue(forKey: key)
    }
}

Solution

  • Before I get to the timeout question, we should probably talk about how to wrap the Process within Swift concurrency.


    You asked:

    How to support ... timeout in swift actor

    The pattern is to wrap the request in a Task, and then start a separate task that will cancel that prior task after a Task.sleep interval.

    But this is going to be surprisingly complicated in this case, because you have to coordinate that with the separate Process which will otherwise still proceed, unaware that the Task has been canceled. That can theoretically lead to problems (e.g., the process gets backlogged, etc.).

    I would advise integrating the timeout logic in the app invoked by the Process, rather than trying to have the caller handle that. It can be done (e.g. maybe write the process app to capture and handle SIGINT and then the caller can call interrupt on the Process). But it is going to be complicated and, most likely, brittle.