-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path_otto.go
More file actions
321 lines (254 loc) · 7.85 KB
/
_otto.go
File metadata and controls
321 lines (254 loc) · 7.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
/*
OttO is a set of Go packages (framework) that help build IoT
applications. The goal is to decouple the IoT sensors, actuators,
etc. from the framework hardware interfaces such as GPIO, I2C, serial
ports, etc.
# Features include
Device level abstraction. Each device has a name that translates into
a path that can be used by MQTT and HTTP REST interface for
communication with other systems.
Device manager that keeps track of all application devices,
configuration and status. This interface attempts to be agnostic to
the underlying drivers, including gpiocdev, I2C, periph.io, etc.
Message based architecture abstracting all communications into a
standard message format. With functionality that can save messages for
later replay or diagnostics.
MQTT messaging built into all devices and components according to
functionality and need
HTTP Rest interface and corresponding API for all components of the
framework.
Drivers for a few different breakout boards meant to run on the
Raspberry Pi.
Station module to represent a single application on a given device or
a series of stations for a networked controller.
Messanger (not to be confused with messages) implements a Pub/Sub
(MQTT or other) interface between components of your application
# HTTP REST Server for data gathering and configuration
# Websockets for realtime bidirectional communication with a UI
High performance Web server built in to serve interactive UI's
and modern API's
Station manager to manage the stations that make up an entire sensor
network
Data Manager for temporary data caching and interfaces to update
your favorite cloud based timeseries database
Message library for standardized messages built to be communicate
events and information between pacakges.
The primary communication model for OttO is a messaging system based
on the Pub/Sub model defaulting to MQTT. oTTo is also heavily invested
in HTTP to implement user interfaces and REST/Graph APIs.
Messaging and HTTP use paths to specify the subject of interest. These
paths can be generically reduced to an ordered collection of strings
seperated by slashes '/'. Both MQTT topics, http URI's and UNIX
filesystems use this same schema which we use the generalize the
identity of the elements we are addressing.
In other words we can generalize the following identities:
For example:
File: /home/rusty/data/hb/temperature
HTTP: /api/data/hb/temperature
MQTT: ss/station/hb/temperature
The data within the highest level topic temperature can be represented
say by JSON `{ farenhiet: 100.07 }`
### Meta Data (Station Information)
For example when a station comes alive it can provide some information
about itself using the topic:
```ss/m/be:ef:ca:fe:02/station```
The station will announce itself along with some meta information and
it's capabilities. The body of the message might look something like
this:
```json
{
"id": "be:ef:ca:fe:02",
"ip": "10.11.24.24",
"sensors": [
"tempc",
"humidity",
"light"
],
"relays": [
"heater",
"light"
],
}
```
### Sensor Data
Sensor data takes on the form:
```ss/d/<station>/<sensor>/<index>```
Where the source is the Station ID publishing the respective data.
The sensor is the type of data being produced (temp, humidity,
lidar, GPS).
The index is useful in application where there is more than one
device, such as sensors, motors, etc.
The value published by the sensors is typically going to be floating
point, however these values may also be integers, strings or byte
arrays.
### Control Data
```ss/c/<source>/<device>/<index>```
This is essentially the same as the sensor except that control
commands are used to have a particular device change, for example
turning a relay on or off.
*/
package otto
import (
"context"
"fmt"
"log/slog"
"net/http"
"os"
"time"
"github.com/rustyeddy/otto/messenger"
"github.com/rustyeddy/otto/server"
"github.com/rustyeddy/otto/station"
"github.com/rustyeddy/otto/utils"
)
// Controller is a message handler that oversees all interactions
// with the application.
type Controller interface {
Init()
Start() error
Stop()
MsgHandler(m *messenger.Msg)
}
// OttO is a large wrapper around the Station, Server,
// DataManager and Messanger, including some convenience functions.
type OttO struct {
Name string
*utils.LogConfig
*station.Station
*station.StationManager
*messenger.Messenger
*server.Server
Mock bool
MQTTBroker string // MQTT broker URL, defaults to test.mosquitto.org
done chan any
}
// global variables and structures
var (
Interactive bool
)
func (o *OttO) Done() chan any {
return o.done
}
// OttO is a convinience function starting the MQTT and HTTP servers,
// the station manager and other stuff.
func (o *OttO) Init() {
o.LogConfig = utils.DefaultLogConfig()
utils.InitLogger(*o.LogConfig)
slog.Info("OttO is starting")
if o.done != nil {
// server has already been started
slog.Error("OttO has already been started")
return
}
o.done = make(chan any)
if o.Messenger == nil {
broker := o.MQTTBroker
if broker == "" {
broker = os.Getenv("MQTT_BROKER")
if broker == "" {
broker = "internal"
}
}
o.Messenger = messenger.NewMessenger(broker)
}
if o.StationManager == nil {
o.StationManager = station.GetStationManager()
station.SetVersion(Version)
}
if o.Server == nil {
o.Server = server.GetServer()
}
if o.Name == "" {
o.Name = "o++o"
}
var err error
if o.Station == nil {
o.Station, err = o.StationManager.Add(o.Name)
if err != nil {
slog.Error("Unable to create station", "error", err)
return
}
// Initialzie the local station
o.Station.Local = true
o.Station.Init()
o.Station.StartTicker(o.Station.Duration)
}
o.Server.Register("/version", o)
o.Server.Register("/api/log", o.LogConfig)
o.Server.Register("/api/shutdown", o)
o.Server.Register("/api/stations", o.StationManager)
o.Server.Register("/api/stats", &utils.Stats{})
o.Server.Register("/api/timers", utils.GetTickers())
o.Server.Register("/api/topics", messenger.GetTopics())
}
// Start the OttO process, TODO return a stop channel or context?
func (o *OttO) Start() {
if o.Messenger != nil {
go func() {
err := o.Messenger.Connect()
if err != nil {
slog.Error("failed to connect to broker", "broker", o.Messenger.Broker, "error", err)
return
}
}()
}
if o.Server != nil {
go o.Server.Start(o.done)
}
if o.StationManager != nil {
go o.StationManager.Start()
}
}
// Stop OttO nicely allowing it to cleanup resources. TODO This is
// probably going to need to be refactored and examined through out
// the code base
func (o *OttO) Stop() {
slog.Info("sending true on the done channel")
o.done <- true
}
func (o *OttO) Shutdown() {
if err := server.GetServer().Close(); err != nil {
slog.Error("Failed to close server", "error", err)
}
if o.Messenger != nil {
o.Messenger.Close()
}
// TODO Check if local broker is running, only if it is
// then stop it
messenger.StopMQTTBroker(context.Background())
}
// AddManagedDevice creates a managed device wrapper and adds it to the station
func (o *OttO) AddManagedDevice(name string, device any, topic string) *station.ManagedDevice {
md := station.NewManagedDevice(name, device, topic)
if o.Station != nil {
o.Station.Devices.Register(md)
}
return md
}
// GetManagedDevice retrieves a managed device by name
func (o *OttO) GetManagedDevice(name string) *station.ManagedDevice {
if o.Station == nil {
return nil
}
device := o.Station.Devices.Get(name)
return device
}
func (o *OttO) ServeHTTP(w http.ResponseWriter, r *http.Request) {
response := ""
url := r.URL.String()
switch url {
case "/api/shutdown":
go func() {
time.Sleep(2 * time.Second)
o.Stop()
}()
response = `{ "shutdown": "shutting down in 2 seconds" }`
case "/version":
vj := VersionJSON()
w.Write(vj)
default:
// change this to 404 not found
http.Error(w, "Not Found", http.StatusNotFound)
return
}
fmt.Fprint(w, response)
}