diff --git a/software/test_two_button.py b/software/test_two_button.py index ff130c5..0ebb4f9 100755 --- a/software/test_two_button.py +++ b/software/test_two_button.py @@ -15,6 +15,7 @@ """Tests for two_button.py""" import tempfile +import time import unittest from gpiozero import Device @@ -42,6 +43,24 @@ deauth_command = rm -f enabled """ +SAMPLE_CONFIG_DELAYED = b""" +[pins] +on_button=Button:11:38 +off_button=Button:16:37 +enable_output=Relay:ActiveHigh:29, Relay:ActiveHigh:31 +output_off_delay_seconds = [ Relay:ActiveHigh:29 = 0, Relay:ActiveHigh:31 = 1 ] +badge_reader=HIDKeystrokingReader:badge_scanner +buzzer=Buzzer:35 +[auth] +duration=20s +warning=10s +extend=20s + +command = touch enabled +extend_command = touch enabled +deauth_command = rm -f enabled +""" + # This is the fastest way to ensure that basic logic is right, but it does not # test the use of BaseDispatcher.event_queue or the way callbacks happen on the @@ -89,3 +108,113 @@ def test_auth_flow(self): self.dispatcher.abort(None) self.assertFalse(self.dispatcher.authorized) self.assertFalse(self.is_relay_on()) + + +class DelayedOffTest(unittest.TestCase): + def setUp(self): + Device.pin_factory = MockFactory() + + try: + from authbox import fake_evdev_device_for_testing + except ModuleNotFoundError: + self.fail("Test requires evdev, but evdev is not available") + authbox.badgereader_hid_keystroking.evdev.list_devices = ( + fake_evdev_device_for_testing.list_devices + ) + authbox.badgereader_hid_keystroking.evdev.InputDevice = ( + fake_evdev_device_for_testing.InputDevice + ) + + with tempfile.NamedTemporaryFile() as f: + f.write(SAMPLE_CONFIG_DELAYED) + f.flush() + config = authbox.config.Config(f.name) + + self.dispatcher = two_button.Dispatcher(config) + for t in self.dispatcher.threads: + if t.__class__.__name__ == "Timer": + t.start() + + def _process_events(self): + while not self.dispatcher.event_queue.empty(): + item = self.dispatcher.event_queue.get_nowait() + if item is authbox.api.SHUTDOWN_SENTINEL: + break + func, args = item[0], item[1:] + func(*args) + + def is_relay_on(self, index_or_name_or_obj): + if isinstance(index_or_name_or_obj, int): + obj = self.dispatcher.outputs[index_or_name_or_obj][0] + elif isinstance(index_or_name_or_obj, str): + obj = getattr(self.dispatcher, index_or_name_or_obj) + else: + obj = index_or_name_or_obj + + if hasattr(obj, "gpio_relay"): + return obj.gpio_relay.value + elif hasattr(obj, "objs"): + return [r.gpio_relay.value for r in obj.objs] + else: + # It might be a mock object or something else + return obj.is_on if hasattr(obj, "is_on") else False # Fallback + + def test_delayed_off(self): + # Out of the box, relay should be off + self.assertFalse(self.dispatcher.authorized) + + self.assertFalse(self.is_relay_on(0)) + self.assertFalse(self.is_relay_on(1)) + + # Badge scan sets authorized flag + self.dispatcher.badge_scan("1234") + self.assertTrue(self.dispatcher.authorized) + + # "On" button pressed + self.dispatcher.on_button_down(None) + self.assertTrue(self.dispatcher.authorized) + self.assertTrue(self.is_relay_on(0)) + self.assertTrue(self.is_relay_on(1)) + + # "Off" button pressed + self.dispatcher.abort(None) + # The dispatcher state should be not authorized + self.assertFalse(self.dispatcher.authorized) + # Main output (0) should be off immediately + self.assertFalse(self.is_relay_on(0)) + # Delayed output (1) should be ON STILL + self.assertTrue(self.is_relay_on(1)) + + # Wait for delay (1s) + buffer + time.sleep(1.5) + self._process_events() + + # Delayed output should be OFF + self.assertFalse(self.is_relay_on(1)) + + def test_cancel_delayed_off(self): + # Badge scan and turn on + self.dispatcher.badge_scan("1234") + self.dispatcher.on_button_down(None) + self.assertTrue(self.is_relay_on(0)) + self.assertTrue(self.is_relay_on(1)) + + # Abort + self.dispatcher.abort(None) + self.assertFalse(self.is_relay_on(0)) + self.assertTrue(self.is_relay_on(1)) + + # Wait 0.5s, then turn back on + time.sleep(0.5) + self.dispatcher.on_button_down(None) # Resume! + self.assertTrue(self.dispatcher.authorized) + self.assertTrue(self.is_relay_on(0)) + self.assertTrue(self.is_relay_on(1)) + + # Wait for the original delay (1s) to show it was cancelled + time.sleep(1.5) + self._process_events() + + # Should STILL be on! + self.assertTrue(self.is_relay_on(0)) + self.assertTrue(self.is_relay_on(1)) diff --git a/software/two_button.ini b/software/two_button.ini index 0edc4f1..65888bd 100644 --- a/software/two_button.ini +++ b/software/two_button.ini @@ -35,8 +35,11 @@ off_button = Button:16:37 # Simple beeps on J4 buzzer = Buzzer:35 badge_reader = HIDKeystrokingReader:HID OMNIKEY 5427 CK -# For Authboard v0.4 29=J13 (small relay for interlock), 31=J12 (small relay for bofa) +# For Authboard v0.4 29=J13 (small relay for interlock), 31=J12 (small relay for BOFA) enable_output = Relay:ActiveHigh:29, Relay:ActiveHigh:31 +# During the specified duration, the outputs will remain on after abort +# Uncomment to leave the BOFA filter on for a duration after the laser is deauthorized. +# output_off_delay_seconds = [ Relay:ActiveHigh:29 = 0, Relay:ActiveHigh:31 = 180 ] [auth] # EDIT ME! diff --git a/software/two_button.py b/software/two_button.py index 92b9b0b..3e8d285 100644 --- a/software/two_button.py +++ b/software/two_button.py @@ -24,7 +24,7 @@ import subprocess import sys -from authbox.api import BaseDispatcher +from authbox.api import BaseDispatcher, MultiProxy, split_escaped from authbox.config import Config from authbox.timer import Timer @@ -41,17 +41,54 @@ def __init__(self, config): self.load_config_object("badge_reader", on_scan=self.badge_scan) self.load_config_object("enable_output") self.load_config_object("buzzer") + + # Custom loading for enable_output to support delay + enable_outputs_config = list( + split_escaped(self.config.get("pins", "enable_output"), preserve=True) + ) + + self.delay_map = self._load_delay_map() + + self.outputs = [] + if isinstance(self.enable_output, MultiProxy): + objs = self.enable_output.objs + else: + objs = [self.enable_output] + + for i, obj in enumerate(objs): + if i < len(enable_outputs_config): + pin_str = enable_outputs_config[i].strip() + delay = self.delay_map.get(pin_str, 0) + self.outputs.append((obj, delay)) + else: + self.outputs.append((obj, 0)) + self.warning_timer = Timer(self.event_queue, "warning_timer", self.warning) self.expire_timer = Timer(self.event_queue, "expire_timer", self.abort) self.expecting_press_timer = Timer( self.event_queue, "expecting_press_timer", self.abort ) - # Otherwise, start them manually! + + self.timers = {} self.threads.extend( [self.warning_timer, self.expire_timer, self.expecting_press_timer] ) + for obj, delay in self.outputs: + if delay > 0: + timer_name = f"off_timer_{id(obj)}" + # Lambda captures obj correctly if we use a default arg. + timer = Timer( + self.event_queue, + timer_name, + lambda source, o=obj: self.delayed_off_generic(o), + ) + self.timers[id(obj)] = (timer, delay) + self.threads.append(timer) + self.noise = None + self.delayed_off_running = False + self.running_timers_count = 0 def _get_command_line(self, section, key, format_args): """Constructs a command line, safely. @@ -66,6 +103,43 @@ def _get_command_line(self, section, key, format_args): pieces = shlex.split(value) return [p.format(*format_args) for p in pieces] + def _load_delay_map(self): + delay_map = {} + try: + delays_str = self.config.get("pins", "output_off_delay_seconds") + delay_map = self._parse_delay_str(delays_str) + except Exception: + pass + + if not delay_map: + try: + delays_str = self.config.get("auth", "output_off_delay_seconds") + delay_map = self._parse_delay_str(delays_str) + except Exception: + pass + + return delay_map + + def _parse_delay_str(self, delays_str): + delay_map = {} + if not delays_str: + return delay_map + delays_str = delays_str.replace("[", "").replace("]", "").strip() + pairs = [p.strip() for p in delays_str.split(",")] + for pair in pairs: + if "=" in pair: + k, v = pair.split("=") + k = k.strip() + v = v.strip() + try: + # Use Config.parse_time to support suffixes + from authbox.config import Config + + delay_map[k] = Config.parse_time(v) + except Exception as e: + print("Error parsing delay for", k, v, e) + return delay_map + def badge_scan(self, badge_id): # Malicious badge "numbers" that contain spaces require this extra work. command = self._get_command_line("auth", "command", [badge_id]) @@ -94,21 +168,30 @@ def badge_scan(self, badge_id): def on_button_down(self, source): print("Button down", source) if not self.authorized: - self.off_button.blink(1) - self.buzzer.beep() - if self.noise: - self.noise.kill() - if self.config.get("sounds", "enable") == "1": - sound_command = self._get_command_line( - "sounds", "command", [self.config.get("sounds", "sad_filename")] - ) - self.noise = subprocess.Popen( - sound_command, stdin=DEVNULL, stdout=DEVNULL, stderr=DEVNULL - ) - return + if self.delayed_off_running: + self.authorized = True + self.delayed_off_running = False + self.running_timers_count = 0 + for timer, _ in self.timers.values(): + timer.cancel() + else: + self.off_button.blink(1) + self.buzzer.beep() + if self.noise: + self.noise.kill() + if self.config.get("sounds", "enable") == "1": + sound_command = self._get_command_line( + "sounds", "command", [self.config.get("sounds", "sad_filename")] + ) + self.noise = subprocess.Popen( + sound_command, stdin=DEVNULL, stdout=DEVNULL, stderr=DEVNULL + ) + return self.expecting_press_timer.cancel() self.on_button.on() self.enable_output.on() + for timer, _ in self.timers.values(): + timer.cancel() self.buzzer.off() self.warning_timer.cancel() self.expire_timer.cancel() @@ -126,7 +209,21 @@ def on_button_down(self, source): def abort(self, source): print("Abort", source) - self.enable_output.off() + delayed_count = 0 + for obj, delay in self.outputs: + if delay == 0: + obj.off() + else: + timer, _ = self.timers.get(id(obj), (None, None)) + if timer: + timer.cancel() + timer.set(delay) + delayed_count += 1 + + if delayed_count > 0: + self.delayed_off_running = True + self.running_timers_count = delayed_count + if self.authorized: command = self._get_command_line("auth", "deauth_command", [self.badge_id]) subprocess.call(command) @@ -142,6 +239,14 @@ def abort(self, source): self.noise.kill() self.noise = None + def delayed_off_generic(self, obj): + print("Delayed off generic", obj) + obj.off() + self.running_timers_count -= 1 + if self.running_timers_count <= 0: + self.delayed_off_running = False + self.running_timers_count = 0 + def warning(self, unused_source): self.buzzer.beepbeep() if self.config.get("sounds", "enable") == "1":