python-multithreadingtiming

Threading producer and consumer timing on live animation


I'm reading data (16 bytes/4 floats) from my com port that comes in every 100us. At the same time I have an animation live update at a rate of 60fps (0.016667sec/frame) using the data at that instant. I've attached the code below.

The idea behind the whole system is that readdata function uses pyserial to passivly read and parse that data. However, I only want to use the data that shows up every 0.016667 sec. While the readdata function is running I have my animation update the frames and everything in parallel. Right now my thought process is to have a flag that raises when I need to grab data to update the frame. However, my current code crashes the animation. I'm new to threading so any suggestions on a better method, things I might be miss understanding, or way to fix this?

class Pipeline:
"""
Class to allow a single element pipeline between producer and consumer.
"""
def __init__(self):
    self.message = 0
    self.flag = 0
    self.producer_lock = threading.Lock()
    self.consumer_lock = threading.Lock()
    self.flag_set_lock = threading.Lock()
    self.flag_get_lock = threading.Lock()
    self.flag_get_lock.acquire()
    self.consumer_lock.acquire()

def get_message(self, name):
    self.consumer_lock.acquire()
    message = self.message
    self.producer_lock.release()
    return message

def set_message(self, message, name):
    self.producer_lock.acquire()
    self.message = message
    self.consumer_lock.release()

def consumer_get_flag(self, name):
    self.flag_get_lock.acquire()
    flag = self.flag
    self.flag_set_lock.release()
    return flag

def set_flag(self, flag, name):
    self.flag_set_lock.acquire()
    self.flag = flag
    self.flag_get_lock.release()

def producer_get_flag(self,name):
    return self.flag

# reading port function
def readport(pipeline):
# global flag
#read incoming data coming in every 100us
with serial.Serial('COM3',baudrate=3000000,bytesize=8, timeout=None, stopbits=serial.STOPBITS_ONE, parity=serial.PARITY_NONE, rtscts=True, dsrdtr=False) as s:
    print('Opened',s.name)
    while True:
        res=s.read(16)
        #parse data
        ypos=struct.unpack('f',res[0:4])[0]
        xpos=struct.unpack('f',res[4:8])[0]
        yamp=struct.unpack('f',res[8:12])[0]
        xamp=struct.unpack('f',res[12:16])[0]
        #flag every 60 fps to set the pipline
        flag = pipline.consumer_get_flag('Flag')
        if  flag == 1:
            pipeline.set_message([xpos,ypos], "Data")
            pipeline.set_flag(0,'Flag Down')
        if xamp != 5.0:
            print('values', [xpos,ypos,xamp,yamp])
            break
            # print('something wrong')

def animation(pipline):
# global flag
running = True
simtime = 0
prevx = 0
prevy = 0
while running:
    for event in pygame.event.get():
        if event.type == QUIT or (event.type == KEYDOWN and event.key == K_ESCAPE):
            # The user closed the window or pressed escape
            running = False
    screen.fill((255, 255, 255, 255))
    # Draw the world
    for body in world.bodies:
        for fixture in body.fixtures:
            fixture.shape.draw(body, fixture)
    #update robot position from the data
    pipline.set_flag(1,'Flag Up')
    if pipline.producer_get_flag('current flag') == 1: 
        xpos = prevx
        ypos = prevy
    else:
        xpos, ypos = pipline.get_message('Robot')
        prevx = xpos
        prevy = ypos
    robot1.position = b2Vec2(xpos,ypos+12)
    world.Step(TIME_STEP, 10, 10)
    simtime = simtime + TIME_STEP
    print(simtime)
    # Flip the screen and try to keep at the target FPS
    pygame.display.flip()
    clock.tick(TARGET_FPS)
pygame.quit()
print('Done!')


# --- main game loop ---

pipline = Pipeline()
flag = 0
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
    executor.submit(readport, pipline)
    executor.submit(animation,pipline)`

Solution

  • You can do this more simply: in Python, threads share memory. The same objects can be used across threads if you're careful. I think you can take advantage of this and eliminate the entire Pipeline class. You don't need it.

    Serial port data is always buffered by the operating system, so you have to read the port continuously anyway or else characters will pile up in the buffer. Since the port is sending you data faster than you can process it, most of this data is going to be thrown away. So just put the latest data from the port into some variables. Most of the time, you'll overwrite the last set of data even though you never used it. That does no harm at all.

    When your animation loop is ready for another set of data, it simply grabs it from the same variables. The data will be there. No need for any flags or synchronization.

    One subtlety is that the data probably comes as a set: an x and a y coordinate. They must get updated together. So in the following listing I use a simple threading.RLock to make sure the two variables cannot be accessed one at a time.

    That's the idea. You also need some logic to handle when the user quits, or when something goes wrong with the serial data. I took a stab at that. Since I can't run your program, the following is just a schematic of the idea. You might have to do some debugging before it will run.

    class Position:
        def __init__(self):
            self.xpos = 0
            self.ypos = 0
            self.lock = threading.RLock()
            
        def set(self, xpos, ypos):
            with self.lock:
                self.xpos = xpos
                self.ypos = ypos
                
        def get(self):
            with self.lock:
                return self.xpos, self.ypos
            
    POSITION = Position()
    
    running = True
    
    # reading port function
    def readport():
        #read incoming data coming in every 100us
        with serial.Serial('COM3', baudrate=3000000, bytesize=8, timeout=None,
                           stopbits=serial.STOPBITS_ONE, parity=serial.PARITY_NONE,
                           rtscts=True, dsrdtr=False) as s:
            print('Opened',s.name)
            while running:
                res=s.read(16)
                #parse data
                ypos = struct.unpack('f',res[0:4])[0]
                xpos = struct.unpack('f',res[4:8])[0]
                yamp = struct.unpack('f',res[8:12])[0]
                xamp = struct.unpack('f',res[12:16])[0]
                if xamp != 5.0:
                    print('values', [xpos,ypos,xamp,yamp])
                    break
                POSITION.set(xpos, ypos)
    
    def animation_wrapper(port_thread):
        global running
        try:
            animation(port_thread)
        finally:
            pygame.quit()
            print('Done!')
            running = False
            
    def animation(port_thread):
        simtime = 0
        prevx = 0
        prevy = 0
        while port_thread.is_alive():
            for event in pygame.event.get():
                if event.type == QUIT or (event.type == KEYDOWN and
                                          event.key == K_ESCAPE):
                    # The user closed the window or pressed escape
                    return
            screen.fill((255, 255, 255, 255))
            # Draw the world
            for body in world.bodies:
                for fixture in body.fixtures:
                    fixture.shape.draw(body, fixture)
            #update robot position from the data
            xpos, ypos = POSITION.get()
            robot1.position = b2Vec2(xpos, ypos+12)
            world.Step(TIME_STEP, 10, 10)
            simtime = simtime + TIME_STEP
            print(simtime)
            # Flip the screen and try to keep at the target FPS
            pygame.display.flip()
            clock.tick(TARGET_FPS)
    
    # --- main game loop ---
    port_thread = threading.Thread(target=port_thread)
    port_thread.start()
    # wait for port_thread to start
    while not port_thread.is_running():
        time.sleep(0.02)
    animation_thread = threading.Thread(target=animation_wrapper,
                                        args=(port_thread))
    animation_thread.start()
    

    I have also fixed your indentation. Please be careful about that when using StackOverflow, especially Python code which is indentation-sensitive.