Непрерывный сбор данных с помощью nidaqmx

Я не могу получать непрерывные данные из NI DAQ, используя nidaqxm на Python 3.

Я уже собирал конечные данные с помощью аналогичного кода, хотя я не могу понять, что мне нужно изменить, чтобы получать данные непрерывно.

import nidaqmx
from nidaqmx import constants
from nidaqmx import stream_readers
from nidaqmx import stream_writers
import matplotlib.pyplot as plt

#user input Acquisition
Ch00_name = 'A00'
Sens_Ch00 = 100#sensibilidade em mV/g
Ch01_name = 'A01'
Sens_Ch01 = 100#sensibilidade em mV/g
fs_acq = 1651 #sample frequency
t_med = 2 #time to acquire data

with nidaqmx.Task() as task:
    task.ai_channels.add_ai_accel_chan(physical_channel="cDAQ9191-1B7B393Mod1/ai0", name_to_assign_to_channel=Ch00_name,
                                       sensitivity=Sens_Ch00, min_val=-5, max_val=5, current_excit_val=0.002)
    task.ai_channels.add_ai_accel_chan(physical_channel="cDAQ9191-1B7B393Mod1/ai1", name_to_assign_to_channel=Ch01_name,
                                       sensitivity=Sens_Ch01, min_val=-5, max_val=5, current_excit_val=0.002)

    task.timing.cfg_samp_clk_timing(rate=fs_acq, sample_mode= constants.AcquisitionType.CONTINUOUS, samps_per_chan=(t_med * fs_acq),)

    reader = stream_readers.AnalogMultiChannelReader(task.in_stream)
    writer = stream_writers.AnalogMultiChannelWriter(task.out_stream)

Что мне нужно изменить в моем коде, чтобы получать непрерывные данные?


person Jose Guilherme    schedule 29.05.2019    source источник


Ответы (1)


Вам необходимо зарегистрировать функцию обратного вызова. Я предполагаю, что ваш блок работает, и мигает какой-то индикатор состояния, чтобы показать, что задача выполняется.

import nidaqmx
from nidaqmx import constants
from nidaqmx import stream_readers
from nidaqmx import stream_writers
import matplotlib.pyplot as plt

import numpy as np

#user input Acquisition
Ch00_name = 'A00'
Sens_Ch00 = 100#sensibilidade em mV/g
Ch01_name = 'A01'
Sens_Ch01 = 100#sensibilidade em mV/g
num_channels = 2
fs_acq = 1651 #sample frequency
t_med = 2 #time to acquire data


with nidaqmx.Task() as task:
    task.ai_channels.add_ai_accel_chan(physical_channel="cDAQ9191-1B7B393Mod1/ai0", name_to_assign_to_channel=Ch00_name,
                                       sensitivity=Sens_Ch00, min_val=-5, max_val=5, current_excit_val=0.002)
    task.ai_channels.add_ai_accel_chan(physical_channel="cDAQ9191-1B7B393Mod1/ai1", name_to_assign_to_channel=Ch01_name,
                                       sensitivity=Sens_Ch01, min_val=-5, max_val=5, current_excit_val=0.002)

    task.timing.cfg_samp_clk_timing(rate=fs_acq, sample_mode=constants.AcquisitionType.CONTINUOUS, 
                                    samps_per_chan=(t_med * fs_acq),) # you may not need samps_per_chan

    # I set an input_buf_size
    samples_per_buffer = int(fs_acq // 30)  # 30 hz update
    # task.in_stream.input_buf_size = samples_per_buffer * 10  # plus some extra space

    reader = stream_readers.AnalogMultiChannelReader(task.in_stream)
    writer = stream_writers.AnalogMultiChannelWriter(task.out_stream)

    def reading_task_callback(task_idx, event_type, num_samples, callback_data=None):
        """After data has been read into the NI buffer this callback is called to read in the data from the buffer.

        This callback is for working with the task callback register_every_n_samples_acquired_into_buffer_event.

        Args:
            task_idx (int): Task handle index value
            event_type (nidaqmx.constants.EveryNSamplesEventType): ACQUIRED_INTO_BUFFER
            num_samples (int): Number of samples that was read into the buffer.
            callback_data (object)[None]: No idea. Documentation says: The callback_data parameter contains the value
                you passed in the callback_data parameter of this function.
        """
        buffer = np.zeros((num_channels, num_samples), dtype=np.float32)
        reader.read_many_sample(buffer, num_samples, timeout=constants.WAIT_INFINITELY)

        # Convert the data from channel as a row order to channel as a column
        data = buffer.T.astype(np.float32)

        # Do something with the data

    task.register_every_n_samples_acquired_into_buffer_event(samples_per_buffer, reading_task_callback)

Это то, что сработало для меня. Я надеюсь, что это помогает

person justengel    schedule 29.05.2019
comment
Спасибо за ответ. Я пытался запустить этот код, но это не сработало... сначала self.dtype не распознается, а также код не работает непрерывно. Как я могу получить непрерывные данные, например, распечатать данные, которые я измеряю? - person Jose Guilherme; 01.05.2020
comment
@JoseGuilherme Извините, в self.dtype остался лишний код. Это место должно быть просто результирующим типом данных, который вы хотите. Я изменил его на np.float32. Если вы хотите, чтобы это работало несколько секунд, вам нужно оставить программу запущенной. Если вы поставите time.sleep(10) после запуска задачи, вы должны увидеть мигание устройства NI в течение 10 секунд. Если вы хотите что-то сделать с данными, вам придется распечатать или добавить данные в массив, где вы видите # Что-то сделать с данными. Как правило, вы используете кольцевой буфер, к которому можно получить доступ в другой части кода с помощью таймера или потока. - person justengel; 01.05.2020