Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/upfluence/endpoint/api_endpoint.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
require 'sinatra'
require 'sinatra/base'
require 'upfluence/http/endpoint/api_endpoint'

module Upfluence
Expand Down
2 changes: 1 addition & 1 deletion lib/upfluence/http/endpoint/api_endpoint.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
require 'sinatra'
require 'sinatra/base'
require 'active_record'
require 'active_support/hash_with_indifferent_access'
require 'upfluence/http/endpoint/validation_error'
Expand Down
119 changes: 84 additions & 35 deletions lib/upfluence/http/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
require 'rack/timeout/base'
require 'prometheus/client'
require 'prometheus/client/push'
require "prometheus/middleware/exporter"
require 'prometheus/middleware/exporter'

require 'upfluence/environment'
require 'upfluence/error_logger'
Expand Down Expand Up @@ -48,19 +48,77 @@ class Server
Instrumentation::GCInstrumenter.new,
Instrumentation::ActiveRecordPoolInstrumenter.new
],
admin_port: ENV.fetch('ADMIN_PORT', nil),
debug: ENV.fetch('DEBUG', nil)
}

def initialize(options = {}, &block)
@options = DEFAULT_OPTIONS.dup.merge(options)
opts = @options
base_handler = nil

if opts[:base_handler_klass]
base_handler = opts[:base_handler_klass].new(@options[:interfaces])
@base_handler = opts[:base_handler_klass].new(opts[:interfaces])
end

@builder = Builder.new do
@admin_builder = admin_builder(opts) if opts[:admin_port]
@production_builder = production_builder(opts, &block)
@handler = Rack::Handler.get(opts[:server])
end

def serve
ENV['RACK_ENV'] = Upfluence.env.to_s

Thread.new { run_prometheus_exporter } if @options[:push_gateway_url]

@options[:instrumentations].each(&:start)

if @admin_builder
admin_port = @options[:admin_port].to_i

Thread.new do
run_builder(@admin_builder, Port: admin_port)
end

Upfluence.logger.info(
"Admin server listening on #{@options[:Host]}:#{admin_port}"
)
end

run_builder(@production_builder)
end

class << self
def request
Thread.current[REQUEST_CONTEXT_KEY]
end

def request=(req)
Thread.current[REQUEST_CONTEXT_KEY] = req
end
end

private

def run_builder(builder, **overrides)
opts = @options.merge(overrides)

@handler.run(builder, **opts) do |server|
server.threaded = opts[:threaded] if server.respond_to? :threaded=

# Thin does not recognize the max_thread argument, howerver it has a
# threadpool_size setter. Puma on the other hand recognize max_thread.
if server.respond_to?(:threadpool_size=) && opts[:max_threads]
server.threadpool_size = opts[:max_threads]
end
end
end

def production_builder(opts, &block)
base_handler = @base_handler
has_admin = !opts[:admin_port].nil?
mount = method(:mount_admin_endpoints)

Builder.new do
use Middleware::RequestStapler
use Middleware::Logger
use Middleware::Prometheus
Expand All @@ -72,7 +130,6 @@ def initialize(options = {}, &block)
end

use Upfluence.error_logger.middleware
use Prometheus::Middleware::Exporter if opts[:prometheus_endpoint]

use Rack::ContentLength
use Rack::Chunked
Expand All @@ -86,54 +143,46 @@ def initialize(options = {}, &block)
use(*m)
end

mount.call(self, opts) unless has_admin

map '/healthcheck' do
run(opts[:healthcheck_endpoint] || Endpoint::Healthcheck.new)
end

if opts[:base_processor_klass] && base_handler
map '/base' do
run_thrift(opts[:base_processor_klass], base_handler)
end
end

map('/debug') { run(Endpoint::Profiler.new) } if opts[:debug]

instance_eval(&block)
end

@handler = Rack::Handler.get(@options[:server])
end

def serve
ENV['RACK_ENV'] = Upfluence.env.to_s

Thread.new { run_prometheus_exporter } if @options[:push_gateway_url]

@options[:instrumentations].each(&:start)
def admin_builder(opts)
mount = method(:mount_admin_endpoints)

@handler.run(@builder, **@options) do |server|
server.threaded = @options[:threaded] if server.respond_to? :threaded=
Builder.new do
use Rack::ContentLength

# Thin does not recognize the max_thread argument, howerver it has a
# threadpool_size setter. Puma on the other hand recognize max_thread.
if server.respond_to?(:threadpool_size=) && @options[:max_threads]
server.threadpool_size = @options[:max_threads]
map '/healthcheck' do
run(opts[:healthcheck_endpoint] || Endpoint::Healthcheck.new)
end

mount.call(self, opts)
end
end

class << self
def request
Thread.current[REQUEST_CONTEXT_KEY]
end
def mount_admin_endpoints(builder, opts)
base_handler = @base_handler

def request=(req)
Thread.current[REQUEST_CONTEXT_KEY] = req
builder.instance_eval do
use Prometheus::Middleware::Exporter if opts[:prometheus_endpoint]

if opts[:base_processor_klass] && base_handler
map '/base' do
run_thrift(opts[:base_processor_klass], base_handler)
end
end

map('/debug') { run(Endpoint::Profiler.new) } if opts[:debug]
end
end

private

def run_prometheus_exporter
push = Prometheus::Client::Push.new(
@options[:app_name],
Expand Down
110 changes: 110 additions & 0 deletions spec/upfluence/http/server_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
require 'spec_helper'
require 'rack/mock'
require 'upfluence/http/server'

RSpec.describe Upfluence::HTTP::Server do
def build_server(**opts, &block)
block ||= proc { run ->(_env) { [200, {}, ["app\n"]] } }

described_class.new(
prometheus_endpoint: false,
instrumentations: [],
**opts,
&block
)
end

def mock_request(builder, path, method: :get)
Rack::MockRequest.new(builder).send(method, path)
end

describe 'without admin_port' do
let(:server) { build_server(admin_port: nil) }
let(:builder) { server.instance_variable_get(:@production_builder) }

it 'does not create an admin builder' do
expect(server.instance_variable_get(:@admin_builder)).to be_nil
end

it 'serves /healthcheck' do
resp = mock_request(builder, '/healthcheck')

expect(resp.status).to eq(200)
expect(resp.body).to eq("ok\n")
end

it 'serves the app' do
resp = mock_request(builder, '/')

expect(resp.status).to eq(200)
expect(resp.body).to eq("app\n")
end

context 'with debug enabled' do
let(:server) { build_server(admin_port: nil, debug: '1') }

it 'serves /debug on the production builder' do
resp = mock_request(builder, '/debug/profile')

expect(resp.body).to eq('No profile available')
end
end

context 'with debug disabled' do
it 'does not mount /debug' do
resp = mock_request(builder, '/debug/start')

expect(resp.body).to eq("app\n")
end
end
end

describe 'with admin_port' do
let(:server) { build_server(admin_port: 9394, debug: debug) }
let(:production) { server.instance_variable_get(:@production_builder) }
let(:admin) { server.instance_variable_get(:@admin_builder) }

context 'with debug disabled' do
let(:debug) { nil }

it 'serves /healthcheck on both builders' do
expect(mock_request(production, '/healthcheck').status).to eq(200)
expect(mock_request(admin, '/healthcheck').status).to eq(200)
end

it 'serves the app on the production builder' do
resp = mock_request(production, '/')

expect(resp.status).to eq(200)
expect(resp.body).to eq("app\n")
end

it 'does not mount /debug on either builder' do
expect(mock_request(production, '/debug/start').body).to eq("app\n")
expect(mock_request(admin, '/debug/start').status).to eq(404)
end
end

context 'with debug enabled' do
let(:debug) { '1' }

it 'serves /debug only on the admin builder' do
expect(mock_request(admin, '/debug/profile').body).to eq('No profile available')
expect(mock_request(production, '/debug/profile').body).to eq("app\n")
end
end
end

describe 'custom healthcheck endpoint' do
let(:custom) { ->(_env) { [200, {}, ["custom\n"]] } }
let(:server) { build_server(healthcheck_endpoint: custom) }
let(:builder) { server.instance_variable_get(:@production_builder) }

it 'uses the custom endpoint' do
resp = mock_request(builder, '/healthcheck')

expect(resp.status).to eq(200)
expect(resp.body).to eq("custom\n")
end
end
end