swiftfirebasegoogle-cloud-firestoreasyncstream

How to use Swift's AsyncThrowingStream with Firestore Listeners


How can I convert this code to use AsyncThrowingStream

    private var listenerRegistration: ListenerRegistration?

    func unsubscribe() {
        if listenerRegistration != nil {
            listenerRegistration?.remove()
            listenerRegistration = nil
        }
    }
    
    func subscribe() {
        if listenerRegistration != nil {
            unsubscribe()
        }
        guard let userID = Auth.auth().currentUser?.uid else { return }
        let docRef = Firestore.db.collection("mdates")
        let query = docRef.whereField(MeetDateKeys.selectedId.rawValue, isEqualTo: userID)
        
        listenerRegistration = query
            .addSnapshotListener { [weak self] querySnapshot, error in
                guard let documents = querySnapshot?.documents else {
                    print("No documents in 'reminders' collection")
                    return
                }
                
                self?.reminders = documents.compactMap { queryDocumentSnapshot in
                    let result = Result { try? queryDocumentSnapshot.data(as: MeetDate.self, decoder: Firestore.Decoder()) }
                    
                    switch result {
                    case .success(let reminder):
                        // A `Reminder` value was successfully initialized from the DocumentSnapshot.
                        return reminder
                    case .failure(let error):
                        // A `Reminder` value could not be initialized from the DocumentSnapshot.
                        switch error {
                        case DecodingError.typeMismatch(_, let context):
                            print("\(error.localizedDescription): \(context.debugDescription)")
                        case DecodingError.valueNotFound(_, let context):
                            print("\(error.localizedDescription): \(context.debugDescription)")
                        case DecodingError.keyNotFound(_, let context):
                            print("\(error.localizedDescription): \(context.debugDescription)")
                        case DecodingError.dataCorrupted(let key):
                            print("\(error.localizedDescription): \(key.debugDescription)")
                        default:
                            print("Error decoding document: \(error.localizedDescription)")
                        }
                        return nil
                    }
                }
                
                self?.thisWeek = self?.dateThisWeekFilter(dates: self?.reminders ?? [])
                self?.nextWeek = self?.dateNextWeekFilter(dates: self?.reminders ?? [])
                self?.laterWeek = self?.dateLaterFilter(dates: self?.reminders ?? [])
            }
    }

using this extension

extension Query {
    func addSnapshotListener1<T>(
        includeMetadataChanges: Bool = false
    ) -> AsyncThrowingStream<[T], Error> where T: Decodable{
        .init { continuation in
            let listener = addSnapshotListener(includeMetadataChanges: includeMetadataChanges) { result in
                do {
                    let snapshot = try result.get()
                    continuation.yield(try snapshot.documents.map { try $0.data(as: T.self) })
                } catch {
                    continuation.finish(throwing: error)
                }
            }

            continuation.onTermination = { @Sendable _ in
                listener.remove()
            }
        }
    }
}

Solution

  • You are almost there.

    extension Query {
        func addSnapshotListener1<T>(
            includeMetadataChanges: Bool = false
        ) -> AsyncThrowingStream<[T], Error> where T : Decodable{
            .init { continuation in
                let listener = addSnapshotListener(includeMetadataChanges: includeMetadataChanges) { snapshot, error in //Add Error
                    if let error {
                        continuation.finish(throwing: error)
                    } else{
                        continuation.yield(snapshot?.documents
                            .compactMap { //Will ignore any nil
                                do {
                                    return try $0.data(as: T.self)
                                } catch {
                                    print("🛑 Error \n\(error)")// Expose in the console any decoding errors, there are other more graceful ways to handle this. But this give you some objects if there is a decoding issue.
                                    
                                    // The other way give you no objects if there is a decoding issue
                                    return nil
                                }
                            } ?? [])
                    }
                }
                continuation.onTermination = { @Sendable _ in
                    listener.remove()
                }
            }
        }
    }
    

    Then you can use it something like

    func listen<T>(query: Query, type element: T) async throws where T : Decodable {
        let stream: AsyncThrowingStream<[T], Error> = query.addSnapshotListener1()
        
        for try await documents in stream {
            print("document count = \(documents.count)")
        }
    }