3

Inspired by the answer to this question, I have tried the following code:

import nidaqmx
from nidaqmx import stream_readers
from nidaqmx import constants
import time

sfreq = 1000
bufsize = 100

data = np.zeros((1, 1), dtype = np.float32)  # initializes total data file

with nidaqmx.Task() as task:
    task.ai_channels.add_ai_voltage_chan("cDAQ2Mod1/ai1")
    task.timing.cfg_samp_clk_timing(rate = sfreq, sample_mode = constants.AcquisitionType.CONTINUOUS,
                                    samps_per_chan = bufsize)  # unclear samps_per_chan is needed or why it would be different than bufsize
    stream = stream_readers.AnalogMultiChannelReader(task.in_stream)

    def reading_task_callback(task_id, event_type, num_samples, callback_data=None):  # num_samples is set to bufsize
        buffer = np.zeros((1, num_samples), dtype = np.float32)  # probably better to define it here inside the callback
        stream.read_many_sample(buffer, num_samples, timeout = constants.WAIT_INFINITELY)
        data = np.append(data, buffer, axis = 1)  # hopping to retrieve this data after the read is stopped

    task.register_every_n_samples_acquired_into_buffer_event(bufsize, reading_task_callback)

Expected behavior: it reads continuously from a channel. I am not even trying to get it to do something specific yet (such as plotting in real time), but I would expect the python console to run until one stops it, since the goal is to read continuously.

Observed behavior: running this code proceeds quickly and the console prompt is returned.

Problem: it seems to me this is not reading continuously at all. Furthermore, the data variable does not get appended like I would like it to (I know that retrieving a certain number of data samples does not require such convoluted code with nidaqmx; this is just one way I thought I could try and see if this is doing what I wanted, i.e. read continuously and continuously append the buffered sample values to data, so that I can then look at the total data acquired).

Any help would be appreciated. I am essentially certain the way to achieve this is by making use of these callbacks which are part of nidaqmx, but somehow I do not seem to manage them well. Note I have been able to read a predefined and finite amount of data samples from analog input channels by making use of read_many_sample.

Details: NI cDAQ 9178 with NI 9205 module inserted, on Lenovo laptop running Windows Home 10, python 3.7 and nidaqmx package for python.

EDIT: for anyone interested, I now have this working in the following way, with a live visual feedback using matplotlib, and - not 100% percent sure yet - it seems there no buffer problems even if one aims at long acquisitions (>10 minutes). Here is the code (not cleaned, sorry):

"""
Analog data acquisition for QuSpin's OPMs via National Instruments' cDAQ unit
The following assumes:
"""

# Imports
import matplotlib.pyplot as plt
import numpy as np

import nidaqmx
from nidaqmx.stream_readers import AnalogMultiChannelReader
from nidaqmx import constants
# from nidaqmx import stream_readers  # not needed in this script
# from nidaqmx import stream_writers  # not needed in this script

import threading
import pickle
from datetime import datetime
import scipy.io


# Parameters
sampling_freq_in = 1000  # in Hz
buffer_in_size = 100
bufsize_callback = buffer_in_size
buffer_in_size_cfg = round(buffer_in_size * 1)  # clock configuration
chans_in = 3  # set to number of active OPMs (x2 if By and Bz are used, but that is not recommended)
refresh_rate_plot = 10  # in Hz
crop = 10  # number of seconds to drop at acquisition start before saving
my_filename = 'test_3_opms'  # with full path if target folder different from current folder (do not leave trailing /)



# Initialize data placeholders
buffer_in = np.zeros((chans_in, buffer_in_size))
data = np.zeros((chans_in, 1))  # will contain a first column with zeros but that's fine


# Definitions of basic functions
def ask_user():
    global running
    input("Press ENTER/RETURN to stop acquisition and coil drivers.")
    running = False


def cfg_read_task(acquisition):  # uses above parameters
    acquisition.ai_channels.add_ai_voltage_chan("cDAQ2Mod1/ai1:3")  # has to match with chans_in
    acquisition.timing.cfg_samp_clk_timing(rate=sampling_freq_in, sample_mode=constants.AcquisitionType.CONTINUOUS,
                                           samps_per_chan=buffer_in_size_cfg)


def reading_task_callback(task_idx, event_type, num_samples, callback_data):  # bufsize_callback is passed to num_samples
    global data
    global buffer_in

    if running:
        # It may be wiser to read slightly more than num_samples here, to make sure one does not miss any sample,
        # see: https://documentation.help/NI-DAQmx-Key-Concepts/contCAcqGen.html
        buffer_in = np.zeros((chans_in, num_samples))  # double definition ???
        stream_in.read_many_sample(buffer_in, num_samples, timeout=constants.WAIT_INFINITELY)

        data = np.append(data, buffer_in, axis=1)  # appends buffered data to total variable data

    return 0  # Absolutely needed for this callback to be well defined (see nidaqmx doc).


# Configure and setup the tasks
task_in = nidaqmx.Task()
cfg_read_task(task_in)
stream_in = AnalogMultiChannelReader(task_in.in_stream)
task_in.register_every_n_samples_acquired_into_buffer_event(bufsize_callback, reading_task_callback)


# Start threading to prompt user to stop
thread_user = threading.Thread(target=ask_user)
thread_user.start()


# Main loop
running = True
time_start = datetime.now()
task_in.start()


# Plot a visual feedback for the user's mental health
f, (ax1, ax2, ax3) = plt.subplots(3, 1, sharex='all', sharey='none')
while running:  # make this adapt to number of channels automatically
    ax1.clear()
    ax2.clear()
    ax3.clear()
    ax1.plot(data[0, -sampling_freq_in * 5:].T)  # 5 seconds rolling window
    ax2.plot(data[1, -sampling_freq_in * 5:].T)
    ax3.plot(data[2, -sampling_freq_in * 5:].T)
    # Label and axis formatting
    ax3.set_xlabel('time [s]')
    ax1.set_ylabel('voltage [V]')
    ax2.set_ylabel('voltage [V]')
    ax3.set_ylabel('voltage [V]')
    xticks = np.arange(0, data[0, -sampling_freq_in * 5:].size, sampling_freq_in)
    xticklabels = np.arange(0, xticks.size, 1)
    ax3.set_xticks(xticks)
    ax3.set_xticklabels(xticklabels)

    plt.pause(1/refresh_rate_plot)  # required for dynamic plot to work (if too low, nulling performance bad)


# Close task to clear connection once done
task_in.close()
duration = datetime.now() - time_start


# Final save data and metadata ... first in python reloadable format:
filename = my_filename
with open(filename, 'wb') as f:
    pickle.dump(data, f)
'''
Load this variable back with:
with open(name, 'rb') as f:
    data_reloaded = pickle.load(f)
'''
# Human-readable text file:
extension = '.txt'
np.set_printoptions(threshold=np.inf, linewidth=np.inf)  # turn off summarization, line-wrapping
with open(filename + extension, 'w') as f:
    f.write(np.array2string(data.T, separator=', '))  # improve precision here!
# Now in matlab:
extension = '.mat'
scipy.io.savemat(filename + extension, {'data':data})


# Some messages at the end
num_samples_acquired = data[0,:].size
print("\n")
print("OPM acquisition ended.\n")
print("Acquisition duration: {}.".format(duration))
print("Acquired samples: {}.".format(num_samples_acquired - 1))


# Final plot of whole time course the acquisition
plt.close('all')
f_tot, (ax1, ax2, ax3) = plt.subplots(3, 1, sharex='all', sharey='none')
ax1.plot(data[0, 10:].T)  # note the exclusion of the first 10 iterations (automatically zoomed in plot)
ax2.plot(data[1, 10:].T)
ax3.plot(data[2, 10:].T)
# Label formatting ...
ax3.set_xlabel('time [s]')
ax1.set_ylabel('voltage [V]')
ax2.set_ylabel('voltage [V]')
ax3.set_ylabel('voltage [V]')
xticks = np.arange(0, data[0, :].size, sampling_freq_in)
xticklabels = np.arange(0, xticks.size, 1)
ax3.set_xticks(xticks)
ax3.set_xticklabels(xticklabels)
plt.show()

Of course comments are appreciated. This is probably still suboptimal.

  • 1
    Do not know much about nidaqmx but looking at the code I have the feeling that you are creating the task without running it. I would expect another command like task.exec() or task.wait(). – Marco Mar 31 '20 at 09:35
  • Aouch, you're absolutely right of course. Sorry, I don't have a lot of experience using high-level Python packages such as this one (it's also just a very poorly documented one I find). Thank you, this helps me obtain different errors, which is progress! – Gustavo Lucena Gómez Mar 31 '20 at 13:43
  • Did you solve your problem @Gustavo Lucena Gómez? I am having the same issue... – Jose Guilherme May 01 '20 at 19:11
  • Sure did, at least in a first approximation the problem was that (dumb me) I wasn't even starting the task. See @Marco 's reply here above! Now I get tons of buffer errors and I'm trying to get to make it read continuously and smoothly with no errors nor warnings, and saving the data from time to time. Will work on that next week in fact for the whole week. – Gustavo Lucena Gómez May 02 '20 at 08:51

0 Answers0