Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 92 additions & 59 deletions lib/timeout.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,92 @@ def self.handle_timeout(message) # :nodoc:
end

# :stopdoc:
CONDVAR = ConditionVariable.new
QUEUE = Queue.new
QUEUE_MUTEX = Mutex.new
TIMEOUT_THREAD_MUTEX = Mutex.new
@timeout_thread = nil
private_constant :CONDVAR, :QUEUE, :QUEUE_MUTEX, :TIMEOUT_THREAD_MUTEX

# We keep a private reference so that time mocking libraries won't break Timeout.
GET_TIME = Process.method(:clock_gettime)
if defined?(Ractor.make_shareable)
# Ractor.make_shareable(Method) only works on Ruby 4+
Ractor.make_shareable(GET_TIME) rescue nil
end
private_constant :GET_TIME

class State
attr_reader :condvar, :queue, :queue_mutex # shared with Timeout.timeout()

def initialize
@condvar = ConditionVariable.new
@queue = Queue.new
@queue_mutex = Mutex.new

@timeout_thread = nil
@timeout_thread_mutex = Mutex.new
end

if defined?(Ractor.store_if_absent) && defined?(Ractor.shareable?) && Ractor.shareable?(GET_TIME)
# Ractor support if
# 1. Ractor.store_if_absent is available
# 2. Method object can be shareable (4.0~)
def self.instance
Ractor.store_if_absent :timeout_gem_state do
State.new
end
end
else
GLOBAL_STATE = State.new

def self.instance
GLOBAL_STATE
end
end

def create_timeout_thread
watcher = Thread.new do
requests = []
while true
until @queue.empty? and !requests.empty? # wait to have at least one request
req = @queue.pop
requests << req unless req.done?
end
closest_deadline = requests.min_by(&:deadline).deadline

now = 0.0
@queue_mutex.synchronize do
while (now = GET_TIME.call(Process::CLOCK_MONOTONIC)) < closest_deadline and @queue.empty?
@condvar.wait(@queue_mutex, closest_deadline - now)
end
end

requests.each do |req|
req.interrupt if req.expired?(now)
end
requests.reject!(&:done?)
end
end

if !watcher.group.enclosed? && (!defined?(Ractor.main?) || Ractor.main?)
ThreadGroup::Default.add(watcher)
end

watcher.name = "Timeout stdlib thread"
watcher.thread_variable_set(:"\0__detached_thread__", true)
watcher
end

def ensure_timeout_thread_created
unless @timeout_thread&.alive?
# If the Mutex is already owned we are in a signal handler.
# In that case, just return and let the main thread create the Timeout thread.
return if @timeout_thread_mutex.owned?

@timeout_thread_mutex.synchronize do
unless @timeout_thread&.alive?
@timeout_thread = create_timeout_thread
end
end
end
end
end
private_constant :State

class Request
attr_reader :deadline
Expand Down Expand Up @@ -91,55 +171,6 @@ def finished
end
private_constant :Request

def self.create_timeout_thread
watcher = Thread.new do
requests = []
while true
until QUEUE.empty? and !requests.empty? # wait to have at least one request
req = QUEUE.pop
requests << req unless req.done?
end
closest_deadline = requests.min_by(&:deadline).deadline

now = 0.0
QUEUE_MUTEX.synchronize do
while (now = GET_TIME.call(Process::CLOCK_MONOTONIC)) < closest_deadline and QUEUE.empty?
CONDVAR.wait(QUEUE_MUTEX, closest_deadline - now)
end
end

requests.each do |req|
req.interrupt if req.expired?(now)
end
requests.reject!(&:done?)
end
end
ThreadGroup::Default.add(watcher) unless watcher.group.enclosed?
watcher.name = "Timeout stdlib thread"
watcher.thread_variable_set(:"\0__detached_thread__", true)
watcher
end
private_class_method :create_timeout_thread

def self.ensure_timeout_thread_created
unless @timeout_thread and @timeout_thread.alive?
# If the Mutex is already owned we are in a signal handler.
# In that case, just return and let the main thread create the @timeout_thread.
return if TIMEOUT_THREAD_MUTEX.owned?
TIMEOUT_THREAD_MUTEX.synchronize do
unless @timeout_thread and @timeout_thread.alive?
@timeout_thread = create_timeout_thread
end
end
end
end
private_class_method :ensure_timeout_thread_created

# We keep a private reference so that time mocking libraries won't break
# Timeout.
GET_TIME = Process.method(:clock_gettime)
private_constant :GET_TIME

# :startdoc:

# Perform an operation in a block, raising an error if it takes longer than
Expand Down Expand Up @@ -178,12 +209,14 @@ def self.timeout(sec, klass = nil, message = nil, &block) #:yield: +sec+
return scheduler.timeout_after(sec, klass || Error, message, &block)
end

ensure_timeout_thread_created
state = State.instance
state.ensure_timeout_thread_created

perform = Proc.new do |exc|
request = Request.new(Thread.current, sec, exc, message)
QUEUE_MUTEX.synchronize do
QUEUE << request
CONDVAR.signal
state.queue_mutex.synchronize do
state.queue << request
state.condvar.signal
end
begin
return yield(sec)
Expand Down
20 changes: 20 additions & 0 deletions test/test_timeout.rb
Original file line number Diff line number Diff line change
Expand Up @@ -280,4 +280,24 @@ def test_handling_enclosed_threadgroup
}.join
end;
end

def test_ractor
assert_separately(%w[-rtimeout -W0], <<-'end;')
r = Ractor.new do
Timeout.timeout(1) { 42 }
end.value

assert_equal 42, r

r = Ractor.new do
begin
Timeout.timeout(0.1) { sleep }
rescue Timeout::Error
:ok
end
end.value

assert_equal :ok, r
end;
end if defined?(::Ractor) && RUBY_VERSION >= '4.0'
end