Class: Concurrent::Throttle

Inherits:
Synchronization::Object
  • Object
show all
Includes:
Promises::FactoryMethods
Defined in:
lib/concurrent-ruby-edge/concurrent/edge/throttle.rb

Overview

Note:

Edge Features are under active development and may change frequently.

  • Deprecations are not added before incompatible changes.
  • Edge version: major is always 0, minor bump means incompatible change, patch bump means compatible change.
  • Edge features may also lack tests and documentation.
  • Features developed in concurrent-ruby-edge are expected to move to concurrent-ruby when finalised.

A tool managing concurrency level of tasks. The maximum capacity is set in constructor. Each acquire will lower the available capacity and release will increase it. When there is no available capacity the current thread may either be blocked or an event is returned which will be resolved when capacity becomes available.

The more common usage of the Throttle is with a proxy executor a_throttle.on(Concurrent.global_io_executor). Anything executed on the proxy executor will be throttled and execute on the given executor. There can be more than one proxy executors. All abstractions which execute tasks have option to specify executor, therefore the proxy executor can be injected to any abstraction throttling its concurrency level.

Examples

Limiting concurrency level of a concurrently executed block to two

max_two = Concurrent::Throttle.new 2 # => #  # used to track concurrency level concurrency_level = Concurrent::AtomicFixnum.new # => # job = -> do # increase the current level at the beginning of the throttled block  concurrency_level.increment # work, takes some time  do_stuff # read the current concurrency level  current_concurrency_level = concurrency_level.value # decrement the concurrency level back at the end of the block  concurrency_level.decrement # return the observed concurrency level  current_concurrency_level end # create 10 threads running concurrently the jobs Array.new(10) do Thread.new do max_two.acquire(&job) end # wait for all the threads to finish and read the observed # concurrency level in each of them end.map(&:value) # => [2, 2, 1, 1, 1, 2, 2, 2, 2, 1] 

Notice that the returned array has no number bigger than 2 therefore the concurrency level of the block with the do_stuff was never bigger than 2.

# runs a block, and returns the observed concurrency level during the execution def monitor_concurrency_level(concurrency_level, &block) concurrency_level.increment block.call current_concurrency_level = concurrency_level.value concurrency_level.decrement # return the observed concurrency level  return current_concurrency_level end throttle = Concurrent::Throttle.new 3 # => # concurrency_level = Concurrent::AtomicFixnum.new # => #  Array.new(10) do |i| # create throttled future  throttle.future(i) do |arg| monitor_concurrency_level(concurrency_level) { do_stuff arg } # fulfill with the observed concurrency level  end # collect observed concurrency levels end.map(&:value!) # => [3, 2, 1, 2, 1, 3, 3, 1, 2, 1] 

The concurrency level does not rise above 3.

It works by setting the executor of the future created from the throttle. The executor is a proxy executor for the Concurrent::Promises.default_executor which can be obtained using #on method. Therefore the above example could be instead more explicitly written as follows

# ... Array.new(10) do |i| # create throttled future  Concurrent::Promises.future_on(throttle.on(Concurrent::Promises.default_executor)) do # ...  end end.map(&:value!) 

Anything executed on the proxy executor is throttled. A throttle can have more proxy executors for different executors, all jobs share the same capacity provided by the throttle.

Since the proxy executor becomes the executor of the future, any chained futures will also be throttled. It can be changed by using different executor. It the following example the first 2 futures in the chain are throttled, the last is not.

concurrency_level_throttled = Concurrent::AtomicFixnum.new concurrency_level_unthrottled = Concurrent::AtomicFixnum.new Array.new(10) do |i| throttle.future(i) do monitor_concurrency_level(concurrency_level_throttled) { do_stuff } end.then do |v| [v, monitor_concurrency_level(concurrency_level_throttled) { do_stuff }] end.then_on(:io) do |l1, l2| [l1, l2, monitor_concurrency_level(concurrency_level_unthrottled) { 5.times { do_stuff } }] end end.map(&:value!) # => [[3, 3, 7], # [3, 2, 9], # [3, 3, 10], # [3, 3, 6], # [3, 3, 5], # [3, 3, 8], # [3, 3, 3], # [3, 3, 4], # [3, 2, 2], # [3, 1, 1]] 

In the output you can see that the first 2 columns do not cross the 3 capacity limit and the last column which is untroubled does.

TODO (pitr-ch 20-Dec-2018): example with virtual throttled executor, throttling only part of promises chain.

Other abstraction

The proxy executor created with throttle can be used with other abstractions as well and combined.

concurrency_level = Concurrent::AtomicFixnum.new futures = Array.new(5) do |i| # create throttled future  throttle.future(i) do |arg| monitor_concurrency_level(concurrency_level) { do_stuff arg } # fulfill with the observed concurrency level  end end agents = Array.new(5) do |i| agent = Concurrent::Agent.new 0 # execute agent update on throttled executor  agent.send_via(throttle.on(:io)) { monitor_concurrency_level(concurrency_level_throttled) { do_stuff } } agent end futures.map(&:value!) # => [3, 3, 3, 2, 1] agents.each { |a| a.await }.map(&:value) # => [3, 2, 3, 3, 1] 

There is no observed concurrency level above 3.

Instance Method Summary collapse

Constructor Details

#initialize(capacity) ⇒ Throttle

Create throttle.

Parameters:

  • capacity (Integer)

    How many tasks using this throttle can run at the same time.

 37 38 39 40 41 42 43
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 37 def initialize(capacity) super() @MaxCapacity = capacity @Queue = LockFreeQueue.new @executor_cache = [nil, nil] self.capacity = capacity end

Instance Method Details

#acquire(timeout = nil) { ... } ⇒ Object, self, true, false

Blocks current thread until there is capacity available in the throttle. The acquired capacity has to be returned to the throttle by calling #release. If block is passed then the block is called after the capacity is acquired and it is automatically released after the block is executed.

Parameters:

  • timeout (Numeric) (defaults to: nil)

    the maximum time in second to wait.

Yields:

  • [] block to execute after the capacity is acquired

Returns:

  • (Object, self, true, false)
    • When no timeout and no block it returns self
    • When no timeout and with block it returns the result of the block
    • When with timeout and no block it returns true when acquired and false when timed out
    • When with timeout and with block it returns the result of the block of nil on timing out

See Also:

 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 63 def acquire(timeout = nil, &block) event = acquire_or_event if event within_timeout = event.wait(timeout) # release immediately when acquired later after the timeout since it is unused  event.on_resolution!(self, &:release) unless within_timeout else within_timeout = true end called = false if timeout if block if within_timeout called = true block.call else nil end else within_timeout end else if block called = true block.call else self end end ensure release if called end

#available_capacityInteger

Returns The available capacity.

Returns:

  • (Integer)

    The available capacity.

 30 31 32 33
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 30 def available_capacity current_capacity = capacity current_capacity >= 0 ? current_capacity : 0 end

#default_executorExecutorService

Uses executor provided by #on therefore all events and futures created using factory methods on this object will be throttled. Overrides Promises::FactoryMethods::Configuration#default_executor.

Returns:

  • (ExecutorService)

See Also:

 183 184 185
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 183 def default_executor on(super) end

#max_capacityInteger

Returns The maximum capacity.

Returns:

  • (Integer)

    The maximum capacity.

 46 47 48
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 46 def max_capacity @MaxCapacity end

#on(executor = Promises::FactoryMethods.default_executor) ⇒ ExecutorService

Returns An executor which wraps given executor and allows to post tasks only as available capacity in the throttle allows.

Examples:

throttling future

a_future.then_on(a_throttle.on(:io)) { a_throttled_task }

Parameters:

  • executor (ExecutorService) (defaults to: Promises::FactoryMethods.default_executor)

Returns:

  • (ExecutorService)

    An executor which wraps given executor and allows to post tasks only as available capacity in the throttle allows.

 162 163 164 165 166 167 168 169 170 171 172 173 174 175
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 162 def on(executor = Promises::FactoryMethods.default_executor) current_executor, current_cache = @executor_cache return current_cache if current_executor == executor && current_cache if current_executor.nil? # cache first proxy  proxy_executor = ProxyExecutor.new(self, Concurrent.executor(executor)) @executor_cache = [executor, proxy_executor] return proxy_executor else # do not cache more than 1 executor  ProxyExecutor.new(self, Concurrent.executor(executor)) end end

#releaseself

Releases previously acquired capacity back to Throttle. Has to be called exactly once for each acquired capacity.

Returns:

  • (self)

See Also:

  • #acquire, #try_acquire
 118 119 120 121 122 123 124 125 126 127 128 129 130
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 118 def release while true current_capacity = capacity if compare_and_set_capacity current_capacity, current_capacity + 1 if current_capacity < 0 # release called after trigger which pushed a trigger, busy wait is ok  Thread.pass until (trigger = @Queue.pop) trigger.resolve end return self end end end

#to_sString Also known as: inspect

Returns Short string representation.

Returns:

  • (String)

    Short string representation.

 133 134 135
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 133 def to_s format '%s capacity available %d of %d>', super[0..-2], capacity, @MaxCapacity end

#try_acquiretrue, false

Tries to acquire capacity from the throttle. Returns true when there is capacity available. The acquired capacity has to be returned to the throttle by calling #release.

Returns:

  • (true, false)

See Also:

 102 103 104 105 106 107 108 109 110 111 112
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 102 def try_acquire while true current_capacity = capacity if current_capacity > 0 return true if compare_and_set_capacity( current_capacity, current_capacity - 1) else return false end end end

#any_event(*futures_and_or_events) ⇒ Event Originally defined in module Promises::FactoryMethods

Shortcut of #any_event_on with default :io executor supplied.

Returns:

See Also:

#any_event_on(default_executor, *futures_and_or_events) ⇒ Event Originally defined in module Promises::FactoryMethods

Creates a new event which becomes resolved after the first futures_and_or_events resolves. If resolved it does not propagate AbstractEventFuture#touch, leaving delayed futures un-executed if they are not required any more.

Parameters:

Returns:

#any_fulfilled_future(*futures_and_or_events) ⇒ Future Originally defined in module Promises::FactoryMethods

Shortcut of #any_fulfilled_future_on with default :io executor supplied.

Returns:

See Also:

#any_fulfilled_future_on(default_executor, *futures_and_or_events) ⇒ Future Originally defined in module Promises::FactoryMethods

Creates a new future which is resolved after the first futures_and_or_events is fulfilled. Its result equals the result of the first resolved future or if all futures_and_or_events reject, it has reason of the last rejected future. If resolved it does not propagate AbstractEventFuture#touch, leaving delayed futures un-executed if they are not required any more. If event is supplied, which does not have value and can be only resolved, it's represented as :fulfilled with value nil.

Parameters:

Returns:

#any_resolved_future(*futures_and_or_events) ⇒ Future Also known as: any Originally defined in module Promises::FactoryMethods

Shortcut of #any_resolved_future_on with default :io executor supplied.

Returns:

See Also:

#any_resolved_future_on(default_executor, *futures_and_or_events) ⇒ Future Originally defined in module Promises::FactoryMethods

Creates a new future which is resolved after the first futures_and_or_events is resolved. Its result equals the result of the first resolved future. If resolved it does not propagate AbstractEventFuture#touch, leaving delayed futures un-executed if they are not required any more. If event is supplied, which does not have value and can be only resolved, it's represented as :fulfilled with value nil.

Parameters:

Returns:

#delay(*args, &task) ⇒ Future, Event Originally defined in module Promises::FactoryMethods

Shortcut of #delay_on with default :io executor supplied.

Returns:

See Also:

#delay_on(default_executor, *args) {|*args| ... } ⇒ Future #delay_on(default_executor) ⇒ Event Originally defined in module Promises::FactoryMethods

Creates a new event or future which is resolved only after it is touched, see AbstractEventFuture#touch.

Overloads:

  • #delay_on(default_executor, *args) {|*args| ... } ⇒ Future

    If task is provided it returns a Concurrent::Promises::Future representing the result of the task.

    Parameters:

    • args (Object)

      arguments which are passed to the task when it's executed. (It might be prepended with other arguments, see the @yield section).

    Yields:

    • (*args)

      to the task.

    Yield Returns:

    Returns:

  • #delay_on(default_executor) ⇒ Event

    If no task is provided, it returns an Event

    Returns:

Parameters:

  • default_executor (Executor, :io, :fast)

    Instance of an executor or a name of the global executor. Default executor propagates to chained futures unless overridden with executor parameter or changed with AbstractEventFuture#with_default_executor.

#fulfilled_future(value, default_executor = self.default_executor) ⇒ Future Originally defined in module Promises::FactoryMethods

Creates a resolved future which will be fulfilled with the given value.

Parameters:

  • default_executor (Executor, :io, :fast) (defaults to: self.default_executor)

    Instance of an executor or a name of the global executor. Default executor propagates to chained futures unless overridden with executor parameter or changed with AbstractEventFuture#with_default_executor.

  • value (Object)

Returns:

#future(*args, &task) ⇒ Future Originally defined in module Promises::FactoryMethods

Shortcut of #future_on with default :io executor supplied.

Returns:

See Also:

#future_on(default_executor, *args) {|*args| ... } ⇒ Future Originally defined in module Promises::FactoryMethods

Constructs a new Future which will be resolved after block is evaluated on default executor. Evaluation begins immediately.

Parameters:

  • default_executor (Executor, :io, :fast)

    Instance of an executor or a name of the global executor. Default executor propagates to chained futures unless overridden with executor parameter or changed with AbstractEventFuture#with_default_executor.

  • args (Object)

    arguments which are passed to the task when it's executed. (It might be prepended with other arguments, see the @yield section).

Yields:

  • (*args)

    to the task.

Yield Returns:

Returns:

#make_future(nil, default_executor = self.default_executor) ⇒ Event #make_future(a_future, default_executor = self.default_executor) ⇒ Future #make_future(an_event, default_executor = self.default_executor) ⇒ Event #make_future(exception, default_executor = self.default_executor) ⇒ Future #make_future(value, default_executor = self.default_executor) ⇒ Future Originally defined in module Promises::FactoryMethods

General constructor. Behaves differently based on the argument's type. It's provided for convenience but it's better to be explicit.

Overloads:

  • #make_future(nil, default_executor = self.default_executor) ⇒ Event

    Returns resolved event.

    Parameters:

    • nil (nil)

    Returns:

    • (Event)

      resolved event.

  • #make_future(a_future, default_executor = self.default_executor) ⇒ Future

    Returns a future which will be resolved when a_future is.

    Parameters:

    Returns:

    • (Future)

      a future which will be resolved when a_future is.

  • #make_future(an_event, default_executor = self.default_executor) ⇒ Event

    Returns an event which will be resolved when an_event is.

    Parameters:

    Returns:

    • (Event)

      an event which will be resolved when an_event is.

  • #make_future(exception, default_executor = self.default_executor) ⇒ Future

    Returns a rejected future with the exception as its reason.

    Parameters:

    • exception (Exception)

    Returns:

    • (Future)

      a rejected future with the exception as its reason.

  • #make_future(value, default_executor = self.default_executor) ⇒ Future

    Returns a fulfilled future with the value.

    Parameters:

    • value (Object)

      when none of the above overloads fits

    Returns:

    • (Future)

      a fulfilled future with the value.

Parameters:

  • default_executor (Executor, :io, :fast) (defaults to: self.default_executor)

    Instance of an executor or a name of the global executor. Default executor propagates to chained futures unless overridden with executor parameter or changed with AbstractEventFuture#with_default_executor.

Returns:

See Also:

  • resolved_event, fulfilled_future

#rejected_future(reason, default_executor = self.default_executor) ⇒ Future Originally defined in module Promises::FactoryMethods

Creates a resolved future which will be rejected with the given reason.

Parameters:

  • default_executor (Executor, :io, :fast) (defaults to: self.default_executor)

    Instance of an executor or a name of the global executor. Default executor propagates to chained futures unless overridden with executor parameter or changed with AbstractEventFuture#with_default_executor.

  • reason (Object)

Returns:

#resolvable_eventResolvableEvent Originally defined in module Promises::FactoryMethods

Shortcut of #resolvable_event_on with default :io executor supplied.

Returns:

See Also:

#resolvable_event_on(default_executor = self.default_executor) ⇒ ResolvableEvent Originally defined in module Promises::FactoryMethods

Creates a resolvable event, user is responsible for resolving the event once by calling ResolvableEvent#resolve.

Parameters:

  • default_executor (Executor, :io, :fast) (defaults to: self.default_executor)

    Instance of an executor or a name of the global executor. Default executor propagates to chained futures unless overridden with executor parameter or changed with AbstractEventFuture#with_default_executor.

Returns:

#resolvable_futureResolvableFuture Originally defined in module Promises::FactoryMethods

Shortcut of #resolvable_future_on with default :io executor supplied.

#resolvable_future_on(default_executor = self.default_executor) ⇒ ResolvableFuture Originally defined in module Promises::FactoryMethods

Creates resolvable future, user is responsible for resolving the future once by ResolvableFuture#resolve, ResolvableFuture#fulfill, or ResolvableFuture#reject

Parameters:

  • default_executor (Executor, :io, :fast) (defaults to: self.default_executor)

    Instance of an executor or a name of the global executor. Default executor propagates to chained futures unless overridden with executor parameter or changed with AbstractEventFuture#with_default_executor.

Returns:

#resolved_event(default_executor = self.default_executor) ⇒ Event Originally defined in module Promises::FactoryMethods

Creates resolved event.

Parameters:

  • default_executor (Executor, :io, :fast) (defaults to: self.default_executor)

    Instance of an executor or a name of the global executor. Default executor propagates to chained futures unless overridden with executor parameter or changed with AbstractEventFuture#with_default_executor.

Returns:

#resolved_future(fulfilled, value, reason, default_executor = self.default_executor) ⇒ Future Originally defined in module Promises::FactoryMethods

Creates a resolved future with will be either fulfilled with the given value or rejected with the given reason.

Parameters:

  • fulfilled (true, false)
  • value (Object)
  • reason (Object)
  • default_executor (Executor, :io, :fast) (defaults to: self.default_executor)

    Instance of an executor or a name of the global executor. Default executor propagates to chained futures unless overridden with executor parameter or changed with AbstractEventFuture#with_default_executor.

Returns:

#schedule(intended_time, *args, &task) ⇒ Future, Event Originally defined in module Promises::FactoryMethods

Shortcut of #schedule_on with default :io executor supplied.

Returns:

See Also:

#schedule_on(default_executor, intended_time, *args) {|*args| ... } ⇒ Future #schedule_on(default_executor, intended_time) ⇒ Event Originally defined in module Promises::FactoryMethods

Creates a new event or future which is resolved in intended_time.

Overloads:

  • #schedule_on(default_executor, intended_time, *args) {|*args| ... } ⇒ Future

    If task is provided it returns a Concurrent::Promises::Future representing the result of the task.

    Parameters:

    • args (Object)

      arguments which are passed to the task when it's executed. (It might be prepended with other arguments, see the @yield section).

    Yields:

    • (*args)

      to the task.

    Yield Returns:

    Returns:

  • #schedule_on(default_executor, intended_time) ⇒ Event

    If no task is provided, it returns an Event

    Returns:

Parameters:

  • default_executor (Executor, :io, :fast)

    Instance of an executor or a name of the global executor. Default executor propagates to chained futures unless overridden with executor parameter or changed with AbstractEventFuture#with_default_executor.

  • intended_time (Numeric, Time)

    Numeric means to run in intended_time seconds. Time means to run on intended_time.

#zip_events(*futures_and_or_events) ⇒ Event Originally defined in module Promises::FactoryMethods

Shortcut of #zip_events_on with default :io executor supplied.

Returns:

See Also:

#zip_events_on(default_executor, *futures_and_or_events) ⇒ Event Originally defined in module Promises::FactoryMethods

Creates a new event which is resolved after all futures_and_or_events are resolved. (Future is resolved when fulfilled or rejected.)

Parameters:

Returns:

#zip_futures(*futures_and_or_events) ⇒ Future Also known as: zip Originally defined in module Promises::FactoryMethods

Shortcut of #zip_futures_on with default :io executor supplied.

Returns:

See Also:

#zip_futures_on(default_executor, *futures_and_or_events) ⇒ Future Originally defined in module Promises::FactoryMethods

Creates a new future which is resolved after all futures_and_or_events are resolved. Its value is an array of zipped future values. Its reason is an array of reasons for rejection. If there is an error it rejects. If event is supplied, which does not have value and can be only resolved, it's represented as :fulfilled with value nil.

Parameters:

Returns:

#zip_futures_over(enumerable, &future_factory) ⇒ Future Originally defined in module Promises::FactoryMethods

Note:

Edge Features are under active development and may change frequently.

  • Deprecations are not added before incompatible changes.
  • Edge version: major is always 0, minor bump means incompatible change, patch bump means compatible change.
  • Edge features may also lack tests and documentation.
  • Features developed in concurrent-ruby-edge are expected to move to concurrent-ruby when finalised.

Shortcut of #zip_futures_over_on with default :io executor supplied.

Returns:

See Also:

#zip_futures_over_on(default_executor, enumerable) {|element| ... } ⇒ Future Originally defined in module Promises::FactoryMethods

Note:

Edge Features are under active development and may change frequently.

  • Deprecations are not added before incompatible changes.
  • Edge version: major is always 0, minor bump means incompatible change, patch bump means compatible change.
  • Edge features may also lack tests and documentation.
  • Features developed in concurrent-ruby-edge are expected to move to concurrent-ruby when finalised.

Creates new future which is resolved after all the futures created by future_factory from enumerable elements are resolved. Simplified it does: zip(*enumerable.map { |e| future e, &future_factory })

Examples:

# `#succ` calls are executed in parallel zip_futures_over_on(:io, [1, 2], &:succ).value! # => [2, 3]

Parameters:

  • default_executor (Executor, :io, :fast)

    Instance of an executor or a name of the global executor. Default executor propagates to chained futures unless overridden with executor parameter or changed with AbstractEventFuture#with_default_executor.

  • enumerable (Enumerable)

Yields:

  • a task to be executed in future

Yield Parameters:

  • element (Object)

    from enumerable

Yield Returns:

  • (Object)

    a value of the future

Returns: