I am using RXSwift to handle with a lot of async AVAssetWriterInput write operations, but I need to wait for isReadyForMoreMediaData before writing more buffers inside the file, how can I handle this?
Basically, the observable receives a lot of buffers emitted by the asyncWriterSubject and I want to write all of them in the order that I am receiving.
I have this subject:
private var asyncWriter = ReplaySubject<(AVAssetWriterInput,CMSampleBuffer)>.create(bufferSize: 1)
I emit the values for it using this code:
asyncWriter.onNext((videoWriterInput, buffer))
And I am subscribing it here to listen:
disposable = asyncWriter.asObservable()
.takeWhile {
(writerPointer, _) in
writerPointer.isReadyForMoreMediaData
}.observeOn(MainScheduler.asyncInstance)
.subscribe(onNext: { (writerPointer, buffer) in
writerPointer.append(buffer)
})
Here's some general information on how to handle back pressure.
This will write a CMSampleBuffer to the writerPointer up to 100 times per second. When isReadyForMoreMediaData
is false, it will store sample buffers until the Bool is true again.
func example(asyncWriter: Observable<CMSampleBuffer>, writerPointer: AVAssetWriterInput) -> Disposable {
enum Action {
case buffer(CMSampleBuffer)
case isReady(Bool)
}
var isReadyForMoreMediaData: Observable<Bool> {
Observable<Int>.interval(.milliseconds(10), scheduler: MainScheduler.instance)
.flatMap { [writerPointer] _ in Observable.just(writerPointer.isReadyForMoreMediaData) }
}
return Observable.merge(
isReadyForMoreMediaData.map { Action.isReady($0) },
asyncWriter.map { Action.buffer($0) }
)
.scan(into: (buffer: [CMSampleBuffer](), trigger: false, emission: CMSampleBuffer?.none), accumulator: { current, new in
switch new {
case let .buffer(buff):
if current.trigger {
if current.buffer.isEmpty {
current.emission = buff
}
else {
current.emission = current.buffer[0]
current.buffer.removeFirst()
current.buffer.append(buff)
}
}
else {
current.buffer.append(buff)
}
case let .isReady(trig):
current.trigger = trig
if trig && !current.buffer.isEmpty {
current.emission = current.buffer[0]
current.buffer.removeFirst()
}
}
})
.compactMap { $0.emission }
.observe(on: MainScheduler.instance)
.subscribe(onNext: { buffer in
writerPointer.append(buffer)
})
}