diff --git a/README.rst b/README.rst index 911c910..cef91f2 100644 --- a/README.rst +++ b/README.rst @@ -61,7 +61,8 @@ To install BluePyEfe, run: Quick Start and Operating Principle =================================== -For a hands-on introduction to BluePyEfe, have a look at the notebook `examples/example_of_extraction.ipynb `_ +For a hands-on introduction to BluePyEfe, have a look at the notebook `examples/example_of_extraction.ipynb `_. +For an NWB-focused example, see `examples/nwb_extraction.ipynb `_. The goal of the present package is to extract meaningful electrophysiological features (e-features) from voltage time series. The e-features considered in the present package are the one implemented in the `eFEL python library `_. See `this pdf `_ for a list of available e-features. diff --git a/bluepyefe/ecode/__init__.py b/bluepyefe/ecode/__init__.py index 4b6623b..7c5be5e 100644 --- a/bluepyefe/ecode/__init__.py +++ b/bluepyefe/ecode/__init__.py @@ -18,7 +18,9 @@ from . import DeHyperPol from . import HyperDePol from . import SpikeRec +from . import capCheck from . import negCheops +from . import pinkNoise from . import posCheops from . import ramp from . import sAHP @@ -63,4 +65,6 @@ "poscheops": posCheops.PosCheops, "spikerec": SpikeRec.SpikeRec, "sinespec": sineSpec.SineSpec, + "vupinknoise": pinkNoise.VUPinkNoise, + "vucapcheck": capCheck.VUCapCheck, } diff --git a/bluepyefe/ecode/capCheck.py b/bluepyefe/ecode/capCheck.py new file mode 100644 index 0000000..5a5d0b2 --- /dev/null +++ b/bluepyefe/ecode/capCheck.py @@ -0,0 +1,148 @@ +"""VUCapCheck eCode class""" + +""" +Copyright 2026 Open Brain Institute + + This file is part of BluePyEfe + + This library is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License version 3.0 as published + by the Free Software Foundation. + + This library is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more + details. + + You should have received a copy of the GNU Lesser General Public License + along with this library; if not, write to the Free Software Foundation, Inc., + 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +""" +import logging +import numpy + +from ..recording import Recording +from .tools import base_current + +logger = logging.getLogger(__name__) + + +class VUCapCheck(Recording): + + """VU capacitance-check current stimulus""" + + def __init__( + self, + config_data, + reader_data, + protocol_name="VUCapCheck", + efel_settings=None + ): + + super(VUCapCheck, self).__init__(config_data, reader_data, protocol_name) + + self.ton = None + self.toff = None + self.tend = None + self.amp = None + self.hypamp = None + self.dt = None + self.waveform = None + + self.amp_rel = None + self.hypamp_rel = None + + if self.t is not None and self.current is not None: + self.interpret( + self.t, self.current, self.config_data, self.reader_data + ) + + if self.voltage is not None: + self.set_autothreshold() + self.compute_spikecount(efel_settings) + + self.export_attr = ["ton", "toff", "tend", "amp", "hypamp", "dt", + "waveform", "amp_rel", "hypamp_rel"] + + def get_stimulus_parameters(self): + """Returns the eCode parameters""" + ecode_params = { + "delay": self.ton, + "amp": self.amp, + "thresh_perc": self.amp_rel, + "duration": self.toff - self.ton, + "totduration": self.tend, + "dt": self.dt, + "waveform": self.waveform, + } + return ecode_params + + def _get_timing_index(self, name, config_data, reader_data): + if name in config_data and config_data[name] is not None: + return int(round(config_data[name] / self.dt)) + if name in reader_data and reader_data[name] is not None: + return int(round(reader_data[name])) + return None + + def _detect_stimulus_indexes(self, current): + deviation = numpy.abs(numpy.asarray(current) - self.hypamp) + edge = min(max(1, int(round(10.0 / self.dt))), len(deviation)) + noise_level = numpy.std( + numpy.concatenate((deviation[:edge], deviation[-edge:])) + ) + threshold = max(4.5 * noise_level, 0.02 * numpy.max(deviation), 1e-5) + active = numpy.flatnonzero(deviation > threshold) + + if len(active) == 0: + logger.warning( + "The automatic cap-check detection failed for the recording " + f"{self.protocol_name} in files {self.files}. The whole trace " + "will be used as the stimulus waveform." + ) + return 0, len(deviation) + + return active[0], active[-1] + 1 + + def interpret(self, t, current, config_data, reader_data): + """Analyse a current array and extract from it the parameters + needed to reconstruct the array""" + self.dt = t[1] + + ton = self._get_timing_index("ton", config_data, reader_data) + toff = self._get_timing_index("toff", config_data, reader_data) + + hypamp_value = base_current(current, idx_ton=ton or 300) + self.set_amplitudes_ecode("hypamp", config_data, reader_data, hypamp_value) + + if ton is None or toff is None: + detected_ton, detected_toff = self._detect_stimulus_indexes(current) + ton = detected_ton if ton is None else ton + toff = detected_toff if toff is None else toff + + ton = max(0, min(ton, len(current) - 1)) + toff = max(ton + 1, min(toff, len(current))) + + stimulus = numpy.asarray(current[ton:toff]) - self.hypamp + amp_value = numpy.max(numpy.abs(stimulus)) if len(stimulus) else 0.0 + self.set_amplitudes_ecode("amp", config_data, reader_data, amp_value) + + if self.amp == 0.0: + self.waveform = numpy.zeros(stimulus.shape) + else: + self.waveform = stimulus / self.amp + + self.ton = t[ton] + self.toff = t[toff] if toff < len(t) else len(t) * self.dt + self.tend = len(t) * self.dt + + def generate(self): + """Generate the current array from the parameters of the ecode""" + t = numpy.arange(0.0, self.tend, self.dt) + current = numpy.full(t.shape, numpy.float64(self.hypamp)) + + waveform = numpy.asarray(self.waveform) + ton = int(round(self.ton / self.dt)) + toff = min(ton + len(waveform), len(current)) + current[ton:toff] += numpy.float64(self.amp) * waveform[:toff - ton] + + return t, current diff --git a/bluepyefe/ecode/pinkNoise.py b/bluepyefe/ecode/pinkNoise.py new file mode 100644 index 0000000..6070a6d --- /dev/null +++ b/bluepyefe/ecode/pinkNoise.py @@ -0,0 +1,151 @@ +"""VUPinkNoise eCode class""" + +""" +Copyright 2026 Open Brain Institute + + This file is part of BluePyEfe + + This library is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License version 3.0 as published + by the Free Software Foundation. + + This library is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more + details. + + You should have received a copy of the GNU Lesser General Public License + along with this library; if not, write to the Free Software Foundation, Inc., + 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +""" +import logging +import numpy + +from ..recording import Recording +from .tools import base_current +from .tools import scipy_signal2d + +logger = logging.getLogger(__name__) + + +class VUPinkNoise(Recording): + + """VU pink-noise current stimulus""" + + def __init__( + self, + config_data, + reader_data, + protocol_name="VUPinkNoise", + efel_settings=None + ): + + super(VUPinkNoise, self).__init__(config_data, reader_data, protocol_name) + + self.ton = None + self.toff = None + self.tend = None + self.amp = None + self.hypamp = None + self.dt = None + self.waveform = None + + self.amp_rel = None + self.hypamp_rel = None + + if self.t is not None and self.current is not None: + self.interpret( + self.t, self.current, self.config_data, self.reader_data + ) + + if self.voltage is not None: + self.set_autothreshold() + self.compute_spikecount(efel_settings) + + self.export_attr = ["ton", "toff", "tend", "amp", "hypamp", "dt", + "waveform", "amp_rel", "hypamp_rel"] + + def get_stimulus_parameters(self): + """Returns the eCode parameters""" + ecode_params = { + "delay": self.ton, + "amp": self.amp, + "thresh_perc": self.amp_rel, + "duration": self.toff - self.ton, + "totduration": self.tend, + "dt": self.dt, + "waveform": self.waveform, + } + return ecode_params + + def _get_timing_index(self, name, config_data, reader_data): + if name in config_data and config_data[name] is not None: + return int(round(config_data[name] / self.dt)) + if name in reader_data and reader_data[name] is not None: + return int(round(reader_data[name])) + return None + + def _detect_stimulus_indexes(self, smooth_current): + deviation = numpy.abs(numpy.asarray(smooth_current) - self.hypamp) + edge = min(max(1, int(round(10.0 / self.dt))), len(deviation)) + noise_level = numpy.std( + numpy.concatenate((deviation[:edge], deviation[-edge:])) + ) + threshold = max(4.5 * noise_level, 0.02 * numpy.max(deviation), 1e-5) + active = numpy.flatnonzero(deviation > threshold) + + if len(active) == 0: + logger.warning( + "The automatic pink-noise detection failed for the recording " + f"{self.protocol_name} in files {self.files}. The whole trace " + "will be used as the stimulus waveform." + ) + return 0, len(deviation) + + return active[0], active[-1] + 1 + + def interpret(self, t, current, config_data, reader_data): + """Analyse a current array and extract from it the parameters + needed to reconstruct the array""" + self.dt = t[1] + + smooth_current = scipy_signal2d(current, 85) + + ton = self._get_timing_index("ton", config_data, reader_data) + toff = self._get_timing_index("toff", config_data, reader_data) + + hypamp_value = base_current(current, idx_ton=ton or 300) + self.set_amplitudes_ecode("hypamp", config_data, reader_data, hypamp_value) + + if ton is None or toff is None: + detected_ton, detected_toff = self._detect_stimulus_indexes(smooth_current) + ton = detected_ton if ton is None else ton + toff = detected_toff if toff is None else toff + + ton = max(0, min(ton, len(current) - 1)) + toff = max(ton + 1, min(toff, len(current))) + + stimulus = numpy.asarray(current[ton:toff]) - self.hypamp + amp_value = numpy.max(numpy.abs(stimulus)) if len(stimulus) else 0.0 + self.set_amplitudes_ecode("amp", config_data, reader_data, amp_value) + + if self.amp == 0.0: + self.waveform = numpy.zeros(stimulus.shape) + else: + self.waveform = stimulus / self.amp + + self.ton = t[ton] + self.toff = t[toff] if toff < len(t) else len(t) * self.dt + self.tend = len(t) * self.dt + + def generate(self): + """Generate the current array from the parameters of the ecode""" + t = numpy.arange(0.0, self.tend, self.dt) + current = numpy.full(t.shape, numpy.float64(self.hypamp)) + + waveform = numpy.asarray(self.waveform) + ton = int(round(self.ton / self.dt)) + toff = min(ton + len(waveform), len(current)) + current[ton:toff] += numpy.float64(self.amp) * waveform[:toff - ton] + + return t, current diff --git a/bluepyefe/ecode/sineSpec.py b/bluepyefe/ecode/sineSpec.py index 2a122f7..6b17b81 100644 --- a/bluepyefe/ecode/sineSpec.py +++ b/bluepyefe/ecode/sineSpec.py @@ -27,6 +27,20 @@ logger = logging.getLogger(__name__) +DEFAULT_CHIRP_FREQUENCY_BASE = 1.0 +DEFAULT_CHIRP_FREQUENCY_SCALE = 1.0 +DEFAULT_CHIRP_FREQUENCY_DENOMINATOR = 5.15 +DEFAULT_CHIRP_TIME_OFFSET = 0.1 + + +def _get_metadata_float(name, config_data, reader_data, default): + """Return a numeric metadata value, preferring config over reader data.""" + if name in config_data and config_data[name] is not None: + return float(config_data[name]) + if name in reader_data and reader_data[name] is not None: + return float(reader_data[name]) + return default + class SineSpec(Recording): @@ -51,6 +65,30 @@ def __init__( self.amp_rel = None self.hypamp_rel = None + self.chirp_frequency_base = _get_metadata_float( + "chirp_frequency_base", + self.config_data, + self.reader_data, + DEFAULT_CHIRP_FREQUENCY_BASE, + ) + self.chirp_frequency_scale = _get_metadata_float( + "chirp_frequency_scale", + self.config_data, + self.reader_data, + DEFAULT_CHIRP_FREQUENCY_SCALE, + ) + self.chirp_frequency_denominator = _get_metadata_float( + "chirp_frequency_denominator", + self.config_data, + self.reader_data, + DEFAULT_CHIRP_FREQUENCY_DENOMINATOR, + ) + self.chirp_time_offset = _get_metadata_float( + "chirp_time_offset", + self.config_data, + self.reader_data, + DEFAULT_CHIRP_TIME_OFFSET, + ) if self.t is not None and self.current is not None: self.interpret( @@ -62,7 +100,11 @@ def __init__( self.compute_spikecount(efel_settings) self.export_attr = ["ton", "toff", "tend", "amp", "hypamp", "dt", - "amp_rel", "hypamp_rel"] + "amp_rel", "hypamp_rel", + "chirp_frequency_base", + "chirp_frequency_scale", + "chirp_frequency_denominator", + "chirp_time_offset"] def get_stimulus_parameters(self): """Returns the eCode parameters""" @@ -72,6 +114,10 @@ def get_stimulus_parameters(self): "thresh_perc": self.amp_rel, "duration": self.toff - self.ton, "totduration": self.tend, + "chirp_frequency_base": self.chirp_frequency_base, + "chirp_frequency_scale": self.chirp_frequency_scale, + "chirp_frequency_denominator": self.chirp_frequency_denominator, + "chirp_time_offset": self.chirp_time_offset, } return ecode_params @@ -121,11 +167,18 @@ def generate(self): t = numpy.arange(0.0, self.tend, self.dt) t_sine = numpy.arange(0.0, self.tend / 1e3, self.dt / 1e3) + t_shifted = t_sine - self.chirp_time_offset current = self.amp * numpy.sin( 2.0 * numpy.pi - * (1.0 + (1.0 / (5.15 - (t_sine - 0.1)))) - * (t_sine - 0.1) + * ( + self.chirp_frequency_base + + ( + self.chirp_frequency_scale + / (self.chirp_frequency_denominator - t_shifted) + ) + ) + * t_shifted ) current[:ton_idx] = 0.0 diff --git a/bluepyefe/nwbreader.py b/bluepyefe/nwbreader.py index 6cd251c..3feeee5 100644 --- a/bluepyefe/nwbreader.py +++ b/bluepyefe/nwbreader.py @@ -5,9 +5,20 @@ PROTOCOL_VU_TO_BBP = { "X1PS_SubThresh_DA_0": "IV", - "X2LP_Search_DA_0": "IDthresh", - "X4PS_SupraThresh_DA_0": "IDrest", - "CCSteps_DA_0": "Step" + "X2LP_Search_DA_0": "IDThresh", + "X3LP_Rheo_DA_0": "IDRest", + "X4PS_SupraThresh_DA_0": "IDRest", + "X4PT_C2NSD1SHORT_DA_0": "VUPinkNoise", + "X4PU_C2NSD2SHORT_DA_0": "VUPinkNoise", + "X5SP_Search_DA_0": "IDThresh", + "X6SP_Rheo_DA_0": "IDRest", + "X6SQ_C2SSTRIPLE_DA_0": "SpikeRec", + "X7Ramp_DA_0": "Ramp", + "X8_CHIRP_DA_0": "SineSpec", + "X9_C1QCAPCHK_DA_0": "VUCapCheck", + "X9_C1SQCAPCHK_DA_0": "VUCapCheck", + "CCSteps_DA_0": "Step", + "steps_DA_0": "Step", } @@ -428,6 +439,11 @@ def read(self): data = [] target_protocols = self._get_target_protocols() + target_protocols_lower = ( + [protocol.lower() for protocol in target_protocols] + if target_protocols + else None + ) for sweep_name, current_sweep in list(self.content["stimulus"]["presentation"].items()): stimulus_description = None @@ -440,7 +456,10 @@ def read(self): continue translated_name = PROTOCOL_VU_TO_BBP[stimulus_description] - if translated_name not in target_protocols: + if ( + target_protocols_lower and + translated_name.lower() not in target_protocols_lower + ): continue voltage_sweep_name = sweep_name.replace("DA", "AD") @@ -455,6 +474,10 @@ def read(self): start_time=voltage_sweeps[voltage_sweep_name]["starting_time"], trace_name=sweep_name )) + if len(data[-1]["voltage"]) == 0 or len(data[-1]["current"]) == 0: + logger.info("Skipping %s because voltage or current data is empty.", sweep_name) + data.pop(-1) + continue # Shorten protocols that finish with NaNs first_nan = numpy.argmax(numpy.isnan(data[-1]["current"])) @@ -467,7 +490,8 @@ def read(self): data.pop(-1) else: # Offset the current with the holding current - holding_current = float(voltage_sweeps[voltage_sweep_name]["bias_current"][()]) * 1e-12 # in pA + bias_current = voltage_sweeps[voltage_sweep_name]["bias_current"][()] + holding_current = float(numpy.asarray(bias_current).reshape(-1)[0]) * 1e-12 # in pA data[-1]["current"] = numpy.asarray(data[-1]["current"]) + holding_current # For Step, IV and IDRest protocols, replace the first 90 ms with the value at 90 ms diff --git a/examples/nwb_extraction.ipynb b/examples/nwb_extraction.ipynb new file mode 100644 index 0000000..302c495 --- /dev/null +++ b/examples/nwb_extraction.ipynb @@ -0,0 +1,186 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Extracting efeatures from NWB files with BluePyEfe\n", + "\n", + "This notebook shows the NWB-specific workflow: inspect an NWB file, read traces through the automatic NWB reader and extract a couple of efeatures." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "import pathlib\n", + "import tempfile\n", + "from pprint import pprint\n", + "\n", + "import h5py\n", + "import matplotlib\n", + "import numpy as np\n", + "\n", + "matplotlib.use(\"Agg\")\n", + "import matplotlib.pyplot as plt\n", + "\n", + "from bluepyefe.cell import Cell\n", + "from bluepyefe.reader import inspect_nwb, nwb_reader" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The examples can be run either from the repository root or from the `examples` directory." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "cwd = pathlib.Path.cwd()\n", + "repo_root = cwd.parent if cwd.name == \"examples\" else cwd\n", + "\n", + "bbp_nwb_path = repo_root / \"tests\" / \"exp_data\" / \"hippocampus-portal\" / \"99111002.nwb\"" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Inspect the NWB file\n", + "\n", + "`inspect_nwb` chooses the reader class from the file layout and reports the protocols that BluePyEfe can parse." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "info = inspect_nwb(bbp_nwb_path)\n", + "\n", + "summary = {\n", + " \"reader\": info[\"reader\"],\n", + " \"protocols\": info[\"protocols\"],\n", + " \"trace_count\": len(info[\"traces\"]),\n", + " \"metadata\": {\n", + " key: info[\"metadata\"].get(key)\n", + " for key in (\"identifier\", \"nwb_version\", \"session_description\")\n", + " },\n", + "}\n", + "\n", + "pprint(summary)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Read and plot the traces\n", + "\n", + "`nwb_reader` returns the raw trace dictionaries that are passed into BluePyEfe recording objects." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%matplotlib inline\n", + "traces = nwb_reader({\"filepath\": str(bbp_nwb_path), \"protocol_name\": \"Step\"})\n", + "trace = traces[0]\n", + "\n", + "t_ms = np.arange(trace[\"voltage\"].size) * trace[\"dt\"] * 1000.0\n", + "\n", + "fig, axes = plt.subplots(nrows=2, ncols=1, figsize=(7, 5), sharex=True)\n", + "axes[0].plot(t_ms, trace[\"current\"], lw=0.8)\n", + "axes[0].set_ylabel(f\"Current ({trace['i_unit']})\")\n", + "axes[1].plot(t_ms, trace[\"voltage\"], lw=0.8)\n", + "axes[1].set_xlabel(\"Time (ms)\")\n", + "axes[1].set_ylabel(f\"Voltage ({trace['v_unit']})\")\n", + "\n", + "fig.tight_layout()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Extract efeatures from NWB recordings\n", + "\n", + "A `.nwb` file can be passed to `Cell.read_recordings` with the same `filepath` and `protocol_name` metadata used by `nwb_reader`." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "efel_settings = {\n", + " \"strict_stiminterval\": True,\n", + " \"Threshold\": -20.0,\n", + " \"interp_step\": 0.025,\n", + "}\n", + "\n", + "cell = Cell(name=bbp_nwb_path)\n", + "cell.read_recordings(\n", + " protocol_data=[{\"filepath\": str(bbp_nwb_path), \"protocol_name\": \"Step\"}],\n", + " protocol_name=\"Step\",\n", + " efel_settings=efel_settings,\n", + ")\n", + "\n", + "cell.extract_efeatures(\n", + " protocol_name=\"Step\",\n", + " efeatures=[\"voltage_base\", \"steady_state_voltage_stimend\"],\n", + " efel_settings=efel_settings,\n", + ")\n", + "\n", + "first_recording = cell.recordings[0]\n", + "feature_preview = {\n", + " \"recording_count\": len(cell.recordings),\n", + " \"first_recording\": first_recording.name,\n", + " \"ton_ms\": float(first_recording.ton),\n", + " \"toff_ms\": float(first_recording.toff),\n", + " \"amp_nA\": float(first_recording.amp),\n", + " \"efeatures\": {\n", + " name: float(value)\n", + " for name, value in first_recording.efeatures.items()\n", + " },\n", + "}\n", + "\n", + "pprint(feature_preview)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.10" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/tests/ecode/test_vu_ecodes.py b/tests/ecode/test_vu_ecodes.py new file mode 100644 index 0000000..922e70a --- /dev/null +++ b/tests/ecode/test_vu_ecodes.py @@ -0,0 +1,83 @@ +"""VU eCode tests.""" +import numpy + +from bluepyefe.cell import Cell +from bluepyefe.ecode import eCodes +from bluepyefe.ecode.capCheck import VUCapCheck +from bluepyefe.ecode.pinkNoise import VUPinkNoise + + +def _reader_data(current): + return { + "voltage": numpy.full(len(current), -65.0), + "current": current, + "dt": 0.0001, + "i_unit": "nA", + "v_unit": "mV", + "t_unit": "s", + } + + +def _read_recording(protocol_name, current, config=None): + config_data = { + "filepath": f"{protocol_name}.nwb", + "i_unit": "nA", + "v_unit": "mV", + "t_unit": "s", + } + if config: + config_data.update(config) + + cell = Cell("VU") + cell.read_recordings( + protocol_data=[config_data], + protocol_name=protocol_name, + recording_reader=lambda _: [_reader_data(current)], + ) + return cell.recordings[0] + + +def test_vu_ecodes_are_registered(): + assert eCodes["vupinknoise"] is VUPinkNoise + assert eCodes["vucapcheck"] is VUCapCheck + + +def test_vu_pink_noise_recording_can_be_read_and_generated(): + current = numpy.zeros(600) + current[80:140] = 0.03 + 0.008 * numpy.sin(numpy.linspace(0.0, 20.0, 60)) + current[240:320] = 0.06 + 0.012 * numpy.sin(numpy.linspace(0.0, 30.0, 80)) + current[420:520] = 0.09 + 0.016 * numpy.sin(numpy.linspace(0.0, 40.0, 100)) + + recording = _read_recording( + "VUPinkNoise", + current, + config={"ton": 8.0, "toff": 52.0}, + ) + generated_t, generated_current = recording.generate() + + assert isinstance(recording, VUPinkNoise) + numpy.testing.assert_allclose(recording.ton, 8.0) + numpy.testing.assert_allclose(recording.toff, 52.0) + assert recording.amp > 0.0 + assert len(generated_t) == len(generated_current) + numpy.testing.assert_allclose(generated_current[80:520], current[80:520]) + + +def test_vu_cap_check_recording_can_be_read_and_generated(): + current = numpy.zeros(500) + cycle = numpy.linspace(-0.05, 0.05, 50, endpoint=False) + current[50:450] = numpy.tile(cycle, 8) + + recording = _read_recording( + "VUCapCheck", + current, + config={"ton": 5.0, "toff": 45.0}, + ) + generated_t, generated_current = recording.generate() + + assert isinstance(recording, VUCapCheck) + numpy.testing.assert_allclose(recording.ton, 5.0) + numpy.testing.assert_allclose(recording.toff, 45.0) + assert recording.amp > 0.0 + assert len(generated_t) == len(generated_current) + numpy.testing.assert_allclose(generated_current[50:450], current[50:450]) diff --git a/tests/test_nwbreader.py b/tests/test_nwbreader.py index fac3df0..8ca7222 100644 --- a/tests/test_nwbreader.py +++ b/tests/test_nwbreader.py @@ -13,7 +13,13 @@ nwb_reader, ) from bluepyefe.nwbreader import ( - NWBReader, AIBSNWBReader, ScalaNWBReader, BBPNWBReader, TRTNWBReader, VUNWBReader + PROTOCOL_VU_TO_BBP, + NWBReader, + AIBSNWBReader, + ScalaNWBReader, + BBPNWBReader, + TRTNWBReader, + VUNWBReader, ) @@ -175,7 +181,12 @@ def test_nwb_inspection_detects_aibs_layout(dummy_content): assert _get_nwb_protocols(dummy_content, reader_class) == ["Step"] def make_vu_content_for_step( - bias_pA=0.0, with_nans=False, stimulus_description_in_attrs=True + bias_pA=0.0, + bias_current_as_array=False, + with_nans=False, + stimulus_description_in_attrs=True, + stimulus_description="CCSteps_DA_0", + current_values=None, ): # Voltage/data voltage_ds = DummyDS( @@ -183,17 +194,24 @@ def make_vu_content_for_step( {"conversion": 1.0, "unit": "mV", "rate": 10000}, ) # Current/data with optional NaNs at tail - current_vals = np.array([0.1, 0.2, 0.3, 0.4], dtype=float) - if with_nans: + if current_values is None: + current_values = [0.1, 0.2, 0.3, 0.4] + current_vals = np.array(current_values, dtype=float) + if with_nans and current_vals.size: current_vals[-1] = np.nan current_ds = DummyDS(current_vals, {"conversion": 1.0, "unit": "pA"}) start_time_ds = DummyDS([0.0], {"rate": 10000, "unit": "s"}) sweep_children = {"data": current_ds} sweep_attrs = {} if stimulus_description_in_attrs: - sweep_attrs["stimulus_description"] = "CCSteps_DA_0" + sweep_attrs["stimulus_description"] = stimulus_description else: - sweep_children["stimulus_description"] = DummyDS([b"CCSteps_DA_0"], {}) + sweep_children["stimulus_description"] = DummyDS( + [stimulus_description.encode("UTF-8")], {} + ) + bias_current = bias_pA * 1e-12 + if bias_current_as_array: + bias_current = [bias_current] # Group layout content = { @@ -208,7 +226,7 @@ def make_vu_content_for_step( { "data": voltage_ds, "starting_time": start_time_ds, - "bias_current": DummyDS(bias_pA * 1e-12, {}), + "bias_current": DummyDS(bias_current, {}), }, attrs={}, ), @@ -217,6 +235,26 @@ def make_vu_content_for_step( } return content + +def test_vu_protocol_mapping_covers_supported_protocols(): + assert PROTOCOL_VU_TO_BBP == { + "X1PS_SubThresh_DA_0": "IV", + "X2LP_Search_DA_0": "IDThresh", + "X3LP_Rheo_DA_0": "IDRest", + "X4PS_SupraThresh_DA_0": "IDRest", + "X4PT_C2NSD1SHORT_DA_0": "VUPinkNoise", + "X4PU_C2NSD2SHORT_DA_0": "VUPinkNoise", + "X5SP_Search_DA_0": "IDThresh", + "X6SP_Rheo_DA_0": "IDRest", + "X6SQ_C2SSTRIPLE_DA_0": "SpikeRec", + "X7Ramp_DA_0": "Ramp", + "X8_CHIRP_DA_0": "SineSpec", + "X9_C1QCAPCHK_DA_0": "VUCapCheck", + "X9_C1SQCAPCHK_DA_0": "VUCapCheck", + "CCSteps_DA_0": "Step", + "steps_DA_0": "Step", + } + def test_vunwbreader_protocol_filter_excludes_non_matching(): # stimulus_description maps to "Step"; ask for IV -> excluded content = make_vu_content_for_step() @@ -234,6 +272,30 @@ def test_vunwbreader_accepts_protocol_list(): assert len(reader.read()) == 1 +def test_vunwbreader_matches_translated_protocol_case_insensitively(): + content = make_vu_content_for_step(stimulus_description="X2LP_Search_DA_0") + in_data = {"protocol_name": "IDthresh"} + reader = VUNWBReader(content, target_protocols=["IDthresh"], in_data=in_data) + + assert len(reader.read()) == 1 + + +def test_vunwbreader_skips_empty_data_trace(): + content = make_vu_content_for_step(current_values=[]) + in_data = {"protocol_name": "Step"} + reader = VUNWBReader(content, target_protocols=["Step"], in_data=in_data) + + assert reader.read() == [] + + +def test_vunwbreader_accepts_array_bias_current(): + content = make_vu_content_for_step(bias_pA=1.0, bias_current_as_array=True) + in_data = {"protocol_name": "Step"} + reader = VUNWBReader(content, target_protocols=["Step"], in_data=in_data) + + assert len(reader.read()) == 1 + + def test_nwb_inspection_detects_vu_layout(): content = make_vu_content_for_step() reader_class = _get_nwb_reader_class(content)