Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions egress/tunnel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright (C) 2026 The PharosVPN Authors

package egress

import (
"context"
"fmt"
"io"
"net"
"time"

"github.com/hashicorp/yamux"
)

// Tunnel yamux tunables, matching the beacon ingress tunnel: a 20s keep-alive
// detects a dead far side within ~20s without waiting on TCP RST (which can lag
// minutes behind a NAT), and a 10s write timeout bounds a stalled peer.
func yamuxCfg() *yamux.Config {
cfg := yamux.DefaultConfig()
cfg.KeepAliveInterval = 20 * time.Second
cfg.ConnectionWriteTimeout = 10 * time.Second
cfg.LogOutput = io.Discard
return cfg
}

// AcceptAndServe runs the relay side of one egress tunnel. conn is an accepted
// coxswain connection (already TLS-terminated by the caller); this wraps it as
// a yamux server and serves every substream coxswain opens by reading its
// CONNECT target and dialing it with dial. It blocks until the session tears
// down (coxswain disconnects, keep-alive fails, or ctx is cancelled), then
// returns so the caller can accept the next coxswain connection.
//
// The substream direction is the inverse of the beacon ingress tunnel: there
// the relay opens streams toward coxswain; here coxswain opens streams toward
// the relay and the relay dials out (DESIGN §3, decision 19).
func AcceptAndServe(ctx context.Context, conn net.Conn, dial Dialer) error {
sess, err := yamux.Server(conn, yamuxCfg())
if err != nil {
_ = conn.Close()
return fmt.Errorf("egress: yamux server: %w", err)
}
defer sess.Close()

for {
stream, err := sess.Accept()
if err != nil {
if ctx.Err() != nil {
return ctx.Err()
}
return fmt.Errorf("egress: accept substream: %w", err)
}
go func() { _ = Serve(ctx, stream, dial) }()
}
}

// RunRelay serves egress tunnels accepted on lis, one coxswain at a time (v1:
// one controller per relay, matching the beacon ingress tunnel). When a
// coxswain session ends it loops to accept the next. It returns when ctx is
// cancelled or lis stops accepting. dial is how the relay reaches nodes
// (typically (&net.Dialer{Timeout: …}).DialContext).
func RunRelay(ctx context.Context, lis net.Listener, dial Dialer, logf func(string, ...any)) error {
if logf == nil {
logf = func(string, ...any) {}
}
go func() {
<-ctx.Done()
_ = lis.Close() // unblock Accept on cancel
}()
for {
conn, err := lis.Accept()
if err != nil {
if ctx.Err() != nil {
return ctx.Err()
}
return fmt.Errorf("egress: accept coxswain: %w", err)
}
logf("[egress] coxswain connected from %s", conn.RemoteAddr())
if err := AcceptAndServe(ctx, conn, dial); err != nil && ctx.Err() == nil {
logf("[egress] session ended: %v", err)
}
}
}

// ClientSession is coxswain's side of an egress tunnel: a yamux client over an
// already-established (TLS) connection to the relay. Each OpenStream returns a
// fresh substream that egress.Open writes a CONNECT target to. It satisfies
// StreamOpener.
type ClientSession struct {
sess *yamux.Session
}

// NewClientSession wraps an established relay connection in a yamux client.
func NewClientSession(conn net.Conn) (*ClientSession, error) {
sess, err := yamux.Client(conn, yamuxCfg())
if err != nil {
return nil, fmt.Errorf("egress: yamux client: %w", err)
}
return &ClientSession{sess: sess}, nil
}

// OpenStream opens a new substream to the relay.
func (c *ClientSession) OpenStream(context.Context) (net.Conn, error) {
return c.sess.Open()
}

// IsClosed reports whether the session has torn down (so a caller can re-dial).
func (c *ClientSession) IsClosed() bool { return c.sess == nil || c.sess.IsClosed() }

// Close tears the session down.
func (c *ClientSession) Close() error {
if c.sess == nil {
return nil
}
return c.sess.Close()
}
93 changes: 93 additions & 0 deletions egress/tunnel_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright (C) 2026 The PharosVPN Authors

package egress

import (
"bytes"
"context"
"fmt"
"io"
"net"
"testing"
"time"
)

// newEchoServer starts a TCP echo backend and returns its address. It stands in
// for a buoy node the relay dials.
func newEchoServer(t *testing.T) string {
t.Helper()
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() { _ = ln.Close() })
go func() {
for {
c, err := ln.Accept()
if err != nil {
return
}
go func(c net.Conn) { _, _ = io.Copy(c, c); _ = c.Close() }(c)
}
}()
return ln.Addr().String()
}

// TestTunnelRoundTrip wires the whole egress transport over a TCP socket pair:
// coxswain (yamux client) opens substreams to the relay (AcceptAndServe), which
// dials the echo backend. Two independent streams prove multiplexing — one
// tunnel carries many concurrent coxswain→node dials.
func TestTunnelRoundTrip(t *testing.T) {
backend := newEchoServer(t)

ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
defer ln.Close()
relayConnCh := make(chan net.Conn, 1)
go func() {
if c, err := ln.Accept(); err == nil {
relayConnCh <- c
}
}()
coxConn, err := net.Dial("tcp", ln.Addr().String())
if err != nil {
t.Fatal(err)
}
relayConn := <-relayConnCh

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dial := func(ctx context.Context, network, address string) (net.Conn, error) {
return (&net.Dialer{}).DialContext(ctx, network, address)
}
go func() { _ = AcceptAndServe(ctx, relayConn, dial) }()

client, err := NewClientSession(coxConn)
if err != nil {
t.Fatal(err)
}
defer client.Close()

for i := 0; i < 2; i++ {
conn, err := Open(ctx, client, backend)
if err != nil {
t.Fatalf("Open #%d: %v", i, err)
}
msg := []byte(fmt.Sprintf("ping-%d", i))
if _, err := conn.Write(msg); err != nil {
t.Fatalf("write #%d: %v", i, err)
}
got := make([]byte, len(msg))
_ = conn.SetReadDeadline(time.Now().Add(3 * time.Second))
if _, err := io.ReadFull(conn, got); err != nil {
t.Fatalf("read #%d: %v", i, err)
}
if !bytes.Equal(got, msg) {
t.Errorf("stream #%d echo = %q, want %q", i, got, msg)
}
_ = conn.Close()
}
}
Loading