Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 129 additions & 0 deletions software/test_two_button.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"""Tests for two_button.py"""

import tempfile
import time
import unittest

from gpiozero import Device
Expand Down Expand Up @@ -42,6 +43,24 @@
deauth_command = rm -f enabled
"""

SAMPLE_CONFIG_DELAYED = b"""
[pins]
on_button=Button:11:38
off_button=Button:16:37
enable_output=Relay:ActiveHigh:29, Relay:ActiveHigh:31
output_off_delay_seconds = [ Relay:ActiveHigh:29 = 0, Relay:ActiveHigh:31 = 1 ]
badge_reader=HIDKeystrokingReader:badge_scanner
buzzer=Buzzer:35
[auth]
duration=20s
warning=10s
extend=20s

command = touch enabled
extend_command = touch enabled
deauth_command = rm -f enabled
"""


# This is the fastest way to ensure that basic logic is right, but it does not
# test the use of BaseDispatcher.event_queue or the way callbacks happen on the
Expand Down Expand Up @@ -89,3 +108,113 @@ def test_auth_flow(self):
self.dispatcher.abort(None)
self.assertFalse(self.dispatcher.authorized)
self.assertFalse(self.is_relay_on())


class DelayedOffTest(unittest.TestCase):
def setUp(self):
Device.pin_factory = MockFactory()

try:
from authbox import fake_evdev_device_for_testing
except ModuleNotFoundError:
self.fail("Test requires evdev, but evdev is not available")
authbox.badgereader_hid_keystroking.evdev.list_devices = (
fake_evdev_device_for_testing.list_devices
)
authbox.badgereader_hid_keystroking.evdev.InputDevice = (
fake_evdev_device_for_testing.InputDevice
)

with tempfile.NamedTemporaryFile() as f:
f.write(SAMPLE_CONFIG_DELAYED)
f.flush()
config = authbox.config.Config(f.name)

self.dispatcher = two_button.Dispatcher(config)
for t in self.dispatcher.threads:
if t.__class__.__name__ == "Timer":
t.start()

def _process_events(self):
while not self.dispatcher.event_queue.empty():
item = self.dispatcher.event_queue.get_nowait()
if item is authbox.api.SHUTDOWN_SENTINEL:
break
func, args = item[0], item[1:]
func(*args)

def is_relay_on(self, index_or_name_or_obj):
if isinstance(index_or_name_or_obj, int):
obj = self.dispatcher.outputs[index_or_name_or_obj][0]
elif isinstance(index_or_name_or_obj, str):
obj = getattr(self.dispatcher, index_or_name_or_obj)
else:
obj = index_or_name_or_obj

if hasattr(obj, "gpio_relay"):
return obj.gpio_relay.value
elif hasattr(obj, "objs"):
return [r.gpio_relay.value for r in obj.objs]
else:
# It might be a mock object or something else
return obj.is_on if hasattr(obj, "is_on") else False # Fallback

def test_delayed_off(self):
# Out of the box, relay should be off
self.assertFalse(self.dispatcher.authorized)

self.assertFalse(self.is_relay_on(0))
self.assertFalse(self.is_relay_on(1))

# Badge scan sets authorized flag
self.dispatcher.badge_scan("1234")
self.assertTrue(self.dispatcher.authorized)

# "On" button pressed
self.dispatcher.on_button_down(None)
self.assertTrue(self.dispatcher.authorized)
self.assertTrue(self.is_relay_on(0))
self.assertTrue(self.is_relay_on(1))

# "Off" button pressed
self.dispatcher.abort(None)
# The dispatcher state should be not authorized
self.assertFalse(self.dispatcher.authorized)
# Main output (0) should be off immediately
self.assertFalse(self.is_relay_on(0))
# Delayed output (1) should be ON STILL
self.assertTrue(self.is_relay_on(1))

# Wait for delay (1s) + buffer
time.sleep(1.5)
self._process_events()

# Delayed output should be OFF
self.assertFalse(self.is_relay_on(1))

def test_cancel_delayed_off(self):
# Badge scan and turn on
self.dispatcher.badge_scan("1234")
self.dispatcher.on_button_down(None)
self.assertTrue(self.is_relay_on(0))
self.assertTrue(self.is_relay_on(1))

# Abort
self.dispatcher.abort(None)
self.assertFalse(self.is_relay_on(0))
self.assertTrue(self.is_relay_on(1))

# Wait 0.5s, then turn back on
time.sleep(0.5)
self.dispatcher.on_button_down(None) # Resume!
self.assertTrue(self.dispatcher.authorized)
self.assertTrue(self.is_relay_on(0))
self.assertTrue(self.is_relay_on(1))

# Wait for the original delay (1s) to show it was cancelled
time.sleep(1.5)
self._process_events()

# Should STILL be on!
self.assertTrue(self.is_relay_on(0))
self.assertTrue(self.is_relay_on(1))
5 changes: 4 additions & 1 deletion software/two_button.ini
Comment thread
brianbeck-google marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,11 @@ off_button = Button:16:37
# Simple beeps on J4
buzzer = Buzzer:35
badge_reader = HIDKeystrokingReader:HID OMNIKEY 5427 CK
# For Authboard v0.4 29=J13 (small relay for interlock), 31=J12 (small relay for bofa)
# For Authboard v0.4 29=J13 (small relay for interlock), 31=J12 (small relay for BOFA)
enable_output = Relay:ActiveHigh:29, Relay:ActiveHigh:31
# During the specified duration, the outputs will remain on after abort
# Uncomment to leave the BOFA filter on for a duration after the laser is deauthorized.
# output_off_delay_seconds = [ Relay:ActiveHigh:29 = 0, Relay:ActiveHigh:31 = 180 ]

[auth]
# EDIT ME!
Expand Down
135 changes: 120 additions & 15 deletions software/two_button.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import subprocess
import sys

from authbox.api import BaseDispatcher
from authbox.api import BaseDispatcher, MultiProxy, split_escaped
from authbox.config import Config
from authbox.timer import Timer

Expand All @@ -41,17 +41,54 @@ def __init__(self, config):
self.load_config_object("badge_reader", on_scan=self.badge_scan)
self.load_config_object("enable_output")
self.load_config_object("buzzer")

# Custom loading for enable_output to support delay
enable_outputs_config = list(
split_escaped(self.config.get("pins", "enable_output"), preserve=True)
)

self.delay_map = self._load_delay_map()

self.outputs = []
if isinstance(self.enable_output, MultiProxy):
objs = self.enable_output.objs
else:
objs = [self.enable_output]

for i, obj in enumerate(objs):
if i < len(enable_outputs_config):
pin_str = enable_outputs_config[i].strip()
delay = self.delay_map.get(pin_str, 0)
self.outputs.append((obj, delay))
else:
self.outputs.append((obj, 0))

self.warning_timer = Timer(self.event_queue, "warning_timer", self.warning)
self.expire_timer = Timer(self.event_queue, "expire_timer", self.abort)
self.expecting_press_timer = Timer(
self.event_queue, "expecting_press_timer", self.abort
)
# Otherwise, start them manually!

self.timers = {}
self.threads.extend(
[self.warning_timer, self.expire_timer, self.expecting_press_timer]
)

for obj, delay in self.outputs:
if delay > 0:
timer_name = f"off_timer_{id(obj)}"
# Lambda captures obj correctly if we use a default arg.
timer = Timer(
self.event_queue,
timer_name,
lambda source, o=obj: self.delayed_off_generic(o),
)
self.timers[id(obj)] = (timer, delay)
self.threads.append(timer)

self.noise = None
self.delayed_off_running = False
self.running_timers_count = 0

def _get_command_line(self, section, key, format_args):
"""Constructs a command line, safely.
Expand All @@ -66,6 +103,43 @@ def _get_command_line(self, section, key, format_args):
pieces = shlex.split(value)
return [p.format(*format_args) for p in pieces]

def _load_delay_map(self):
delay_map = {}
try:
delays_str = self.config.get("pins", "output_off_delay_seconds")
delay_map = self._parse_delay_str(delays_str)
except Exception:
pass

if not delay_map:
try:
delays_str = self.config.get("auth", "output_off_delay_seconds")
delay_map = self._parse_delay_str(delays_str)
except Exception:
pass

return delay_map

def _parse_delay_str(self, delays_str):
delay_map = {}
if not delays_str:
return delay_map
delays_str = delays_str.replace("[", "").replace("]", "").strip()
pairs = [p.strip() for p in delays_str.split(",")]
for pair in pairs:
if "=" in pair:
k, v = pair.split("=")
k = k.strip()
v = v.strip()
try:
# Use Config.parse_time to support suffixes
from authbox.config import Config

delay_map[k] = Config.parse_time(v)
except Exception as e:
print("Error parsing delay for", k, v, e)
return delay_map

def badge_scan(self, badge_id):
# Malicious badge "numbers" that contain spaces require this extra work.
command = self._get_command_line("auth", "command", [badge_id])
Expand Down Expand Up @@ -94,21 +168,30 @@ def badge_scan(self, badge_id):
def on_button_down(self, source):
print("Button down", source)
if not self.authorized:
self.off_button.blink(1)
self.buzzer.beep()
if self.noise:
self.noise.kill()
if self.config.get("sounds", "enable") == "1":
sound_command = self._get_command_line(
"sounds", "command", [self.config.get("sounds", "sad_filename")]
)
self.noise = subprocess.Popen(
sound_command, stdin=DEVNULL, stdout=DEVNULL, stderr=DEVNULL
)
return
if self.delayed_off_running:
self.authorized = True
self.delayed_off_running = False
self.running_timers_count = 0
for timer, _ in self.timers.values():
timer.cancel()
else:
self.off_button.blink(1)
self.buzzer.beep()
if self.noise:
self.noise.kill()
if self.config.get("sounds", "enable") == "1":
sound_command = self._get_command_line(
"sounds", "command", [self.config.get("sounds", "sad_filename")]
)
self.noise = subprocess.Popen(
sound_command, stdin=DEVNULL, stdout=DEVNULL, stderr=DEVNULL
)
return
self.expecting_press_timer.cancel()
self.on_button.on()
self.enable_output.on()
for timer, _ in self.timers.values():
timer.cancel()
self.buzzer.off()
self.warning_timer.cancel()
self.expire_timer.cancel()
Expand All @@ -126,7 +209,21 @@ def on_button_down(self, source):

def abort(self, source):
print("Abort", source)
self.enable_output.off()
delayed_count = 0
for obj, delay in self.outputs:
if delay == 0:
obj.off()
else:
timer, _ = self.timers.get(id(obj), (None, None))
if timer:
timer.cancel()
timer.set(delay)
delayed_count += 1

if delayed_count > 0:
self.delayed_off_running = True
self.running_timers_count = delayed_count

if self.authorized:
command = self._get_command_line("auth", "deauth_command", [self.badge_id])
subprocess.call(command)
Expand All @@ -142,6 +239,14 @@ def abort(self, source):
self.noise.kill()
self.noise = None

def delayed_off_generic(self, obj):
print("Delayed off generic", obj)
obj.off()
self.running_timers_count -= 1
if self.running_timers_count <= 0:
self.delayed_off_running = False
self.running_timers_count = 0

def warning(self, unused_source):
self.buzzer.beepbeep()
if self.config.get("sounds", "enable") == "1":
Expand Down
Loading