-
- Notifications
You must be signed in to change notification settings - Fork 103
Description
Certain access patterns, pool sizes, and numbers of fibers will result in fiber starvation when sharing a connection pool. The starvation occurs when the pool size is smaller than the number of fibers, and the access pattern of using pooled resources and performing IO causes a waiting fiber to only be resumed at times when a pooled resource is unavailable, so it must wait again. The sequence continues until some timeout is reached.
Reproduction script
worker-1 is always signaled when the resource is released after "Sleep with resource #1", but is resumed during "Sleep with resource #2", when the resource has already been re-acquired, so it is never able to make progress. worker-0 and worker-2 make progress by being resumed during the "Sleep without resource" step.
require 'async' require 'colorize' require_relative 'resource_pool' POOL = ResourcePool.new(pool_size: 1, timeout: 0.1) WORKER_COUNT = 3 MAX_TEST_DURATION = 2.0 LOG_COLORS = [:light_blue, :light_magenta, :light_green, :light_red, :light_cyan, :light_yellow, :blue, :magenta, :green, :red, :cyan, :yellow] class Logger def self.debug(message) task = Async::Task.current fiber = Fiber.current color = Thread.current[:log_color] puts "[#{Time.now}] #{task} on Fiber 0x#{fiber.object_id.to_s(16)}: #{message}".colorize(color) end end Async do clock = Async::Clock.new clock.start! WORKER_COUNT.times do |n| Async(annotation: "worker-#{n}") do Thread.current[:log_color] = LOG_COLORS[n] begin while clock.total < MAX_TEST_DURATION do POOL.with_resource do Logger.debug('Sleep with resource #1') sleep(0.001) # simulates a DB call end POOL.with_resource do Logger.debug('Sleep with resource #2') sleep(0.001) # simulates a DB call end Logger.debug('Sleep without resource') sleep(0.001) # simulates some other IO end rescue ResourcePool::TimeoutError => e Logger.debug("Timed out. Aborting test after #{clock.total} seconds") puts "#{e.class} #{e.message}" puts e.backtrace STDOUT.flush Kernel.exit! end end end endresource_pool.rb
# Uses the same acquire/release flow as Sequel::ThreadedConnectionPool class ResourcePool class TimeoutError < StandardError; end def initialize(pool_size:, timeout:) @available_resources = pool_size.times.map { |n| "resource-#{n}" } @timeout = timeout @mutex = Mutex.new @waiter = ConditionVariable.new end def with_resource resource = acquire yield resource ensure if resource release(resource) end end private def acquire if resource = sync_next_available Logger.debug('Pool: Acquired resource without waiting') return resource end timeout = @timeout start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) @mutex.synchronize do Logger.debug('Pool: Waiting') @waiter.wait(@mutex, timeout) if resource = next_available Logger.debug('Pool: Acquired resource after waiting') return resource end end until resource = sync_next_available elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_time if elapsed > timeout raise TimeoutError, "Unable to acquire resource after #{elapsed} seconds" end # We get here when the resource was released and this fiber was unblocked by the signal, # but the resource was immediately re-acquired by the fiber that sent the signal before # this fiber could be resumed. Effectively a race condition. @mutex.synchronize do Logger.debug('Pool: Woken by signal but resource unavailable. Waiting again.') @waiter.wait(@mutex, timeout - elapsed) if resource = next_available Logger.debug('Pool: Acquired resource after multiple waits') return resource end end end Logger.debug('Pool: Acquired resource after waiting') resource end def release(resource) @mutex.synchronize do @available_resources << resource Logger.debug('Pool: Released resource. Signaling.') @waiter.signal end end def sync_next_available @mutex.synchronize do next_available end end def next_available @available_resources.pop end endMy environment and output
$ uname -a Linux toppy 4.15.0-135-generic #139-Ubuntu SMP Mon Jan 18 17:38:24 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux $ ruby -v ruby 3.0.0p0 (2020-12-25 revision 95aff21468) [x86_64-linux] $ gem list --exact async *** LOCAL GEMS *** async (1.28.5) $ ruby async_resource_pool_test.rb # line added between fibers for clarity [2021-01-31 14:33:32 -0500] #<Async::Task:0x3c worker-0 (running)> on Fiber 0x50: Pool: Acquired resource without waiting [2021-01-31 14:33:32 -0500] #<Async::Task:0x3c worker-0 (running)> on Fiber 0x50: Sleep with resource #1 [2021-01-31 14:33:32 -0500] #<Async::Task:0x78 worker-1 (running)> on Fiber 0x8c: Pool: Waiting [2021-01-31 14:33:32 -0500] #<Async::Task:0xb4 worker-2 (running)> on Fiber 0xc8: Pool: Waiting [2021-01-31 14:33:32 -0500] #<Async::Task:0x3c worker-0 (running)> on Fiber 0x50: Pool: Released resource. Signaling. [2021-01-31 14:33:32 -0500] #<Async::Task:0x3c worker-0 (running)> on Fiber 0x50: Pool: Acquired resource without waiting [2021-01-31 14:33:32 -0500] #<Async::Task:0x3c worker-0 (running)> on Fiber 0x50: Sleep with resource #2 [2021-01-31 14:33:32 -0500] #<Async::Task:0x78 worker-1 (running)> on Fiber 0x8c: Pool: Woken by signal but resource unavailable. Waiting again. [2021-01-31 14:33:32 -0500] #<Async::Task:0x3c worker-0 (running)> on Fiber 0x50: Pool: Released resource. Signaling. [2021-01-31 14:33:32 -0500] #<Async::Task:0x3c worker-0 (running)> on Fiber 0x50: Sleep without resource [2021-01-31 14:33:32 -0500] #<Async::Task:0xb4 worker-2 (running)> on Fiber 0xc8: Pool: Acquired resource after waiting [2021-01-31 14:33:32 -0500] #<Async::Task:0xb4 worker-2 (running)> on Fiber 0xc8: Sleep with resource #1 [2021-01-31 14:33:32 -0500] #<Async::Task:0x3c worker-0 (running)> on Fiber 0x50: Pool: Waiting [2021-01-31 14:33:32 -0500] #<Async::Task:0xb4 worker-2 (running)> on Fiber 0xc8: Pool: Released resource. Signaling. [2021-01-31 14:33:32 -0500] #<Async::Task:0xb4 worker-2 (running)> on Fiber 0xc8: Pool: Acquired resource without waiting [2021-01-31 14:33:32 -0500] #<Async::Task:0xb4 worker-2 (running)> on Fiber 0xc8: Sleep with resource #2 [2021-01-31 14:33:32 -0500] #<Async::Task:0x78 worker-1 (running)> on Fiber 0x8c: Pool: Woken by signal but resource unavailable. Waiting again. [2021-01-31 14:33:32 -0500] #<Async::Task:0xb4 worker-2 (running)> on Fiber 0xc8: Pool: Released resource. Signaling. [2021-01-31 14:33:32 -0500] #<Async::Task:0xb4 worker-2 (running)> on Fiber 0xc8: Sleep without resource [2021-01-31 14:33:32 -0500] #<Async::Task:0x3c worker-0 (running)> on Fiber 0x50: Pool: Acquired resource after waiting [2021-01-31 14:33:32 -0500] #<Async::Task:0x3c worker-0 (running)> on Fiber 0x50: Sleep with resource #1 ... sequence continues until ... [2021-01-31 14:33:32 -0500] #<Async::Task:0x78 worker-1 (running)> on Fiber 0x8c: Timed out. Aborting test after 0.10122425699955784 seconds ResourcePool::TimeoutError Unable to acquire resource after 0.1010006020005676 seconds /home/brendan/projects/async-experiments/resource_pool.rb:44:in `acquire' /home/brendan/projects/async-experiments/resource_pool.rb:12:in `with_resource' async_resource_pool_test.rb:30:in `block (3 levels) in <main>' /home/brendan/.rvm/gems/ruby-3.0.0/gems/async-1.28.5/lib/async/task.rb:265:in `block in make_fiber' Note that in about 1 run out of 10, the script will run for longer than 0.1 seconds, so far always timing out at 0.2 seconds, but could presumably run for longer. I believe this occurs when the wait timeout triggers at a time that causes worker-1 to be resumed when the resource is available during the "Sleep without resource" step. The system falls back into a starvation pattern after that though.
I'm interested to hear your thoughts on what should be done to avoid this. Is this something that ruby or async should prevent? Or should a fiber-safe resource pool be written differently from a thread-safe resource pool? (e.g. explicitly yielding after releasing the resource if others are waiting)