Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ jobs:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- name: Run golang root module tests
run: |
go test -timeout 20m -v -race -cover -tags=debug -failfast -coverpkg=github.com/roadrunner-server/grpc/v6 -coverprofile=./tests/coverage-ci/grpc_root.out -covermode=atomic ./codec ./parser ./protoc_plugins ./proxy
go test -timeout 20m -v -race -cover -tags=debug -failfast -coverpkg=github.com/roadrunner-server/grpc/v6 -coverprofile=./tests/coverage-ci/grpc_root.out -covermode=atomic . ./codec ./parser ./protoc_plugins ./proxy
- name: Run ee tests with coverage
run: "cd tests\n\nsudo apt update\nsudo apt install -y libnss3-tools\ncurl -JLO \"https://dl.filippo.io/mkcert/latest?for=linux/amd64\"\nchmod +x mkcert-v*-linux-amd64\nsudo cp mkcert-v*-linux-amd64 /usr/local/bin/mkcert\nmkcert -install\nmkcert localhost 127.0.0.1 ::1\nmkcert -client localhost 127.0.0.1 ::1 \nmkdir test-certs\ncp -r localhost+2-client-key.pem localhost+2-client.pem localhost+2-key.pem localhost+2.pem test-certs/\ncp -r $(mkcert -CAROOT)/rootCA.pem test-certs/\n\ngo test -timeout 20m -v -race -cover -tags=debug -failfast -coverpkg=github.com/roadrunner-server/grpc/v6 -coverprofile=./coverage-ci/grpc.out -covermode=atomic grpc_plugin_gzip_test.go grpc_plugin_test.go\n"
- name: Archive code coverage results
Expand Down
11 changes: 6 additions & 5 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package grpc

import (
"crypto/tls"
stderr "errors"
"math"
"os"
"strings"
Expand Down Expand Up @@ -80,7 +81,7 @@ func (c *Config) InitDefaults() error { //nolint:gocyclo,gocognit
}

if _, err := os.Stat(path); err != nil {
if os.IsNotExist(err) {
if stderr.Is(err, os.ErrNotExist) {
return errors.E(op, errors.Errorf("proto file '%s' does not exists", path))
}

Expand All @@ -92,15 +93,15 @@ func (c *Config) InitDefaults() error { //nolint:gocyclo,gocognit

if c.EnableTLS() {
if _, err := os.Stat(c.TLS.Key); err != nil {
if os.IsNotExist(err) {
if stderr.Is(err, os.ErrNotExist) {
return errors.E(op, errors.Errorf("key file '%s' does not exists", c.TLS.Key))
}

return errors.E(op, err)
}

if _, err := os.Stat(c.TLS.Cert); err != nil {
if os.IsNotExist(err) {
if stderr.Is(err, os.ErrNotExist) {
return errors.E(op, errors.Errorf("cert file '%s' does not exists", c.TLS.Cert))
}

Expand All @@ -110,8 +111,8 @@ func (c *Config) InitDefaults() error { //nolint:gocyclo,gocognit
// RootCA is optional, but if provided - check it
if c.TLS.RootCA != "" {
if _, err := os.Stat(c.TLS.RootCA); err != nil {
if os.IsNotExist(err) {
return errors.E(op, errors.Errorf("root ca path provided, but key file '%s' does not exists", c.TLS.RootCA))
if stderr.Is(err, os.ErrNotExist) {
return errors.E(op, errors.Errorf("root ca path provided, but root ca file '%s' does not exists", c.TLS.RootCA))
}
return errors.E(op, err)
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/bmatcuk/doublestar/v4 v4.10.0
github.com/emicklei/proto v1.14.3
github.com/prometheus/client_golang v1.23.2
github.com/prometheus/client_model v0.6.2
github.com/roadrunner-server/api-plugins/v6 v6.0.0-beta.2
github.com/roadrunner-server/endure/v2 v2.6.2
github.com/roadrunner-server/errors v1.5.0
Expand Down Expand Up @@ -35,7 +36,6 @@ require (
github.com/google/uuid v1.6.0 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.67.5 // indirect
github.com/prometheus/procfs v0.20.1 // indirect
github.com/roadrunner-server/events v1.0.1 // indirect
Expand Down
230 changes: 230 additions & 0 deletions go.work.sum

Large diffs are not rendered by default.

12 changes: 10 additions & 2 deletions health_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,26 @@ func NewHeathServer(p *Plugin, log *slog.Logger) *HealthCheckServer {
// Clients should keep in mind that the list of health services exposed by an
// application can change over the lifetime of the process.
func (h *HealthCheckServer) List(context.Context, *grpc_health_v1.HealthListRequest) (*grpc_health_v1.HealthListResponse, error) {
h.mu.Lock()
st := h.status
h.mu.Unlock()

return &grpc_health_v1.HealthListResponse{
Statuses: map[string]*grpc_health_v1.HealthCheckResponse{
"grpc": {
Status: h.status,
Status: st,
},
},
}, nil
}

func (h *HealthCheckServer) Check(_ context.Context, _ *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
h.mu.Lock()
st := h.status
h.mu.Unlock()

return &grpc_health_v1.HealthCheckResponse{
Status: h.status,
Status: st,
}, nil
}

Expand Down
123 changes: 123 additions & 0 deletions health_server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package grpc

import (
"context"
"log/slog"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/status"
)

func discardLog() *slog.Logger { return slog.New(slog.DiscardHandler) }

// fakeWatchStream is a minimal grpc_health_v1.Health_WatchServer: it records the
// responses Watch sends and exposes a controllable context. Only Send and
// Context are used by HealthCheckServer.Watch; the embedded grpc.ServerStream
// satisfies the rest of the interface and is intentionally left nil.
type fakeWatchStream struct {
grpc.ServerStream
ctx context.Context
sent chan *grpc_health_v1.HealthCheckResponse
}

func (f *fakeWatchStream) Context() context.Context { return f.ctx }

func (f *fakeWatchStream) Send(resp *grpc_health_v1.HealthCheckResponse) error {
f.sent <- resp
return nil
}

func TestHealthServer_CheckListAndServingStatus(t *testing.T) {
h := NewHeathServer(nil, discardLog())

// A fresh server starts NOT_SERVING.
resp, err := h.Check(t.Context(), &grpc_health_v1.HealthCheckRequest{})
require.NoError(t, err)
assert.Equal(t, grpc_health_v1.HealthCheckResponse_NOT_SERVING, resp.GetStatus())

// List reports the same status under the "grpc" key.
lst, err := h.List(t.Context(), &grpc_health_v1.HealthListRequest{})
require.NoError(t, err)
require.Contains(t, lst.GetStatuses(), "grpc")
assert.Equal(t, grpc_health_v1.HealthCheckResponse_NOT_SERVING, lst.GetStatuses()["grpc"].GetStatus())

// Flipping to SERVING is reflected by Check.
h.SetServingStatus(grpc_health_v1.HealthCheckResponse_SERVING)
resp, err = h.Check(t.Context(), &grpc_health_v1.HealthCheckRequest{})
require.NoError(t, err)
assert.Equal(t, grpc_health_v1.HealthCheckResponse_SERVING, resp.GetStatus())
}

func TestHealthServer_ShutdownIgnoresStatusChange(t *testing.T) {
h := NewHeathServer(nil, discardLog())
h.SetServingStatus(grpc_health_v1.HealthCheckResponse_SERVING)

h.Shutdown()

// After Shutdown, further status changes must be ignored.
h.SetServingStatus(grpc_health_v1.HealthCheckResponse_NOT_SERVING)

resp, err := h.Check(t.Context(), &grpc_health_v1.HealthCheckRequest{})
require.NoError(t, err)
assert.Equal(t, grpc_health_v1.HealthCheckResponse_SERVING, resp.GetStatus(),
"status must not change after Shutdown")
}

func TestHealthServer_Watch(t *testing.T) {
h := NewHeathServer(nil, discardLog())

ctx, cancel := context.WithCancel(t.Context())
defer cancel()

stream := &fakeWatchStream{
ctx: ctx,
sent: make(chan *grpc_health_v1.HealthCheckResponse, 8),
}

watchErr := make(chan error, 1)
go func() { watchErr <- h.Watch(&grpc_health_v1.HealthCheckRequest{}, stream) }()

// Watch emits the current status immediately on subscribe.
assert.Equal(t, grpc_health_v1.HealthCheckResponse_NOT_SERVING, recvStatus(t, stream.sent))

// A status change is streamed to the watcher.
h.SetServingStatus(grpc_health_v1.HealthCheckResponse_SERVING)
assert.Equal(t, grpc_health_v1.HealthCheckResponse_SERVING, recvStatus(t, stream.sent))

// Canceling the stream context ends Watch with a Canceled status.
cancel()

select {
case err := <-watchErr:
require.Error(t, err)
assert.Equal(t, codes.Canceled, status.Code(err))
case <-time.After(5 * time.Second):
t.Fatal("Watch did not return after context cancellation")
}
}

func recvStatus(t *testing.T, ch <-chan *grpc_health_v1.HealthCheckResponse) grpc_health_v1.HealthCheckResponse_ServingStatus {
t.Helper()
select {
case resp := <-ch:
return resp.GetStatus()
case <-time.After(5 * time.Second):
t.Fatal("timed out waiting for a health status update")
return 0
}
}

func TestHealthServer_RegisterServer(t *testing.T) {
h := NewHeathServer(nil, discardLog())
srv := grpc.NewServer()
t.Cleanup(srv.Stop)

// RegisterServer must wire the health service onto the gRPC server.
require.NotPanics(t, func() { h.RegisterServer(srv) })
}
83 changes: 83 additions & 0 deletions metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package grpc

import (
"testing"

"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/roadrunner-server/pool/v2/fsm"
"github.com/roadrunner-server/pool/v2/state/process"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// fakeInformer returns a fixed set of worker states so the StatsExporter can be
// exercised without a running worker pool.
type fakeInformer struct {
states []*process.State
}

func (f *fakeInformer) Workers() []*process.State { return f.states }

func TestStatsExporter_DescribeAndCollect(t *testing.T) {
inf := &fakeInformer{states: []*process.State{
{Pid: 1, Status: fsm.StateReady, StatusStr: "ready", MemoryUsage: 100},
{Pid: 2, Status: fsm.StateWorking, StatusStr: "working", MemoryUsage: 200},
{Pid: 3, Status: fsm.StateErrored, StatusStr: "errored", MemoryUsage: 300},
}}

exp := newStatsExporter(inf)

// Describe must announce every descriptor the collector can emit.
descCh := make(chan *prometheus.Desc, 16)
exp.Describe(descCh)
assert.Len(t, descCh, 7, "StatsExporter must describe all 7 descriptors")

// Collect through a registry so the metric families can be asserted by name.
reg := prometheus.NewRegistry()
require.NoError(t, reg.Register(exp))

mfs, err := reg.Gather()
require.NoError(t, err)

byName := make(map[string]*dto.MetricFamily, len(mfs))
for _, mf := range mfs {
byName[mf.GetName()] = mf
}

// One worker lands in each branch of Collect's switch: ready / working /
// everything-else (StateErrored falls through to the invalid bucket).
assert.Equal(t, float64(1), gaugeValue(t, byName, "rr_grpc_workers_ready"))
assert.Equal(t, float64(1), gaugeValue(t, byName, "rr_grpc_workers_working"))
assert.Equal(t, float64(1), gaugeValue(t, byName, "rr_grpc_workers_invalid"))

// Totals: 3 workers, cumulative RSS = 100 + 200 + 300.
assert.Equal(t, float64(3), gaugeValue(t, byName, "rr_grpc_total_workers"))
assert.Equal(t, float64(600), gaugeValue(t, byName, "rr_grpc_workers_memory_bytes"))

// Per-worker series: one sample per worker for state and memory.
require.Contains(t, byName, "rr_grpc_worker_state")
require.Contains(t, byName, "rr_grpc_worker_memory_bytes")
assert.Len(t, byName["rr_grpc_worker_state"].GetMetric(), 3)
assert.Len(t, byName["rr_grpc_worker_memory_bytes"].GetMetric(), 3)
}

// gaugeValue returns the single gauge sample for a label-less metric family.
func gaugeValue(t *testing.T, byName map[string]*dto.MetricFamily, name string) float64 {
t.Helper()
mf, ok := byName[name]
require.Truef(t, ok, "metric %q was not collected", name)
require.Lenf(t, mf.GetMetric(), 1, "metric %q must have a single sample", name)
return mf.GetMetric()[0].GetGauge().GetValue()
}

func TestMetricsCollector(t *testing.T) {
p := &Plugin{
statsExporter: newStatsExporter(&fakeInformer{}),
queueSize: prometheus.NewGauge(prometheus.GaugeOpts{Name: "q"}),
requestCounter: prometheus.NewCounterVec(prometheus.CounterOpts{Name: "c"}, []string{"l"}),
requestDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: "d"}, []string{"l"}),
}

assert.Len(t, p.MetricsCollector(), 4)
}
27 changes: 14 additions & 13 deletions plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"sync"

"github.com/prometheus/client_golang/prometheus"
"github.com/roadrunner-server/pool/v2/pool/static_pool"
"github.com/roadrunner-server/tcplisten"
"go.opentelemetry.io/otel/propagation"

Expand Down Expand Up @@ -137,8 +136,12 @@ func (p *Plugin) Serve() chan error {
p.mu.Lock()
defer p.mu.Unlock()

var err error
p.gPool, err = p.rrServer.NewPool(context.Background(), &pool.Config{
// NewPool returns a concrete *static_pool.Pool; on error it is a nil
// pointer. Assigning it directly to the api.Pool interface field would
// produce a non-nil interface wrapping a nil pointer, so Stop's
// `p.gPool != nil` guard would pass and Destroy would panic. Assign the
// interface only after a successful NewPool.
gPool, err := p.rrServer.NewPool(context.Background(), &pool.Config{
Debug: p.config.GrpcPool.Debug,
Command: p.config.GrpcPool.Command,
NumWorkers: p.config.GrpcPool.NumWorkers,
Expand All @@ -151,6 +154,7 @@ func (p *Plugin) Serve() chan error {
errCh <- errors.E(op, err)
return errCh
}
p.gPool = gPool

p.server, err = p.createGRPCserver(p.interceptors)
if err != nil {
Expand Down Expand Up @@ -194,23 +198,20 @@ func (p *Plugin) Stop(ctx context.Context) error {
p.mu.Lock()
defer p.mu.Unlock()

p.healthServer.SetServingStatus(grpc_health_v1.HealthCheckResponse_NOT_SERVING)
if p.healthServer != nil {
p.healthServer.SetServingStatus(grpc_health_v1.HealthCheckResponse_NOT_SERVING)
}

if p.server != nil {
p.server.GracefulStop()
}

p.healthServer.Shutdown()
if p.healthServer != nil {
p.healthServer.Shutdown()
}

if p.gPool != nil {
switch pp := p.gPool.(type) {
case *static_pool.Pool:
if pp != nil {
pp.Destroy(ctx)
}
default:
// pool is nil, nothing to do
}
p.gPool.Destroy(ctx)
}

finCh <- struct{}{}
Expand Down
Loading
Loading