Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions BUILD.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ Implement the server against the proto; do not fork it.
- `RestartService` — last-resort restart of one protocol's service.
- `WatchEvents` — **server-stream**: handshake up/down, peer connect/disconnect,
errors. `helm` holds this open; this is what makes the admin UI live.
- `SetNetworkConfig` — apply the node's traffic-handling policy (DESIGN §3,
decision 16): kernel IP forwarding, source NAT, client-to-client isolation.
Idempotent — a replay of the last applied policy returns `applied=false`.

### AmneziaWG obfuscation

Expand All @@ -87,6 +90,22 @@ buoy restarts and `awg` reloads — `helm` caches them. The data-plane writer
(milestone B2) renders them into the `[Interface]` section of `awg0.conf` and
applies it; `GetStatus` reports the same persisted set.

### Network policy

`SetNetworkConfig` (decision 16) carries three booleans — `forwarding`,
`masquerade`, `isolation` — on `NetworkConfig`. Buoy applies them via an
**nftables** table named `pharos_buoy` (atomic `add table; delete table;
table { … }` reset, idempotent across runs) plus the `/proc/sys` forwarding
switches. The wire contract is the booleans; the choice of `nftables` vs
`iptables` vs other is buoy's. Helm's `netpolicy.Policy.Validate` already
rejects `masquerade`/`isolation` without `forwarding`; buoy repeats the
check defensively.

Idempotency lives in `internal/netpolicy.Manager`: an in-memory
last-applied snapshot returns `applied=false` for a duplicate call.
State is not persisted — nftables rules don't survive reboot either, so a
restart correctly re-applies on the next call.

### Data plane

Manages `awg-quick@awg0` (UDP 443) and `xray.service` (TCP 443). Peer
Expand Down
4 changes: 4 additions & 0 deletions internal/cli/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/PharosVPN/buoy/internal/awg"
"github.com/PharosVPN/buoy/internal/config"
"github.com/PharosVPN/buoy/internal/control"
"github.com/PharosVPN/buoy/internal/netpolicy"
"github.com/spf13/cobra"
)

Expand Down Expand Up @@ -62,6 +63,8 @@ func newRunCmd() *cobra.Command {
"conf_path", awg.DefaultConfPath,
"applied_revision", awgManager.AppliedRevision())

netPolicy := netpolicy.NewManager(netpolicy.NewNftApplier())

srv, err := control.NewServer(control.Options{
ListenAddr: cfg.Control.ListenAddr,
NodeCertPath: cfg.NodeCertPath(),
Expand All @@ -70,6 +73,7 @@ func newRunCmd() *cobra.Command {
Version: version,
AWGNode: awgNode,
AWGManager: awgManager,
NetPolicy: netPolicy,
Log: log,
})
if err != nil {
Expand Down
6 changes: 5 additions & 1 deletion internal/control/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/PharosVPN/buoy/internal/awg"
buoyv1 "github.com/PharosVPN/buoy/internal/gen/pharos/buoy/v1"
"github.com/PharosVPN/buoy/internal/netpolicy"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
Expand Down Expand Up @@ -50,6 +51,9 @@ type Options struct {
// AWGManager owns the AmneziaWG data plane (awg0.conf, live state) and
// backs AddPeer / RemovePeer / ListPeers and the AmneziaWG ServiceStatus.
AWGManager *awg.Manager
// NetPolicy applies the node's forwarding / masquerade / isolation
// policy on SetNetworkConfig (DESIGN §3, decision 16).
NetPolicy *netpolicy.Manager
// Log receives server diagnostics.
Log *slog.Logger
}
Expand All @@ -63,7 +67,7 @@ func NewServer(opts Options) (*Server, error) {
}

gs := grpc.NewServer(grpc.Creds(credentials.NewTLS(tlsCfg)))
buoyv1.RegisterNodeControlServer(gs, newService(opts.Version, opts.AWGNode, opts.AWGManager))
buoyv1.RegisterNodeControlServer(gs, newService(opts.Version, opts.AWGNode, opts.AWGManager, opts.NetPolicy))

return &Server{addr: opts.ListenAddr, grpc: gs, log: opts.Log}, nil
}
Expand Down
19 changes: 14 additions & 5 deletions internal/control/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/PharosVPN/buoy/internal/awg"
buoyv1 "github.com/PharosVPN/buoy/internal/gen/pharos/buoy/v1"
"github.com/PharosVPN/buoy/internal/netpolicy"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
Expand Down Expand Up @@ -72,12 +73,12 @@ func TestServeAcceptsMutualTLS(t *testing.T) {
t.Errorf("amneziawg.obfuscation invalid: %+v", obf)
}

// Unimplemented RPCs still return a clean Unimplemented over mTLS.
// SetNetworkConfig (decision 16) is a later-milestone RPC.
_, err = buoyv1.NewNodeControlClient(conn).SetNetworkConfig(context.Background(),
&buoyv1.SetNetworkConfigRequest{Config: &buoyv1.NetworkConfig{}})
// RestartService is still Unimplemented — used here as the canary that
// proves unwired RPCs still return a clean Unimplemented over mTLS.
_, err = buoyv1.NewNodeControlClient(conn).RestartService(context.Background(),
&buoyv1.RestartServiceRequest{Protocol: buoyv1.Protocol_PROTOCOL_AMNEZIAWG})
if status.Code(err) != codes.Unimplemented {
t.Errorf("SetNetworkConfig: got %v, want Unimplemented", err)
t.Errorf("RestartService: got %v, want Unimplemented", err)
}
}

Expand Down Expand Up @@ -148,10 +149,18 @@ func testOptions(t *testing.T, dir, addr string) Options {
Version: "test-version",
AWGNode: node,
AWGManager: mgr,
NetPolicy: netpolicy.NewManager(&nopApplier{}),
Log: discardLogger(),
}
}

// nopApplier accepts every Policy without touching the system — control
// tests exercise the RPC surface, not the firewall. The dedicated nft
// renderer + Manager idempotency tests live under internal/netpolicy.
type nopApplier struct{}

func (nopApplier) Apply(context.Context, netpolicy.Policy) error { return nil }

// stubRuntime is a minimal awg.Runtime for control-level tests: it reports
// the interface as down and answers Show with no peers. The data-plane
// orchestration is tested under package awg with a richer fake.
Expand Down
32 changes: 31 additions & 1 deletion internal/control/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/PharosVPN/buoy/internal/awg"
buoyv1 "github.com/PharosVPN/buoy/internal/gen/pharos/buoy/v1"
"github.com/PharosVPN/buoy/internal/netpolicy"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
Expand All @@ -32,15 +33,17 @@ type service struct {
started time.Time
awgNode *awg.Node
awgManager *awg.Manager
netPolicy *netpolicy.Manager
}

// newService returns a NodeControl service implementation.
func newService(version string, awgNode *awg.Node, awgManager *awg.Manager) *service {
func newService(version string, awgNode *awg.Node, awgManager *awg.Manager, netPolicy *netpolicy.Manager) *service {
return &service{
version: version,
started: time.Now(),
awgNode: awgNode,
awgManager: awgManager,
netPolicy: netPolicy,
}
}

Expand Down Expand Up @@ -81,6 +84,33 @@ func (s *service) GetStatus(ctx context.Context, _ *buoyv1.GetStatusRequest) (*b
}, nil
}

// SetNetworkConfig applies the node's traffic-handling policy: kernel IP
// forwarding, source NAT for forwarded traffic, and client-to-client
// isolation (DESIGN §3, decision 16). The wire contract is just three
// booleans on NetworkConfig — buoy installs them via nftables + sysctl.
//
// The reply's `applied` field reports whether the call took effect (true)
// or was an idempotent replay of the last applied policy (false).
func (s *service) SetNetworkConfig(ctx context.Context, req *buoyv1.SetNetworkConfigRequest) (*buoyv1.SetNetworkConfigResponse, error) {
nc := req.GetConfig()
if nc == nil {
return nil, status.Error(codes.InvalidArgument, "SetNetworkConfig: missing config")
}
p := netpolicy.Policy{
Forwarding: nc.GetForwarding(),
Masquerade: nc.GetMasquerade(),
Isolation: nc.GetIsolation(),
}
if err := p.Validate(); err != nil {
return nil, status.Errorf(codes.InvalidArgument, "SetNetworkConfig: %v", err)
}
applied, err := s.netPolicy.Apply(ctx, p)
if err != nil {
return nil, status.Errorf(codes.Internal, "SetNetworkConfig: %v", err)
}
return &buoyv1.SetNetworkConfigResponse{Applied: applied}, nil
}

// WatchEvents streams live data-plane events to helm: handshake up/down,
// peer connect/disconnect, observer errors. helm holds the stream open;
// this is what makes the admin UI live (DESIGN §7). The first events fire
Expand Down
56 changes: 56 additions & 0 deletions internal/control/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,62 @@ func TestGetMetricsRPC(t *testing.T) {
}
}

// --- SetNetworkConfig -------------------------------------------------------

func TestSetNetworkConfigAppliesAndReplays(t *testing.T) {
c := startTestServer(t)

resp, err := c.SetNetworkConfig(context.Background(), &buoyv1.SetNetworkConfigRequest{
Config: &buoyv1.NetworkConfig{Forwarding: true, Masquerade: true},
})
if err != nil {
t.Fatalf("first SetNetworkConfig: %v", err)
}
if !resp.GetApplied() {
t.Error("first call should report applied=true")
}

// An identical call is an idempotent replay.
resp, err = c.SetNetworkConfig(context.Background(), &buoyv1.SetNetworkConfigRequest{
Config: &buoyv1.NetworkConfig{Forwarding: true, Masquerade: true},
})
if err != nil {
t.Fatalf("replay SetNetworkConfig: %v", err)
}
if resp.GetApplied() {
t.Error("identical replay should report applied=false")
}

// A changed policy applies again.
resp, err = c.SetNetworkConfig(context.Background(), &buoyv1.SetNetworkConfigRequest{
Config: &buoyv1.NetworkConfig{Forwarding: true, Isolation: true},
})
if err != nil {
t.Fatalf("changed SetNetworkConfig: %v", err)
}
if !resp.GetApplied() {
t.Error("changed policy should report applied=true")
}
}

func TestSetNetworkConfigRejectsInvalid(t *testing.T) {
c := startTestServer(t)
_, err := c.SetNetworkConfig(context.Background(), &buoyv1.SetNetworkConfigRequest{
Config: &buoyv1.NetworkConfig{Masquerade: true}, // forwarding=false
})
if status.Code(err) != codes.InvalidArgument {
t.Errorf("masquerade without forwarding: got %v, want InvalidArgument", err)
}
}

func TestSetNetworkConfigMissingConfig(t *testing.T) {
c := startTestServer(t)
_, err := c.SetNetworkConfig(context.Background(), &buoyv1.SetNetworkConfigRequest{})
if status.Code(err) != codes.InvalidArgument {
t.Errorf("missing config: got %v, want InvalidArgument", err)
}
}

// --- WatchEvents ------------------------------------------------------------

// TestWatchEventsClosesOnClientCancel proves the server-stream plumbing:
Expand Down
65 changes: 65 additions & 0 deletions internal/netpolicy/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// SPDX-License-Identifier: AGPL-3.0-or-later
// Copyright (C) 2026 The PharosVPN Authors

package netpolicy

import (
"context"
"sync"
)

// Applier installs a Policy on the host. Tests substitute a fake; the
// production wiring uses *NftApplier.
type Applier interface {
Apply(ctx context.Context, p Policy) error
}

// Manager owns the last-applied Policy and serves idempotent Apply calls.
// A call that matches the most recent applied policy returns applied=false
// without re-running the firewall transaction; anything else applies and
// records the new policy. The state lives only in memory, so a buoy
// restart re-applies on the next call (which is correct — nftables rules
// are not durable across reboots either).
type Manager struct {
applier Applier

mu sync.Mutex
last *Policy
}

// NewManager wraps an Applier.
func NewManager(a Applier) *Manager {
return &Manager{applier: a}
}

// Apply installs p if it differs from the last applied policy.
// It returns (true, nil) when the call took effect, (false, nil) on an
// idempotent replay, or (false, err) on a validation or applier error.
func (m *Manager) Apply(ctx context.Context, p Policy) (bool, error) {
if err := p.Validate(); err != nil {
return false, err
}
m.mu.Lock()
defer m.mu.Unlock()
if m.last != nil && *m.last == p {
return false, nil
}
if err := m.applier.Apply(ctx, p); err != nil {
return false, err
}
clone := p
m.last = &clone
return true, nil
}

// LastApplied returns a copy of the most-recently-applied policy, or nil
// if no policy has been applied since process start.
func (m *Manager) LastApplied() *Policy {
m.mu.Lock()
defer m.mu.Unlock()
if m.last == nil {
return nil
}
clone := *m.last
return &clone
}
Loading
Loading