iosswiftconcurrencyoperationqueue

Limiting the number of fetch requests in URLSession with SwiftUI?


I'm using an async image loader to fetch images from a URLRequest, and I'm trying to wrap my code inside of an Operation so I can use .maxConcurrentOperationCount for an OperationQueue, because I'm supposed to limit the number of downloads to 3 at a time.

I've overriden the Operation class to try and support async downloads, however, I'm not able to achieve this, and I think it's because my downloading function is inside of a Task group.

The error i get is as follows:

Invalid conversion from 'async' function of type '(URL?, URLResponse?, (any Error)?) async throws -> Void' to synchronous function type '(URL?, URLResponse?, (any Error)?) -> Void'

Here are the code snippets:

for the overriden Operation class:


class DownloadOperation: Operation {
    private var task: URLSessionDataTask!
    
    init(session: URLSession, downloadTaskURL: URLRequest, completionHandler: ((URL?, URLResponse?, Error?) -> Void)?) {
           super.init()
           
           // use weak self to prevent retain cycle
           task = session.dataTask(with: downloadTaskURL, completionHandler: { [weak self] (URLRequest, response, error) in
               
   
               
              /*
                set the operation state to finished once
                the download task is completed or have error
              */
               self?.state = .finished
           })
       }
    
    enum OperationState : Int {
            case ready
            case executing
            case finished
        }

    private var state : OperationState = .ready {
          willSet {
              self.willChangeValue(forKey: "isExecuting")
              self.willChangeValue(forKey: "isFinished")
          }
          
          didSet {
              self.didChangeValue(forKey: "isExecuting")
              self.didChangeValue(forKey: "isFinished")
          }
      }
      
      override var isReady: Bool { return state == .ready }
      override var isExecuting: Bool { return state == .executing }
      override var isFinished: Bool { return state == .finished }
    
    
    override func start() {
         /*
         if the operation or queue got cancelled even
         before the operation has started, set the
         operation state to finished and return
         */
         if(self.isCancelled) {
             state = .finished
             return
         }
         
         // set the state to executing
         state = .executing
         
         print("downloading")
               
         // start the downloading
         self.task.resume()
     }

     override func cancel() {
         super.cancel()
       
         // cancel the downloading
         self.task.cancel()
     }
}

and here is me trying to use it inside of a task in the loader function:

  public func loadImage(_ urlRequest: URLRequest) async throws -> UIImage {
        if let status = images[urlRequest]{
            switch status{
            case .fetched(let image):
                return image
            case .inProgress(let task):
                return try await task.value
            case .failure(let error):
                self.hasError = true
                self.error = error as? InternetError
            }
        }
        
        
        let task: Task<UIImage, Error> = Task {
            do {
                let imageQueue = OperationQueue()
                imageQueue.maxConcurrentOperationCount = 3
                
                let operation = DownloadOperation(session: URLSession.shared, downloadTaskURL: urlRequest, completionHandler: {_, response ,_ in
                    let (imageData, response) = try await URLSession.shared.data(for: urlRequest)

                    guard let httpResponse = response as? HTTPURLResponse,
                          httpResponse.statusCode == 200 else {
                        throw InternetError.invalidServerResponse
                    }
                    guard let image = UIImage(data: imageData) else {
                        throw InternetError.noInternet
                    }
                    
                })
                imageQueue.addOperation(operation)
                
                
                
               // return image
            }
            catch {
                self.hasError = true
                images[urlRequest] = .failure(error)
                print("error caught in Loader")
                let image = UIImage(systemName: "wifi.exclamationmark")!
                return image
            }
        }
        
        do{
            images[urlRequest] = .inProgress(task)
            var image = try await task.value
            if let imageFromCache = imageCache.object(forKey: urlRequest as AnyObject) as? UIImage {
                image = imageFromCache
                return image
            }
            images[urlRequest] = .fetched(image)
            //storing image in cache
            imageCache.setObject(image, forKey: urlRequest as AnyObject)
            return image
        }
    }
}

I would appreciate any help about this! Thank you!!


Solution

  • There are several issues:

    1. You are creating a new operation queue every time you call loadImage, rendering the maxConcurrentOperationCount moot. E.g., if you quickly request five images, you will end up with five operation queues, each with one operation on them, and they will run concurrently, with none of the five queues exceeding their respective maxConcurrentOperationCount.

      You must remove the local variable declaration of the operation queue from the function, and make it a property.

    2. DownloadOperation is starting a dataTask but not calling the completion handler. Also, when you create the DownloadOperation you are suppling a completion handler in which you are starting yet another download operation. If you are going to use an Operation to encapsulate the download, you should not have any URLSession code in the completion handler. Use the parameters returned.

    3. The asynchronous operation is not thread-safe. One must synchronize the access to this shared state variable.

    Thus, perhaps:

    var images: [URLRequest: ImageRequest] = [:]
    
    let queue: OperationQueue = {
        let queue = OperationQueue()
        queue.maxConcurrentOperationCount = 3
        return queue
    }()
    
    let session: URLSession = .shared
    
    public func loadImage(_ request: URLRequest) async throws -> UIImage {
        switch images[request] {
        case .fetched(let image):
            return image
    
        case .inProgress(let task):
            return try await task.value
    
        case .failure(let error):
            throw error
    
        case nil:
            let task: Task<UIImage, Error> = Task {
                try await withCheckedThrowingContinuation { continuation in
                    let operation = ImageRequestOperation(session: session, request: request) { [weak self] result in
                        DispatchQueue.main.async {
                            switch result {
                            case .failure(let error):
                                self?.images[request] = .failure(error)
                                continuation.resume(throwing: error)
    
                            case .success(let image):
                                self?.images[request] = .fetched(image)
                                continuation.resume(returning: image)
                            }
                        }
                    }
    
                    queue.addOperation(operation)
                }
            }
    
            images[request] = .inProgress(task)
    
            return try await task.value
        }
    }
    

    Where the above, the async-await code, uses the following operation:

    class ImageRequestOperation: DataRequestOperation {
        init(session: URLSession, request: URLRequest, completionHandler: @escaping (Result<UIImage, Error>) -> Void) {
            super.init(session: session, request: request) { result in
                switch result {
                case .failure(let error):
                    DispatchQueue.main.async { completionHandler(.failure(error)) }
    
                case .success(let data):
                    guard let image = UIImage(data: data) else {
                        DispatchQueue.main.async { completionHandler(.failure(URLError(.badServerResponse))) }
                        return
                    }
    
                    DispatchQueue.main.async { completionHandler(.success(image)) }
                }
            }
        }
    }
    

    The the above abstracts the image-related part, above, from the network-related stuff, below. Thus:

    class DataRequestOperation: AsynchronousOperation {
        private var task: URLSessionDataTask!
    
        init(session: URLSession, request: URLRequest, completionHandler: @escaping (Result<Data, Error>) -> Void) {
            super.init()
    
            task = session.dataTask(with: request) { data, response, error in
                guard
                    let data = data,
                    let response = response as? HTTPURLResponse,
                    200 ..< 300 ~= response.statusCode
                else {
                    completionHandler(.failure(error ?? URLError(.badServerResponse)))
                    return
                }
    
                completionHandler(.success(data))
    
                self.finish()
            }
        }
    
        override func main() {
            task.resume()
        }
    
        override func cancel() {
            super.cancel()
    
            task.cancel()
        }
    }
    

    And the above inherits from an AsynchronousOperation that abstracts all of your asynchronous operation stuff, below, from the substance of what the operation does, above. Thus:

    /// AsynchronousOperation
    ///
    /// Encapsulate the basic asynchronous operation logic in its own class, to avoid cluttering
    /// your concrete implementations with a ton of boilerplate code.
    
    class AsynchronousOperation: Operation {
        enum OperationState: Int {
            case ready
            case executing
            case finished
        }
    
        @Atomic var state: OperationState = .ready {
            willSet {
                willChangeValue(forKey: #keyPath(isExecuting))
                willChangeValue(forKey: #keyPath(isFinished))
            }
    
            didSet {
                didChangeValue(forKey: #keyPath(isFinished))
                didChangeValue(forKey: #keyPath(isExecuting))
            }
        }
    
        override var isReady: Bool        { state == .ready && super.isReady }
        override var isExecuting: Bool    { state == .executing }
        override var isFinished: Bool     { state == .finished }
        override var isAsynchronous: Bool { true }
    
        override func start() {
            if isCancelled {
                state = .finished
                return
            }
    
            state = .executing
    
            main()
        }
    
        /// Subclasses should override this method, but *not* call this `super` rendition.
    
        override func main() {
            assertionFailure("The `main` method should be overridden in concrete subclasses of this abstract class.")
        }
    
        func finish() {
            state = .finished
        }
    }
    

    And, note, that I addressed the lack of thread-safe access to the state using this property wrapper:

    /// Atomic
    ///
    /// Property wrapper providing atomic interface.
    ///
    /// - Note: It is advised to use this with value types only. If you use reference types, the object could theoretically be mutated beyone the knowledge of this property wrapper, losing atomic behavior.
    
    @propertyWrapper
    struct Atomic<T> {
        private var _wrappedValue: T
        private let lock = NSLock()
    
        var wrappedValue: T {
            get { lock.withLock { _wrappedValue } }
            set { lock.withLock { _wrappedValue = newValue } }
        }
    
        init(wrappedValue: T) {
            _wrappedValue = wrappedValue
        }
    }
    

    This yields asynchronous behaviors with max concurrency count of 3. E.g., here I download 10 images, then another 10, and then another 20:

    enter image description here