diff --git a/messenger/_archive/messanger.go b/_archive/messenger/_archive/messanger.go similarity index 100% rename from messenger/_archive/messanger.go rename to _archive/messenger/_archive/messanger.go diff --git a/messenger/_archive/messanger_local.go b/_archive/messenger/_archive/messanger_local.go similarity index 100% rename from messenger/_archive/messanger_local.go rename to _archive/messenger/_archive/messanger_local.go diff --git a/messenger/_archive/messanger_local_test.go b/_archive/messenger/_archive/messanger_local_test.go similarity index 100% rename from messenger/_archive/messanger_local_test.go rename to _archive/messenger/_archive/messanger_local_test.go diff --git a/messenger/_archive/messanger_mqtt.go b/_archive/messenger/_archive/messanger_mqtt.go similarity index 100% rename from messenger/_archive/messanger_mqtt.go rename to _archive/messenger/_archive/messanger_mqtt.go diff --git a/messenger/_archive/messanger_mqtt_test.go b/_archive/messenger/_archive/messanger_mqtt_test.go similarity index 100% rename from messenger/_archive/messanger_mqtt_test.go rename to _archive/messenger/_archive/messanger_mqtt_test.go diff --git a/messenger/_archive/messanger_test.go b/_archive/messenger/_archive/messanger_test.go similarity index 100% rename from messenger/_archive/messanger_test.go rename to _archive/messenger/_archive/messanger_test.go diff --git a/messenger/_archive/mqtt.go b/_archive/messenger/_archive/mqtt.go similarity index 100% rename from messenger/_archive/mqtt.go rename to _archive/messenger/_archive/mqtt.go diff --git a/messenger/_archive/mqtt_mock.go b/_archive/messenger/_archive/mqtt_mock.go similarity index 100% rename from messenger/_archive/mqtt_mock.go rename to _archive/messenger/_archive/mqtt_mock.go diff --git a/messenger/_archive/mqtt_test.go b/_archive/messenger/_archive/mqtt_test.go similarity index 100% rename from messenger/_archive/mqtt_test.go rename to _archive/messenger/_archive/mqtt_test.go diff --git a/_archive/messenger/messenger.go b/_archive/messenger/messenger.go new file mode 100644 index 0000000..10a2595 --- /dev/null +++ b/_archive/messenger/messenger.go @@ -0,0 +1,362 @@ +package messenger + +import ( + "context" + "encoding/json" + "fmt" + "log" + "log/slog" + "math/rand/v2" + "net/http" + "os" + + gomqtt "github.com/eclipse/paho.mqtt.golang" + mqtt "github.com/eclipse/paho.mqtt.golang" +) + +type BrokerType int + +const ( + None BrokerType = 0 + Builtin + External +) + +// MsgHandler is a callback function type for handling incoming messages. +// Subscribers provide a MsgHandler function that will be invoked when +// a message is received on a subscribed topic. The handler receives a +// pointer to the Msg and should return an error if message processing fails. +type MsgHandler func(msg *Msg) error + +// MessageHandler is an interface for types that can handle messages. +// This provides an alternative to the MsgHandler function type for +// implementing message handling logic as methods on types. +type MessageHandler interface { + HandleMsg(msg *Msg) error +} + +type Conn interface { + Connect(broker string, user string, pass string) error + Close() + Sub(string, MsgHandler) error + Unsub(...string) + PubMsg(*Msg) error + IsConnected() bool +} + +// Messenger +type Messenger struct { + ID string + Broker string + Username string + Password string + Prefix string + Published int + + BrokerType + Conn + subscriptions map[string]MsgHandler +} + +var ( + msgr *Messenger +) + +// NewMessanger creates a new Messanger instance based on the provided ID. +// It sets up the appropriate messaging backend and stores it as the singleton instance. +// +// Supported ID values: +// - "none": Creates a local in-process messanger without MQTT +// - "otto": Starts an embedded MQTT broker and creates an MQTT messanger +// - default: Creates an MQTT messanger connecting to an external broker +func NewMessenger(broker string) *Messenger { + msgr = &Messenger{} + msgr.ID = "otto-client" + msgr.Broker = broker + msgr.Username = os.Getenv("MQTT_USER") + msgr.Password = os.Getenv("MQTT_PASS") + msgr.Prefix = "o/" + msgr.subscriptions = make(map[string]MsgHandler) + + switch broker { + case "none": + msgr.Conn = &nobrokerConn{} + + case "internal": + // make sure the internal broker has been started + _, err := StartMQTTBroker(context.Background()) + if err != nil { + slog.Error("Failed to start embedded MQTT broker", "error", err) + return nil + } + msgr.BrokerType = Builtin + msgr.Conn = &connMQTT{} + + default: + msgr.BrokerType = External + msgr.Conn = &connMQTT{} + } + return msgr +} + +func SetMessenger(m *Messenger) { + msgr = m +} + +func GetMessenger() *Messenger { + return msgr +} + +func (m *Messenger) Connect() error { + err := m.Conn.Connect(m.Broker, m.Username, m.Password) + return err +} + +func (m *Messenger) Close() { + // remove the handler from the root node + var topics []string + for t := range m.subscriptions { + topics = append(topics, t) + } + if m.Conn != nil { + m.Conn.Unsub(topics...) + } +} + +func (m *Messenger) SubscribeAll(c mqtt.Client) { + slog.Info("MQTT client connection successful") + for topic, handler := range m.subscriptions { + err := m.Conn.Sub(topic, handler) + if err != nil { + slog.Error("MQTT failed to subscribe", "topic", topic, "error", err) + } + slog.Info("MQTT subscribed to", "topic", topic) + } +} + +// Sub will add the topic and handler to the client subscription list. If +// the client is connected the subscriptions will be sent to the client. +// If the client is not connected the subscriptions will be sent once the +// client has made a connection to the broker. +func (m *Messenger) Sub(topic string, handler MsgHandler) (err error) { + if m.subscriptions == nil { + m.subscriptions = make(map[string]MsgHandler) + } + m.subscriptions[topic] = handler + if m.Conn != nil { + err = m.Conn.Sub(topic, handler) + } + return nil +} + +func (m *Messenger) Unsub(topic string) { + m.Conn.Unsub(topic) +} + +// Pub takes a topic string and data, wraps those parameters into +// a Msg struct and call PubMsg(msg) +func (m *Messenger) Pub(topic string, data any) error { + b, err := Bytes(data) + if err != nil { + slog.Error("messenger failed to convert bytes", "error", err) + return err + } + msg := NewMsg(topic, b, "otto") + return m.PubMsg(msg) +} + +// Pub takes a topic string and data, wraps those parameters into +// a Msg struct and call PubMsg(msg) +func (m *Messenger) PubMsg(msg *Msg) error { + return m.Conn.PubMsg(msg) +} + +// ServeHTTP implements http.Handler to provide a REST API endpoint for +// inspecting messanger state. It returns a JSON response containing the +// messanger ID, list of subscribed topics, and count of published messages. +// +// Response format: +// +// { +// "ID": "messanger-id", +// "Subs": ["topic1", "topic2"], +// "Published": 42 +// } +// +// This is useful for debugging and monitoring the messanger's state. +func (m *Messenger) ServeHTTP(w http.ResponseWriter, r *http.Request) { + var subs []string + for s := range m.subscriptions { + subs = append(subs, s) + } + + mbase := struct { + ID string + Subs []string + Published int + }{ + ID: m.ID, + Subs: subs, + Published: m.Published, + } + + w.Header().Set("Content-Type", "application/json") + err := json.NewEncoder(w).Encode(mbase) + if err != nil { + slog.Error("MQTT.ServeHTTP failed to encode", "error", err) + } +} + +type connMQTT struct { + Debug bool + gomqtt.Client // Embedded Paho MQTT client +} + +// Connect initiates a connection to the connection broker using a +// username and password if not empty +func (m *connMQTT) Connect(b string, u string, p string) error { + m.Debug = false + if m.Debug { + gomqtt.DEBUG = log.Default() + } + gomqtt.ERROR = log.Default() + gomqtt.CRITICAL = log.Default() + gomqtt.WARN = log.Default() + + url := "tcp://" + b + ":1883" + opts := gomqtt.NewClientOptions() + opts.AddBroker(url) + opts.SetClientID("o++o" + os.Hostname() + string(rand.IntN(100))) + opts.SetAutoReconnect(true) + // opts.SetConnectRetry(true) + opts.SetCleanSession(false) + opts.SetUsername(u) + opts.SetPassword(p) + opts.SetConnectionLostHandler(func(m mqtt.Client, err error) { + slog.Info("MQTT disconnected from serrver", "error", err) + }) + opts.OnConnect = msgr.SubscribeAll + + // If we are testing m.Client will point to the mock client otherwise + // in real life a new real client will be created + if m.Client == nil { + m.Client = gomqtt.NewClient(opts) + } + + token := m.Client.Connect() + token.Wait() + if token.Error() != nil { + return token.Error() + } + slog.Info("MQTT client has connected to", "broker", b) + return nil +} + +func (m *connMQTT) IsConnected() bool { + if m.Client == nil { + return false + } + return m.Client.IsConnected() +} + +func (m *connMQTT) Close() { + if m.Client != nil { + m.Client.Disconnect(1000) + } +} + +func (m *connMQTT) Sub(topic string, f MsgHandler) error { + if m.Client == nil { + return fmt.Errorf("MQTT Client is not connected to a broker") + } + + var err error + token := m.Client.Subscribe(topic, byte(0), func(c gomqtt.Client, m gomqtt.Message) { + slog.Debug("MQTT incoming:", "topic", m.Topic(), "payload", string(m.Payload())) + msg := NewMsg(m.Topic(), m.Payload(), "mqtt-sub") + f(msg) + }) + + token.Wait() + if token.Error() != nil { + return token.Error() + } + return err + +} + +func (m *connMQTT) Unsub(topics ...string) { + var token gomqtt.Token + if m.Client == nil { + return + } + + if token = m.Unsubscribe(topics...); token.Wait() && token.Error() != nil { + slog.Error("Unsubscribe error: ", "error", token.Error()) + } +} + +func (m *connMQTT) PubMsg(msg *Msg) error { + if m.Client == nil { + return fmt.Errorf("MQTT Client is not connected to a broker") + } + + val, err := Bytes(msg.Data) + if err != nil { + return fmt.Errorf("MQTT failed to convert msg to bytes: %+v", err) + } + + var t gomqtt.Token + if t = m.Client.Publish(msg.Topic, byte(0), false, val); t == nil { + if false { + return fmt.Errorf("MQTT Pub NULL token topic %s - value: %+v", msg.Topic, val) + } + return nil + } + + t.Wait() + if t.Error() != nil { + return fmt.Errorf("MQTT Publish token error %+v", t.Error()) + } + return nil +} + +type nobrokerConn struct { + root *node +} + +func (c *nobrokerConn) Connect(b string, u string, p string) error { + return nil +} + +func (c *nobrokerConn) IsConnected() bool { + return true +} + +func (c *nobrokerConn) Close() { +} + +func (c *nobrokerConn) Sub(topic string, handler MsgHandler) error { + root.insert(topic, handler) + return nil +} + +func (c *nobrokerConn) Unsub(topics ...string) { + for _, t := range topics { + root.remove(t, nil) + } +} + +func (c *nobrokerConn) PubMsg(msg *Msg) error { + if msg == nil { + return fmt.Errorf("nil message") + } + + // look up local routing table to pass message along to subscribers + n := root.lookup(msg.Topic) + if n == nil { + return fmt.Errorf("No subscribers for %s\n", msg.Topic) + } + n.pub(msg) + return nil +} diff --git a/messenger/messenger_nodes.go b/_archive/messenger/messenger_nodes.go similarity index 100% rename from messenger/messenger_nodes.go rename to _archive/messenger/messenger_nodes.go diff --git a/messenger/messenger_nodes_test.go b/_archive/messenger/messenger_nodes_test.go similarity index 100% rename from messenger/messenger_nodes_test.go rename to _archive/messenger/messenger_nodes_test.go diff --git a/messenger/messenger_test.go b/_archive/messenger/messenger_test.go similarity index 100% rename from messenger/messenger_test.go rename to _archive/messenger/messenger_test.go diff --git a/messenger/mqtt_broker.go b/_archive/messenger/mqtt_broker.go similarity index 100% rename from messenger/mqtt_broker.go rename to _archive/messenger/mqtt_broker.go diff --git a/messenger/mqtt_broker_test.go b/_archive/messenger/mqtt_broker_test.go similarity index 100% rename from messenger/mqtt_broker_test.go rename to _archive/messenger/mqtt_broker_test.go diff --git a/messenger/msg.go b/_archive/messenger/msg.go similarity index 100% rename from messenger/msg.go rename to _archive/messenger/msg.go diff --git a/messenger/msg_test.go b/_archive/messenger/msg_test.go similarity index 100% rename from messenger/msg_test.go rename to _archive/messenger/msg_test.go diff --git a/_archive/messenger/topics.go b/_archive/messenger/topics.go new file mode 100644 index 0000000..931450a --- /dev/null +++ b/_archive/messenger/topics.go @@ -0,0 +1,190 @@ +package messenger + +import ( + "encoding/json" + "fmt" + "log/slog" + "net/http" + "strings" + "sync" + + "github.com/rustyeddy/otto/utils" +) + +// Topics manages topic formatting and usage tracking for the Otto messaging system. +// It provides helper methods for creating properly formatted topics and tracks +// how many times each topic has been used. +// +// Otto uses a standardized topic format: "ss/[c|d]/station/sensor" +// Where: +// - "o" is the namespace prefix (Smart Station) +// - "c" indicates control topics (commands to devices) +// - "d" indicates data topics (sensor readings, telemetry) +// - station is the station/device identifier +// - sensor is the sensor/actuator/command name +// +// Example topics: +// - "ss/c/station1/led" - Control topic to turn LED on/off +// - "ss/d/station1/temp" - Data topic for temperature readings +type Topics struct { + Prefix string // Prefix for all topics + Format string // Format string for topic generation (e.g., "ss/%s/%s/%s") + Map map[string]int // Map of topic to usage count + + mu *sync.RWMutex `json:"-"` +} + +var ( + topics *Topics +) + +func init() { + var mu sync.RWMutex + topics = &Topics{ + Prefix: "o/", + Format: "%s/%s/%s", + Map: make(map[string]int), + mu: &mu, + } + topics.Format = topics.Prefix + topics.Format +} + +// ValidateTopic checks if a topic string follows Otto's topic format conventions. +// A valid Otto topic must have: +// - At least 4 segments separated by '/' +// - First segment must be "ss" (namespace) +// - Second segment must be "c" (control) or "d" (data) +// - Third segment (station ID) must not be empty +// - Fourth segment (sensor/command) must not be empty +// +// Parameters: +// - topic: The topic string to validate (e.g., "ss/c/station1/temp") +// +// Returns true if the topic is valid, false otherwise. +// +// Example: +// +// valid := ValidateTopic("ss/c/station1/temp") // Returns true +// valid := ValidateTopic("invalid/topic") // Returns false +func ValidateTopic(topic string) bool { + path := strings.Split(topic, "/") + if len(path) < 4 { + return false + } + + if path[0] != "o" { + return false + } + + if path[1] != "c" && path[1] != "d" { + return false + } + + if path[2] == "" || path[3] == "" { + return false + } + + // here we have to accept the station id and topic it advertises + // because we can't know what the station IDs are. + return true +} + +// GetTopics returns the singleton Topics instance for the application. +// This provides access to topic formatting and usage tracking. +// +// Returns a pointer to the Topics instance. +// +// Example: +// +// topics := GetTopics() +// controlTopic := topics.Control("led") +func GetTopics() *Topics { + return topics +} + +// Control generates a control topic for the current station. +// Control topics are used to send commands to devices (e.g., turn on LED, set speed). +// The format is: "ss/c/{station}/{topic}" +// +// This method also increments the usage counter for the generated topic. +// +// Parameters: +// - topic: The command or actuator name (e.g., "led", "motor", "relay") +// +// Returns the fully formatted control topic string. +// +// Example: +// +// topics := GetTopics() +// ledTopic := topics.Control("led") // Returns "ss/c/mystation/led" +func (t *Topics) Control(topic string) string { + top := fmt.Sprintf(t.Format, "c", utils.StationName(), topic) + t.mu.Lock() + defer t.mu.Unlock() + t.Map[top]++ + return top +} + +func ControlTopic(topic string) string { + return topics.Control(topic) +} + +// Data generates a data topic for the current station. +// Data topics are used to publish sensor readings and telemetry. +// The format is: "ss/d/{station}/{topic}" +// +// This method also increments the usage counter for the generated topic. +// +// Parameters: +// - topic: The sensor or data stream name (e.g., "temp", "humidity", "motion") +// +// Returns the fully formatted data topic string. +// +// Example: +// +// topics := GetTopics() +// tempTopic := topics.Data("temp") // Returns "ss/d/mystation/temp" +func (t *Topics) Data(topic string) string { + top := fmt.Sprintf(t.Format, "d", utils.StationName(), topic) + t.mu.Lock() + defer t.mu.Unlock() + t.Map[top]++ + return top +} + +func DataTopic(topic string) string { + return topics.Data(topic) +} + +func Topic(topic string) string { + top := topics.Prefix + topic + topics.mu.Lock() + defer topics.mu.Unlock() + topics.Map[top]++ + return top +} + +// ServeHTTP implements http.Handler to provide a REST API endpoint for +// inspecting topic usage. Returns JSON containing the topic format and +// a map of all topics used by this station with their usage counts. +// +// Response format: +// +// { +// "TopicFmt": "ss/%s/%s/%s", +// "Topicmap": { +// "ss/c/station1/led": 5, +// "ss/d/station1/temp": 120 +// } +// } +// +// This is useful for monitoring which topics are being used and how frequently. +func (t *Topics) ServeHTTP(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + t.mu.Lock() + defer t.mu.Unlock() + err := json.NewEncoder(w).Encode(t) + if err != nil { + slog.Error("Error wrote data", "error", err) + } +} diff --git a/messenger/topics_test.go b/_archive/messenger/topics_test.go similarity index 100% rename from messenger/topics_test.go rename to _archive/messenger/topics_test.go diff --git a/cmd/otto/main.go b/cmd/otto/main.go index 4f77e17..83ea728 100644 --- a/cmd/otto/main.go +++ b/cmd/otto/main.go @@ -1,37 +1,51 @@ package main import ( - "log/slog" - "os" + "context" "os/signal" "syscall" - "github.com/rustyeddy/otto" - "github.com/rustyeddy/otto/utils" + "github.com/rustyeddy/devices/gpio" // your devices + "github.com/rustyeddy/otto/messenger" + "github.com/rustyeddy/otto/messenger/codec" + mqttpaho "github.com/rustyeddy/otto/messenger/mqtt" ) func main() { - // Set up signal handling for graceful shutdown - sigChan := make(chan os.Signal, 1) - signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) - - // TODO: add command line flags - // configure logging. - o := otto.OttO{ - LogConfig: utils.DefaultLogConfig(), - } - o.Init() - o.Start() - - // Block until we receive a signal or done channel closes - select { - case sig := <-sigChan: - slog.Info("Received shutdown signal", "signal", sig) - o.Stop() - case <-o.Done(): - slog.Info("Server done channel closed") - o.Shutdown() + ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer cancel() + + // MQTT + m := mqttpaho.New(mqttpaho.Config{ + Broker: "tcp://10.11.1.11:1883", + ClientID: "", // auto + Username: "", + Password: "", + CleanSession: true, + }) + + reg := messenger.NewRegistry(m, messenger.TopicScheme{Prefix: "otto"}) + + // IMPORTANT: resubscribe on reconnect + m.SetOnConnect(func() { reg.ResubscribeAll(ctx) }) + + if err := m.Connect(ctx); err != nil { + panic(err) } + // First connect: apply subscriptions now too + reg.ResubscribeAll(ctx) + + // Devices (examples) + btn := gpio.NewButton("button") // devices.Source[bool] + rel := gpio.NewRelay("light") // devices.Duplex[bool] + + reg.Add(btn) + reg.Add(rel) + + // Typed wiring + messenger.WireSource(ctx, reg, btn, codec.JSON[bool]{}) + messenger.WireDuplex(ctx, reg, rel, codec.JSON[bool]{}) - slog.Info("OttO server stopped") + // Run + _ = reg.Run(ctx) } diff --git a/cmd/otto/otto b/cmd/otto/otto new file mode 100755 index 0000000..5e128a7 Binary files /dev/null and b/cmd/otto/otto differ diff --git a/examples/demo/main.go b/examples/demo/main.go new file mode 100644 index 0000000..1b89507 --- /dev/null +++ b/examples/demo/main.go @@ -0,0 +1,154 @@ +package main + +import ( + "context" + "log/slog" + "os" + "os/signal" + "syscall" + "time" + + "github.com/rustyeddy/devices/drivers" + "github.com/rustyeddy/devices/gpio" + + "github.com/rustyeddy/otto/messenger" + "github.com/rustyeddy/otto/messenger/codec" + mqttpaho "github.com/rustyeddy/otto/messenger/mqtt" + "github.com/rustyeddy/otto/rules" +) + +func main() { + ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer cancel() + + broker := getenv("MQTT_BROKER", "tcp://127.0.0.1:1883") + prefix := getenv("MQTT_PREFIX", "otto") + + slog.Info("starting demo", "broker", broker, "prefix", prefix) + + // ---- MQTT client (Paho) ---- + m := mqttpaho.New(mqttpaho.Config{ + Broker: broker, + ClientID: getenv("MQTT_CLIENT_ID", ""), // auto if empty + Username: getenv("MQTT_USER", ""), + Password: getenv("MQTT_PASS", ""), + CleanSession: true, + }) + + reg := messenger.NewRegistry(m, messenger.TopicScheme{Prefix: prefix}) + reg.RetainState = true + reg.RetainMeta = true + reg.CommandTimeout = 2 * time.Second + + // important: resubscribe on connect/reconnect + m.SetOnConnect(func() { reg.ResubscribeAll(ctx) }) + + if err := m.Connect(ctx); err != nil { + slog.Error("mqtt connect failed", "error", err) + os.Exit(1) + } + reg.ResubscribeAll(ctx) + + // ---- Driver factory ---- + // For demo, use virtual GPIO you can run anywhere. + // Swap to drivers.NewGPIOCDevFactory() on Linux with real pins. + f := drivers.NewVPIOFactory() + + // ---- Devices ---- + btn := gpio.NewButton(gpio.ButtonConfig{ + Name: "demo_button", + Factory: f, + Chip: "vpio", + Offset: 1, + Bias: drivers.BiasPullUp, + Edge: drivers.EdgeBoth, + Debounce: 30 * time.Millisecond, + }) + + rel := gpio.NewRelay(gpio.RelayConfig{ + Name: "demo_relay", + Factory: f, + Chip: "vpio", + Offset: 2, + Initial: false, + }) + + reg.Add(btn) + reg.Add(rel) + + // ---- Messaging wiring ---- + messenger.WireSource(ctx, reg, btn, codec.JSON[bool]{}) + messenger.WireDuplex(ctx, reg, rel, codec.JSON[bool]{}) + + runner := rules.NewRunner() + runner.Add(rules.NewToggleOnRisingEdge("btn->toggle-relay", reg, btn, rel)) + + go func() { + _ = runner.Run(ctx) + }() + + /* + // ---- Demo behavior: press button toggles relay (in OttO, not in devices) ---- + go func() { + for { + select { + case v := <-btn.Out(): + // interpret "true" as press (depends on pull-up/down wiring) + if v { + // toggle the relay by sending into its command channel + rel.In() <- !current(rel) + slog.Info("button pressed: toggling relay") + } + case <-ctx.Done(): + return + } + } + }() + */ + // ---- Demo: simulate button presses in VPIO every 3 seconds ---- + go func() { + t := time.NewTicker(3 * time.Second) + defer t.Stop() + state := false + for { + select { + case <-t.C: + state = !state + edge := drivers.EdgeFalling + if state { + edge = drivers.EdgeRising + } + f.InjectEdge("vpio", 1, edge, state) + slog.Info("simulated button edge", "state", state, "edge", edge) + case <-ctx.Done(): + return + } + } + }() + + // ---- Run ---- + if err := reg.Run(ctx); err != nil { + slog.Error("registry exited with error", "error", err) + os.Exit(1) + } + + slog.Info("demo stopped") +} + +// current tries to read the latest state from the relay's Out channel without blocking. +// If nothing is available, it returns false (initial default). +func current(rel *gpio.Relay) bool { + select { + case v := <-rel.Out(): + return v + default: + return false + } +} + +func getenv(k, def string) string { + if v := os.Getenv(k); v != "" { + return v + } + return def +} diff --git a/examples/logging_demo.go b/examples/logging/main.go similarity index 100% rename from examples/logging_demo.go rename to examples/logging/main.go diff --git a/go.mod b/go.mod index c49301e..444769e 100644 --- a/go.mod +++ b/go.mod @@ -1,25 +1,31 @@ module github.com/rustyeddy/otto -go 1.23.3 +go 1.24.5 require ( github.com/chzyer/readline v1.5.1 github.com/eclipse/paho.mqtt.golang v1.5.0 github.com/gorilla/websocket v1.5.3 - github.com/mochi-mqtt/server/v2 v2.7.9 + github.com/rustyeddy/devices v0.0.3 github.com/spf13/cobra v1.8.1 github.com/stretchr/testify v1.11.1 ) +replace github.com/rustyeddy/devices => ../devices + require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect + github.com/kr/pretty v0.3.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/rs/xid v1.4.0 // indirect github.com/spf13/pflag v1.0.5 // indirect + github.com/warthog618/go-gpiocdev v0.9.1 // indirect golang.org/x/net v0.33.0 // indirect - golang.org/x/sync v0.10.0 // indirect - golang.org/x/sys v0.28.0 // indirect + golang.org/x/sync v0.17.0 // indirect + golang.org/x/sys v0.29.0 // indirect gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect + periph.io/x/conn/v3 v3.7.2 // indirect + periph.io/x/devices/v3 v3.7.4 // indirect + periph.io/x/host/v3 v3.8.5 // indirect ) diff --git a/go.sum b/go.sum index 3a379ce..cbe67bb 100644 --- a/go.sum +++ b/go.sum @@ -5,6 +5,7 @@ github.com/chzyer/readline v1.5.1/go.mod h1:Eh+b79XXUwfKfcPLepksvw2tcLE/Ct21YObk github.com/chzyer/test v1.0.0 h1:p3BQDXSxOhOG0P9z6/hGnII4LGiEPOYBhs8asl/fC04= github.com/chzyer/test v1.0.0/go.mod h1:2JlltgoNkt4TW/z9V/IzDdFaMTM2JPIi26O1pF38GC8= github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/eclipse/paho.mqtt.golang v1.5.0 h1:EH+bUVJNgttidWFkLLVKaQPGmkTUfQQqjOsyvMGvD6o= @@ -13,20 +14,19 @@ github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aN github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= -github.com/jinzhu/copier v0.3.5 h1:GlvfUwHk62RokgqVNvYsku0TATCF7bAHVwEXoBh3iJg= -github.com/jinzhu/copier v0.3.5/go.mod h1:DfbEm0FYsaqBcKcFuvmOZb218JkPGtvSHsKg8S8hyyg= +github.com/jonboulle/clockwork v0.5.0 h1:Hyh9A8u51kptdkR+cqRpT1EebBwTn1oK9YfGYbdFz6I= +github.com/jonboulle/clockwork v0.5.0/go.mod h1:3mZlmanh0g2NDKO5TWZVJAfofYk64M7XN3SzBPjZF60= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/mochi-mqtt/server/v2 v2.7.9 h1:y0g4vrSLAag7T07l2oCzOa/+nKVLoazKEWAArwqBNYI= -github.com/mochi-mqtt/server/v2 v2.7.9/go.mod h1:lZD3j35AVNqJL5cezlnSkuG05c0FCHSsfAKSPBOSbqc= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= -github.com/rs/xid v1.4.0 h1:qd7wPTDkN6KQx2VmMBLrpHkiyQwgFXRnkOLacUiaSNY= -github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM= github.com/spf13/cobra v1.8.1/go.mod h1:wHxEcudfqmLYa8iTfL+OuZPbBZkmvliBWKIezN3kD9Y= @@ -34,15 +34,25 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +github.com/warthog618/go-gpiocdev v0.9.1 h1:pwHPaqjJfhCipIQl78V+O3l9OKHivdRDdmgXYbmhuCI= +github.com/warthog618/go-gpiocdev v0.9.1/go.mod h1:dN3e3t/S2aSNC+hgigGE/dBW8jE1ONk9bDSEYfoPyl8= +github.com/warthog618/go-gpiosim v0.1.1 h1:MRAEv+T+itmw+3GeIGpQJBfanUVyg0l3JCTwHtwdre4= +github.com/warthog618/go-gpiosim v0.1.1/go.mod h1:YXsnB+I9jdCMY4YAlMSRrlts25ltjmuIsrnoUrBLdqU= golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I= golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4= -golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= -golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= +golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.0.0-20220310020820-b874c991c1a5/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= -golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= +golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +periph.io/x/conn/v3 v3.7.2 h1:qt9dE6XGP5ljbFnCKRJ9OOCoiOyBGlw7JZgoi72zZ1s= +periph.io/x/conn/v3 v3.7.2/go.mod h1:Ao0b4sFRo4QOx6c1tROJU1fLJN1hUIYggjOrkIVnpGg= +periph.io/x/devices/v3 v3.7.4 h1:g9CGKTtiXS9iyDFDba4sr9pYde4dy+ZCKRPuKpKJdKo= +periph.io/x/devices/v3 v3.7.4/go.mod h1:FqFG9RotW2aCkfIlAes3qxziwgjRTncTMS5cSOcizNg= +periph.io/x/host/v3 v3.8.5 h1:g4g5xE1XZtDiGl1UAJaUur1aT7uNiFLMkyMEiZ7IHII= +periph.io/x/host/v3 v3.8.5/go.mod h1:hPq8dISZIc+UNfWoRj+bPH3XEBQqJPdFdx218W92mdc= diff --git a/messenger/codec/codec.go b/messenger/codec/codec.go new file mode 100644 index 0000000..4c2c0dd --- /dev/null +++ b/messenger/codec/codec.go @@ -0,0 +1,6 @@ +package codec + +type Codec[T any] interface { + Marshal(v T) ([]byte, error) + Unmarshal(b []byte) (T, error) +} diff --git a/messenger/codec/json.go b/messenger/codec/json.go new file mode 100644 index 0000000..fc83960 --- /dev/null +++ b/messenger/codec/json.go @@ -0,0 +1,8 @@ +package codec + +import "encoding/json" + +type JSON[T any] struct{} + +func (JSON[T]) Marshal(v T) ([]byte, error) { return json.Marshal(v) } +func (JSON[T]) Unmarshal(b []byte) (T, error) { var v T; return v, json.Unmarshal(b, &v) } diff --git a/messenger/messenger.go b/messenger/messenger.go index e855d5a..c986041 100644 --- a/messenger/messenger.go +++ b/messenger/messenger.go @@ -2,361 +2,70 @@ package messenger import ( "context" - "encoding/json" - "fmt" - "log" "log/slog" - "net/http" - "os" - "time" - - gomqtt "github.com/eclipse/paho.mqtt.golang" - mqtt "github.com/eclipse/paho.mqtt.golang" -) - -type BrokerType int - -const ( - None BrokerType = 0 - Builtin - External + "sync" ) -// MsgHandler is a callback function type for handling incoming messages. -// Subscribers provide a MsgHandler function that will be invoked when -// a message is received on a subscribed topic. The handler receives a -// pointer to the Msg and should return an error if message processing fails. -type MsgHandler func(msg *Msg) error - -// MessageHandler is an interface for types that can handle messages. -// This provides an alternative to the MsgHandler function type for -// implementing message handling logic as methods on types. -type MessageHandler interface { - HandleMsg(msg *Msg) error -} - -type Conn interface { - Connect(broker string, user string, pass string) error - Close() - Sub(string, MsgHandler) error - Unsub(...string) - PubMsg(*Msg) error - IsConnected() bool +type subSpec struct { + topic string + qos byte + handler func(Message) } -// Messenger type Messenger struct { - ID string - Broker string - Username string - Password string - Prefix string - Published int + MQTT MQTT - BrokerType - Conn - subscriptions map[string]MsgHandler + mu sync.RWMutex + subscriptions map[string]subSpec // key=topic + unsubs map[string]func() error } -var ( - msgr *Messenger -) - -// NewMessanger creates a new Messanger instance based on the provided ID. -// It sets up the appropriate messaging backend and stores it as the singleton instance. -// -// Supported ID values: -// - "none": Creates a local in-process messanger without MQTT -// - "otto": Starts an embedded MQTT broker and creates an MQTT messanger -// - default: Creates an MQTT messanger connecting to an external broker -func NewMessenger(broker string) *Messenger { - msgr = &Messenger{} - msgr.ID = "otto-client" - msgr.Broker = broker - msgr.Username = os.Getenv("MQTT_USER") - msgr.Password = os.Getenv("MQTT_PASS") - msgr.Prefix = "o/" - msgr.subscriptions = make(map[string]MsgHandler) - - switch broker { - case "none": - msgr.Conn = &nobrokerConn{} - - case "internal": - // make sure the internal broker has been started - _, err := StartMQTTBroker(context.Background()) - if err != nil { - slog.Error("Failed to start embedded MQTT broker", "error", err) - return nil - } - msgr.BrokerType = Builtin - msgr.Conn = &connMQTT{} - - default: - msgr.BrokerType = External - msgr.Conn = &connMQTT{} +func New(mqtt MQTT) *Messenger { + return &Messenger{ + MQTT: mqtt, + subscriptions: map[string]subSpec{}, + unsubs: map[string]func() error{}, } - return msgr -} - -func SetMessenger(m *Messenger) { - msgr = m -} - -func GetMessenger() *Messenger { - return msgr } -func (m *Messenger) Connect() error { - err := m.Conn.Connect(m.Broker, m.Username, m.Password) - return err +// Register a subscription you want to always be active. +func (m *Messenger) WantSub(topic string, qos byte, handler func(Message)) { + m.mu.Lock() + defer m.mu.Unlock() + m.subscriptions[topic] = subSpec{topic: topic, qos: qos, handler: handler} } -func (m *Messenger) Close() { - // remove the handler from the root node - var topics []string - for t := range m.subscriptions { - topics = append(topics, t) - } - if m.Conn != nil { - m.Conn.Unsub(topics...) - } -} +// Apply all desired subscriptions (call on first connect and on every reconnect). +func (m *Messenger) ResubscribeAll(ctx context.Context) { + slog.Info("MQTT connected; (re)subscribing", "count", len(m.subscriptions)) -func (m *Messenger) SubscribeAll(c mqtt.Client) { - slog.Info("MQTT client connection successful") - for topic, handler := range m.subscriptions { - err := m.Conn.Sub(topic, handler) - if err != nil { - slog.Error("MQTT failed to subscribe", "topic", topic, "error", err) - } - slog.Info("MQTT subscribed to", "topic", topic) - } -} - -// Sub will add the topic and handler to the client subscription list. If -// the client is connected the subscriptions will be sent to the client. -// If the client is not connected the subscriptions will be sent once the -// client has made a connection to the broker. -func (m *Messenger) Sub(topic string, handler MsgHandler) (err error) { - if m.subscriptions == nil { - m.subscriptions = make(map[string]MsgHandler) - } - m.subscriptions[topic] = handler - if m.Conn != nil { - err = m.Conn.Sub(topic, handler) - } - return nil -} - -func (m *Messenger) Unsub(topic string) { - m.Conn.Unsub(topic) -} - -// Pub takes a topic string and data, wraps those parameters into -// a Msg struct and call PubMsg(msg) -func (m *Messenger) Pub(topic string, data any) error { - b, err := Bytes(data) - if err != nil { - slog.Error("messenger failed to convert bytes", "error", err) - return err - } - msg := NewMsg(topic, b, "otto") - return m.PubMsg(msg) -} - -// Pub takes a topic string and data, wraps those parameters into -// a Msg struct and call PubMsg(msg) -func (m *Messenger) PubMsg(msg *Msg) error { - return m.Conn.PubMsg(msg) -} - -// ServeHTTP implements http.Handler to provide a REST API endpoint for -// inspecting messanger state. It returns a JSON response containing the -// messanger ID, list of subscribed topics, and count of published messages. -// -// Response format: -// -// { -// "ID": "messanger-id", -// "Subs": ["topic1", "topic2"], -// "Published": 42 -// } -// -// This is useful for debugging and monitoring the messanger's state. -func (m *Messenger) ServeHTTP(w http.ResponseWriter, r *http.Request) { - var subs []string - for s := range m.subscriptions { + // Snapshot desired subs under read lock, then do network work unlocked. + m.mu.RLock() + subs := make([]subSpec, 0, len(m.subscriptions)) + for _, s := range m.subscriptions { subs = append(subs, s) } + m.mu.RUnlock() - mbase := struct { - ID string - Subs []string - Published int - }{ - ID: m.ID, - Subs: subs, - Published: m.Published, - } - - w.Header().Set("Content-Type", "application/json") - err := json.NewEncoder(w).Encode(mbase) - if err != nil { - slog.Error("MQTT.ServeHTTP failed to encode", "error", err) - } -} - -type connMQTT struct { - Debug bool - gomqtt.Client // Embedded Paho MQTT client -} - -// Connect initiates a connection to the connection broker using a -// username and password if not empty -func (m *connMQTT) Connect(b string, u string, p string) error { - m.Debug = false - if m.Debug { - gomqtt.DEBUG = log.Default() - } - gomqtt.ERROR = log.Default() - gomqtt.CRITICAL = log.Default() - gomqtt.WARN = log.Default() - - url := "tcp://" + b + ":1883" - opts := gomqtt.NewClientOptions() - opts.AddBroker(url) - opts.SetClientID("o++o" + time.Now().Format(time.RFC3339)) - opts.SetAutoReconnect(true) - // opts.SetConnectRetry(true) - opts.SetCleanSession(true) - opts.SetUsername(u) - opts.SetPassword(p) - opts.SetConnectionLostHandler(func(m mqtt.Client, err error) { - slog.Info("MQTT disconnected from serrver", "error", err) - }) - opts.OnConnect = msgr.SubscribeAll - - // If we are testing m.Client will point to the mock client otherwise - // in real life a new real client will be created - if m.Client == nil { - m.Client = gomqtt.NewClient(opts) - } - - token := m.Client.Connect() - token.Wait() - if token.Error() != nil { - return token.Error() - } - slog.Info("MQTT client has connected to", "broker", b) - return nil -} - -func (m *connMQTT) IsConnected() bool { - if m.Client == nil { - return false - } - return m.Client.IsConnected() -} - -func (m *connMQTT) Close() { - if m.Client != nil { - m.Client.Disconnect(1000) - } -} - -func (m *connMQTT) Sub(topic string, f MsgHandler) error { - if m.Client == nil { - return fmt.Errorf("MQTT Client is not connected to a broker") - } - - var err error - token := m.Client.Subscribe(topic, byte(0), func(c gomqtt.Client, m gomqtt.Message) { - slog.Debug("MQTT incoming:", "topic", m.Topic(), "payload", string(m.Payload())) - msg := NewMsg(m.Topic(), m.Payload(), "mqtt-sub") - f(msg) - }) - - token.Wait() - if token.Error() != nil { - return token.Error() - } - return err - -} - -func (m *connMQTT) Unsub(topics ...string) { - var token gomqtt.Token - if m.Client == nil { - return - } - - if token = m.Unsubscribe(topics...); token.Wait() && token.Error() != nil { - slog.Error("Unsubscribe error: ", "error", token.Error()) - } -} - -func (m *connMQTT) PubMsg(msg *Msg) error { - if m.Client == nil { - return fmt.Errorf("MQTT Client is not connected to a broker") - } - - val, err := Bytes(msg.Data) - if err != nil { - return fmt.Errorf("MQTT failed to convert msg to bytes: %+v", err) - } - - var t gomqtt.Token - if t = m.Client.Publish(msg.Topic, byte(0), false, val); t == nil { - if false { - return fmt.Errorf("MQTT Pub NULL token topic %s - value: %+v", msg.Topic, val) + for _, s := range subs { + // If we already had an active sub, try to unsubscribe first (optional). + m.mu.Lock() + if u, ok := m.unsubs[s.topic]; ok && u != nil { + _ = u() + delete(m.unsubs, s.topic) } - return nil - } - - t.Wait() - if t.Error() != nil { - return fmt.Errorf("MQTT Publish token error %+v", t.Error()) - } - return nil -} - -type nobrokerConn struct { - root *node -} - -func (c *nobrokerConn) Connect(b string, u string, p string) error { - return nil -} + m.mu.Unlock() -func (c *nobrokerConn) IsConnected() bool { - return true -} - -func (c *nobrokerConn) Close() { -} - -func (c *nobrokerConn) Sub(topic string, handler MsgHandler) error { - root.insert(topic, handler) - return nil -} - -func (c *nobrokerConn) Unsub(topics ...string) { - for _, t := range topics { - root.remove(t, nil) - } -} + unsub, err := m.MQTT.Subscribe(ctx, s.topic, s.qos, s.handler) + if err != nil { + slog.Error("MQTT subscribe failed", "topic", s.topic, "error", err) + continue + } -func (c *nobrokerConn) PubMsg(msg *Msg) error { - if msg == nil { - return fmt.Errorf("nil message") - } + m.mu.Lock() + m.unsubs[s.topic] = unsub + m.mu.Unlock() - // look up local routing table to pass message along to subscribers - n := root.lookup(msg.Topic) - if n == nil { - return fmt.Errorf("No subscribers for %s\n", msg.Topic) + slog.Info("MQTT subscribed", "topic", s.topic, "qos", s.qos) } - n.pub(msg) - return nil } diff --git a/messenger/mqtt/paho.go b/messenger/mqtt/paho.go new file mode 100644 index 0000000..90b31d9 --- /dev/null +++ b/messenger/mqtt/paho.go @@ -0,0 +1,130 @@ +package mqtt + +import ( + "context" + "errors" + "log/slog" + "math/rand" + "time" + + paho "github.com/eclipse/paho.mqtt.golang" + + "github.com/rustyeddy/otto/messenger" +) + +type Paho struct { + opts *paho.ClientOptions + c paho.Client + + // Called whenever Paho connects/reconnects. + onConnect func() +} + +type Config struct { + Broker string // e.g. "tcp://10.11.0.10:1883" + ClientID string // if empty, random + Username string + Password string + + CleanSession bool +} + +func New(cfg Config) *Paho { + id := cfg.ClientID + if id == "" { + id = "otto-" + randSuffix() + } + + opts := paho.NewClientOptions(). + AddBroker(cfg.Broker). + SetClientID(id). + SetUsername(cfg.Username). + SetPassword(cfg.Password). + SetAutoReconnect(true). + SetConnectTimeout(10 * time.Second). + SetCleanSession(cfg.CleanSession) + + p := &Paho{opts: opts} + + opts.SetConnectionLostHandler(func(_ paho.Client, err error) { + slog.Info("MQTT disconnected", "error", err) + }) + + opts.OnConnect = func(_ paho.Client) { + slog.Info("MQTT connected") + if p.onConnect != nil { + p.onConnect() + } + } + + return p +} + +func randSuffix() string { + const letters = "abcdefghijklmnopqrstuvwxyz0123456789" + b := make([]byte, 8) + for i := range b { + b[i] = letters[rand.Intn(len(letters))] + } + return string(b) +} + +func (p *Paho) SetOnConnect(fn func()) { + p.onConnect = fn +} + +func (p *Paho) Connect(ctx context.Context) error { + if p.c == nil { + p.c = paho.NewClient(p.opts) + } + tok := p.c.Connect() + if !tok.WaitTimeout(15 * time.Second) { + return errors.New("mqtt connect timeout") + } + return tok.Error() +} + +func (p *Paho) SetWill(topic string, payload []byte, retain bool, qos byte) error { + if p.opts == nil { + return errors.New("mqtt options not initialized") + } + // Paho expects string payload for will. + p.opts.SetWill(topic, string(payload), qos, retain) + return nil +} + +func (p *Paho) Publish(ctx context.Context, topic string, payload []byte, retain bool, qos byte) error { + tok := p.c.Publish(topic, qos, retain, payload) + // For QoS0, we usually don't need to wait. For QoS1, wait briefly. + if qos > 0 { + if !tok.WaitTimeout(5 * time.Second) { + return errors.New("mqtt publish timeout") + } + } + return tok.Error() +} + +func (p *Paho) Subscribe(ctx context.Context, topic string, qos byte, handler func(messenger.Message)) (func() error, error) { + tok := p.c.Subscribe(topic, qos, func(_ paho.Client, msg paho.Message) { + handler(messenger.Message{ + Topic: msg.Topic(), + Payload: msg.Payload(), + Retain: msg.Retained(), + QoS: msg.Qos(), + }) + }) + if !tok.WaitTimeout(10 * time.Second) { + return nil, errors.New("mqtt subscribe timeout") + } + if tok.Error() != nil { + return nil, tok.Error() + } + + return func() error { + ut := p.c.Unsubscribe(topic) + if !ut.WaitTimeout(10 * time.Second) { + return errors.New("mqtt unsubscribe timeout") + } + return ut.Error() + }, nil +} diff --git a/messenger/mqtt_client.go b/messenger/mqtt_client.go new file mode 100644 index 0000000..8131398 --- /dev/null +++ b/messenger/mqtt_client.go @@ -0,0 +1,17 @@ +package messenger + +import "context" + +type Message struct { + Topic string + Payload []byte + Retain bool + QoS byte +} + +type MQTT interface { + // Publish should be safe to call from multiple goroutines. + Publish(ctx context.Context, topic string, payload []byte, retain bool, qos byte) error + Subscribe(ctx context.Context, topic string, qos byte, handler func(Message)) (unsubscribe func() error, err error) + SetWill(topic string, payload []byte, retain bool, qos byte) error +} diff --git a/messenger/payloads.go b/messenger/payloads.go new file mode 100644 index 0000000..8ba9a5b --- /dev/null +++ b/messenger/payloads.go @@ -0,0 +1,20 @@ +package messenger + +import "time" + +type StatusPayload struct { + Status string `json:"status"` // "online"|"offline" + Time time.Time `json:"time"` +} + +type MetaPayload struct { + Name string `json:"name"` + Kind string `json:"kind"` + ValueType string `json:"value_type"` + Access string `json:"access"` // "ro"|"wo"|"rw" + Unit string `json:"unit,omitempty"` + Min *float64 `json:"min,omitempty"` + Max *float64 `json:"max,omitempty"` + Tags []string `json:"tags,omitempty"` + Attrs map[string]string `json:"attrs,omitempty"` +} diff --git a/messenger/registry.go b/messenger/registry.go new file mode 100644 index 0000000..9191baa --- /dev/null +++ b/messenger/registry.go @@ -0,0 +1,283 @@ +package messenger + +import ( + "context" + "encoding/json" + "log/slog" + "sync" + "time" + + "github.com/rustyeddy/devices" +) + +type Logger interface { + Info(msg string, args ...any) + Warn(msg string, args ...any) + Error(msg string, args ...any) +} + +type Registry struct { + MQTT MQTT + Topics TopicScheme + Log Logger + + // Defaults (override after NewRegistry if desired) + QoSState byte + QoSSet byte + QoSEvent byte + QoSStatus byte + RetainState bool + RetainMeta bool + + // Command delivery guard (prevents wedging MQTT callback path) + CommandTimeout time.Duration + + // Internal + mu sync.RWMutex + + devs []devices.Device + + // Desired subscriptions (topic -> spec) + subs map[string]subSpec + + // Active unsubscribers (topic -> unsub) + unsubs map[string]func() error + + // ---- State cache ---- + stateMu sync.RWMutex + + // raw retained payloads (JSON bytes as published) + stateRaw map[string][]byte + + // decoded state cache (optional, populated by WireSource) + stateAny map[string]any +} + +func NewRegistry(m MQTT, topics TopicScheme) *Registry { + return &Registry{ + MQTT: m, + Topics: topics, + Log: slog.Default(), + QoSState: 0, + QoSSet: 1, + QoSEvent: 0, + QoSStatus: 1, + RetainState: true, + RetainMeta: true, + CommandTimeout: 2 * time.Second, + + subs: map[string]subSpec{}, + unsubs: map[string]func() error{}, + stateRaw: make(map[string][]byte), + stateAny: make(map[string]any), + } +} + +func (r *Registry) Add(dev devices.Device) { + r.mu.Lock() + defer r.mu.Unlock() + r.devs = append(r.devs, dev) +} + +// WantSub registers a subscription that should be active whenever MQTT is connected. +// Registry will apply these on every connect/reconnect. +func (r *Registry) WantSub(topic string, qos byte, handler func(Message)) { + r.mu.Lock() + defer r.mu.Unlock() + r.subs[topic] = subSpec{topic: topic, qos: qos, handler: handler} +} + +// ResubscribeAll applies all desired subscriptions (call on connect and reconnect). +func (r *Registry) ResubscribeAll(ctx context.Context) { + r.mu.RLock() + subs := make([]subSpec, 0, len(r.subs)) + for _, s := range r.subs { + subs = append(subs, s) + } + r.mu.RUnlock() + + r.Log.Info("MQTT connected; (re)subscribing", "count", len(subs)) + + for _, s := range subs { + // Unsubscribe previous if any (best effort) + r.mu.Lock() + if u, ok := r.unsubs[s.topic]; ok && u != nil { + _ = u() + delete(r.unsubs, s.topic) + } + r.mu.Unlock() + + unsub, err := r.MQTT.Subscribe(ctx, s.topic, s.qos, s.handler) + if err != nil { + r.Log.Error("MQTT subscribe failed", "topic", s.topic, "error", err) + continue + } + r.mu.Lock() + r.unsubs[s.topic] = unsub + r.mu.Unlock() + + r.Log.Info("MQTT subscribed", "topic", s.topic, "qos", s.qos) + } +} + +func (r *Registry) publishStatus(ctx context.Context, name, status string) { + b, _ := json.Marshal(StatusPayload{Status: status, Time: time.Now()}) + _ = r.MQTT.Publish(ctx, r.Topics.Status(name), b, true, r.QoSStatus) +} + +func (r *Registry) publishMeta(ctx context.Context, dev devices.Device) { + d, ok := dev.(interface{ Descriptor() devices.Descriptor }) + if !ok { + return + } + desc := d.Descriptor() + mp := MetaPayload{ + Name: desc.Name, + Kind: desc.Kind, + ValueType: desc.ValueType, + Access: string(desc.Access), + Unit: desc.Unit, + Min: desc.Min, + Max: desc.Max, + Tags: desc.Tags, + Attrs: desc.Attributes, + } + b, err := json.Marshal(mp) + if err != nil { + r.Log.Warn("meta marshal failed", "device", dev.Name(), "error", err) + return + } + _ = r.MQTT.Publish(ctx, r.Topics.Meta(dev.Name()), b, r.RetainMeta, r.QoSStatus) +} + +func (r *Registry) wireEvents(ctx context.Context, dev devices.Device) { + name := dev.Name() + + go func() { + for { + select { + case evt, ok := <-dev.Events(): + if !ok { + return + } + + // JSON-friendly event payload + wire := map[string]any{ + "device": evt.Device, + "kind": evt.Kind, + "time": evt.Time, + "msg": evt.Msg, + "meta": evt.Meta, + } + if evt.Err != nil { + wire["err"] = evt.Err.Error() + } + + b, err := json.Marshal(wire) + if err != nil { + continue + } + _ = r.MQTT.Publish(ctx, r.Topics.Event(name), b, false, r.QoSEvent) + + case <-ctx.Done(): + return + } + } + }() +} + +// Run starts device goroutines, wires events, and publishes status/meta. +// IMPORTANT: For reconnect-resubscribe to work, your MQTT adapter must call r.ResubscribeAll(ctx) on connect. +func (r *Registry) Run(ctx context.Context) error { + // Snapshot devices + r.mu.RLock() + devs := append([]devices.Device(nil), r.devs...) + r.mu.RUnlock() + + // Prepare LWT + birth/meta and wire events + for _, dev := range devs { + name := dev.Name() + + // Set LWT (offline retained) + offline, _ := json.Marshal(StatusPayload{Status: "offline", Time: time.Now()}) + _ = r.MQTT.SetWill(r.Topics.Status(name), offline, true, r.QoSStatus) + + // Birth online + r.publishStatus(ctx, name, "online") + + // Meta retained (optional) + r.publishMeta(ctx, dev) + + // Events -> MQTT + r.wireEvents(ctx, dev) + } + + // Start devices + var wg sync.WaitGroup + errCh := make(chan error, len(devs)) + + for _, dev := range devs { + wg.Add(1) + go func(d devices.Device) { + defer wg.Done() + if err := d.Run(ctx); err != nil { + errCh <- err + } + }(dev) + } + + select { + case err := <-errCh: + // first fatal error + return err + case <-ctx.Done(): + // graceful shutdown + } + + // Best effort: unsubscribe + r.mu.Lock() + for topic, u := range r.unsubs { + _ = u() + delete(r.unsubs, topic) + } + r.mu.Unlock() + + wg.Wait() + + // Publish offline retained + for _, dev := range devs { + r.publishStatus(context.Background(), dev.Name(), "offline") + } + + return nil +} + +// StateRaw returns the last published state payload for a device. +func (r *Registry) StateRaw(name string) ([]byte, bool) { + r.stateMu.RLock() + defer r.stateMu.RUnlock() + b, ok := r.stateRaw[name] + return b, ok +} + +// StateAny returns the last decoded state value (if known). +func (r *Registry) StateAny(name string) (any, bool) { + r.stateMu.RLock() + defer r.stateMu.RUnlock() + v, ok := r.stateAny[name] + return v, ok +} + +// StateAs returns the last decoded state as a concrete type. +func StateAs[T any](r *Registry, name string) (T, bool) { + var zero T + v, ok := r.StateAny(name) + if !ok { + return zero, false + } + tv, ok := v.(T) + if !ok { + return zero, false + } + return tv, true +} diff --git a/messenger/topics.go b/messenger/topics.go index 931450a..ab831b5 100644 --- a/messenger/topics.go +++ b/messenger/topics.go @@ -1,190 +1,17 @@ package messenger -import ( - "encoding/json" - "fmt" - "log/slog" - "net/http" - "strings" - "sync" +import "path" - "github.com/rustyeddy/otto/utils" -) +// I think we should add a station to the MQTT topic path -// Topics manages topic formatting and usage tracking for the Otto messaging system. -// It provides helper methods for creating properly formatted topics and tracks -// how many times each topic has been used. -// -// Otto uses a standardized topic format: "ss/[c|d]/station/sensor" -// Where: -// - "o" is the namespace prefix (Smart Station) -// - "c" indicates control topics (commands to devices) -// - "d" indicates data topics (sensor readings, telemetry) -// - station is the station/device identifier -// - sensor is the sensor/actuator/command name -// -// Example topics: -// - "ss/c/station1/led" - Control topic to turn LED on/off -// - "ss/d/station1/temp" - Data topic for temperature readings -type Topics struct { - Prefix string // Prefix for all topics - Format string // Format string for topic generation (e.g., "ss/%s/%s/%s") - Map map[string]int // Map of topic to usage count - - mu *sync.RWMutex `json:"-"` -} - -var ( - topics *Topics -) - -func init() { - var mu sync.RWMutex - topics = &Topics{ - Prefix: "o/", - Format: "%s/%s/%s", - Map: make(map[string]int), - mu: &mu, - } - topics.Format = topics.Prefix + topics.Format -} - -// ValidateTopic checks if a topic string follows Otto's topic format conventions. -// A valid Otto topic must have: -// - At least 4 segments separated by '/' -// - First segment must be "ss" (namespace) -// - Second segment must be "c" (control) or "d" (data) -// - Third segment (station ID) must not be empty -// - Fourth segment (sensor/command) must not be empty -// -// Parameters: -// - topic: The topic string to validate (e.g., "ss/c/station1/temp") -// -// Returns true if the topic is valid, false otherwise. -// -// Example: -// -// valid := ValidateTopic("ss/c/station1/temp") // Returns true -// valid := ValidateTopic("invalid/topic") // Returns false -func ValidateTopic(topic string) bool { - path := strings.Split(topic, "/") - if len(path) < 4 { - return false - } - - if path[0] != "o" { - return false - } - - if path[1] != "c" && path[1] != "d" { - return false - } - - if path[2] == "" || path[3] == "" { - return false - } - - // here we have to accept the station id and topic it advertises - // because we can't know what the station IDs are. - return true -} - -// GetTopics returns the singleton Topics instance for the application. -// This provides access to topic formatting and usage tracking. -// -// Returns a pointer to the Topics instance. -// -// Example: -// -// topics := GetTopics() -// controlTopic := topics.Control("led") -func GetTopics() *Topics { - return topics -} - -// Control generates a control topic for the current station. -// Control topics are used to send commands to devices (e.g., turn on LED, set speed). -// The format is: "ss/c/{station}/{topic}" -// -// This method also increments the usage counter for the generated topic. -// -// Parameters: -// - topic: The command or actuator name (e.g., "led", "motor", "relay") -// -// Returns the fully formatted control topic string. -// -// Example: -// -// topics := GetTopics() -// ledTopic := topics.Control("led") // Returns "ss/c/mystation/led" -func (t *Topics) Control(topic string) string { - top := fmt.Sprintf(t.Format, "c", utils.StationName(), topic) - t.mu.Lock() - defer t.mu.Unlock() - t.Map[top]++ - return top +type TopicScheme struct { + Prefix string // e.g. "otto" or "home" } -func ControlTopic(topic string) string { - return topics.Control(topic) -} - -// Data generates a data topic for the current station. -// Data topics are used to publish sensor readings and telemetry. -// The format is: "ss/d/{station}/{topic}" -// -// This method also increments the usage counter for the generated topic. -// -// Parameters: -// - topic: The sensor or data stream name (e.g., "temp", "humidity", "motion") -// -// Returns the fully formatted data topic string. -// -// Example: -// -// topics := GetTopics() -// tempTopic := topics.Data("temp") // Returns "ss/d/mystation/temp" -func (t *Topics) Data(topic string) string { - top := fmt.Sprintf(t.Format, "d", utils.StationName(), topic) - t.mu.Lock() - defer t.mu.Unlock() - t.Map[top]++ - return top -} +func (s TopicScheme) base(name string) string { return path.Join(s.Prefix, "devices", name) } -func DataTopic(topic string) string { - return topics.Data(topic) -} - -func Topic(topic string) string { - top := topics.Prefix + topic - topics.mu.Lock() - defer topics.mu.Unlock() - topics.Map[top]++ - return top -} - -// ServeHTTP implements http.Handler to provide a REST API endpoint for -// inspecting topic usage. Returns JSON containing the topic format and -// a map of all topics used by this station with their usage counts. -// -// Response format: -// -// { -// "TopicFmt": "ss/%s/%s/%s", -// "Topicmap": { -// "ss/c/station1/led": 5, -// "ss/d/station1/temp": 120 -// } -// } -// -// This is useful for monitoring which topics are being used and how frequently. -func (t *Topics) ServeHTTP(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "application/json") - t.mu.Lock() - defer t.mu.Unlock() - err := json.NewEncoder(w).Encode(t) - if err != nil { - slog.Error("Error wrote data", "error", err) - } -} +func (s TopicScheme) State(name string) string { return path.Join(s.base(name), "state") } +func (s TopicScheme) Set(name string) string { return path.Join(s.base(name), "set") } +func (s TopicScheme) Event(name string) string { return path.Join(s.base(name), "event") } +func (s TopicScheme) Status(name string) string { return path.Join(s.base(name), "status") } +func (s TopicScheme) Meta(name string) string { return path.Join(s.base(name), "meta") } diff --git a/messenger/wire_typed.go b/messenger/wire_typed.go new file mode 100644 index 0000000..23f5b42 --- /dev/null +++ b/messenger/wire_typed.go @@ -0,0 +1,80 @@ +package messenger + +import ( + "context" + "time" + + "github.com/rustyeddy/devices" + "github.com/rustyeddy/otto/messenger/codec" +) + +// WireSource publishes device.Out() to MQTT .../state (JSON-encoded). +func WireSource[T any](ctx context.Context, r *Registry, dev devices.Source[T], c codec.Codec[T]) { + name := dev.Name() + + go func() { + for { + select { + case v, ok := <-dev.Out(): + if !ok { + return + } + + // encode + b, err := c.Marshal(v) + if err != nil { + r.Log.Warn("state marshal failed", "device", name, "error", err) + continue + } + + // cache + r.stateMu.Lock() + r.stateRaw[name] = b + r.stateAny[name] = v + r.stateMu.Unlock() + + // publish + _ = r.MQTT.Publish(ctx, r.Topics.State(name), b, r.RetainState, r.QoSState) + + case <-ctx.Done(): + return + } + } + }() +} + +// WireSink subscribes to MQTT .../set and delivers decoded values into device.In(). +// Uses timeout so MQTT callback doesn't block forever. +func WireSink[T any](ctx context.Context, r *Registry, dev devices.Sink[T], c codec.Codec[T]) { + name := dev.Name() + setTopic := r.Topics.Set(name) + in := dev.In() + + r.WantSub(setTopic, r.QoSSet, func(m Message) { + v, err := c.Unmarshal(m.Payload) + if err != nil { + r.Log.Warn("set unmarshal failed", "device", name, "topic", m.Topic, "error", err) + return + } + + timeout := r.CommandTimeout + if timeout <= 0 { + timeout = 2 * time.Second + } + + select { + case in <- v: + // delivered + case <-time.After(timeout): + r.Log.Warn("set delivery timeout", "device", name, "topic", m.Topic) + case <-ctx.Done(): + return + } + }) +} + +// WireDuplex wires both state publish and set subscribe. +func WireDuplex[T any](ctx context.Context, r *Registry, dev devices.Duplex[T], c codec.Codec[T]) { + WireSource(ctx, r, dev, c) + WireSink(ctx, r, dev, c) +} diff --git a/rules/follow.go b/rules/follow.go new file mode 100644 index 0000000..c94b217 --- /dev/null +++ b/rules/follow.go @@ -0,0 +1,37 @@ +package rules + +import ( + "context" + + "github.com/rustyeddy/devices" +) + +type Follow[T any] struct { + name string + Src devices.Source[T] + Dst devices.Sink[T] +} + +func NewFollow[T any](name string, src devices.Source[T], dst devices.Sink[T]) *Follow[T] { + return &Follow[T]{name: name, Src: src, Dst: dst} +} + +func (f *Follow[T]) Name() string { return f.name } + +func (f *Follow[T]) Run(ctx context.Context) error { + for { + select { + case v, ok := <-f.Src.Out(): + if !ok { + return nil + } + select { + case f.Dst.In() <- v: + case <-ctx.Done(): + return nil + } + case <-ctx.Done(): + return nil + } + } +} diff --git a/rules/rules.go b/rules/rules.go new file mode 100644 index 0000000..2635947 --- /dev/null +++ b/rules/rules.go @@ -0,0 +1,45 @@ +package rules + +import ( + "context" + "sync" +) + +type Rule interface { + Name() string + Run(ctx context.Context) error +} + +type Runner struct { + rules []Rule +} + +func NewRunner() *Runner { return &Runner{} } + +func (r *Runner) Add(rule Rule) { r.rules = append(r.rules, rule) } + +// Run starts all rules and returns the first fatal error (or ctx cancellation). +func (r *Runner) Run(ctx context.Context) error { + errCh := make(chan error, len(r.rules)) + var wg sync.WaitGroup + + for _, rule := range r.rules { + wg.Add(1) + go func(rule Rule) { + defer wg.Done() + if err := rule.Run(ctx); err != nil { + errCh <- err + } + }(rule) + } + + select { + case err := <-errCh: + return err + case <-ctx.Done(): + // graceful + } + + wg.Wait() + return nil +} diff --git a/rules/toggle_on_rising.go b/rules/toggle_on_rising.go new file mode 100644 index 0000000..dc479d0 --- /dev/null +++ b/rules/toggle_on_rising.go @@ -0,0 +1,75 @@ +package rules + +import ( + "context" + "time" + + "github.com/rustyeddy/devices" + "github.com/rustyeddy/otto/messenger" +) + +type ToggleOnRisingEdge struct { + name string + + Button devices.Source[bool] + Relay devices.Duplex[bool] + + Registry *messenger.Registry + + // interpret press as true or false (depends on wiring/pull-up) + PressValue bool + + // simple guard against rapid repeats + MinInterval time.Duration +} + +func NewToggleOnRisingEdge(name string, reg *messenger.Registry, btn devices.Source[bool], relay devices.Duplex[bool]) *ToggleOnRisingEdge { + return &ToggleOnRisingEdge{ + name: name, + Button: btn, + Relay: relay, + Registry: reg, + PressValue: true, + MinInterval: 150 * time.Millisecond, + } +} + +func (t *ToggleOnRisingEdge) Name() string { return t.name } + +func (t *ToggleOnRisingEdge) Run(ctx context.Context) error { + + var last time.Time + + for { + select { + case v, ok := <-t.Button.Out(): + if !ok { + return nil + } + if v != t.PressValue { + continue + } + now := time.Now() + if t.MinInterval > 0 && !last.IsZero() && now.Sub(last) < t.MinInterval { + continue + } + last = now + + // read cached relay state + cur, ok := messenger.StateAs[bool](t.Registry, t.Relay.Name()) + if !ok { + cur = false // default + } + + // Toggle + select { + case t.Relay.In() <- !cur: + case <-ctx.Done(): + return nil + } + + case <-ctx.Done(): + return nil + } + } +}