pythonparallel-processingmultiprocessingsynchronization

Parallel programming: Synchronizing processes


I have a program which has a lot of music decks (deck 1, deck 2, music_clip_deck, speackers_deck, ip_call_1, ip_call_2, ip_call_3). Each deck works in a seperate process. The chunk time I use to crop the mp3 files/retransmitions stream/voice from microphone/voice from aiortc-pyav is 125msec. After that I fill some queues (one for each seperate process) and I send the final queue to the final thread for the final audio processing before hearing and transmitted to clients.

How can I synchronize all the process together, so one while run time of each process takes exactly 125 msec?

Here is one figure for help:

enter image description here

This approach may not help at all:

class Deck_1_Proc(Process):
...
...
...
    def run(self):
        while(True):
            t1 = time.time()
            ...
            ...
            ...
            t2 = time.time()
            if t2 - t1 < 0.125:
                time.sleep(0.125 - (t2 - t1))

Maybe a better approach should be use something like javascript setInterval with time parameter: 125msec

from threading import Event, Thread

def call_repeatedly(interval, func, *args):
    stopped = Event()
    def loop():
        while not stopped.wait(interval): # the first call is in `interval` secs
            func(*args)
    Thread(target=loop).start()    
    return stopped.set

#call:
cancel_future_calls = call_repeatedly(0.125, run)
#stopping to app termination:
cancel_future_calls()

Here is a more complicated flow chart:

enter image description here

With this processes run, and with Ahmed AEK solution i have to buffer the downsampling audio data before plot it (this is for sync time: 20msec, not for 125msec). What can i do instead?


Solution

  • the main issue is that most timers drift, and sleep is not accurate, not even QTimer, so a stable timer (in a sense that the 100th tick is close to 12.5 seconds) would have to do something like this.

    import time
    from multiprocessing import Condition
    def infinite_heartbreat(cv: Condition):
        next_beat = time.time()
        while True:
            next_beat += 0.125
            time_to_sleep = next_beat - time.time()
            if time_to_sleep > 0:
                time.sleep(time_to_sleep)
            with cv:
                cv.notify_all()
    

    you can easily synchronize all processes to wake up at the same time using a Condition Variable, but if one of them lags behind for a few milliseconds you probably need a multiprocessing.Value to ensure they only wait if they are not lagging behind as follows:

    import threading
    import time
    from multiprocessing import Condition, Value, Process, Event
    def infinite_heartbreat(cv: Condition, frame: Value, quit_event: Event):
        next_beat = time.time()
        while True:
            next_beat += 0.125
            time_to_sleep = next_beat - time.time()
            if time_to_sleep > 0:
                time.sleep(time_to_sleep)
            with cv:
                frame.value += 1
                cv.notify_all()
                if quit_event.is_set():
                    return
    
    def worker(cv, frame_number, worker_id, quit_event: Event):
        current_frame = 0
        while True:
            with cv:
                cv.wait_for(lambda: current_frame <= frame_number.value)
                if quit_event.is_set():
                    return
            print(f"processed frame {current_frame} in worker {worker_id}")
            current_frame += 1
    
    if __name__ == "__main__":
        condition = Condition()
        frame = Value('q', lock=False)
        quit_event = Event()
        processes = []
        for i in range(4):
            process = Process(target=worker, args=(condition, frame, i, quit_event))
            process.start()
            processes.append(process)
        tr = threading.Thread(target=infinite_heartbreat, args=(condition,frame, quit_event))
        tr.start()
        time.sleep(5)
        quit_event.set()
    

    Edit: added a quit_event in the form of a multiprocessing.Event, because it is an atomic.

    Edit2: changed the value to be signed q with no lock, as per @Booboo comment, this saves a file descriptor and allows negative frame numbers (as -1 has its uses).