diff --git a/lib/upfluence.rb b/lib/upfluence.rb index 4754a89..fc26b7b 100644 --- a/lib/upfluence.rb +++ b/lib/upfluence.rb @@ -1,3 +1,4 @@ +require 'upfluence/context' require 'upfluence/utils' require 'upfluence/endpoint/api_endpoint' require 'upfluence/mixin/strong_parameters' diff --git a/lib/upfluence/context.rb b/lib/upfluence/context.rb new file mode 100644 index 0000000..2f71f71 --- /dev/null +++ b/lib/upfluence/context.rb @@ -0,0 +1,54 @@ +# frozen_string_literal: true + +require 'upfluence/thread' + +module Upfluence + CONTEXT_KEY = :upfluence_context + + class Context + attr_reader :deadline + + def initialize + @deadline = nil + end + + def timeout + return nil if @deadline.nil? + + [(@deadline - Time.now).to_f, 0.0].max + end + + def with_deadline(deadline, override: false) + previous = @deadline + @deadline = override || previous.nil? ? deadline : [previous, deadline].min + + yield + ensure + @deadline = previous + end + + def with_timeout(duration, override: false) + with_deadline(Time.now + duration, override: override) { yield } + end + + class ThreadWrapper + class << self + def wrap_thread(thr, block) + parent_deadline = thr[CONTEXT_KEY]&.deadline + + return block.call unless parent_deadline + + Upfluence.context.with_deadline(parent_deadline) { block.call } + end + end + end + + ::Upfluence::Thread.register_wrapper :context, ThreadWrapper + end + + class << self + def context + Thread.current[CONTEXT_KEY] ||= Context.new + end + end +end diff --git a/lib/upfluence/http/middleware/request_stapler.rb b/lib/upfluence/http/middleware/request_stapler.rb index de9a44b..0fe2a0d 100644 --- a/lib/upfluence/http/middleware/request_stapler.rb +++ b/lib/upfluence/http/middleware/request_stapler.rb @@ -1,14 +1,20 @@ +require 'upfluence/context' + module Upfluence module HTTP module Middleware class RequestStapler - def initialize(app) + def initialize(app, timeout: nil) + @timeout = timeout @app = app end def call(env) Server.request = Rack::Request.new(env) - @app.call(env) + + return @app.call(env) unless @timeout + + Upfluence.context.with_timeout(@timeout) { @app.call(env) } end end end diff --git a/lib/upfluence/http/server.rb b/lib/upfluence/http/server.rb index dc1409e..d4d4367 100644 --- a/lib/upfluence/http/server.rb +++ b/lib/upfluence/http/server.rb @@ -119,7 +119,7 @@ def production_builder(opts, &block) mount = method(:mount_admin_endpoints) Builder.new do - use Middleware::RequestStapler + use Middleware::RequestStapler, timeout: opts[:request_timeout] use Middleware::Logger use Middleware::Prometheus use Middleware::ApplicationHeaders, base_handler diff --git a/lib/upfluence/pool.rb b/lib/upfluence/pool.rb index d3cbf60..59cedd8 100644 --- a/lib/upfluence/pool.rb +++ b/lib/upfluence/pool.rb @@ -1,5 +1,5 @@ require 'thread' -require 'timeout' +require 'upfluence/timeout' module Upfluence class Pool @@ -39,7 +39,8 @@ def push(obj) def pop(options = {}) timeout = options.fetch :timeout, @timeout - deadline = Time.now + timeout + effective = [timeout, Upfluence.context.timeout].compact.min + deadline = Time.now + effective @mutex.synchronize do loop do @@ -49,7 +50,7 @@ def pop(options = {}) return connection if connection to_wait = deadline - Time.now - raise Timeout::Error, "Waited #{timeout} sec" if to_wait <= 0 + raise ::Timeout::Error, "Waited #{timeout} sec" if to_wait <= 0 @resource.wait(@mutex, to_wait) end end diff --git a/lib/upfluence/timeout.rb b/lib/upfluence/timeout.rb new file mode 100644 index 0000000..bdf0109 --- /dev/null +++ b/lib/upfluence/timeout.rb @@ -0,0 +1,18 @@ +# frozen_string_literal: true + +require 'timeout' +require 'upfluence/context' + +module Upfluence + module Timeout + def self.timeout(sec, klass = nil, message = nil, &block) + effective = [sec, Upfluence.context.timeout].compact.min + + raise(klass || ::Timeout::Error, message) if effective <= 0 + + Upfluence.context.with_timeout(effective) do + ::Timeout.timeout(effective, klass, message, &block) + end + end + end +end diff --git a/lib/upfluence/utils/thrift/middleware/timeout.rb b/lib/upfluence/utils/thrift/middleware/timeout.rb index 856cf8a..46b60f9 100644 --- a/lib/upfluence/utils/thrift/middleware/timeout.rb +++ b/lib/upfluence/utils/thrift/middleware/timeout.rb @@ -1,3 +1,5 @@ +require 'upfluence/timeout' + module Upfluence module Utils module Thrift @@ -9,7 +11,7 @@ def initialize(app, duration) end def method_missing(method, *args, &block) - ::Timeout.timeout(@duration) { @app.send(method, *args, &block) } + Upfluence::Timeout.timeout(@duration) { @app.send(method, *args, &block) } rescue ::Timeout::Error raise ::Thrift::ApplicationException.new( ::Thrift::ApplicationException::INTERNAL_ERROR, diff --git a/spec/upfluence/context_spec.rb b/spec/upfluence/context_spec.rb new file mode 100644 index 0000000..f98205f --- /dev/null +++ b/spec/upfluence/context_spec.rb @@ -0,0 +1,160 @@ +# frozen_string_literal: true + +require 'spec_helper' +require 'upfluence/context' + +RSpec.describe Upfluence::Context do + subject(:context) { described_class.new } + + describe '#deadline' do + it { expect(context.deadline).to be_nil } + end + + describe '#timeout' do + it { expect(context.timeout).to be_nil } + + context 'with a deadline set' do + before { context.with_deadline(Time.now + 10) { @timeout = context.timeout } } + + it { expect(@timeout).to be_within(0.1).of(10.0) } + end + end + + describe '#with_deadline' do + let(:deadline) { Time.now + 30 } + + it 'sets the deadline within the block' do + context.with_deadline(deadline) do + expect(context.deadline).to eq(deadline) + end + end + + it 'restores the previous deadline after the block' do + context.with_deadline(deadline) {} + + expect(context.deadline).to be_nil + end + + it 'restores the deadline even when the block raises' do + context.with_deadline(deadline) { raise 'boom' } rescue nil + + expect(context.deadline).to be_nil + end + + context 'when a deadline is already set' do + let(:outer) { Time.now + 60 } + + it 'uses the earlier deadline by default' do + context.with_deadline(outer) do + context.with_deadline(deadline) do + expect(context.deadline).to eq(deadline) + end + end + end + + it 'keeps the outer deadline when it is earlier' do + context.with_deadline(deadline) do + context.with_deadline(outer) do + expect(context.deadline).to eq(deadline) + end + end + end + + it 'overrides regardless when override: true' do + context.with_deadline(deadline) do + context.with_deadline(outer, override: true) do + expect(context.deadline).to eq(outer) + end + end + end + + it 'restores the outer deadline after the block' do + context.with_deadline(outer) do + context.with_deadline(deadline) {} + + expect(context.deadline).to eq(outer) + end + end + end + end + + describe '#with_timeout' do + it 'sets a deadline relative to now within the block' do + context.with_timeout(10) do + expect(context.timeout).to be_within(0.1).of(10.0) + end + end + + it 'restores the previous deadline after the block' do + context.with_timeout(10) {} + + expect(context.deadline).to be_nil + end + + context 'when a shorter deadline is already set' do + it 'keeps the shorter existing deadline' do + context.with_timeout(5) do + context.with_timeout(30) do + expect(context.timeout).to be_within(0.1).of(5.0) + end + end + end + end + + context 'with override: true' do + it 'sets the deadline regardless of existing deadline' do + context.with_timeout(5) do + context.with_timeout(30, override: true) do + expect(context.timeout).to be_within(0.1).of(30.0) + end + end + end + end + end +end + +RSpec.describe Upfluence do + describe '.context' do + subject { described_class.context } + + it { is_expected.to be_a(Upfluence::Context) } + + it 'returns the same instance within the same thread' do + expect(described_class.context).to equal(described_class.context) + end + + it 'returns a different instance in a different thread' do + other = Thread.new { described_class.context }.value + + expect(other).not_to equal(described_class.context) + end + end + + describe 'thread wrapper' do + it 'propagates the deadline to child threads' do + deadline = Time.now + 30 + + described_class.context.with_deadline(deadline) do + child_deadline = Upfluence::Thread.new { described_class.context.deadline }.value + + expect(child_deadline).to eq(deadline) + end + end + + it 'does not propagate deadline to child threads when none is set' do + child_deadline = Upfluence::Thread.new { described_class.context.deadline }.value + + expect(child_deadline).to be_nil + end + + it 'does not leak the deadline back to the parent after the child exits' do + deadline = Time.now + 30 + + described_class.context.with_deadline(deadline) do + Upfluence::Thread.new { described_class.context.with_deadline(Time.now + 999) {} }.join + end + + expect(described_class.context.deadline).to be_nil + end + end +end diff --git a/spec/upfluence/http/server_spec.rb b/spec/upfluence/http/server_spec.rb index cb2318e..c313798 100644 --- a/spec/upfluence/http/server_spec.rb +++ b/spec/upfluence/http/server_spec.rb @@ -1,6 +1,7 @@ require 'spec_helper' require 'rack/mock' require 'upfluence/http/server' +require 'upfluence/context' RSpec.describe Upfluence::HTTP::Server do def build_server(**opts, &block) @@ -107,4 +108,31 @@ def mock_request(builder, path, method: :get) expect(resp.body).to eq("custom\n") end end + + describe 'request_timeout' do + let(:captured_timeout) { [] } + let(:server) do + captured = captured_timeout + + build_server(request_timeout: 30, admin_port: nil) do + run ->(_env) { + captured << Upfluence.context.timeout + [200, {}, ['ok']] + } + end + end + let(:builder) { server.instance_variable_get(:@production_builder) } + + it 'sets the context timeout for the duration of the request' do + mock_request(builder, '/') + + expect(captured_timeout.first).to be_within(0.1).of(30.0) + end + + it 'clears the context timeout after the request' do + mock_request(builder, '/') + + expect(Upfluence.context.timeout).to be_nil + end + end end diff --git a/spec/upfluence/timeout_spec.rb b/spec/upfluence/timeout_spec.rb new file mode 100644 index 0000000..c425fc4 --- /dev/null +++ b/spec/upfluence/timeout_spec.rb @@ -0,0 +1,99 @@ +# frozen_string_literal: true + +require 'spec_helper' +require 'upfluence/timeout' + +RSpec.describe Upfluence::Timeout do + describe '.timeout' do + subject(:call) { described_class.timeout(sec, &block) } + + let(:block) { ->(_sec) { :ok } } + + context 'with no context deadline' do + let(:sec) { 5 } + + it { is_expected.to eq(:ok) } + + it 'uses the requested duration' do + expect(::Timeout).to receive(:timeout).with(5, nil, nil) + + call + end + end + + context 'with a context deadline shorter than the requested duration' do + let(:sec) { 30 } + + before { Upfluence.context.with_deadline(Time.now + 5) { @result = described_class.timeout(sec, &block) } } + + it 'uses the context timeout' do + expect(::Timeout).to receive(:timeout).with(be_within(0.1).of(5.0), nil, nil) + + Upfluence.context.with_deadline(Time.now + 5) { call } + end + end + + context 'with a context deadline longer than the requested duration' do + let(:sec) { 5 } + + it 'uses the requested duration' do + expect(::Timeout).to receive(:timeout).with(5, nil, nil) + + Upfluence.context.with_deadline(Time.now + 30) { call } + end + end + + context 'with a custom exception class' do + let(:sec) { 5 } + let(:klass) { Class.new(StandardError) } + + it 'passes it through to Timeout' do + expect(::Timeout).to receive(:timeout).with(5, klass, nil) + + described_class.timeout(sec, klass, &block) + end + end + + context 'with a custom message' do + let(:sec) { 5 } + + it 'passes it through to Timeout' do + expect(::Timeout).to receive(:timeout).with(5, nil, 'too slow') + + described_class.timeout(sec, nil, 'too slow', &block) + end + end + + context 'when the timeout fires' do + let(:sec) { 0.01 } + let(:block) { ->(_sec) { sleep 1 } } + + it 'raises Timeout::Error' do + expect { call }.to raise_error(::Timeout::Error) + end + end + + context 'when effective timeout is already 0' do + let(:sec) { 30 } + let(:block) { ->(_sec) { :ok } } + + it 'raises Timeout::Error immediately' do + expect { + Upfluence.context.with_deadline(Time.now - 1) { call } + }.to raise_error(::Timeout::Error) + end + + context 'with a custom exception class' do + let(:klass) { Class.new(StandardError) } + + it 'raises the custom class immediately' do + expect { + Upfluence.context.with_deadline(Time.now - 1) do + described_class.timeout(sec, klass, &block) + end + }.to raise_error(klass) + end + end + end + end +end