swiftavfoundationrx-swiftavassetwriteravassetwriterinput

Is there any waitUntil operator or something similar in RXSwift to wait a value?


I am using RXSwift to handle with a lot of async AVAssetWriterInput write operations, but I need to wait for isReadyForMoreMediaData before writing more buffers inside the file, how can I handle this?

Basically, the observable receives a lot of buffers emitted by the asyncWriterSubject and I want to write all of them in the order that I am receiving.

I have this subject:

private var asyncWriter = ReplaySubject<(AVAssetWriterInput,CMSampleBuffer)>.create(bufferSize: 1)

I emit the values for it using this code:

asyncWriter.onNext((videoWriterInput, buffer))

And I am subscribing it here to listen:

disposable = asyncWriter.asObservable()
    .takeWhile {
        (writerPointer, _) in
            writerPointer.isReadyForMoreMediaData
        }.observeOn(MainScheduler.asyncInstance)
        .subscribe(onNext: { (writerPointer, buffer) in
            writerPointer.append(buffer)
    })

Solution

  • Here's some general information on how to handle back pressure.

    This will write a CMSampleBuffer to the writerPointer up to 100 times per second. When isReadyForMoreMediaData is false, it will store sample buffers until the Bool is true again.

    func example(asyncWriter: Observable<CMSampleBuffer>, writerPointer: AVAssetWriterInput) -> Disposable {
        enum Action {
            case buffer(CMSampleBuffer)
            case isReady(Bool)
        }
        var isReadyForMoreMediaData: Observable<Bool> {
            Observable<Int>.interval(.milliseconds(10), scheduler: MainScheduler.instance)
                .flatMap { [writerPointer] _ in Observable.just(writerPointer.isReadyForMoreMediaData) }
        }
        return Observable.merge(
            isReadyForMoreMediaData.map { Action.isReady($0) },
            asyncWriter.map { Action.buffer($0) }
        )
            .scan(into: (buffer: [CMSampleBuffer](), trigger: false, emission: CMSampleBuffer?.none), accumulator: { current, new in
                switch new {
                case let .buffer(buff):
                    if current.trigger {
                        if current.buffer.isEmpty {
                            current.emission = buff
                        }
                        else {
                            current.emission = current.buffer[0]
                            current.buffer.removeFirst()
                            current.buffer.append(buff)
                        }
                    }
                    else {
                        current.buffer.append(buff)
                    }
                case let .isReady(trig):
                    current.trigger = trig
                    if trig && !current.buffer.isEmpty {
                        current.emission = current.buffer[0]
                        current.buffer.removeFirst()
                    }
                }
            })
            .compactMap { $0.emission }
            .observe(on: MainScheduler.instance)
            .subscribe(onNext: { buffer in
                writerPointer.append(buffer)
            })
    }