The data was received by socket ,with no more shell , they are pure I P B frames begin with NAL Header(something like 00 00 00 01). I am now using pyav to decode the frames ,but i can only decode the data after the second pps info(in key frame) was received(so the chunk of data I send to my decode thread can begin with pps and sps ), otherwise the decode() or demux() will return error "non-existing PPS 0 referenced decode_slice_header error" .
I want to feed data to a sustaining decoder which can remember the previous P frame , so after feeding one B frame, the decoder return a decoded video frame. Or someform of IO that can be opened as container and keep writing data into it by another thread.
Here is my key code:
#read thread... read until get a key frame, then make a new io.BytesIO() to store the new data.
rawFrames = io.BytesIO()
while flag_get_keyFrame:()
....
content= socket.recv(2048)
rawFrames.write(content)
....
#decode thread... decode content between two key frames
....
rawFrames.seek(0)
container = av.open(rawFrames)
for packet in container.demux():
for frame in packet.decode():
self.frames.append(frame)
....
My code will play the video but with a 3~4 seconds delay. So I am not putting all of it here, because I know it's not actually working for what I want to achieve. I want to play the video after receiving the first key frame and decode the following frames right after receiving them . Pyav opencv ffmpeg or something else ,how can I achieve my goal?
After hours of finding an answer for this as well. I figure this out myself.
For single thread, you can do the following:
rawData = io.BytesIO()
container = av.open(rawData, format="h264", mode='r')
cur_pos = 0
while True:
data = await websocket.recv()
rawData.write(data)
rawData.seek(cur_pos)
for packet in container.demux():
if packet.size == 0:
continue
cur_pos += packet.size
for frame in packet.decode():
self.frames.append(frame)
That is the basic idea. I have worked out a generic version that has receiving thread and decoding thread separated. The code will also skip frames if the CPU does not keep up with the decoding speed and will start decoding from the next key frame (so you will not have the teared green screen effect). Here is the full version of the code:
import asyncio
import av
import cv2
import io
from multiprocessing import Process, Queue, Event
import time
import websockets
def display_frame(frame, start_time, pts_offset, frame_rate):
if frame.pts is not None:
play_time = (frame.pts - pts_offset) * frame.time_base.numerator / frame.time_base.denominator
if start_time is not None:
current_time = time.time() - start_time
time_diff = play_time - current_time
if time_diff > 1 / frame_rate:
return False
if time_diff > 0:
time.sleep(time_diff)
img = frame.to_ndarray(format='bgr24')
cv2.imshow('Video', img)
return True
def get_pts(frame):
return frame.pts
def render(terminated, data_queue):
rawData = io.BytesIO()
cur_pos = 0
frames_buffer = []
start_time = None
pts_offset = None
got_key_frame = False
while not terminated.is_set():
try:
data = data_queue.get_nowait()
except:
time.sleep(0.01)
continue
rawData.write(data)
rawData.seek(cur_pos)
if cur_pos == 0:
container = av.open(rawData, mode='r')
original_codec_ctx = container.streams.video[0].codec_context
codec = av.codec.CodecContext.create(original_codec_ctx.name, 'r')
cur_pos += len(data)
dts = None
for packet in container.demux():
if packet.size == 0:
continue
dts = packet.dts
if pts_offset is None:
pts_offset = packet.pts
if not got_key_frame and packet.is_keyframe:
got_key_frame = True
if data_queue.qsize() > 8 and not packet.is_keyframe:
got_key_frame = False
continue
if not got_key_frame:
continue
frames = codec.decode(packet)
if start_time is None:
start_time = time.time()
frames_buffer += frames
frames_buffer.sort(key=get_pts)
for frame in frames_buffer:
if display_frame(frame, start_time, pts_offset, codec.framerate):
frames_buffer.remove(frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
if dts is not None:
container.seek(25000)
rawData.seek(cur_pos)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
terminated.set()
cv2.destroyAllWindows()
async def receive_encoded_video(websocket, path):
data_queue = Queue()
terminated = Event()
p = Process(
target=render,
args=(terminated, data_queue)
)
p.start()
while not terminated.is_set():
try:
data = await websocket.recv()
except:
break
data_queue.put(data)
terminated.set()