pythonmultithreadingmatplotlibexceptnidaqmx

Timers cannot be stopped from another thread (short example with nidaqmx-python and callbacks)


I have seen other questions on the topic on this forum, but none have helped me understand how to deal with this. Most of them also seem to me to be about quite intricated and long code. I believe I am doing something rather simple / would like to do something rather simple. I hope someone can help! Here below extensive explanations and then my current code.

NOTE: please do not delete this question. I have given the following a lot of thought and I have carefully gone through related threads, but to no avail. I also believe it makes sense to post this because it is partly related to a more generic question: that of how to plot in real-time while having callbacks running in the background (see summary at the end), which could be summarized to be my general goal.

Setup and goal: National Instruments acquisition module (this matters litlle) NI cDAQ9178, interfaced via nidaqmx-python, a package maintained by NI with documentation here. Some analog signal is inputed in there, and the goal is to acquire it continuously (until I decide to stop the acquisition) at a certain sampling rate (approximately 1000 Hz) while plotting the signal in real time. The plotting needs not be refreshed nearly so often (10Hz refresh rate would even be fine). I'm using Windows 10 with Python 3.7 in a conda virtual environment, and editing is done in PyCharm. Ideally things should work both in PyCharm and any terminal.

Situation: nidaqmx-python provides high-level functions that allow one to register callbacks (which one defines as one wishes), which are called everytime a certain number of samples (in my case 100, but that's not strict) fills the PC buffer. The idea is that the callback, defined below, reads the buffer at that point, and does something (in my case some low-pass filtering, which I have taken out for conciseness, some storing into a global variable data, and maybe plotting - see below).

Problem: I have been fooling around with having whatever plots the data in real time be included in the callback, but with matplotlib that is a nightmare because the callback uses threads other than the main one, and matplotlib does not like to be called from anywhere outside the main thread. I've googled the heck out of other libraries optimized for real-time plotting (and, I was thinking, hopefully thread safe) but it's not so easy: I cannot get vispy to work and I cannot get pyqtgraph to even install, just to give you some examples. Then I saw several posts on the internet of people actually managing pretty decent real-time animations with matplotlib, despite it having been developped with publication in mind and not these applications; so I thought let's give it a go.

My take: Since I could not have matplotlib do the work from inside the callback, I did the following (which is the code you see below): after the callback and after the task is started with task.start() (that's specific to nidaqmx-python), I just create a while loop which plots the global variable buffer. I thought it was a nice trick: see, buffer is updated (call it that) by the callback every 0.1 seconds or so (does not matter) and, on the side, the while loop is plotting the buffer variable over and over, erasing everytime before plotting, effectively yielding a real-time like plot.

NOTE: I am perfectly aware the plotting part is not nearly as good as it could be made (I probably should use the ax API of matplotlib and the subplots, not to mention animation), but I do not care for the moment. I'll deal with that later and refine it to make it more efficient.

What I want: this actually does what I want ... except, in order to stop it, I introduced the try: and except: statements around the while loop, as you see in the code below. Naturally, pressing CTRL+C does break the loop ... but it then also breaks the whole running script and leaves me with the following error: forrtl: error (200): program aborting due to control-C event, in PyCharm, and the following precision when run from a terminal:

Image              PC                Routine            Line        Source
libifcoremd.dll    00007FFECF413B58  Unknown               Unknown  Unknown
KERNELBASE.dll     00007FFF219F60A3  Unknown               Unknown  Unknown
KERNEL32.DLL       00007FFF23847BD4  Unknown               Unknown  Unknown
ntdll.dll          00007FFF240CCED1  Unknown               Unknown  Unknown
QObject::~QObject: Timers cannot be stopped from another thread

The inconvenience is that I then have no choice but to close the python shell (thinking PyCharm again), and I do not have access to my precious variable data, containing ... well, my data.

Guess: obviously, the callback does not like to be stopped in this fahsion. The nidaqmx_python task should be stopped with task.stop(). I try putting task.stop() right after the KeyboardInterrupt except:, but it does not help, since CTRL+C stops the script on top / instead of breaking the while loop. I believe some more sofisticated method of stopping my task is required. I have been thinking about this for days but can't figure out a way of having both things: a task I can stop, and at the same time real-time plotting. Note that, without the plotting, it is easy to stop the task upon ENTER keypress: one simply writes at the end

input('Press ENTER to stop task')
task.stop()

But of course simply doing the above does not allow me to include the real-time plotting part.

Summary: I could not call matplotlib from the callback which reads the data continuously, so I wrote a while loop for real-time plotting in a separate block, but then I see no way of stopping that while loop without getting the above error (which complains that the callback was stop from a different thread, I think).

I hope I am being clear and if not, please do ask!

Code: I've cleaned it to get it down to as close as can be to an MWE that shows the problem, although of course I realize most of you don't have an NI daq to play around and connect so as to be able to run this. Anyway ... here it is:

import matplotlib.pyplot as plt
import numpy as np

import nidaqmx
from nidaqmx import stream_readers
from nidaqmx import constants

sfreq = 1000
bufsize = 100

with nidaqmx.Task() as task:

    # Here we set up the task ... nevermind
    task.ai_channels.add_ai_voltage_chan("cDAQ2Mod1/ai1")
    task.timing.cfg_samp_clk_timing(rate=sfreq, sample_mode=constants.AcquisitionType.CONTINUOUS,
                                    samps_per_chan=bufsize)
    # Here we define a stream to be read continuously
    stream = stream_readers.AnalogMultiChannelReader(task.in_stream)

    data = np.zeros((1, 0))  # initializing an empty numpy array for my total data
    buffer = np.zeros((1, bufsize))  # defined so that global buffer can be written to by the callback

    # This is my callback to read data continuously
    def reading_task_callback(task_idx, event_type, num_samples, callback_data):  # bufsize is passed to num_samples when this is called
        global data
        global buffer

        buffer = np.zeros((1, num_samples))

        # This is the reading part
        stream.read_many_sample(buffer, num_samples, timeout=constants.WAIT_INFINITELY)
        data = np.append(data, buffer, axis=1)  # appends buffered data to variable data

        return 0  # Absolutely needed for this callback to be well defined (see nidaqmx doc).

    # Here is the heavy lifting I believe: the above callback is registered
    task.register_every_n_samples_acquired_into_buffer_event(bufsize, reading_task_callback)
    task.start()  # The task is started (callback called periodically)

    print('Acquiring sensor data. Press CTRL+C to stop the run.\n')  # This should work ...

    fig = plt.figure()
    try:
        while True:
            # Poor's man plot updating
            plt.clf()
            plt.plot(buffer.T)
            plt.show()
            plt.pause(0.01)  # 100 Hz refresh rate
    except KeyboardInterrupt:  # stop loop with CTRL+C ... or so I thought :-(
        plt.close(fig)
        pass

    task.stop()  # I believe I never get to this part after pressing CTRL+C ...

    # Some prints at the end ... nevermind
    print('Total number of acquired samples: ', len(data.T),'\n')
    print('Sampling frequency: ', sfreq, 'Hz\n')
    print('Buffer size: ', bufsize, '\n')
    print('Acquisition duration: ', len(data.T)/sfreq, 's\n')

Any input would be appreciated. Thank you in advance folks!

EDIT: after the accepted answer here below, I rewrote the code above and came up with the following, which works as intended now (sorry, this time I have not cleaned it up, and some lines are irrelevant for the present question):

# Stream read from a task that is set up to read continuously
import matplotlib.pyplot as plt
import numpy as np

import nidaqmx
from nidaqmx import stream_readers
from nidaqmx import constants

from scipy import signal

import threading

running = True

sfreq = 1000
bufsize = 100
bufsizeb = 100

global task

def askUser():  # it might be better to put this outside of task
    global running
    input("Press return to stop.")
    running = False

def main():
    global running

    global data
    global buffer
    global data_filt
    global buffer_filt

    global b
    global z

    print('Acquiring sensor data...')

    with nidaqmx.Task() as task:  # maybe we can use target as above

        thread = threading.Thread(target=askUser)
        thread.start()

        task.ai_channels.add_ai_voltage_chan("cDAQ2Mod1/ai1")
        task.timing.cfg_samp_clk_timing(rate=sfreq, sample_mode=constants.AcquisitionType.CONTINUOUS,
                                        samps_per_chan=bufsize)
        # unclear samps_per_chan is needed here above or why it would be different than bufsize
        stream = stream_readers.AnalogMultiChannelReader(task.in_stream)

        data = np.zeros((1, 0))  # probably not the most elegant way of initializing an empty numpy array
        buffer = np.zeros((1, bufsizeb))  # defined so that global buffer can be written in the callback
        data_filt = np.zeros((1, 0))  # probably not the most elegant way of initializing an empty numpy array
        buffer_filt = np.zeros((1, bufsizeb))  # defined so that global buffer can be written in the callback

        b = signal.firwin(150, 0.004)
        z = signal.lfilter_zi(b, 1)

        def reading_task_callback(task_idx, event_type, num_samples, callback_data):  # bufsizeb is passed to num_samples
            global data
            global buffer
            global data_filt
            global buffer_filt
            global z
            global b

            if running:
                # It may be wiser to read slightly more than num_samples here, to make sure one does not miss any sample,
                # see: https://documentation.help/NI-DAQmx-Key-Concepts/contCAcqGen.html
                buffer = np.zeros((1, num_samples))
                stream.read_many_sample(buffer, num_samples, timeout=constants.WAIT_INFINITELY)
                data = np.append(data, buffer, axis=1)  # appends buffered data to variable data

                # IIR Filtering, low-pass
                buffer_filt = np.zeros((1, num_samples))
                for i, x in enumerate(np.squeeze(buffer)):  # squeeze required for x to be just a scalar (which lfilter likes)
                    buffer_filt[0,i], z = signal.lfilter(b, 1, [x], zi=z)

                data_filt = np.append(data_filt, buffer_filt, axis=1)  # appends buffered filtered data to variable data_filt

            return 0  # Absolutely needed for this callback to be well defined (see nidaqmx doc).

        task.register_every_n_samples_acquired_into_buffer_event(bufsizeb, reading_task_callback)  # bufsizeb instead

        task.start()
        while running:  # this is perfect: it "stops" the console just like sleep in a way that the task does not stop
            plt.clf()
            plt.plot(buffer.T)
            plt.draw()
            plt.pause(0.01)  # 100 Hz refresh rate
        # plt.close(fig)  # maybe no need to close it for now

        # task.join()  # this is for threads I guess ... (seems useless to my case?)

        # Some prints at the end ...
    print('Total number of acquired samples:', len(data.T))
    print('Sampling frequency:', sfreq, 'Hz')
    print('Buffer size:', bufsize)
    print('Acquisition duration:', len(data.T)/sfreq, 's')

if __name__ == '__main__':
    main()

Note that I do not need a task.stop() after all because the way continuous acquisition tasks work with this package is that reading any line of code after task.start() which is not a sleep or something like that makes the task stop (well that's my understanding at least).


Solution

  • The first thing I did was get rid of the keyboard interrupt loop. I replaced it with a global variable running, and another thread that sets the variable to False when returned from.

    def askUser():
      global running
      input("Press return to stop.")
      running = False
    

    Then, before the while loop, created a new thread that will execute this function.

    askUserThread = threading.Thread(target=askUser)
    askUserThread.start()
    

    And for the while loop, getting rid of the try catch statement:

    while running:
      plt.clf()
      plt.plot(buffer.T)
      plt.draw()          # Note: this got changed because .show wasn't working.
      plt.pause(0.01)
    

    This still didn't work for me because I had to close the plot window for a new one to show up. So from this answer, I changed it from .show to .draw.

    My end code was a little different (since I sampled random data) but here it is.

    # sampling.py
    # by Preston Hager
    
    import matplotlib.pyplot as plt
    import numpy as np
    
    import threading
    
    sfreq = 1000
    bufsize = 100
    
    running = True
    
    data = np.zeros((1, 0))  # initializing an empty numpy array for my total data
    buffer = np.zeros((1, bufsize))  # defined so that global buffer can be written to by the callback
    
    def askUser():
        global running
    
        input("Press return to stop.")
        running = False
    
    def readingTask():
        global data
        global buffer
    
        while running:
            buffer = np.random.rand(1, bufsize)
            # This is the reading part
            data = np.append(data, buffer, axis=1)  # appends buffered data to variable data
    
    def main():
        global running
    
        print('Acquiring sensor data.')
    
        thread = threading.Thread(target=askUser)
        thread.start()
        task = threading.Thread(target=readingTask)
        task.start()
    
        fig = plt.figure()
        while running:
            # Poor's man plot updating
            plt.clf()
            plt.plot(buffer.T)
            plt.draw()
            plt.pause(0.01)  # 100 Hz refresh rate
        plt.close(fig)
    
        task.join()
    
        # Some prints at the end ... nevermind
        print('Total number of acquired samples:', len(data.T))
        print('Sampling frequency:', sfreq, 'Hz')
        print('Buffer size:', bufsize)
        print('Acquisition duration:', len(data.T)/sfreq, 's')
    
    if __name__ == '__main__':
        main()