From 3897f1c6d0a4ac6e5dea9d4c735e28d5365fbfba Mon Sep 17 00:00:00 2001 From: ilkankilic Date: Fri, 5 Jun 2026 10:38:57 +0200 Subject: [PATCH 1/6] refactor SineSPec --- bluepyefe/ecode/sineSpec.py | 59 +++++++++++++++++++++++++++++++++++-- 1 file changed, 56 insertions(+), 3 deletions(-) diff --git a/bluepyefe/ecode/sineSpec.py b/bluepyefe/ecode/sineSpec.py index 2a122f7..6b17b81 100644 --- a/bluepyefe/ecode/sineSpec.py +++ b/bluepyefe/ecode/sineSpec.py @@ -27,6 +27,20 @@ logger = logging.getLogger(__name__) +DEFAULT_CHIRP_FREQUENCY_BASE = 1.0 +DEFAULT_CHIRP_FREQUENCY_SCALE = 1.0 +DEFAULT_CHIRP_FREQUENCY_DENOMINATOR = 5.15 +DEFAULT_CHIRP_TIME_OFFSET = 0.1 + + +def _get_metadata_float(name, config_data, reader_data, default): + """Return a numeric metadata value, preferring config over reader data.""" + if name in config_data and config_data[name] is not None: + return float(config_data[name]) + if name in reader_data and reader_data[name] is not None: + return float(reader_data[name]) + return default + class SineSpec(Recording): @@ -51,6 +65,30 @@ def __init__( self.amp_rel = None self.hypamp_rel = None + self.chirp_frequency_base = _get_metadata_float( + "chirp_frequency_base", + self.config_data, + self.reader_data, + DEFAULT_CHIRP_FREQUENCY_BASE, + ) + self.chirp_frequency_scale = _get_metadata_float( + "chirp_frequency_scale", + self.config_data, + self.reader_data, + DEFAULT_CHIRP_FREQUENCY_SCALE, + ) + self.chirp_frequency_denominator = _get_metadata_float( + "chirp_frequency_denominator", + self.config_data, + self.reader_data, + DEFAULT_CHIRP_FREQUENCY_DENOMINATOR, + ) + self.chirp_time_offset = _get_metadata_float( + "chirp_time_offset", + self.config_data, + self.reader_data, + DEFAULT_CHIRP_TIME_OFFSET, + ) if self.t is not None and self.current is not None: self.interpret( @@ -62,7 +100,11 @@ def __init__( self.compute_spikecount(efel_settings) self.export_attr = ["ton", "toff", "tend", "amp", "hypamp", "dt", - "amp_rel", "hypamp_rel"] + "amp_rel", "hypamp_rel", + "chirp_frequency_base", + "chirp_frequency_scale", + "chirp_frequency_denominator", + "chirp_time_offset"] def get_stimulus_parameters(self): """Returns the eCode parameters""" @@ -72,6 +114,10 @@ def get_stimulus_parameters(self): "thresh_perc": self.amp_rel, "duration": self.toff - self.ton, "totduration": self.tend, + "chirp_frequency_base": self.chirp_frequency_base, + "chirp_frequency_scale": self.chirp_frequency_scale, + "chirp_frequency_denominator": self.chirp_frequency_denominator, + "chirp_time_offset": self.chirp_time_offset, } return ecode_params @@ -121,11 +167,18 @@ def generate(self): t = numpy.arange(0.0, self.tend, self.dt) t_sine = numpy.arange(0.0, self.tend / 1e3, self.dt / 1e3) + t_shifted = t_sine - self.chirp_time_offset current = self.amp * numpy.sin( 2.0 * numpy.pi - * (1.0 + (1.0 / (5.15 - (t_sine - 0.1)))) - * (t_sine - 0.1) + * ( + self.chirp_frequency_base + + ( + self.chirp_frequency_scale + / (self.chirp_frequency_denominator - t_shifted) + ) + ) + * t_shifted ) current[:ton_idx] = 0.0 From 6c42b98c85653dd061d1251205c3e5617aa75a59 Mon Sep 17 00:00:00 2001 From: ilkankilic Date: Fri, 5 Jun 2026 10:41:48 +0200 Subject: [PATCH 2/6] Map additional VU NWB protocols to eCodes --- bluepyefe/nwbreader.py | 23 +++++++++++++++++---- tests/test_nwbreader.py | 44 +++++++++++++++++++++++++++++++++++++---- 2 files changed, 59 insertions(+), 8 deletions(-) diff --git a/bluepyefe/nwbreader.py b/bluepyefe/nwbreader.py index 6cd251c..464f21a 100644 --- a/bluepyefe/nwbreader.py +++ b/bluepyefe/nwbreader.py @@ -5,9 +5,16 @@ PROTOCOL_VU_TO_BBP = { "X1PS_SubThresh_DA_0": "IV", - "X2LP_Search_DA_0": "IDthresh", - "X4PS_SupraThresh_DA_0": "IDrest", - "CCSteps_DA_0": "Step" + "X2LP_Search_DA_0": "IDThresh", + "X3LP_Rheo_DA_0": "IDRest", + "X4PS_SupraThresh_DA_0": "IDRest", + "X5SP_Search_DA_0": "IDThresh", + "X6SP_Rheo_DA_0": "IDRest", + "X6SQ_C2SSTRIPLE_DA_0": "SpikeRec", + "X7Ramp_DA_0": "Ramp", + "X8_CHIRP_DA_0": "SineSpec", + "CCSteps_DA_0": "Step", + "steps_DA_0": "Step", } @@ -428,6 +435,11 @@ def read(self): data = [] target_protocols = self._get_target_protocols() + target_protocols_lower = ( + [protocol.lower() for protocol in target_protocols] + if target_protocols + else None + ) for sweep_name, current_sweep in list(self.content["stimulus"]["presentation"].items()): stimulus_description = None @@ -440,7 +452,10 @@ def read(self): continue translated_name = PROTOCOL_VU_TO_BBP[stimulus_description] - if translated_name not in target_protocols: + if ( + target_protocols_lower and + translated_name.lower() not in target_protocols_lower + ): continue voltage_sweep_name = sweep_name.replace("DA", "AD") diff --git a/tests/test_nwbreader.py b/tests/test_nwbreader.py index fac3df0..658b139 100644 --- a/tests/test_nwbreader.py +++ b/tests/test_nwbreader.py @@ -13,7 +13,13 @@ nwb_reader, ) from bluepyefe.nwbreader import ( - NWBReader, AIBSNWBReader, ScalaNWBReader, BBPNWBReader, TRTNWBReader, VUNWBReader + PROTOCOL_VU_TO_BBP, + NWBReader, + AIBSNWBReader, + ScalaNWBReader, + BBPNWBReader, + TRTNWBReader, + VUNWBReader, ) @@ -175,7 +181,10 @@ def test_nwb_inspection_detects_aibs_layout(dummy_content): assert _get_nwb_protocols(dummy_content, reader_class) == ["Step"] def make_vu_content_for_step( - bias_pA=0.0, with_nans=False, stimulus_description_in_attrs=True + bias_pA=0.0, + with_nans=False, + stimulus_description_in_attrs=True, + stimulus_description="CCSteps_DA_0", ): # Voltage/data voltage_ds = DummyDS( @@ -191,9 +200,11 @@ def make_vu_content_for_step( sweep_children = {"data": current_ds} sweep_attrs = {} if stimulus_description_in_attrs: - sweep_attrs["stimulus_description"] = "CCSteps_DA_0" + sweep_attrs["stimulus_description"] = stimulus_description else: - sweep_children["stimulus_description"] = DummyDS([b"CCSteps_DA_0"], {}) + sweep_children["stimulus_description"] = DummyDS( + [stimulus_description.encode("UTF-8")], {} + ) # Group layout content = { @@ -217,6 +228,23 @@ def make_vu_content_for_step( } return content + +def test_vu_protocol_mapping_covers_supported_protocols(): + assert PROTOCOL_VU_TO_BBP == { + "X1PS_SubThresh_DA_0": "IV", + "X2LP_Search_DA_0": "IDThresh", + "X3LP_Rheo_DA_0": "IDRest", + "X4PS_SupraThresh_DA_0": "IDRest", + "X5SP_Search_DA_0": "IDThresh", + "X6SP_Rheo_DA_0": "IDRest", + "X6SQ_C2SSTRIPLE_DA_0": "SpikeRec", + "X7Ramp_DA_0": "Ramp", + "X7_Ramp_DA_0": "Ramp", + "X8_CHIRP_DA_0": "SineSpec", + "CCSteps_DA_0": "Step", + "steps_DA_0": "Step", + } + def test_vunwbreader_protocol_filter_excludes_non_matching(): # stimulus_description maps to "Step"; ask for IV -> excluded content = make_vu_content_for_step() @@ -234,6 +262,14 @@ def test_vunwbreader_accepts_protocol_list(): assert len(reader.read()) == 1 +def test_vunwbreader_matches_translated_protocol_case_insensitively(): + content = make_vu_content_for_step(stimulus_description="X2LP_Search_DA_0") + in_data = {"protocol_name": "IDthresh"} + reader = VUNWBReader(content, target_protocols=["IDthresh"], in_data=in_data) + + assert len(reader.read()) == 1 + + def test_nwb_inspection_detects_vu_layout(): content = make_vu_content_for_step() reader_class = _get_nwb_reader_class(content) From 94f7a6524ab5e2df7aa46ef005b22c7c1a71f462 Mon Sep 17 00:00:00 2001 From: ilkankilic Date: Fri, 26 Jun 2026 18:35:06 +0200 Subject: [PATCH 3/6] Support additional VU NWB protocols Add VU pink-noise and cap-check eCodes, map their NWB stimulus descriptions, and skip empty VU traces before post-processing. --- bluepyefe/ecode/__init__.py | 4 + bluepyefe/ecode/capCheck.py | 164 ++++++++++++++++++++++++++++++++++ bluepyefe/ecode/pinkNoise.py | 149 ++++++++++++++++++++++++++++++ bluepyefe/nwbreader.py | 8 ++ tests/ecode/test_vu_ecodes.py | 75 ++++++++++++++++ tests/test_nwbreader.py | 20 ++++- 6 files changed, 417 insertions(+), 3 deletions(-) create mode 100644 bluepyefe/ecode/capCheck.py create mode 100644 bluepyefe/ecode/pinkNoise.py create mode 100644 tests/ecode/test_vu_ecodes.py diff --git a/bluepyefe/ecode/__init__.py b/bluepyefe/ecode/__init__.py index 4b6623b..7c5be5e 100644 --- a/bluepyefe/ecode/__init__.py +++ b/bluepyefe/ecode/__init__.py @@ -18,7 +18,9 @@ from . import DeHyperPol from . import HyperDePol from . import SpikeRec +from . import capCheck from . import negCheops +from . import pinkNoise from . import posCheops from . import ramp from . import sAHP @@ -63,4 +65,6 @@ "poscheops": posCheops.PosCheops, "spikerec": SpikeRec.SpikeRec, "sinespec": sineSpec.SineSpec, + "vupinknoise": pinkNoise.VUPinkNoise, + "vucapcheck": capCheck.VUCapCheck, } diff --git a/bluepyefe/ecode/capCheck.py b/bluepyefe/ecode/capCheck.py new file mode 100644 index 0000000..577de6f --- /dev/null +++ b/bluepyefe/ecode/capCheck.py @@ -0,0 +1,164 @@ +"""VUCapCheck eCode class""" + +""" +Copyright (c) 2022, EPFL/Blue Brain Project + + This file is part of BluePyEfe + + This library is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License version 3.0 as published + by the Free Software Foundation. + + This library is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more + details. + + You should have received a copy of the GNU Lesser General Public License + along with this library; if not, write to the Free Software Foundation, Inc., + 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +""" +import logging +import numpy + +from ..recording import Recording +from .tools import base_current +from .tools import scipy_signal2d + +logger = logging.getLogger(__name__) + + +def _group_indexes(indexes, gap): + groups = [] + for index in indexes: + if not groups or groups[-1][-1] + gap < index: + groups.append([index]) + else: + groups[-1].append(index) + return groups + + +class VUCapCheck(Recording): + + """VU alternating square-pulse capacitance-check stimulus.""" + + def __init__( + self, + config_data, + reader_data, + protocol_name="VUCapCheck", + efel_settings=None + ): + + super(VUCapCheck, self).__init__(config_data, reader_data, protocol_name) + + self.ton = None + self.toff = None + self.tend = None + self.tpulse = [] + self.pulse_duration = None + self.pulse_amps = [] + self.amp = None + self.hypamp = None + self.dt = None + + self.amp_rel = None + self.hypamp_rel = None + + if self.t is not None and self.current is not None: + self.interpret( + self.t, self.current, self.config_data, self.reader_data + ) + + if self.voltage is not None: + self.set_autothreshold() + self.compute_spikecount(efel_settings) + + self.export_attr = ["ton", "toff", "tend", "tpulse", + "pulse_duration", "pulse_amps", "amp", + "hypamp", "dt", "amp_rel", "hypamp_rel"] + + @property + def multi_stim_start(self): + return list(self.tpulse) + + @property + def multi_stim_end(self): + return [t + self.pulse_duration for t in self.tpulse] + + def get_stimulus_parameters(self): + """Returns the eCode parameters.""" + return { + "delay": self.tpulse[0] if self.tpulse else self.ton, + "n_pulses": len(self.tpulse), + "pulse_duration": self.pulse_duration, + "pulse_amps": self.pulse_amps, + "amp": self.amp, + "thresh_perc": self.amp_rel, + "totduration": self.tend, + } + + def _detect_pulses(self, smooth_current): + deviation = numpy.abs(numpy.asarray(smooth_current) - self.hypamp) + edge = min(max(1, int(round(10.0 / self.dt))), len(deviation)) + noise_level = numpy.std( + numpy.concatenate((deviation[:edge], deviation[-edge:])) + ) + threshold = max(4.5 * noise_level, 0.1 * numpy.max(deviation), 1e-5) + active = numpy.flatnonzero(deviation > threshold) + gap = max(1, int(round(0.5 / self.dt))) + return _group_indexes(active, gap) + + def interpret(self, t, current, config_data, reader_data): + """Detect cap-check pulses from the current trace.""" + self.dt = t[1] + + smooth_current = scipy_signal2d(current, 5) + + hypamp_value = base_current(current) + self.set_amplitudes_ecode("hypamp", config_data, reader_data, hypamp_value) + + pulse_groups = self._detect_pulses(smooth_current) + + if not pulse_groups: + logger.warning( + "The automatic cap-check pulse detection failed for the " + f"recording {self.protocol_name} in files {self.files}. " + "The whole trace will be treated as one pulse." + ) + pulse_groups = [list(range(len(current)))] + + pulse_starts = [group[0] for group in pulse_groups] + pulse_ends = [group[-1] + 1 for group in pulse_groups] + + self.tpulse = [t[start] for start in pulse_starts] + durations = [ + (end - start) * self.dt + for start, end in zip(pulse_starts, pulse_ends) + ] + self.pulse_duration = float(numpy.median(durations)) + self.pulse_amps = [ + float(numpy.median(current[start:end]) - self.hypamp) + for start, end in zip(pulse_starts, pulse_ends) + ] + + amp_value = max(numpy.abs(self.pulse_amps)) if self.pulse_amps else 0.0 + self.set_amplitudes_ecode("amp", config_data, reader_data, amp_value) + + self.ton = self.tpulse[0] + toff_idx = pulse_ends[-1] + self.toff = t[toff_idx] if toff_idx < len(t) else len(t) * self.dt + self.tend = len(t) * self.dt + + def generate(self): + """Generate the cap-check current array from detected pulses.""" + time = numpy.arange(0.0, self.tend, self.dt) + current = numpy.full(time.shape, numpy.float64(self.hypamp)) + + duration = int(round(self.pulse_duration / self.dt)) + for tpulse, amp in zip(self.tpulse, self.pulse_amps): + start = int(round(tpulse / self.dt)) + end = min(start + duration, len(current)) + current[start:end] += numpy.float64(amp) + + return time, current diff --git a/bluepyefe/ecode/pinkNoise.py b/bluepyefe/ecode/pinkNoise.py new file mode 100644 index 0000000..c35feba --- /dev/null +++ b/bluepyefe/ecode/pinkNoise.py @@ -0,0 +1,149 @@ +"""VUPinkNoise eCode class""" + +""" +Copyright (c) 2022, EPFL/Blue Brain Project + + This file is part of BluePyEfe + + This library is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License version 3.0 as published + by the Free Software Foundation. + + This library is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more + details. + + You should have received a copy of the GNU Lesser General Public License + along with this library; if not, write to the Free Software Foundation, Inc., + 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +""" +import logging +import numpy + +from ..recording import Recording +from .tools import base_current +from .tools import scipy_signal2d + +logger = logging.getLogger(__name__) + + +class VUPinkNoise(Recording): + + """VU pink-noise current stimulus.""" + + def __init__( + self, + config_data, + reader_data, + protocol_name="VUPinkNoise", + efel_settings=None + ): + + super(VUPinkNoise, self).__init__(config_data, reader_data, protocol_name) + + self.ton = None + self.toff = None + self.tend = None + self.amp = None + self.hypamp = None + self.dt = None + self.waveform = None + + self.amp_rel = None + self.hypamp_rel = None + + if self.t is not None and self.current is not None: + self.interpret( + self.t, self.current, self.config_data, self.reader_data + ) + + if self.voltage is not None: + self.set_autothreshold() + self.compute_spikecount(efel_settings) + + self.export_attr = ["ton", "toff", "tend", "amp", "hypamp", "dt", + "waveform", "amp_rel", "hypamp_rel"] + + def get_stimulus_parameters(self): + """Returns the eCode parameters.""" + return { + "delay": self.ton, + "amp": self.amp, + "thresh_perc": self.amp_rel, + "duration": self.toff - self.ton, + "totduration": self.tend, + "dt": self.dt, + "waveform": self.waveform, + } + + def _get_timing_index(self, name, config_data, reader_data): + if name in config_data and config_data[name] is not None: + return int(round(config_data[name] / self.dt)) + if name in reader_data and reader_data[name] is not None: + return int(round(reader_data[name])) + return None + + def _detect_stimulus_indexes(self, smooth_current): + deviation = numpy.abs(numpy.asarray(smooth_current) - self.hypamp) + edge = min(max(1, int(round(10.0 / self.dt))), len(deviation)) + noise_level = numpy.std( + numpy.concatenate((deviation[:edge], deviation[-edge:])) + ) + threshold = max(4.5 * noise_level, 0.02 * numpy.max(deviation), 1e-5) + active = numpy.flatnonzero(deviation > threshold) + + if len(active) == 0: + logger.warning( + "The automatic pink-noise detection failed for the recording " + f"{self.protocol_name} in files {self.files}. The whole trace " + "will be used as the stimulus waveform." + ) + return 0, len(deviation) + + return active[0], active[-1] + 1 + + def interpret(self, t, current, config_data, reader_data): + """Store the active pink-noise waveform from the recorded current.""" + self.dt = t[1] + + smooth_current = scipy_signal2d(current, 85) + + ton = self._get_timing_index("ton", config_data, reader_data) + toff = self._get_timing_index("toff", config_data, reader_data) + + hypamp_value = base_current(current, idx_ton=ton or 300) + self.set_amplitudes_ecode("hypamp", config_data, reader_data, hypamp_value) + + if ton is None or toff is None: + detected_ton, detected_toff = self._detect_stimulus_indexes(smooth_current) + ton = detected_ton if ton is None else ton + toff = detected_toff if toff is None else toff + + ton = max(0, min(ton, len(current) - 1)) + toff = max(ton + 1, min(toff, len(current))) + + stimulus = numpy.asarray(current[ton:toff]) - self.hypamp + amp_value = numpy.max(numpy.abs(stimulus)) if len(stimulus) else 0.0 + self.set_amplitudes_ecode("amp", config_data, reader_data, amp_value) + + if self.amp == 0.0: + self.waveform = numpy.zeros(stimulus.shape) + else: + self.waveform = stimulus / self.amp + + self.ton = t[ton] + self.toff = t[toff] if toff < len(t) else len(t) * self.dt + self.tend = len(t) * self.dt + + def generate(self): + """Generate the pink-noise current array from the stored waveform.""" + t = numpy.arange(0.0, self.tend, self.dt) + current = numpy.full(t.shape, numpy.float64(self.hypamp)) + + waveform = numpy.asarray(self.waveform) + ton = int(round(self.ton / self.dt)) + toff = min(ton + len(waveform), len(current)) + current[ton:toff] += numpy.float64(self.amp) * waveform[:toff - ton] + + return t, current diff --git a/bluepyefe/nwbreader.py b/bluepyefe/nwbreader.py index 464f21a..9f713c1 100644 --- a/bluepyefe/nwbreader.py +++ b/bluepyefe/nwbreader.py @@ -8,11 +8,15 @@ "X2LP_Search_DA_0": "IDThresh", "X3LP_Rheo_DA_0": "IDRest", "X4PS_SupraThresh_DA_0": "IDRest", + "X4PT_C2NSD1SHORT_DA_0": "VUPinkNoise", + "X4PU_C2NSD2SHORT_DA_0": "VUPinkNoise", "X5SP_Search_DA_0": "IDThresh", "X6SP_Rheo_DA_0": "IDRest", "X6SQ_C2SSTRIPLE_DA_0": "SpikeRec", "X7Ramp_DA_0": "Ramp", "X8_CHIRP_DA_0": "SineSpec", + "X9_C1QCAPCHK_DA_0": "VUCapCheck", + "X9_C1SQCAPCHK_DA_0": "VUCapCheck", "CCSteps_DA_0": "Step", "steps_DA_0": "Step", } @@ -470,6 +474,10 @@ def read(self): start_time=voltage_sweeps[voltage_sweep_name]["starting_time"], trace_name=sweep_name )) + if len(data[-1]["voltage"]) == 0 or len(data[-1]["current"]) == 0: + logger.info("Skipping %s because voltage or current data is empty.", sweep_name) + data.pop(-1) + continue # Shorten protocols that finish with NaNs first_nan = numpy.argmax(numpy.isnan(data[-1]["current"])) diff --git a/tests/ecode/test_vu_ecodes.py b/tests/ecode/test_vu_ecodes.py new file mode 100644 index 0000000..de3f18c --- /dev/null +++ b/tests/ecode/test_vu_ecodes.py @@ -0,0 +1,75 @@ +"""VU eCode tests.""" +import numpy + +from bluepyefe.cell import Cell +from bluepyefe.ecode import eCodes +from bluepyefe.ecode.capCheck import VUCapCheck +from bluepyefe.ecode.pinkNoise import VUPinkNoise + + +def _reader_data(current): + return { + "voltage": numpy.full(len(current), -65.0), + "current": current, + "dt": 0.0001, + "i_unit": "nA", + "v_unit": "mV", + "t_unit": "s", + } + + +def _read_recording(protocol_name, current, config=None): + config_data = { + "filepath": f"{protocol_name}.nwb", + "i_unit": "nA", + "v_unit": "mV", + "t_unit": "s", + } + if config: + config_data.update(config) + + cell = Cell("VU") + cell.read_recordings( + protocol_data=[config_data], + protocol_name=protocol_name, + recording_reader=lambda _: [_reader_data(current)], + ) + return cell.recordings[0] + + +def test_vu_ecodes_are_registered(): + assert eCodes["vupinknoise"] is VUPinkNoise + assert eCodes["vucapcheck"] is VUCapCheck + + +def test_vu_pink_noise_recording_can_be_read_and_generated(): + current = numpy.zeros(500) + current[100:400] = numpy.sin(numpy.linspace(0.0, 20.0, 300)) * 0.05 + + recording = _read_recording( + "VUPinkNoise", + current, + config={"ton": 10.0, "toff": 40.0}, + ) + generated_t, generated_current = recording.generate() + + assert isinstance(recording, VUPinkNoise) + assert recording.ton == 10.0 + assert recording.toff == 40.0 + assert recording.amp > 0.0 + assert len(generated_t) == len(generated_current) + + +def test_vu_cap_check_recording_can_be_read_and_generated(): + current = numpy.zeros(500) + current[100:120] = 0.05 + current[200:220] = -0.05 + current[300:320] = 0.05 + + recording = _read_recording("VUCapCheck", current) + generated_t, generated_current = recording.generate() + + assert isinstance(recording, VUCapCheck) + assert len(recording.tpulse) == 3 + assert recording.amp > 0.0 + assert len(generated_t) == len(generated_current) diff --git a/tests/test_nwbreader.py b/tests/test_nwbreader.py index 658b139..52f3384 100644 --- a/tests/test_nwbreader.py +++ b/tests/test_nwbreader.py @@ -185,6 +185,7 @@ def make_vu_content_for_step( with_nans=False, stimulus_description_in_attrs=True, stimulus_description="CCSteps_DA_0", + current_values=None, ): # Voltage/data voltage_ds = DummyDS( @@ -192,8 +193,10 @@ def make_vu_content_for_step( {"conversion": 1.0, "unit": "mV", "rate": 10000}, ) # Current/data with optional NaNs at tail - current_vals = np.array([0.1, 0.2, 0.3, 0.4], dtype=float) - if with_nans: + if current_values is None: + current_values = [0.1, 0.2, 0.3, 0.4] + current_vals = np.array(current_values, dtype=float) + if with_nans and current_vals.size: current_vals[-1] = np.nan current_ds = DummyDS(current_vals, {"conversion": 1.0, "unit": "pA"}) start_time_ds = DummyDS([0.0], {"rate": 10000, "unit": "s"}) @@ -235,12 +238,15 @@ def test_vu_protocol_mapping_covers_supported_protocols(): "X2LP_Search_DA_0": "IDThresh", "X3LP_Rheo_DA_0": "IDRest", "X4PS_SupraThresh_DA_0": "IDRest", + "X4PT_C2NSD1SHORT_DA_0": "VUPinkNoise", + "X4PU_C2NSD2SHORT_DA_0": "VUPinkNoise", "X5SP_Search_DA_0": "IDThresh", "X6SP_Rheo_DA_0": "IDRest", "X6SQ_C2SSTRIPLE_DA_0": "SpikeRec", "X7Ramp_DA_0": "Ramp", - "X7_Ramp_DA_0": "Ramp", "X8_CHIRP_DA_0": "SineSpec", + "X9_C1QCAPCHK_DA_0": "VUCapCheck", + "X9_C1SQCAPCHK_DA_0": "VUCapCheck", "CCSteps_DA_0": "Step", "steps_DA_0": "Step", } @@ -270,6 +276,14 @@ def test_vunwbreader_matches_translated_protocol_case_insensitively(): assert len(reader.read()) == 1 +def test_vunwbreader_skips_empty_data_trace(): + content = make_vu_content_for_step(current_values=[]) + in_data = {"protocol_name": "Step"} + reader = VUNWBReader(content, target_protocols=["Step"], in_data=in_data) + + assert reader.read() == [] + + def test_nwb_inspection_detects_vu_layout(): content = make_vu_content_for_step() reader_class = _get_nwb_reader_class(content) From af09fb4c8cdf2f8909b71d4bc11083b75603f6a0 Mon Sep 17 00:00:00 2001 From: ilkankilic Date: Fri, 26 Jun 2026 18:35:47 +0200 Subject: [PATCH 4/6] Add NWB extraction example notebook Document the NWB inspection and extraction workflow in a clean example notebook. --- README.rst | 3 +- examples/nwb_extraction.ipynb | 186 ++++++++++++++++++++++++++++++++++ 2 files changed, 188 insertions(+), 1 deletion(-) create mode 100644 examples/nwb_extraction.ipynb diff --git a/README.rst b/README.rst index 911c910..cef91f2 100644 --- a/README.rst +++ b/README.rst @@ -61,7 +61,8 @@ To install BluePyEfe, run: Quick Start and Operating Principle =================================== -For a hands-on introduction to BluePyEfe, have a look at the notebook `examples/example_of_extraction.ipynb `_ +For a hands-on introduction to BluePyEfe, have a look at the notebook `examples/example_of_extraction.ipynb `_. +For an NWB-focused example, see `examples/nwb_extraction.ipynb `_. The goal of the present package is to extract meaningful electrophysiological features (e-features) from voltage time series. The e-features considered in the present package are the one implemented in the `eFEL python library `_. See `this pdf `_ for a list of available e-features. diff --git a/examples/nwb_extraction.ipynb b/examples/nwb_extraction.ipynb new file mode 100644 index 0000000..302c495 --- /dev/null +++ b/examples/nwb_extraction.ipynb @@ -0,0 +1,186 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Extracting efeatures from NWB files with BluePyEfe\n", + "\n", + "This notebook shows the NWB-specific workflow: inspect an NWB file, read traces through the automatic NWB reader and extract a couple of efeatures." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "import pathlib\n", + "import tempfile\n", + "from pprint import pprint\n", + "\n", + "import h5py\n", + "import matplotlib\n", + "import numpy as np\n", + "\n", + "matplotlib.use(\"Agg\")\n", + "import matplotlib.pyplot as plt\n", + "\n", + "from bluepyefe.cell import Cell\n", + "from bluepyefe.reader import inspect_nwb, nwb_reader" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The examples can be run either from the repository root or from the `examples` directory." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "cwd = pathlib.Path.cwd()\n", + "repo_root = cwd.parent if cwd.name == \"examples\" else cwd\n", + "\n", + "bbp_nwb_path = repo_root / \"tests\" / \"exp_data\" / \"hippocampus-portal\" / \"99111002.nwb\"" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Inspect the NWB file\n", + "\n", + "`inspect_nwb` chooses the reader class from the file layout and reports the protocols that BluePyEfe can parse." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "info = inspect_nwb(bbp_nwb_path)\n", + "\n", + "summary = {\n", + " \"reader\": info[\"reader\"],\n", + " \"protocols\": info[\"protocols\"],\n", + " \"trace_count\": len(info[\"traces\"]),\n", + " \"metadata\": {\n", + " key: info[\"metadata\"].get(key)\n", + " for key in (\"identifier\", \"nwb_version\", \"session_description\")\n", + " },\n", + "}\n", + "\n", + "pprint(summary)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Read and plot the traces\n", + "\n", + "`nwb_reader` returns the raw trace dictionaries that are passed into BluePyEfe recording objects." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%matplotlib inline\n", + "traces = nwb_reader({\"filepath\": str(bbp_nwb_path), \"protocol_name\": \"Step\"})\n", + "trace = traces[0]\n", + "\n", + "t_ms = np.arange(trace[\"voltage\"].size) * trace[\"dt\"] * 1000.0\n", + "\n", + "fig, axes = plt.subplots(nrows=2, ncols=1, figsize=(7, 5), sharex=True)\n", + "axes[0].plot(t_ms, trace[\"current\"], lw=0.8)\n", + "axes[0].set_ylabel(f\"Current ({trace['i_unit']})\")\n", + "axes[1].plot(t_ms, trace[\"voltage\"], lw=0.8)\n", + "axes[1].set_xlabel(\"Time (ms)\")\n", + "axes[1].set_ylabel(f\"Voltage ({trace['v_unit']})\")\n", + "\n", + "fig.tight_layout()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Extract efeatures from NWB recordings\n", + "\n", + "A `.nwb` file can be passed to `Cell.read_recordings` with the same `filepath` and `protocol_name` metadata used by `nwb_reader`." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "efel_settings = {\n", + " \"strict_stiminterval\": True,\n", + " \"Threshold\": -20.0,\n", + " \"interp_step\": 0.025,\n", + "}\n", + "\n", + "cell = Cell(name=bbp_nwb_path)\n", + "cell.read_recordings(\n", + " protocol_data=[{\"filepath\": str(bbp_nwb_path), \"protocol_name\": \"Step\"}],\n", + " protocol_name=\"Step\",\n", + " efel_settings=efel_settings,\n", + ")\n", + "\n", + "cell.extract_efeatures(\n", + " protocol_name=\"Step\",\n", + " efeatures=[\"voltage_base\", \"steady_state_voltage_stimend\"],\n", + " efel_settings=efel_settings,\n", + ")\n", + "\n", + "first_recording = cell.recordings[0]\n", + "feature_preview = {\n", + " \"recording_count\": len(cell.recordings),\n", + " \"first_recording\": first_recording.name,\n", + " \"ton_ms\": float(first_recording.ton),\n", + " \"toff_ms\": float(first_recording.toff),\n", + " \"amp_nA\": float(first_recording.amp),\n", + " \"efeatures\": {\n", + " name: float(value)\n", + " for name, value in first_recording.efeatures.items()\n", + " },\n", + "}\n", + "\n", + "pprint(feature_preview)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.10" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} From fa8a67967ed3d97f70113a3a60679813e49cbf52 Mon Sep 17 00:00:00 2001 From: ilkankilic Date: Tue, 30 Jun 2026 15:38:00 +0200 Subject: [PATCH 5/6] Handle scalar-like VU bias current --- bluepyefe/nwbreader.py | 3 ++- tests/test_nwbreader.py | 14 +++++++++++++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/bluepyefe/nwbreader.py b/bluepyefe/nwbreader.py index 9f713c1..3feeee5 100644 --- a/bluepyefe/nwbreader.py +++ b/bluepyefe/nwbreader.py @@ -490,7 +490,8 @@ def read(self): data.pop(-1) else: # Offset the current with the holding current - holding_current = float(voltage_sweeps[voltage_sweep_name]["bias_current"][()]) * 1e-12 # in pA + bias_current = voltage_sweeps[voltage_sweep_name]["bias_current"][()] + holding_current = float(numpy.asarray(bias_current).reshape(-1)[0]) * 1e-12 # in pA data[-1]["current"] = numpy.asarray(data[-1]["current"]) + holding_current # For Step, IV and IDRest protocols, replace the first 90 ms with the value at 90 ms diff --git a/tests/test_nwbreader.py b/tests/test_nwbreader.py index 52f3384..8ca7222 100644 --- a/tests/test_nwbreader.py +++ b/tests/test_nwbreader.py @@ -182,6 +182,7 @@ def test_nwb_inspection_detects_aibs_layout(dummy_content): def make_vu_content_for_step( bias_pA=0.0, + bias_current_as_array=False, with_nans=False, stimulus_description_in_attrs=True, stimulus_description="CCSteps_DA_0", @@ -208,6 +209,9 @@ def make_vu_content_for_step( sweep_children["stimulus_description"] = DummyDS( [stimulus_description.encode("UTF-8")], {} ) + bias_current = bias_pA * 1e-12 + if bias_current_as_array: + bias_current = [bias_current] # Group layout content = { @@ -222,7 +226,7 @@ def make_vu_content_for_step( { "data": voltage_ds, "starting_time": start_time_ds, - "bias_current": DummyDS(bias_pA * 1e-12, {}), + "bias_current": DummyDS(bias_current, {}), }, attrs={}, ), @@ -284,6 +288,14 @@ def test_vunwbreader_skips_empty_data_trace(): assert reader.read() == [] +def test_vunwbreader_accepts_array_bias_current(): + content = make_vu_content_for_step(bias_pA=1.0, bias_current_as_array=True) + in_data = {"protocol_name": "Step"} + reader = VUNWBReader(content, target_protocols=["Step"], in_data=in_data) + + assert len(reader.read()) == 1 + + def test_nwb_inspection_detects_vu_layout(): content = make_vu_content_for_step() reader_class = _get_nwb_reader_class(content) From 9dd4bf3fdf948194321cdcdc902619d74c6ed3d7 Mon Sep 17 00:00:00 2001 From: ilkankilic Date: Tue, 30 Jun 2026 15:38:16 +0200 Subject: [PATCH 6/6] Refine VU eCode waveform extraction --- bluepyefe/ecode/capCheck.py | 138 +++++++++++++++------------------- bluepyefe/ecode/pinkNoise.py | 14 ++-- tests/ecode/test_vu_ecodes.py | 28 ++++--- 3 files changed, 87 insertions(+), 93 deletions(-) diff --git a/bluepyefe/ecode/capCheck.py b/bluepyefe/ecode/capCheck.py index 577de6f..5a5d0b2 100644 --- a/bluepyefe/ecode/capCheck.py +++ b/bluepyefe/ecode/capCheck.py @@ -1,7 +1,7 @@ """VUCapCheck eCode class""" """ -Copyright (c) 2022, EPFL/Blue Brain Project +Copyright 2026 Open Brain Institute This file is part of BluePyEfe @@ -23,24 +23,13 @@ from ..recording import Recording from .tools import base_current -from .tools import scipy_signal2d logger = logging.getLogger(__name__) -def _group_indexes(indexes, gap): - groups = [] - for index in indexes: - if not groups or groups[-1][-1] + gap < index: - groups.append([index]) - else: - groups[-1].append(index) - return groups - - class VUCapCheck(Recording): - """VU alternating square-pulse capacitance-check stimulus.""" + """VU capacitance-check current stimulus""" def __init__( self, @@ -55,12 +44,10 @@ def __init__( self.ton = None self.toff = None self.tend = None - self.tpulse = [] - self.pulse_duration = None - self.pulse_amps = [] self.amp = None self.hypamp = None self.dt = None + self.waveform = None self.amp_rel = None self.hypamp_rel = None @@ -74,91 +61,88 @@ def __init__( self.set_autothreshold() self.compute_spikecount(efel_settings) - self.export_attr = ["ton", "toff", "tend", "tpulse", - "pulse_duration", "pulse_amps", "amp", - "hypamp", "dt", "amp_rel", "hypamp_rel"] - - @property - def multi_stim_start(self): - return list(self.tpulse) - - @property - def multi_stim_end(self): - return [t + self.pulse_duration for t in self.tpulse] + self.export_attr = ["ton", "toff", "tend", "amp", "hypamp", "dt", + "waveform", "amp_rel", "hypamp_rel"] def get_stimulus_parameters(self): - """Returns the eCode parameters.""" - return { - "delay": self.tpulse[0] if self.tpulse else self.ton, - "n_pulses": len(self.tpulse), - "pulse_duration": self.pulse_duration, - "pulse_amps": self.pulse_amps, + """Returns the eCode parameters""" + ecode_params = { + "delay": self.ton, "amp": self.amp, "thresh_perc": self.amp_rel, + "duration": self.toff - self.ton, "totduration": self.tend, + "dt": self.dt, + "waveform": self.waveform, } + return ecode_params + + def _get_timing_index(self, name, config_data, reader_data): + if name in config_data and config_data[name] is not None: + return int(round(config_data[name] / self.dt)) + if name in reader_data and reader_data[name] is not None: + return int(round(reader_data[name])) + return None - def _detect_pulses(self, smooth_current): - deviation = numpy.abs(numpy.asarray(smooth_current) - self.hypamp) + def _detect_stimulus_indexes(self, current): + deviation = numpy.abs(numpy.asarray(current) - self.hypamp) edge = min(max(1, int(round(10.0 / self.dt))), len(deviation)) noise_level = numpy.std( numpy.concatenate((deviation[:edge], deviation[-edge:])) ) - threshold = max(4.5 * noise_level, 0.1 * numpy.max(deviation), 1e-5) + threshold = max(4.5 * noise_level, 0.02 * numpy.max(deviation), 1e-5) active = numpy.flatnonzero(deviation > threshold) - gap = max(1, int(round(0.5 / self.dt))) - return _group_indexes(active, gap) + + if len(active) == 0: + logger.warning( + "The automatic cap-check detection failed for the recording " + f"{self.protocol_name} in files {self.files}. The whole trace " + "will be used as the stimulus waveform." + ) + return 0, len(deviation) + + return active[0], active[-1] + 1 def interpret(self, t, current, config_data, reader_data): - """Detect cap-check pulses from the current trace.""" + """Analyse a current array and extract from it the parameters + needed to reconstruct the array""" self.dt = t[1] - smooth_current = scipy_signal2d(current, 5) + ton = self._get_timing_index("ton", config_data, reader_data) + toff = self._get_timing_index("toff", config_data, reader_data) - hypamp_value = base_current(current) + hypamp_value = base_current(current, idx_ton=ton or 300) self.set_amplitudes_ecode("hypamp", config_data, reader_data, hypamp_value) - pulse_groups = self._detect_pulses(smooth_current) + if ton is None or toff is None: + detected_ton, detected_toff = self._detect_stimulus_indexes(current) + ton = detected_ton if ton is None else ton + toff = detected_toff if toff is None else toff - if not pulse_groups: - logger.warning( - "The automatic cap-check pulse detection failed for the " - f"recording {self.protocol_name} in files {self.files}. " - "The whole trace will be treated as one pulse." - ) - pulse_groups = [list(range(len(current)))] - - pulse_starts = [group[0] for group in pulse_groups] - pulse_ends = [group[-1] + 1 for group in pulse_groups] - - self.tpulse = [t[start] for start in pulse_starts] - durations = [ - (end - start) * self.dt - for start, end in zip(pulse_starts, pulse_ends) - ] - self.pulse_duration = float(numpy.median(durations)) - self.pulse_amps = [ - float(numpy.median(current[start:end]) - self.hypamp) - for start, end in zip(pulse_starts, pulse_ends) - ] - - amp_value = max(numpy.abs(self.pulse_amps)) if self.pulse_amps else 0.0 + ton = max(0, min(ton, len(current) - 1)) + toff = max(ton + 1, min(toff, len(current))) + + stimulus = numpy.asarray(current[ton:toff]) - self.hypamp + amp_value = numpy.max(numpy.abs(stimulus)) if len(stimulus) else 0.0 self.set_amplitudes_ecode("amp", config_data, reader_data, amp_value) - self.ton = self.tpulse[0] - toff_idx = pulse_ends[-1] - self.toff = t[toff_idx] if toff_idx < len(t) else len(t) * self.dt + if self.amp == 0.0: + self.waveform = numpy.zeros(stimulus.shape) + else: + self.waveform = stimulus / self.amp + + self.ton = t[ton] + self.toff = t[toff] if toff < len(t) else len(t) * self.dt self.tend = len(t) * self.dt def generate(self): - """Generate the cap-check current array from detected pulses.""" - time = numpy.arange(0.0, self.tend, self.dt) - current = numpy.full(time.shape, numpy.float64(self.hypamp)) + """Generate the current array from the parameters of the ecode""" + t = numpy.arange(0.0, self.tend, self.dt) + current = numpy.full(t.shape, numpy.float64(self.hypamp)) - duration = int(round(self.pulse_duration / self.dt)) - for tpulse, amp in zip(self.tpulse, self.pulse_amps): - start = int(round(tpulse / self.dt)) - end = min(start + duration, len(current)) - current[start:end] += numpy.float64(amp) + waveform = numpy.asarray(self.waveform) + ton = int(round(self.ton / self.dt)) + toff = min(ton + len(waveform), len(current)) + current[ton:toff] += numpy.float64(self.amp) * waveform[:toff - ton] - return time, current + return t, current diff --git a/bluepyefe/ecode/pinkNoise.py b/bluepyefe/ecode/pinkNoise.py index c35feba..6070a6d 100644 --- a/bluepyefe/ecode/pinkNoise.py +++ b/bluepyefe/ecode/pinkNoise.py @@ -1,7 +1,7 @@ """VUPinkNoise eCode class""" """ -Copyright (c) 2022, EPFL/Blue Brain Project +Copyright 2026 Open Brain Institute This file is part of BluePyEfe @@ -30,7 +30,7 @@ class VUPinkNoise(Recording): - """VU pink-noise current stimulus.""" + """VU pink-noise current stimulus""" def __init__( self, @@ -66,8 +66,8 @@ def __init__( "waveform", "amp_rel", "hypamp_rel"] def get_stimulus_parameters(self): - """Returns the eCode parameters.""" - return { + """Returns the eCode parameters""" + ecode_params = { "delay": self.ton, "amp": self.amp, "thresh_perc": self.amp_rel, @@ -76,6 +76,7 @@ def get_stimulus_parameters(self): "dt": self.dt, "waveform": self.waveform, } + return ecode_params def _get_timing_index(self, name, config_data, reader_data): if name in config_data and config_data[name] is not None: @@ -104,7 +105,8 @@ def _detect_stimulus_indexes(self, smooth_current): return active[0], active[-1] + 1 def interpret(self, t, current, config_data, reader_data): - """Store the active pink-noise waveform from the recorded current.""" + """Analyse a current array and extract from it the parameters + needed to reconstruct the array""" self.dt = t[1] smooth_current = scipy_signal2d(current, 85) @@ -137,7 +139,7 @@ def interpret(self, t, current, config_data, reader_data): self.tend = len(t) * self.dt def generate(self): - """Generate the pink-noise current array from the stored waveform.""" + """Generate the current array from the parameters of the ecode""" t = numpy.arange(0.0, self.tend, self.dt) current = numpy.full(t.shape, numpy.float64(self.hypamp)) diff --git a/tests/ecode/test_vu_ecodes.py b/tests/ecode/test_vu_ecodes.py index de3f18c..922e70a 100644 --- a/tests/ecode/test_vu_ecodes.py +++ b/tests/ecode/test_vu_ecodes.py @@ -43,33 +43,41 @@ def test_vu_ecodes_are_registered(): def test_vu_pink_noise_recording_can_be_read_and_generated(): - current = numpy.zeros(500) - current[100:400] = numpy.sin(numpy.linspace(0.0, 20.0, 300)) * 0.05 + current = numpy.zeros(600) + current[80:140] = 0.03 + 0.008 * numpy.sin(numpy.linspace(0.0, 20.0, 60)) + current[240:320] = 0.06 + 0.012 * numpy.sin(numpy.linspace(0.0, 30.0, 80)) + current[420:520] = 0.09 + 0.016 * numpy.sin(numpy.linspace(0.0, 40.0, 100)) recording = _read_recording( "VUPinkNoise", current, - config={"ton": 10.0, "toff": 40.0}, + config={"ton": 8.0, "toff": 52.0}, ) generated_t, generated_current = recording.generate() assert isinstance(recording, VUPinkNoise) - assert recording.ton == 10.0 - assert recording.toff == 40.0 + numpy.testing.assert_allclose(recording.ton, 8.0) + numpy.testing.assert_allclose(recording.toff, 52.0) assert recording.amp > 0.0 assert len(generated_t) == len(generated_current) + numpy.testing.assert_allclose(generated_current[80:520], current[80:520]) def test_vu_cap_check_recording_can_be_read_and_generated(): current = numpy.zeros(500) - current[100:120] = 0.05 - current[200:220] = -0.05 - current[300:320] = 0.05 + cycle = numpy.linspace(-0.05, 0.05, 50, endpoint=False) + current[50:450] = numpy.tile(cycle, 8) - recording = _read_recording("VUCapCheck", current) + recording = _read_recording( + "VUCapCheck", + current, + config={"ton": 5.0, "toff": 45.0}, + ) generated_t, generated_current = recording.generate() assert isinstance(recording, VUCapCheck) - assert len(recording.tpulse) == 3 + numpy.testing.assert_allclose(recording.ton, 5.0) + numpy.testing.assert_allclose(recording.toff, 45.0) assert recording.amp > 0.0 assert len(generated_t) == len(generated_current) + numpy.testing.assert_allclose(generated_current[50:450], current[50:450])