From 019805748ba54ca495a1b4cbef7a0260c52010cf Mon Sep 17 00:00:00 2001 From: Alexis Montagne Date: Fri, 26 Jun 2026 09:00:12 -0700 Subject: [PATCH] upfluence/http/server: Accept ADMIN_PORT to start a side server with all the control endpoints --- lib/upfluence/endpoint/api_endpoint.rb | 2 +- lib/upfluence/http/endpoint/api_endpoint.rb | 2 +- lib/upfluence/http/server.rb | 119 ++++++++++++++------ spec/upfluence/http/server_spec.rb | 110 ++++++++++++++++++ 4 files changed, 196 insertions(+), 37 deletions(-) create mode 100644 spec/upfluence/http/server_spec.rb diff --git a/lib/upfluence/endpoint/api_endpoint.rb b/lib/upfluence/endpoint/api_endpoint.rb index 30c56ef..7282e58 100644 --- a/lib/upfluence/endpoint/api_endpoint.rb +++ b/lib/upfluence/endpoint/api_endpoint.rb @@ -1,4 +1,4 @@ -require 'sinatra' +require 'sinatra/base' require 'upfluence/http/endpoint/api_endpoint' module Upfluence diff --git a/lib/upfluence/http/endpoint/api_endpoint.rb b/lib/upfluence/http/endpoint/api_endpoint.rb index 10c8487..7be46e1 100644 --- a/lib/upfluence/http/endpoint/api_endpoint.rb +++ b/lib/upfluence/http/endpoint/api_endpoint.rb @@ -1,4 +1,4 @@ -require 'sinatra' +require 'sinatra/base' require 'active_record' require 'active_support/hash_with_indifferent_access' require 'upfluence/http/endpoint/validation_error' diff --git a/lib/upfluence/http/server.rb b/lib/upfluence/http/server.rb index 8089c9a..dc1409e 100644 --- a/lib/upfluence/http/server.rb +++ b/lib/upfluence/http/server.rb @@ -3,7 +3,7 @@ require 'rack/timeout/base' require 'prometheus/client' require 'prometheus/client/push' -require "prometheus/middleware/exporter" +require 'prometheus/middleware/exporter' require 'upfluence/environment' require 'upfluence/error_logger' @@ -48,19 +48,77 @@ class Server Instrumentation::GCInstrumenter.new, Instrumentation::ActiveRecordPoolInstrumenter.new ], + admin_port: ENV.fetch('ADMIN_PORT', nil), debug: ENV.fetch('DEBUG', nil) } def initialize(options = {}, &block) @options = DEFAULT_OPTIONS.dup.merge(options) opts = @options - base_handler = nil if opts[:base_handler_klass] - base_handler = opts[:base_handler_klass].new(@options[:interfaces]) + @base_handler = opts[:base_handler_klass].new(opts[:interfaces]) end - @builder = Builder.new do + @admin_builder = admin_builder(opts) if opts[:admin_port] + @production_builder = production_builder(opts, &block) + @handler = Rack::Handler.get(opts[:server]) + end + + def serve + ENV['RACK_ENV'] = Upfluence.env.to_s + + Thread.new { run_prometheus_exporter } if @options[:push_gateway_url] + + @options[:instrumentations].each(&:start) + + if @admin_builder + admin_port = @options[:admin_port].to_i + + Thread.new do + run_builder(@admin_builder, Port: admin_port) + end + + Upfluence.logger.info( + "Admin server listening on #{@options[:Host]}:#{admin_port}" + ) + end + + run_builder(@production_builder) + end + + class << self + def request + Thread.current[REQUEST_CONTEXT_KEY] + end + + def request=(req) + Thread.current[REQUEST_CONTEXT_KEY] = req + end + end + + private + + def run_builder(builder, **overrides) + opts = @options.merge(overrides) + + @handler.run(builder, **opts) do |server| + server.threaded = opts[:threaded] if server.respond_to? :threaded= + + # Thin does not recognize the max_thread argument, howerver it has a + # threadpool_size setter. Puma on the other hand recognize max_thread. + if server.respond_to?(:threadpool_size=) && opts[:max_threads] + server.threadpool_size = opts[:max_threads] + end + end + end + + def production_builder(opts, &block) + base_handler = @base_handler + has_admin = !opts[:admin_port].nil? + mount = method(:mount_admin_endpoints) + + Builder.new do use Middleware::RequestStapler use Middleware::Logger use Middleware::Prometheus @@ -72,7 +130,6 @@ def initialize(options = {}, &block) end use Upfluence.error_logger.middleware - use Prometheus::Middleware::Exporter if opts[:prometheus_endpoint] use Rack::ContentLength use Rack::Chunked @@ -86,54 +143,46 @@ def initialize(options = {}, &block) use(*m) end + mount.call(self, opts) unless has_admin + map '/healthcheck' do run(opts[:healthcheck_endpoint] || Endpoint::Healthcheck.new) end - if opts[:base_processor_klass] && base_handler - map '/base' do - run_thrift(opts[:base_processor_klass], base_handler) - end - end - - map('/debug') { run(Endpoint::Profiler.new) } if opts[:debug] - instance_eval(&block) end - - @handler = Rack::Handler.get(@options[:server]) end - def serve - ENV['RACK_ENV'] = Upfluence.env.to_s - - Thread.new { run_prometheus_exporter } if @options[:push_gateway_url] - - @options[:instrumentations].each(&:start) + def admin_builder(opts) + mount = method(:mount_admin_endpoints) - @handler.run(@builder, **@options) do |server| - server.threaded = @options[:threaded] if server.respond_to? :threaded= + Builder.new do + use Rack::ContentLength - # Thin does not recognize the max_thread argument, howerver it has a - # threadpool_size setter. Puma on the other hand recognize max_thread. - if server.respond_to?(:threadpool_size=) && @options[:max_threads] - server.threadpool_size = @options[:max_threads] + map '/healthcheck' do + run(opts[:healthcheck_endpoint] || Endpoint::Healthcheck.new) end + + mount.call(self, opts) end end - class << self - def request - Thread.current[REQUEST_CONTEXT_KEY] - end + def mount_admin_endpoints(builder, opts) + base_handler = @base_handler - def request=(req) - Thread.current[REQUEST_CONTEXT_KEY] = req + builder.instance_eval do + use Prometheus::Middleware::Exporter if opts[:prometheus_endpoint] + + if opts[:base_processor_klass] && base_handler + map '/base' do + run_thrift(opts[:base_processor_klass], base_handler) + end + end + + map('/debug') { run(Endpoint::Profiler.new) } if opts[:debug] end end - private - def run_prometheus_exporter push = Prometheus::Client::Push.new( @options[:app_name], diff --git a/spec/upfluence/http/server_spec.rb b/spec/upfluence/http/server_spec.rb new file mode 100644 index 0000000..cb2318e --- /dev/null +++ b/spec/upfluence/http/server_spec.rb @@ -0,0 +1,110 @@ +require 'spec_helper' +require 'rack/mock' +require 'upfluence/http/server' + +RSpec.describe Upfluence::HTTP::Server do + def build_server(**opts, &block) + block ||= proc { run ->(_env) { [200, {}, ["app\n"]] } } + + described_class.new( + prometheus_endpoint: false, + instrumentations: [], + **opts, + &block + ) + end + + def mock_request(builder, path, method: :get) + Rack::MockRequest.new(builder).send(method, path) + end + + describe 'without admin_port' do + let(:server) { build_server(admin_port: nil) } + let(:builder) { server.instance_variable_get(:@production_builder) } + + it 'does not create an admin builder' do + expect(server.instance_variable_get(:@admin_builder)).to be_nil + end + + it 'serves /healthcheck' do + resp = mock_request(builder, '/healthcheck') + + expect(resp.status).to eq(200) + expect(resp.body).to eq("ok\n") + end + + it 'serves the app' do + resp = mock_request(builder, '/') + + expect(resp.status).to eq(200) + expect(resp.body).to eq("app\n") + end + + context 'with debug enabled' do + let(:server) { build_server(admin_port: nil, debug: '1') } + + it 'serves /debug on the production builder' do + resp = mock_request(builder, '/debug/profile') + + expect(resp.body).to eq('No profile available') + end + end + + context 'with debug disabled' do + it 'does not mount /debug' do + resp = mock_request(builder, '/debug/start') + + expect(resp.body).to eq("app\n") + end + end + end + + describe 'with admin_port' do + let(:server) { build_server(admin_port: 9394, debug: debug) } + let(:production) { server.instance_variable_get(:@production_builder) } + let(:admin) { server.instance_variable_get(:@admin_builder) } + + context 'with debug disabled' do + let(:debug) { nil } + + it 'serves /healthcheck on both builders' do + expect(mock_request(production, '/healthcheck').status).to eq(200) + expect(mock_request(admin, '/healthcheck').status).to eq(200) + end + + it 'serves the app on the production builder' do + resp = mock_request(production, '/') + + expect(resp.status).to eq(200) + expect(resp.body).to eq("app\n") + end + + it 'does not mount /debug on either builder' do + expect(mock_request(production, '/debug/start').body).to eq("app\n") + expect(mock_request(admin, '/debug/start').status).to eq(404) + end + end + + context 'with debug enabled' do + let(:debug) { '1' } + + it 'serves /debug only on the admin builder' do + expect(mock_request(admin, '/debug/profile').body).to eq('No profile available') + expect(mock_request(production, '/debug/profile').body).to eq("app\n") + end + end + end + + describe 'custom healthcheck endpoint' do + let(:custom) { ->(_env) { [200, {}, ["custom\n"]] } } + let(:server) { build_server(healthcheck_endpoint: custom) } + let(:builder) { server.instance_variable_get(:@production_builder) } + + it 'uses the custom endpoint' do + resp = mock_request(builder, '/healthcheck') + + expect(resp.status).to eq(200) + expect(resp.body).to eq("custom\n") + end + end +end