I have a program which has a lot of music decks (deck 1, deck 2, music_clip_deck, speackers_deck, ip_call_1, ip_call_2, ip_call_3). Each deck works in a seperate process. The chunk time I use to crop the mp3 files/retransmitions stream/voice from microphone/voice from aiortc-pyav is 125msec. After that I fill some queues (one for each seperate process) and I send the final queue to the final thread for the final audio processing before hearing and transmitted to clients.
How can I synchronize all the process together, so one while run time of each process takes exactly 125 msec?
Here is one figure for help:
This approach may not help at all:
class Deck_1_Proc(Process):
...
...
...
def run(self):
while(True):
t1 = time.time()
...
...
...
t2 = time.time()
if t2 - t1 < 0.125:
time.sleep(0.125 - (t2 - t1))
Maybe a better approach should be use something like javascript setInterval with time parameter: 125msec
from threading import Event, Thread
def call_repeatedly(interval, func, *args):
stopped = Event()
def loop():
while not stopped.wait(interval): # the first call is in `interval` secs
func(*args)
Thread(target=loop).start()
return stopped.set
#call:
cancel_future_calls = call_repeatedly(0.125, run)
#stopping to app termination:
cancel_future_calls()
Here is a more complicated flow chart:
With this processes run, and with Ahmed AEK solution i have to buffer the downsampling audio data before plot it (this is for sync time: 20msec, not for 125msec). What can i do instead?
the main issue is that most timers drift, and sleep
is not accurate, not even QTimer, so a stable timer (in a sense that the 100th tick is close to 12.5 seconds) would have to do something like this.
import time
from multiprocessing import Condition
def infinite_heartbreat(cv: Condition):
next_beat = time.time()
while True:
next_beat += 0.125
time_to_sleep = next_beat - time.time()
if time_to_sleep > 0:
time.sleep(time_to_sleep)
with cv:
cv.notify_all()
you can easily synchronize all processes to wake up at the same time using a Condition Variable, but if one of them lags behind for a few milliseconds you probably need a multiprocessing.Value to ensure they only wait if they are not lagging behind as follows:
import threading
import time
from multiprocessing import Condition, Value, Process, Event
def infinite_heartbreat(cv: Condition, frame: Value, quit_event: Event):
next_beat = time.time()
while True:
next_beat += 0.125
time_to_sleep = next_beat - time.time()
if time_to_sleep > 0:
time.sleep(time_to_sleep)
with cv:
frame.value += 1
cv.notify_all()
if quit_event.is_set():
return
def worker(cv, frame_number, worker_id, quit_event: Event):
current_frame = 0
while True:
with cv:
cv.wait_for(lambda: current_frame <= frame_number.value)
if quit_event.is_set():
return
print(f"processed frame {current_frame} in worker {worker_id}")
current_frame += 1
if __name__ == "__main__":
condition = Condition()
frame = Value('q', lock=False)
quit_event = Event()
processes = []
for i in range(4):
process = Process(target=worker, args=(condition, frame, i, quit_event))
process.start()
processes.append(process)
tr = threading.Thread(target=infinite_heartbreat, args=(condition,frame, quit_event))
tr.start()
time.sleep(5)
quit_event.set()
Edit: added a quit_event
in the form of a multiprocessing.Event, because it is an atomic.
Edit2: changed the value to be signed q
with no lock, as per @Booboo comment, this saves a file descriptor and allows negative frame numbers (as -1 has its uses).