pythondictionarypython-asynciocan-buspython-can

Concurrency issue data corruption asyncio python-can locking/queues - nested dictionaries


I am at the end of a very long journey...

Here is the long story if you are interested. https://github.com/hardbyte/python-can/issues/1336

Sorry for the incredibly long code snippet but I am not sure where I am going wrong so I thought more is more.

The code is as follows request_inst() requests instrumentation data using the dict request_info from an MCU, the MCU responds and this is picked up by the listener. obtain_message() creates a future with which to store all_data that is yielded from the listener with msg = await reader.get_message(). I attempt to structure this process with lock. store_data() is where I store the response data from the MCU, this is a dict called all_data. all_data when printed outside of the listener appears with zero values as shown below. The purpose of the code is to make all_data available outside of the event loop but currently even with this implementation I cannot get all_data to appear without zero values showing up in the dict.

import asyncio
import can
from can.notifier import MessageRecipient
from typing import List

freq = 0.0003

# this is the respond ids and the and the parameter ids of the data
# stored data is suppose to fill up the None with a value
all_data = {268439810: {16512: [None], 16513: [None], 16514: [None], 16515: [None]},
            268444162: {16512: [None], 16513: [None], 16514: [None], 16515: [None]}}

request_info = {286326784: {16512, 16513, 16514, 16515},
                287440896: {16512, 16513, 16514, 16515}}

# all the request ids in that have been configured
cm4_read_ids = [286326784, 287440896]

# all the response ids in that have been configured
mcu_respond_ids = [268439810, 268444162]

# finds arb id and pid and logs the data from the message
# async def store_data(arb_id, msg_data, msg_dlc):
async def store_data(msg: can.Message, lock):
    pid = int.from_bytes(msg.data[0:2], 'little')
    arb_id = msg.arbitration_id
    if arb_id in mcu_respond_ids:
        async with lock:
            if msg.dlc == 5:
                all_data[arb_id][pid][0] = int.from_bytes(msg.data[2:4], 'little', signed=False)
            elif msg.dlc == 7:
                all_data[arb_id][pid][0] = int.from_bytes(msg.data[2:6], 'little', signed=False)
    return all_data

async def request_inst(bus: can.Bus):
    print('Request inst active')
    while True:
        for key in request_info:
            for val in request_info[key]:
                pid = int(val)
                pidbytes = pid.to_bytes(2, 'little')
                msg = can.Message(arbitration_id=key, data=pidbytes)
                bus.send(msg)
                await asyncio.sleep(freq)
                # await store_data(reader)

async def message_obtain(reader: can.AsyncBufferedReader, lock):
    print('Started it the get message process')
    while True:
        await asyncio.sleep(0.01)
        msg = await reader.get_message()
        future = await store_data(msg, lock)
        async with lock:

            print('This is the future')
            print(future)

async def main() -> None:
    with can.Bus(
            interface="socketcan", channel="can0", receive_own_messages=True
    ) as bus:
        # Create Notifier with an explicit loop to use for scheduling of callbacks
        loop = asyncio.get_running_loop()
        reader = can.AsyncBufferedReader()
        lock = asyncio.Lock()

        listeners: List[MessageRecipient] = [reader]

        notifier = can.Notifier(bus, listeners, loop=loop)

        try:
            task1 = asyncio.create_task(request_inst(bus))
            task2 = asyncio.create_task(message_obtain(reader, lock))
            await asyncio.gather(task1, task2)
        except KeyboardInterrupt:

            notifier.stop()
            bus.shutdown()

if __name__ == "__main__":
    asyncio.run(main())

The issue I am seeing is that even on this cut down one page wonder I am seeing what I believe to be concurrency issues.

As you can see I have tried locking but still I am seeing these zero values appear in all_data See below where pid 16514's balue becomes 0 when the messages being returned by the MCU are not zero.

n.b. the output below where the incorrect data is shown is the output from print(future)

enter image description here

The real value should never be 0 as it is a measured value.

b6 = (1016872961).to_bytes(4, 'little')
struct.unpack('<f', b6)
(0.01907348819077015,)

Am I doing anything very stupid? It feels like I am not accessing the data in the listener correctly despite using lock when all_data is being modified.

If I print from the listeners the data is always correct even when all_data is returning 0 values.

If anyone is able to help me it would be much appreciated.


Solution

  • Ok ok ok, it turns out I was doing something really silly. I was using nested dictionaries to store data about different ids but the keys were the same. After some investigation using id(id1[some_pid1]) and id(id2[some_pid1]) I discovered the keys had the same memory address.

    data_dict = {id1: {some_pid1: value, some_pid2: value},
                 id2: {some_pid1: value, some_pid2: value}}
    

    This appeared to all that it was a race condition but actually I was just writing zeros (which turned out to be forced from the MCU) to the wrong id because it shared a key with the other id.

    Whoops