python-multithreadingcan-buspython-can

threaded python-can suddenly stops sending messages


I have a strange issue with my canbus thread, maybe somebody here is able to help.

I set up a test environment with the class below and another file thats just simulating the other can module via vcan0 which works great. However the class below is what is giving me headache.

When I launch the script it will send a message only once before stopping. The expectation would be that it keeps sending and receiving data. There are no errors, I tried catching exceptions in some places especially when sending and receiving the messages but it didnt throw any errors.

Now when I am opening up another terminal and send a message to vcan0 manually, the script is actually still able to pick it up and prints it as expected, but it still won't send any messages out. So an unresponsive thread is also not the case.

What I also don't understand is, that the print("0") is also only executed once at the start and that's it. I expect it to run in a loop and the 0 should be printed over and over even though nothing is being sent or received or am I missing something?.

And the thing that got me confused the most is that the script was working this morning. I'm just getting into threading in python so I assume it might have something to do with this but I hope that somebody with more knowledge can point me in the right direction.

import threading
import time
import can
import socketio
import sys
import settings

class Config:
    def __init__(self):
        self.settings = settings.load_settings("canbus")
        self.refresh_rate = self.settings["timing"]["refresh_rate"]
        self.interval = self.settings["timing"]["interval"]
        self.msg_hs = []
        self.msg_ls = []

        self.initialize_messages()

    def initialize_messages(self):
        for key, message in self.settings['messages'].items():
            req_id = int(message['req_id'], 16)
            rep_id = int(message['rep_id'], 16)
            target = int(message['target'], 16)
            action = int(message['action'], 16)
            parameter0 = int(message['parameter'][0], 16)
            parameter1 = int(message['parameter'][1], 16)

            dlc = 0xC8 + len([byte for byte in [rep_id, target, action, parameter0, parameter1] if byte != 0])

            req_id_bytes = [req_id]
            message_bytes = [dlc, target, action, parameter0, parameter1, 0x01, 0x00, 0x00]

            rep_id_bytes = [rep_id]
            scale = message['scale']
            is_16bit = message['is_16bit']
            rtvi_id = message['rtvi_id']

            refresh_rate = message['refresh_rate']
            if refresh_rate == "high":
                self.msg_hs.append((req_id_bytes, rep_id_bytes, message_bytes, scale, is_16bit, rtvi_id))
            elif refresh_rate == "low":
                self.msg_ls.append((req_id_bytes, rep_id_bytes, message_bytes, scale, is_16bit, rtvi_id))

class CanBusThread(threading.Thread):
    def __init__(self):
        super(CanBusThread, self).__init__()
        self._stop_event = threading.Event()
        self.daemon = True
        self.client = socketio.Client()
        self.config = Config()
        self.can_bus = None
        self.isDev = True

    def run(self):
        self.connect_to_socketio()
        self.initialize_canbus()
        self.run_can_bus()

    def initialize_canbus(self):
        try:
            if(self.isDev):
                print("Using Virtual CAN.")
                self.can_bus = can.interface.Bus(channel='vcan0', bustype='socketcan', bitrate=500000)
            else:    
                self.can_bus = can.interface.Bus(channel='can0', bustype='socketcan', bitrate=500000)    
        except Exception as e:
            print(f'Error initializing CAN Bus: {e}')

    def stop_thread(self):
        self._stop_event.set()
        self.stop_canbus()

    def stop_canbus(self):
        if self.can_bus:
            self.can_bus.shutdown()

    def connect_to_socketio(self):
        max_retries = 10
        current_retry = 0
        while not self.client.connected and current_retry < max_retries:
            try:
                self.client.connect('http://localhost:4001', namespaces=['/canbus'])
            except Exception as e:
                print(f"Socket.IO connection failed. Retry {current_retry}/{max_retries}. Error: {e}")
                time.sleep(.5)
                current_retry += 1

        if self.client.connected:
            print("Socket.IO connected successfully")
        else:
            print("Failed to connect to Socket.IO.")

    def emit_data_to_frontend(self, data):
        if self.client and self.client.connected:
            self.client.emit('data', data, namespace='/canbus')

    def request(self, messages):
        for message in messages:

            msg = can.Message(arbitration_id=message[0][0], data=message[2], is_extended_id=True)
            
            try:
                received = False
                print(msg)
                #print(self.can_bus)
                self.can_bus.send(msg)
                print("1")
                data = self.can_bus.recv()
                print(data)
                print("2")
                self.emit_data_to_frontend(self.filter(data, message))
                print("3")
                    

            except can.CanError as e:
                print(f"Error sending/receiving CAN message: {e}")

    def filter(self, data, message):
        if data.arbitration_id == message[1][0] and data.data[4] == message[2][4]:
            value = (data.data[5] << 8) | data.data[6] if message[4] else data.data[5]
            converted_value = eval(message[3], {'value': value})

            data = message[5] + str(float(converted_value))
            
            sys.stdout.flush()
            return data
        else:
            return None

    def run_can_bus(self):
        x = 0
        while not self._stop_event.is_set():
            print("0")
            if x <= self.config.interval:
                self.request(self.config.msg_hs)
                time.sleep(self.config.refresh_rate)
            x += 1
            if x == self.config.interval:
                self.request(self.config.msg_ls)
                x = 0


def main():
    can_thread = CanBusThread()
    can_thread.start()

    time.sleep(30)

    # Stop the thread
    can_thread.stop_thread()
    can_thread.join()

if __name__ == "__main__":
    main()

//EDIT for the solution:

I modified the request() method like this:

def request(self, messages):
    for message in messages:
        if self._stop_event.is_set():
            break

        msg = can.Message(arbitration_id=message[0][0], data=message[2], is_extended_id=True)

        try:
            received = False
            self.can_bus.send(msg)
            print(msg)

            retries = 500

            while not received and retries > 0:
                data = self.can_bus.recv()
                print(data)

                received = self.filter(data, message)
                
                retries -= 1

            return False

        except can.CanError:
            return False

I implemented the while method and I'm returning a False when no data was received. Like this it seems to be working reliably now.


Solution

  • recv() is a blocking call. It will only return after a frame is received.

    After sending the first message with self.can_bus.send(msg), the loop gets stuck while waiting for data = self.can_bus.recv() to finish.

    But this does not explain, why no second message is sent, when you put some frames to vcan from your other console, which seem to get received fine according to your explanation. Then I suggest that you replace msg = can.Message(arbitration_id=message[0][0], data=message[2], is_extended_id=True) with a hard-coded message.

    To overcome the blocking, you could call bus.recv(timeout), to ensure the call returns after timeout, even if no message is received.