I am trying to record data from an accelerometer that is connected to a NI DAQ using the python nidaqmx library and APIs. I am starting and stopping the task from a tkinter GUI, so I am using the AnalogMultiChannelReader
so that the data_read is not thread blocking and I can still use the GUI once I start reading data. My code is below, but I adapted it from this answer because my use case is basically the same as the question asker's. I am not sure what the author of that question was using the code for, but I want to write the data to a text file which is local on my desktop.
The code works for some amount of time, but it starts throwing a wall of errors with the code -200279, which I understand to be a buffer overflow error (it works longer if I configure a slower sensor sample rate, shorter if I configure a faster sensor sample rate, but I understand why the former makes the buffer overflow slower). My intuition is that this error is occurring because I am writing to a file, and this writing is slowing down the process, so writing to the file cannot keep up with the speed of data acquisition, which causes the buffer to overflow, but I am not sure how to get around this. All of the solutions that I see on the NIDAQ forum address this error for LabVIEW, but none have addressed it for Python.
Below is my code, if anyone has any insight, I'd be incredibly appreciative. Or, please let me know if you need any clarification.
import nidaqmx
import time
import threading
import numpy as np
import tkinter as tk
from datetime import datetime
from nidaqmx import constants, stream_readers
from tkinter import ttk
class DAQ:
def __init__(self) -> None:
# name of the C-Series Module
self.deviceName = "cDAQ1Mod1"
# constants for the accelerometer
self.sensitivity = 1000 # units = mV/g (specified in self.sensitivity_units)
self.sensitivity_units = constants.AccelSensitivityUnits.MILLIVOLTS_PER_G
self.measurement_range = 5.0 # units = g (specified in self.measurement_units)
self.max_val = self.measurement_range
self.min_val = self.measurement_range * -1
self.measurement_units = constants.AccelUnits.G
self.current_excit_val = 0.004 # units = Amps # spec ranges from 2 to 20 mA, this is the code default
#user input Acquisition
self.num_channels = 3
self.sample_freq = 25600 #sample frequency will be set from GUI
self.update_Hz = 50
# keep track of the task so you don't have more than 1 task, otherwise the code gets in trouble
self.task = None
def createTask(self):
if self.task is None:
""" BELOW IS CODE FROM STACKOVERFLOW """
# intialize task
self.task = nidaqmx.Task()
# add accelerometer to task
self.task.ai_channels.add_ai_accel_chan(self.deviceName + "/ai0", current_excit_val=self.current_excit_val,
min_val=self.min_val, max_val=self.max_val, units=self.measurement_units,
sensitivity=self.sensitivity, sensitivity_units=self.sensitivity_units) # X
self.task.ai_channels.add_ai_accel_chan(self.deviceName + "/ai1", current_excit_val=self.current_excit_val,
min_val=self.min_val, max_val=self.max_val, units=self.measurement_units,
sensitivity=self.sensitivity, sensitivity_units=self.sensitivity_units) # Y
self.task.ai_channels.add_ai_accel_chan(self.deviceName + "/ai2", current_excit_val=self.current_excit_val,
min_val=self.min_val, max_val=self.max_val, units=self.measurement_units,
sensitivity=self.sensitivity, sensitivity_units=self.sensitivity_units) # Z
# configure onboard clock
self.task.timing.cfg_samp_clk_timing(rate=self.sample_freq, sample_mode=constants.AcquisitionType.CONTINUOUS, source="OnboardClock")
self.update_Hz = 30 # hz update
samples_per_buffer = int(self.sample_freq // self.update_Hz) # how many samples you'll read every update
self.task.in_stream.input_buf_size = samples_per_buffer * 10 # allocate memory for the buffer, plus some extra space
print("updateHz = " + str(self.update_Hz))
print("samples per buffer = " + str(samples_per_buffer))
# stream reader for getting input
reader = stream_readers.AnalogMultiChannelReader(self.task.in_stream)
reader.verify_array_shape = False # this speeds up processing a bit
# local path to store data
path = "data_acquisition\\data\\"
datetime_str = datetime.now().strftime("%m%d%Y_%H%M%S")
self.filename = path + "acc_" + datetime_str + ".txt"
# open the file and store it locally.
# continually opening and closing the file will slow down the application and make the buffer overflow
self.file = open(self.filename, "w")
self.file.write("SAMPLERATE="+str(self.sample_freq)+"\n")
def reading_task_callback(task_idx, event_type, num_samples, callback_data=None):
"""
adapted from stackoverflow https://stackoverflow.com/questions/56366033/continuous-acquistion-with-nidaqmx
"""
buffer = np.zeros((self.num_channels, num_samples), dtype=np.float64)
reader.read_many_sample(buffer, constants.READ_ALL_AVAILABLE, timeout=constants.WAIT_INFINITELY)
# Convert the data from channel as a row order to channel as a column
data = buffer.T.astype(np.float64)
# append the data to the csv file
"""THIS IS WHERE I WRITE THE DATA"""
np.savetxt(self.file, data, fmt="%s")
# must return an integer, otherwise callback throws a wall of errors
return 0
# register the callback with the task to make sure it executes
self.task.register_every_n_samples_acquired_into_buffer_event(samples_per_buffer, reading_task_callback)
def startTask(self, sample_freq = 25600):
if self.task is None:
if sample_freq is not None and isinstance(sample_freq, int):
self.sample_freq = sample_freq
# create and start the task
self.createTask()
self.task.start()
def endTask(self):
if self.task is not None:
# self.task.wait_until_done()
self.task.stop()
self.task.close()
# close the file
self.file.close()
self.task = None
class DAQ_GUI:
def __init__(self) -> None:
"""
Create a GUI for easy start/stop of NIDAQ accelerometer data collection
THIS GUI WORKS FUNCTIONALLY
"""
self.DAQ = DAQ() # initialize DAQ object
# initialize tkinter window
self.window = tk.Tk()
self.window.title("record DAQ data")
self.window.columnconfigure(0, weight=1)
s = ttk.Style()
s.theme_use('default')
s.configure("TButton", padding=(6, 3), width=0, relief="ridge")
s.configure("TEntry")
s.configure("TLabel", padding=8)
s.configure("TLabelframe", padding=8)
self.mainFrame = ttk.Frame(self.window, padding=12)
self.mainFrame.grid(sticky=tk.NSEW)
self.mainFrame.columnconfigure(0, weight=1)
self.mainFrame.columnconfigure(1, weight=1)
self.dataCollectionLabelframe = ttk.Labelframe(self.mainFrame, text="Data Collection Frame")
self.dataCollectionLabelframe.grid(column=0, row=0)
ttk.Button(self.dataCollectionLabelframe, text="START Data Collection", command=self.startTask ).grid(row=0, column=0)
ttk.Button(self.dataCollectionLabelframe, text="STOP Data Collection" , command=self.DAQ.endTask ).grid(row=0, column=1)
ttk.Label( self.dataCollectionLabelframe, text="Sampling Rate (Hz)" ).grid(row=2, column=0)
self.samplerateVar = tk.StringVar(value = "1000")
ttk.Entry( self.dataCollectionLabelframe, textvariable=self.samplerateVar ).grid(row=2, column=1)
self.mainFrame.pack()
self.window.update_idletasks()
self.window.update()
def startTask(self):
# get the samplerate from the GUI
samplerate = self.samplerateVar.get()
if len(samplerate) > 0:
try:
samplerate = int(samplerate)
except:
samplerate = None
else:
samplerate = None
# start the DAQ
self.DAQ.startTask(samplerate)
if __name__ == "__main__":
DAQ_GUI().window.mainloop()
I have not found the error message helpful, but I've pasted it below for reference.
Status Code: -200279
Exception ignored on calling ctypes callback function: <function DAQ.createTask.<locals>.reading_task_callback at 0x000002919C86A820>
Traceback (most recent call last):
File "c:\PATH\TO\FILE.py", line 159, in reading_task_callback
reader.read_many_sample(buffer, constants.READ_ALL_AVAILABLE, timeout=constants.WAIT_INFINITELY)
File "C:\ProgramData\Anaconda3\envs\ni_daq\lib\site-packages\nidaqmx\stream_readers.py", line 331, in read_many_sample
return _read_analog_f_64(
File "C:\ProgramData\Anaconda3\envs\ni_daq\lib\site-packages\nidaqmx\_task_modules\read_functions.py", line 31, in _read_analog_f_64
check_for_error(error_code, samps_per_chan_read=samps_per_chan_read.value)
File "C:\ProgramData\Anaconda3\envs\ni_daq\lib\site-packages\nidaqmx\errors.py", line 192, in check_for_error
raise DaqReadError(error_buffer.value.decode("utf-8"), error_code, samps_per_chan_read)
nidaqmx.errors.DaqReadError: The application is not able to keep up with the hardware acquisition.
Property: DAQmx_Read_RelativeTo
Property: DAQmx_Read_Offset
Corresponding Value: 0
Task Name: _unnamedTask<1>