swiftswift-concurrency

JobScheduler implemented with Swift Concurrency needs improvements


I implemented a job scheduler with Swift Concurrency. The jobs are simply closures. This scheduler processes certain number of jobs in parallel and asks other jobs to wait. It uses an actor to encapsulate all the mutable data.

I managed to make it work. But I feel it's very cumbersome. How can I make it better? Can I implement it in a different way? All suggestions are warmly welcomed.

class CustomJob {
    var completion: () -> ()
    init(completion: @escaping () -> Void) {
        self.completion = completion
    }
}

actor JobQueue {
    private var maxRunningCount: Int
    private var runningJobCount = 0
    private var pendingJobs = [CustomJob]()
    
    init(maxRunningCount: Int) {
        self.maxRunningCount = maxRunningCount
    }
    
    func addJob(job: CustomJob) {
        pendingJobs.append(job)
    }
    
    // I found that I need to increment the runningJobCount here.
    func nextJob() -> CustomJob? {
        if runningJobCount == maxRunningCount {
            print("The next job needs to wait")
            return nil
        }
        if runningJobCount < maxRunningCount && pendingJobs.count > 0 {
            runningJobCount += 1
            return pendingJobs.removeFirst()
        } else {
            return nil
        }
    }
    
    
    func finishOneJob() {
        runningJobCount -= 1
    }
}

class JobScheduler {
    
    let jobQueue: JobQueue
    
    init(maxRunningCount: Int) {
        jobQueue = JobQueue(maxRunningCount: maxRunningCount)
    }
    
    func scheduleJob(job: @escaping () -> ()) {
        Task {
            await jobQueue.addJob(job: CustomJob(completion: job))
            run()
        }
    }
    
    private func run() {
        Task {
            if let job = await jobQueue.nextJob() {
                Task {
                    await self.executeJob(job: job)
                    await self.jobQueue.finishOneJob()
                    run()
                }
            }
        }
    }
    
    private func executeJob(job: CustomJob) async {
        return await withCheckedContinuation { continuation in
            job.completion()
            continuation.resume()
        }
    }
}

I used a dispatch group to schedule 5 jobs and test.

// MARK: - TEST

let processor = JobScheduler(maxRunningCount: 2)
let group = DispatchGroup()

for job in 1...5 {
    group.enter()
    print("Job \(job) scheduled")
    processor.scheduleJob {
        print("Job \(job) starts")
        sleep(2)
        print("Job \(job) complete")
        group.leave()
    }
}

group.wait()
print("Done")


Solution

  • I will first illustrate how you can do this in Swift concurrency with asynchronous work, but then explain why you might not want to do this with synchronous work. The unfortunate reality is that the correct solution will vary based on the sort of work being launched. You probably do not want a one-size-fits-all sort of general scheduler, but really think about the particular needs of your app and pick the suitable pattern for your particular use-case.


    So, first, how do we do this within Swift concurrency for asynchronous tasks?

    1. Constrain concurrency within Swift concurrency.

      To constrain concurrency in Swift concurrency, we would use a task group, but call next for every iteration above a certain threshold:

      await withTaskGroup(of: Void.self) { group in
          var index = 0
          for await work in sequence {
              index += 1
              if index > 4 { await group.next() }
      
              group.addTask { await work() }
          }
      }
      

      See How to constrain concurrency (like maxConcurrentOperationCount) with Swift Concurrency?

    2. How do we add work to an AsyncSequence after the fact?

      One easy way to add work to an AsyncSequence is to use an AsyncChannel of the Swift Async Algorithms package. E.g.:

      actor Scheduler {
          typealias Work = @Sendable () async -> Void
      
          private let channel = AsyncChannel<Work>()
      
          func start() async {
              await withTaskGroup(of: Void.self) { group in
                  var index = 0
                  for await work in channel {
                      index += 1
                      if index > 4 { await group.next() }
      
                      group.addTask { await work() }
                  }
              }
          }
      
          func addWork(_ work: @escaping Work) async {
              await channel.send(work)
          }
      }
      

      Then we can do things like:

      struct ContentView: View {
          let scheduler = Scheduler()
      
          var body: some View {
              VStack {
                  Button("Channel") {
                      Task {
                          await scheduler.addWork {
                              …
                          }
                      }
                  }
              }
              .task {
                  await scheduler.start()
              }
          }
      }
      

      So, that starts the scheduler when the view launches, and adds work every time you tap the button. For example, when I ran that in Instruments, I added a signpost ⓢ every time the button was tapped and an “interval” while work was running. I then ran the app, tapped the button ten times in a row, and you can see that this only runs four tasks at a time:

      enter image description here


    Having outlined how you might do this with asynchronous work within Swift concurrency, we should note that your example was for processing synchronous tasks. But we must refrain from calling slow, synchronous functions from within Swift concurrency. Swift concurrency’s cooperative thread pool only has one thread per CPU core. The whole concurrency system is predicated upon a contract to never impede forward progress on any thread from the cooperative thread pool.

    There are two possible solutions:

    First, if this was some computational algorithm of your own, you could periodically yield, thereby preventing the Swift concurrency system from ever deadlocking.

    Second, if that is not possible, Apple explicitly advises moving this outside the Swift concurrency system (e.g., moving it to GCD). You can then wrap that legacy pattern within a “continuation” to bridge back to the Swift concurrency codebase. (Usually, we advise avoiding mixing GCD with Swift concurrency, but this is the exception to the rule.) See point 3 in Convert a sync function to async using new concurrency in Swift. For more information, see WWDC 2022 video Visualize and optimize Swift concurrency, which talks about getting slow synchronous work out of the Swift concurrency system.


    For what it is worth, if you really are trying to constrain the concurrency of slow, synchronous tasks, the simple legacy solution is an operation queue:

    let queue = OperationQueue()
    queue.maxConcurrentOperationCount = 4
    

    And:

    queue.addOperation { … }
    

    This is the simple solution for constrained parallelism of synchronous work, where you might be launching the work in a just-in-time manner. That yields the exact same behavior:

    enter image description here

    The other legacy approach is GCD’s concurrentPerform, but that is only used when launching a fixed number of synchronous work items up-front. Or in Combine, we might use flatMap(maxPublishers:transform:). For a survey of some legacy options, see How to avoid thread explosion when using DispatchSemaphores?