pythoncallbackadspython-decoratorstwincat-ads

Return value from Callback notification Pyads


I'm using pyads to pull data from a PLC via ADS and process this data in python. Since I have to pull the data quite fast (200 ms cycle) I use a ADS notification to get the values always when they change which is every 200 ms or faster.

In a while Loop I'm waiting for data coming in and putting them together to an array. I have to go that path because the amount of data is high. 1000 values per cycle and I have to put them together so I get an array of 100000 values per signal. There can be 15 signals together. Below you can find the example from the pyads documentation. What I'm looking for is nice Pythonic way to get the values from the callback function inside a while loop and build them together to the 100000 value array. I thought about creating a class which handles what I need but I couldn't find a way to make it work. Only option I can imagine is using a global variable which I don't like to do.

structure_def = (
    ('nVar', pyads.PLCTYPE_DINT, 1000),
    ('nVar2', pyads.PLCTYPE_DINT, 1000),
    ('nVar3', pyads.PLCTYPE_DINT, 1000),
    ('nVar4', pyads.PLCTYPE_DINT, 1000),
    ('nVar5', pyads.PLCTYPE_DINT, 1000))

size_of_struct = pyads.size_of_structure(structure_def)

@plc.notification(ctypes.c_ubyte * size_of_struct)
def callback(handle, name, timestamp, value):
    values = pyads.dict_from_bytes(value, structure_def)
    print(values)

attr = pyads.NotificationAttrib(size_of_struct)
plc.add_device_notification('global.sample_structure', attr, callback)

Use of a global variable will work of course. I tried to pull 100000 values at once but it fails when the amount of signals increases. So I have to pull smaller packets.


Solution

  • You can use a class, you just can't use the decorator inside a class unfortunately.

    See this example

    class Example:
        def __init__:
            plc = pyads.Conection(...)
            heartbeat = 0
            set_up_notifications()
    
        def set_up_notifications(self):
            self.plc.add_device_notification(
                VAR_NAME,
                pyads.NotificationAttrib(ctypes.sizeof(pyads.PLCTYPE_BOOL)),
                self.on_plc_heartbeat,
            )
    
        def on_plc_heartbeat(self, *_):
            self.heartbeat += 1
    

    An example for detecting the plc status change:

            self._plc.add_device_notification(
                    (int("0xF100", 16), int("0x0000", 16)),  # ADSIGRP_DEVICE_DATA, ADSIOFFS_DEVDATA_ADSSTATE
                    pyads.NotificationAttrib(ctypes.sizeof(pyads.PLCTYPE_INT)),
                    self._on_plc_status_change,
    
        def _on_plc_status_change(self, notification, _):
            """Notification callback for a change in PLC state (run to config etc)."""
            *_, value = self._plc.parse_notification(notification, pyads.PLCTYPE_INT)  # type: ignore
            if value != pyads.ADSSTATE_RUN:
                self._plc_fatal_com_error_message = "PLC exited run mode"
    

    Doing it in a class means you can then have more methods for doing the signal processing etc.

    Also depending on what kind of data you are pulling, there can be more efficient ways of converting the byte data back to python types which is useful for data logging applications. Using Read structure by name means pyads does all the conversion for you on a variable by variable basis. You can do a read_by_name with return_ctypes=True and parse the data yourself. This can be orders of magnitude quicker if your data is all the same type as you can do the conversion using numpy for example. see https://github.com/stlehmann/pyads/issues/289 and https://github.com/stlehmann/pyads/issues/63