I'm building some scripting routines via the nidaqmx module developed by NI. I use 2 NI PXI 44-98 (32 channels) acquisition cards.
We would like to develop a monitoring experiment over long periods (10 hours) with a sampling rate of 200k Hz.
For the moment, I'm struggling with the intrinsic limits of python and the logic of the Nidaqmx module.
I have so far coded a continuous acquisition routine with a limited number of sensors.
import nidaqmx
from nidaqmx.constants import AcquisitionType
import matplotlib.pyplot as plt
import numpy as np
sample_time = 600 # units = seconds
s_freq = 200000
num_samples = sample_time*s_freq
dt = 1/s_freq
print('go acquisition !')
with nidaqmx.Task() as task:
task.ai_channels.add_ai_accel_chan("PXI1Slot2_3/ai1:3",
sensitivity=10000.0,
max_val=1,
min_val=-1)
task.timing.cfg_samp_clk_timing(s_freq,
sample_mode = AcquisitionType.CONTINUOUS)
data = task.read(number_of_samples_per_channel=num_samples, timeout = nidaqmx.constants.WAIT_INFINITELY)
print('I do it right !')
But with this routine, which is very simple, I can't record monitoring > 10 min. The memory of python is not enough to allow it. And this is totally logical for me.
I tchek the buffer logic on NI website, but I didn't clearly understand how I can implement it here...
I can't understand how I can fit in this little routine a write to disk every X MB of data recorded by the task, while still monitoring and emptying the "data" directory to avoid an overflow, and I didn't see on stackoverflow some right answer in my case.
If you have already encountered this problem and you have the solution, I am interested,
Thanks for reading
I finally outcome my problems, here a solution for large array of sensor with high frequency rate to be continuously monitored.
import matplotlib.pyplot as plt
import numpy as np
import nidaqmx
from nidaqmx.stream_readers import AnalogMultiChannelReader
from nidaqmx import constants
import threading
from datetime import datetime
import os
# Parameters
sampling_freq_in = 200000 # in Hz
buffer_in_size = 800000
bufsize_callback = 200000
buffer_in_size_cfg = round(buffer_in_size) * 10 # clock configuration * 10 ?
chans_in = 32 # number of chan
refresh_rate_plot = 100000 # in Hz
crop = 0 # number of seconds to drop at acquisition start before saving
# Initialize data placeholders
buffer_in = np.zeros((chans_in, buffer_in_size))
data = np.zeros(
(chans_in, 1)) # will contain a first column with zeros but that's fine
print(data.size)
# Definitions of basic functions
def ask_user():
global running
input("Press ENTER/RETURN to stop acquisition and coil drivers.")
running = False
def cfg_read_task(acquisition):
acquisition.ai_channels.ai_gain = 100
acquisition.ai_channels.ai_max = 100
acquisition.ai_channels.add_ai_accel_chan("Dev3/ai0:15",
sensitivity=1000.0,
max_val=1,
min_val=-1) # Cards1
acquisition.ai_channels.add_ai_accel_chan("Dev4/ai0:15",
sensitivity=1000.0,
max_val=1,
min_val=-1) # Cards2
acquisition.timing.cfg_samp_clk_timing(rate=sampling_freq_in,
sample_mode=constants.AcquisitionType.CONTINUOUS,
samps_per_chan=buffer_in_size_cfg)
def reading_task_callback(task_idx, event_type, num_samples, callback_data):
global data
global buffer_in
if running:
path = r'D:\Experience\caca\Acc/'
isExist = os.path.exists(path)
if not isExist:
os.makedirs(path)
buffer_in = np.zeros((chans_in, num_samples)) # double definition ???
stream_in.read_many_sample(buffer_in, num_samples,
timeout=constants.WAIT_INFINITELY)
data = np.append(data, buffer_in,
axis=1) # appends buffered data to total variable data
filename = path + 'Acc_' + str(
datetime.now().strftime("%m%d%h%H%M%S%f"))
extension = '.npy'
np.save(filename + extension, data)
# f=np.fft.rfftfreq(data[4][1::].size,d=1/100000)
# P=abs(np.fft.rfft(data[4][1::]))
# plt.plot(f[f>200] ,P[f>200],'k')
# plt.xlim(100,10000)
# plt.plot(data[0][1::],'k--')
data = np.zeros((chans_in, 1))
return 0
# Configure and setup the tasks
task_in = nidaqmx.Task()
cfg_read_task(task_in)
stream_in = AnalogMultiChannelReader(task_in.in_stream)
task_in.register_every_n_samples_acquired_into_buffer_event(bufsize_callback,
reading_task_callback)
# Start threading to prompt user to stop
thread_user = threading.Thread(target=ask_user)
thread_user.start()
# Main loop
running = True
time_start = datetime.now()
task_in.start()
# Plot a visual feedback for the user's mental health
# f, (ax1, ax2, ax3) = plt.subplots(3, 1, sharex='all', sharey='none')
while running: # make this adapt to number of channels automatically
a = 0
# ax1.clear()
# ax2.clear()
# ax3.clear()
# ax1.plot(data[0, -sampling_freq_in * 5:].T,'r') # 5 seconds rolling window
# ax2.plot(data[1, -sampling_freq_in * 5:].T,'k')
# ax3.plot(data[2, -sampling_freq_in * 5:].T,'b')
# Label and axis formatting
# ax3.set_xlabel('time [s]')
# ax1.set_ylabel('m/s**2')
# ax2.set_ylabel('m/s**2')
# ax3.set_ylabel('m/s**2')
# xticks = np.arange(0, data[0, -sampling_freq_in * 5:].size, sampling_freq_in)
# xticklabels = np.arange(0, xticks.size, 1)
# ax3.set_xticks(xticks)
# ax3.set_xticklabels(xticklabels)
# plt.pause(1/refresh_rate_plot) # required for dynamic plot to work (if too low, nulling performance bad)
# Close task to clear connection once done
task_in.close()
duration = datetime.now() - time_start
print(duration, 'Pan t es mort')
experience_file = [duration, sampling_freq_in]
np.save('info.npy', experience_file)
# savefile:
# Some messages at the end
# print("\n")
# print("OPM acquisition ended.\n")
# print("Acquisition duration: {}.".format(duration))