Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/upfluence.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
require 'upfluence/context'
require 'upfluence/utils'
require 'upfluence/endpoint/api_endpoint'
require 'upfluence/mixin/strong_parameters'
Expand Down
54 changes: 54 additions & 0 deletions lib/upfluence/context.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# frozen_string_literal: true

require 'upfluence/thread'

module Upfluence
CONTEXT_KEY = :upfluence_context

class Context
attr_reader :deadline

def initialize
@deadline = nil
end

def timeout
return nil if @deadline.nil?

[(@deadline - Time.now).to_f, 0.0].max
end

def with_deadline(deadline, override: false)
previous = @deadline
@deadline = override || previous.nil? ? deadline : [previous, deadline].min

yield
ensure
@deadline = previous
end

def with_timeout(duration, override: false)
with_deadline(Time.now + duration, override: override) { yield }
end

class ThreadWrapper
class << self
def wrap_thread(thr, block)
parent_deadline = thr[CONTEXT_KEY]&.deadline

return block.call unless parent_deadline

Upfluence.context.with_deadline(parent_deadline) { block.call }
end
end
end

::Upfluence::Thread.register_wrapper :context, ThreadWrapper
end

class << self
def context
Thread.current[CONTEXT_KEY] ||= Context.new
end
end
end
10 changes: 8 additions & 2 deletions lib/upfluence/http/middleware/request_stapler.rb
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
require 'upfluence/context'

module Upfluence
module HTTP
module Middleware
class RequestStapler
def initialize(app)
def initialize(app, timeout: nil)
@timeout = timeout
@app = app
end

def call(env)
Server.request = Rack::Request.new(env)
@app.call(env)

return @app.call(env) unless @timeout

Upfluence.context.with_timeout(@timeout) { @app.call(env) }
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/upfluence/http/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def production_builder(opts, &block)
mount = method(:mount_admin_endpoints)

Builder.new do
use Middleware::RequestStapler
use Middleware::RequestStapler, timeout: opts[:request_timeout]
use Middleware::Logger
use Middleware::Prometheus
use Middleware::ApplicationHeaders, base_handler
Expand Down
7 changes: 4 additions & 3 deletions lib/upfluence/pool.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
require 'thread'
require 'timeout'
require 'upfluence/timeout'

module Upfluence
class Pool
Expand Down Expand Up @@ -39,7 +39,8 @@ def push(obj)

def pop(options = {})
timeout = options.fetch :timeout, @timeout
deadline = Time.now + timeout
effective = [timeout, Upfluence.context.timeout].compact.min
deadline = Time.now + effective

@mutex.synchronize do
loop do
Expand All @@ -49,7 +50,7 @@ def pop(options = {})
return connection if connection

to_wait = deadline - Time.now
raise Timeout::Error, "Waited #{timeout} sec" if to_wait <= 0
raise ::Timeout::Error, "Waited #{timeout} sec" if to_wait <= 0
@resource.wait(@mutex, to_wait)
end
end
Expand Down
18 changes: 18 additions & 0 deletions lib/upfluence/timeout.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# frozen_string_literal: true

require 'timeout'
require 'upfluence/context'

module Upfluence
module Timeout
def self.timeout(sec, klass = nil, message = nil, &block)
effective = [sec, Upfluence.context.timeout].compact.min

raise(klass || ::Timeout::Error, message) if effective <= 0

Upfluence.context.with_timeout(effective) do
::Timeout.timeout(effective, klass, message, &block)
end
end
end
end
4 changes: 3 additions & 1 deletion lib/upfluence/utils/thrift/middleware/timeout.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
require 'upfluence/timeout'

module Upfluence
module Utils
module Thrift
Expand All @@ -9,7 +11,7 @@ def initialize(app, duration)
end

def method_missing(method, *args, &block)
::Timeout.timeout(@duration) { @app.send(method, *args, &block) }
Upfluence::Timeout.timeout(@duration) { @app.send(method, *args, &block) }
rescue ::Timeout::Error
raise ::Thrift::ApplicationException.new(
::Thrift::ApplicationException::INTERNAL_ERROR,
Expand Down
160 changes: 160 additions & 0 deletions spec/upfluence/context_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
# frozen_string_literal: true

require 'spec_helper'
require 'upfluence/context'

RSpec.describe Upfluence::Context do
subject(:context) { described_class.new }

describe '#deadline' do
it { expect(context.deadline).to be_nil }
end

describe '#timeout' do
it { expect(context.timeout).to be_nil }

context 'with a deadline set' do
before { context.with_deadline(Time.now + 10) { @timeout = context.timeout } }

it { expect(@timeout).to be_within(0.1).of(10.0) }
end
end

describe '#with_deadline' do
let(:deadline) { Time.now + 30 }

it 'sets the deadline within the block' do
context.with_deadline(deadline) do
expect(context.deadline).to eq(deadline)
end
end

it 'restores the previous deadline after the block' do
context.with_deadline(deadline) {}

expect(context.deadline).to be_nil
end

it 'restores the deadline even when the block raises' do
context.with_deadline(deadline) { raise 'boom' } rescue nil

expect(context.deadline).to be_nil
end

context 'when a deadline is already set' do
let(:outer) { Time.now + 60 }

it 'uses the earlier deadline by default' do
context.with_deadline(outer) do
context.with_deadline(deadline) do
expect(context.deadline).to eq(deadline)
end
end
end

it 'keeps the outer deadline when it is earlier' do
context.with_deadline(deadline) do
context.with_deadline(outer) do
expect(context.deadline).to eq(deadline)
end
end
end

it 'overrides regardless when override: true' do
context.with_deadline(deadline) do
context.with_deadline(outer, override: true) do
expect(context.deadline).to eq(outer)
end
end
end

it 'restores the outer deadline after the block' do
context.with_deadline(outer) do
context.with_deadline(deadline) {}

expect(context.deadline).to eq(outer)
end
end
end
end

describe '#with_timeout' do
it 'sets a deadline relative to now within the block' do
context.with_timeout(10) do
expect(context.timeout).to be_within(0.1).of(10.0)
end
end

it 'restores the previous deadline after the block' do
context.with_timeout(10) {}

expect(context.deadline).to be_nil
end

context 'when a shorter deadline is already set' do
it 'keeps the shorter existing deadline' do
context.with_timeout(5) do
context.with_timeout(30) do
expect(context.timeout).to be_within(0.1).of(5.0)
end
end
end
end

context 'with override: true' do
it 'sets the deadline regardless of existing deadline' do
context.with_timeout(5) do
context.with_timeout(30, override: true) do
expect(context.timeout).to be_within(0.1).of(30.0)
end
end
end
end
end
end

RSpec.describe Upfluence do
describe '.context' do
subject { described_class.context }

it { is_expected.to be_a(Upfluence::Context) }

it 'returns the same instance within the same thread' do
expect(described_class.context).to equal(described_class.context)
end

it 'returns a different instance in a different thread' do
other = Thread.new { described_class.context }.value

expect(other).not_to equal(described_class.context)
end
end

describe 'thread wrapper' do
it 'propagates the deadline to child threads' do
deadline = Time.now + 30

described_class.context.with_deadline(deadline) do
child_deadline = Upfluence::Thread.new { described_class.context.deadline }.value

expect(child_deadline).to eq(deadline)
end
end

it 'does not propagate deadline to child threads when none is set' do
child_deadline = Upfluence::Thread.new { described_class.context.deadline }.value

expect(child_deadline).to be_nil
end

it 'does not leak the deadline back to the parent after the child exits' do
deadline = Time.now + 30

described_class.context.with_deadline(deadline) do
Upfluence::Thread.new { described_class.context.with_deadline(Time.now + 999) {} }.join
end

expect(described_class.context.deadline).to be_nil
end
end
end
28 changes: 28 additions & 0 deletions spec/upfluence/http/server_spec.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
require 'spec_helper'
require 'rack/mock'
require 'upfluence/http/server'
require 'upfluence/context'

RSpec.describe Upfluence::HTTP::Server do
def build_server(**opts, &block)
Expand Down Expand Up @@ -107,4 +108,31 @@ def mock_request(builder, path, method: :get)
expect(resp.body).to eq("custom\n")
end
end

describe 'request_timeout' do
let(:captured_timeout) { [] }
let(:server) do
captured = captured_timeout

build_server(request_timeout: 30, admin_port: nil) do
run ->(_env) {
captured << Upfluence.context.timeout
[200, {}, ['ok']]
}
end
end
let(:builder) { server.instance_variable_get(:@production_builder) }

it 'sets the context timeout for the duration of the request' do
mock_request(builder, '/')

expect(captured_timeout.first).to be_within(0.1).of(30.0)
end

it 'clears the context timeout after the request' do
mock_request(builder, '/')

expect(Upfluence.context.timeout).to be_nil
end
end
end
Loading