From 6498952dc57ad12dc759c0eb71d1d7b944bb2b9e Mon Sep 17 00:00:00 2001 From: Brian Beck Date: Wed, 25 Mar 2026 15:47:02 -0700 Subject: [PATCH] Implement delayed-off support This allows each output pin to have a configured delay between the abort button being pressed and the pin being disabled. This supports scenarios such as keeping ventilation on longer after a tool has been deactivated. Tested: * Added unit tests for delayed off and reauth during delay timer. All tests pass * Physical hardware: confirmed that previous config file (with no changes, no delay section) continues working as before (all outputs immediately turn off) * Physical hardware: confirmed that outputs stay on during delay and turn off after delay (tested delays 10s, 180s) * Physical hardware: confirmed that deauth and then reauth during the delay timer period leaves the device continuously on with no interruption This commit was generated in part with Gemini 3.1 Pro. Prompt: ``` Write a plan to add the ability to control the two outputs separately, specifically keeping one output alive for a configurable duration after the "off" behavior is triggered. In two_button.ini the parameter `enable_output` currently toggles both `Relay:ActiveHigh:29` and `Relay:ActiveHigh:31`. The logic to read these parameters is in two_button.py It is critical that all changes be backwards compatible - that is, after the change, the old version of the config should trigger the same behavior as today (trigger both outputs out and on at the same time). There should be new syntax options that allow using this new behavior. The code should handle the case where someone triggers an `abort` and then triggers an `on_button_down` before the timer expires. In this case, the output should remain continuously on. * Add a new optional parameter for how long to keep Relay:ActiveHigh:31 after the service is disabled. This duration should be specified in seconds. * In the `abort` function in two_button.py, instead of immediately turning all options off, if a delay >0 is configured, a timer should be started. When the timer expires that output should be disabled ``` --- software/test_two_button.py | 129 ++++++++++++++++++++++++++++++++++ software/two_button.ini | 5 +- software/two_button.py | 135 ++++++++++++++++++++++++++++++++---- 3 files changed, 253 insertions(+), 16 deletions(-) diff --git a/software/test_two_button.py b/software/test_two_button.py index ff130c5..0ebb4f9 100755 --- a/software/test_two_button.py +++ b/software/test_two_button.py @@ -15,6 +15,7 @@ """Tests for two_button.py""" import tempfile +import time import unittest from gpiozero import Device @@ -42,6 +43,24 @@ deauth_command = rm -f enabled """ +SAMPLE_CONFIG_DELAYED = b""" +[pins] +on_button=Button:11:38 +off_button=Button:16:37 +enable_output=Relay:ActiveHigh:29, Relay:ActiveHigh:31 +output_off_delay_seconds = [ Relay:ActiveHigh:29 = 0, Relay:ActiveHigh:31 = 1 ] +badge_reader=HIDKeystrokingReader:badge_scanner +buzzer=Buzzer:35 +[auth] +duration=20s +warning=10s +extend=20s + +command = touch enabled +extend_command = touch enabled +deauth_command = rm -f enabled +""" + # This is the fastest way to ensure that basic logic is right, but it does not # test the use of BaseDispatcher.event_queue or the way callbacks happen on the @@ -89,3 +108,113 @@ def test_auth_flow(self): self.dispatcher.abort(None) self.assertFalse(self.dispatcher.authorized) self.assertFalse(self.is_relay_on()) + + +class DelayedOffTest(unittest.TestCase): + def setUp(self): + Device.pin_factory = MockFactory() + + try: + from authbox import fake_evdev_device_for_testing + except ModuleNotFoundError: + self.fail("Test requires evdev, but evdev is not available") + authbox.badgereader_hid_keystroking.evdev.list_devices = ( + fake_evdev_device_for_testing.list_devices + ) + authbox.badgereader_hid_keystroking.evdev.InputDevice = ( + fake_evdev_device_for_testing.InputDevice + ) + + with tempfile.NamedTemporaryFile() as f: + f.write(SAMPLE_CONFIG_DELAYED) + f.flush() + config = authbox.config.Config(f.name) + + self.dispatcher = two_button.Dispatcher(config) + for t in self.dispatcher.threads: + if t.__class__.__name__ == "Timer": + t.start() + + def _process_events(self): + while not self.dispatcher.event_queue.empty(): + item = self.dispatcher.event_queue.get_nowait() + if item is authbox.api.SHUTDOWN_SENTINEL: + break + func, args = item[0], item[1:] + func(*args) + + def is_relay_on(self, index_or_name_or_obj): + if isinstance(index_or_name_or_obj, int): + obj = self.dispatcher.outputs[index_or_name_or_obj][0] + elif isinstance(index_or_name_or_obj, str): + obj = getattr(self.dispatcher, index_or_name_or_obj) + else: + obj = index_or_name_or_obj + + if hasattr(obj, "gpio_relay"): + return obj.gpio_relay.value + elif hasattr(obj, "objs"): + return [r.gpio_relay.value for r in obj.objs] + else: + # It might be a mock object or something else + return obj.is_on if hasattr(obj, "is_on") else False # Fallback + + def test_delayed_off(self): + # Out of the box, relay should be off + self.assertFalse(self.dispatcher.authorized) + + self.assertFalse(self.is_relay_on(0)) + self.assertFalse(self.is_relay_on(1)) + + # Badge scan sets authorized flag + self.dispatcher.badge_scan("1234") + self.assertTrue(self.dispatcher.authorized) + + # "On" button pressed + self.dispatcher.on_button_down(None) + self.assertTrue(self.dispatcher.authorized) + self.assertTrue(self.is_relay_on(0)) + self.assertTrue(self.is_relay_on(1)) + + # "Off" button pressed + self.dispatcher.abort(None) + # The dispatcher state should be not authorized + self.assertFalse(self.dispatcher.authorized) + # Main output (0) should be off immediately + self.assertFalse(self.is_relay_on(0)) + # Delayed output (1) should be ON STILL + self.assertTrue(self.is_relay_on(1)) + + # Wait for delay (1s) + buffer + time.sleep(1.5) + self._process_events() + + # Delayed output should be OFF + self.assertFalse(self.is_relay_on(1)) + + def test_cancel_delayed_off(self): + # Badge scan and turn on + self.dispatcher.badge_scan("1234") + self.dispatcher.on_button_down(None) + self.assertTrue(self.is_relay_on(0)) + self.assertTrue(self.is_relay_on(1)) + + # Abort + self.dispatcher.abort(None) + self.assertFalse(self.is_relay_on(0)) + self.assertTrue(self.is_relay_on(1)) + + # Wait 0.5s, then turn back on + time.sleep(0.5) + self.dispatcher.on_button_down(None) # Resume! + self.assertTrue(self.dispatcher.authorized) + self.assertTrue(self.is_relay_on(0)) + self.assertTrue(self.is_relay_on(1)) + + # Wait for the original delay (1s) to show it was cancelled + time.sleep(1.5) + self._process_events() + + # Should STILL be on! + self.assertTrue(self.is_relay_on(0)) + self.assertTrue(self.is_relay_on(1)) diff --git a/software/two_button.ini b/software/two_button.ini index 0edc4f1..65888bd 100644 --- a/software/two_button.ini +++ b/software/two_button.ini @@ -35,8 +35,11 @@ off_button = Button:16:37 # Simple beeps on J4 buzzer = Buzzer:35 badge_reader = HIDKeystrokingReader:HID OMNIKEY 5427 CK -# For Authboard v0.4 29=J13 (small relay for interlock), 31=J12 (small relay for bofa) +# For Authboard v0.4 29=J13 (small relay for interlock), 31=J12 (small relay for BOFA) enable_output = Relay:ActiveHigh:29, Relay:ActiveHigh:31 +# During the specified duration, the outputs will remain on after abort +# Uncomment to leave the BOFA filter on for a duration after the laser is deauthorized. +# output_off_delay_seconds = [ Relay:ActiveHigh:29 = 0, Relay:ActiveHigh:31 = 180 ] [auth] # EDIT ME! diff --git a/software/two_button.py b/software/two_button.py index 92b9b0b..3e8d285 100644 --- a/software/two_button.py +++ b/software/two_button.py @@ -24,7 +24,7 @@ import subprocess import sys -from authbox.api import BaseDispatcher +from authbox.api import BaseDispatcher, MultiProxy, split_escaped from authbox.config import Config from authbox.timer import Timer @@ -41,17 +41,54 @@ def __init__(self, config): self.load_config_object("badge_reader", on_scan=self.badge_scan) self.load_config_object("enable_output") self.load_config_object("buzzer") + + # Custom loading for enable_output to support delay + enable_outputs_config = list( + split_escaped(self.config.get("pins", "enable_output"), preserve=True) + ) + + self.delay_map = self._load_delay_map() + + self.outputs = [] + if isinstance(self.enable_output, MultiProxy): + objs = self.enable_output.objs + else: + objs = [self.enable_output] + + for i, obj in enumerate(objs): + if i < len(enable_outputs_config): + pin_str = enable_outputs_config[i].strip() + delay = self.delay_map.get(pin_str, 0) + self.outputs.append((obj, delay)) + else: + self.outputs.append((obj, 0)) + self.warning_timer = Timer(self.event_queue, "warning_timer", self.warning) self.expire_timer = Timer(self.event_queue, "expire_timer", self.abort) self.expecting_press_timer = Timer( self.event_queue, "expecting_press_timer", self.abort ) - # Otherwise, start them manually! + + self.timers = {} self.threads.extend( [self.warning_timer, self.expire_timer, self.expecting_press_timer] ) + for obj, delay in self.outputs: + if delay > 0: + timer_name = f"off_timer_{id(obj)}" + # Lambda captures obj correctly if we use a default arg. + timer = Timer( + self.event_queue, + timer_name, + lambda source, o=obj: self.delayed_off_generic(o), + ) + self.timers[id(obj)] = (timer, delay) + self.threads.append(timer) + self.noise = None + self.delayed_off_running = False + self.running_timers_count = 0 def _get_command_line(self, section, key, format_args): """Constructs a command line, safely. @@ -66,6 +103,43 @@ def _get_command_line(self, section, key, format_args): pieces = shlex.split(value) return [p.format(*format_args) for p in pieces] + def _load_delay_map(self): + delay_map = {} + try: + delays_str = self.config.get("pins", "output_off_delay_seconds") + delay_map = self._parse_delay_str(delays_str) + except Exception: + pass + + if not delay_map: + try: + delays_str = self.config.get("auth", "output_off_delay_seconds") + delay_map = self._parse_delay_str(delays_str) + except Exception: + pass + + return delay_map + + def _parse_delay_str(self, delays_str): + delay_map = {} + if not delays_str: + return delay_map + delays_str = delays_str.replace("[", "").replace("]", "").strip() + pairs = [p.strip() for p in delays_str.split(",")] + for pair in pairs: + if "=" in pair: + k, v = pair.split("=") + k = k.strip() + v = v.strip() + try: + # Use Config.parse_time to support suffixes + from authbox.config import Config + + delay_map[k] = Config.parse_time(v) + except Exception as e: + print("Error parsing delay for", k, v, e) + return delay_map + def badge_scan(self, badge_id): # Malicious badge "numbers" that contain spaces require this extra work. command = self._get_command_line("auth", "command", [badge_id]) @@ -94,21 +168,30 @@ def badge_scan(self, badge_id): def on_button_down(self, source): print("Button down", source) if not self.authorized: - self.off_button.blink(1) - self.buzzer.beep() - if self.noise: - self.noise.kill() - if self.config.get("sounds", "enable") == "1": - sound_command = self._get_command_line( - "sounds", "command", [self.config.get("sounds", "sad_filename")] - ) - self.noise = subprocess.Popen( - sound_command, stdin=DEVNULL, stdout=DEVNULL, stderr=DEVNULL - ) - return + if self.delayed_off_running: + self.authorized = True + self.delayed_off_running = False + self.running_timers_count = 0 + for timer, _ in self.timers.values(): + timer.cancel() + else: + self.off_button.blink(1) + self.buzzer.beep() + if self.noise: + self.noise.kill() + if self.config.get("sounds", "enable") == "1": + sound_command = self._get_command_line( + "sounds", "command", [self.config.get("sounds", "sad_filename")] + ) + self.noise = subprocess.Popen( + sound_command, stdin=DEVNULL, stdout=DEVNULL, stderr=DEVNULL + ) + return self.expecting_press_timer.cancel() self.on_button.on() self.enable_output.on() + for timer, _ in self.timers.values(): + timer.cancel() self.buzzer.off() self.warning_timer.cancel() self.expire_timer.cancel() @@ -126,7 +209,21 @@ def on_button_down(self, source): def abort(self, source): print("Abort", source) - self.enable_output.off() + delayed_count = 0 + for obj, delay in self.outputs: + if delay == 0: + obj.off() + else: + timer, _ = self.timers.get(id(obj), (None, None)) + if timer: + timer.cancel() + timer.set(delay) + delayed_count += 1 + + if delayed_count > 0: + self.delayed_off_running = True + self.running_timers_count = delayed_count + if self.authorized: command = self._get_command_line("auth", "deauth_command", [self.badge_id]) subprocess.call(command) @@ -142,6 +239,14 @@ def abort(self, source): self.noise.kill() self.noise = None + def delayed_off_generic(self, obj): + print("Delayed off generic", obj) + obj.off() + self.running_timers_count -= 1 + if self.running_timers_count <= 0: + self.delayed_off_running = False + self.running_timers_count = 0 + def warning(self, unused_source): self.buzzer.beepbeep() if self.config.get("sounds", "enable") == "1":