Module: Concurrent

Defined in:
lib/concurrent-ruby/concurrent.rb,
lib/concurrent-ruby/concurrent/map.rb,
lib/concurrent-ruby/concurrent/set.rb,
lib/concurrent-ruby/concurrent/atom.rb,
lib/concurrent-ruby/concurrent/hash.rb,
lib/concurrent-ruby/concurrent/ivar.rb,
lib/concurrent-ruby/concurrent/mvar.rb,
lib/concurrent-ruby/concurrent/tvar.rb,
lib/concurrent-ruby/concurrent/agent.rb,
lib/concurrent-ruby/concurrent/array.rb,
lib/concurrent-ruby/concurrent/async.rb,
lib/concurrent-ruby/concurrent/delay.rb,
lib/concurrent-ruby/concurrent/maybe.rb,
lib/concurrent-ruby/concurrent/tuple.rb,
lib/concurrent-ruby/concurrent/errors.rb,
lib/concurrent-ruby/concurrent/future.rb,
lib/concurrent-ruby/concurrent/options.rb,
lib/concurrent-ruby/concurrent/promise.rb,
lib/concurrent-ruby/concurrent/version.rb,
lib/concurrent-ruby/concurrent/dataflow.rb,
lib/concurrent-ruby/concurrent/promises.rb,
lib/concurrent-ruby/concurrent/constants.rb,
lib/concurrent-ruby/concurrent/exchanger.rb,
lib/concurrent-ruby/concurrent/re_include.rb,
lib/concurrent-ruby/concurrent/timer_task.rb,
lib/concurrent-ruby/concurrent/atomic/event.rb,
lib/concurrent-ruby/concurrent/atomic/locals.rb,
lib/concurrent-ruby/concurrent/configuration.rb,
lib/concurrent-ruby/concurrent/mutable_struct.rb,
lib/concurrent-ruby/concurrent/scheduled_task.rb,
lib/concurrent-ruby/concurrent/utility/engine.rb,
lib/concurrent-ruby/concurrent/concern/logging.rb,
lib/concurrent-ruby/concurrent/concern/logging.rb,
lib/concurrent-ruby/concurrent/settable_struct.rb,
lib/concurrent-ruby/concurrent/synchronization.rb,
lib/concurrent-ruby/concurrent/atomic/semaphore.rb,
lib/concurrent-ruby/concurrent/immutable_struct.rb,
lib/concurrent-ruby/concurrent/thread_safe/util.rb,
lib/concurrent-ruby/concurrent/concern/obligation.rb,
lib/concurrent-ruby/concurrent/concern/observable.rb,
lib/concurrent-ruby/concurrent/executor/timer_set.rb,
lib/concurrent-ruby/concurrent/concern/deprecation.rb,
lib/concurrent-ruby/concurrent/atomic/atomic_fixnum.rb,
lib/concurrent-ruby/concurrent/synchronization/lock.rb,
lib/concurrent-ruby/concurrent/atomic/atomic_boolean.rb,
lib/concurrent-ruby/concurrent/atomic/cyclic_barrier.rb,
lib/concurrent-ruby/concurrent/atomic/lock_local_var.rb,
lib/concurrent-ruby/concurrent/atomic/fiber_local_var.rb,
lib/concurrent-ruby/concurrent/atomic/mutex_semaphore.rb,
lib/concurrent-ruby/concurrent/atomic/read_write_lock.rb,
lib/concurrent-ruby/concurrent/synchronization/object.rb,
lib/concurrent-ruby/concurrent/thread_safe/util/adder.rb,
lib/concurrent-ruby/concurrent/utility/monotonic_time.rb,
lib/concurrent-ruby/concurrent/utility/native_integer.rb,
lib/concurrent-ruby/concurrent/atomic/atomic_reference.rb,
lib/concurrent-ruby/concurrent/atomic/count_down_latch.rb,
lib/concurrent-ruby/concurrent/atomic/thread_local_var.rb,
lib/concurrent-ruby/concurrent/concern/dereferenceable.rb,
lib/concurrent-ruby/concurrent/synchronization/volatile.rb,
lib/concurrent-ruby/concurrent/executor/executor_service.rb,
lib/concurrent-ruby/concurrent/synchronization/condition.rb,
lib/concurrent-ruby/concurrent/thread_safe/util/volatile.rb,
lib/concurrent-ruby/concurrent/utility/processor_counter.rb,
lib/concurrent-ruby/concurrent/atomic/mutex_atomic_fixnum.rb,
lib/concurrent-ruby/concurrent/collection/lock_free_stack.rb,
lib/concurrent-ruby/concurrent/executor/fixed_thread_pool.rb,
lib/concurrent-ruby/concurrent/thread_safe/util/striped64.rb,
lib/concurrent-ruby/concurrent/atomic/mutex_atomic_boolean.rb,
lib/concurrent-ruby/concurrent/executor/cached_thread_pool.rb,
lib/concurrent-ruby/concurrent/executor/immediate_executor.rb,
lib/concurrent-ruby/concurrent/executor/safe_task_executor.rb,
lib/concurrent-ruby/concurrent/atomic/java_count_down_latch.rb,
lib/concurrent-ruby/concurrent/atomic/mutex_count_down_latch.rb,
lib/concurrent-ruby/concurrent/atomic_reference/mutex_atomic.rb,
lib/concurrent-ruby/concurrent/executor/serialized_execution.rb,
lib/concurrent-ruby/concurrent/executor/thread_pool_executor.rb,
lib/concurrent-ruby/concurrent/collection/map/mri_map_backend.rb,
lib/concurrent-ruby/concurrent/executor/java_executor_service.rb,
lib/concurrent-ruby/concurrent/executor/ruby_executor_service.rb,
lib/concurrent-ruby/concurrent/executor/single_thread_executor.rb,
lib/concurrent-ruby/concurrent/synchronization/abstract_object.rb,
lib/concurrent-ruby/concurrent/synchronization/abstract_struct.rb,
lib/concurrent-ruby/concurrent/synchronization/lockable_object.rb,
lib/concurrent-ruby/concurrent/utility/native_extension_loader.rb,
lib/concurrent-ruby/concurrent/atomic/atomic_markable_reference.rb,
lib/concurrent-ruby/concurrent/atomic/reentrant_read_write_lock.rb,
lib/concurrent-ruby/concurrent/executor/serial_executor_service.rb,
lib/concurrent-ruby/concurrent/executor/simple_executor_service.rb,
lib/concurrent-ruby/concurrent/thread_safe/util/data_structures.rb,
lib/concurrent-ruby/concurrent/thread_safe/util/xor_shift_random.rb,
lib/concurrent-ruby/concurrent/executor/abstract_executor_service.rb,
lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb,
lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb,
lib/concurrent-ruby/concurrent/thread_safe/synchronized_delegator.rb,
lib/concurrent-ruby/concurrent/synchronization/full_memory_barrier.rb,
lib/concurrent-ruby/concurrent/synchronization/safe_initialization.rb,
lib/concurrent-ruby/concurrent/thread_safe/util/power_of_two_tuple.rb,
lib/concurrent-ruby/concurrent/atomic_reference/numeric_cas_wrapper.rb,
lib/concurrent-ruby/concurrent/executor/indirect_immediate_executor.rb,
lib/concurrent-ruby/concurrent/executor/java_single_thread_executor.rb,
lib/concurrent-ruby/concurrent/executor/ruby_single_thread_executor.rb,
lib/concurrent-ruby/concurrent/atomic_reference/atomic_direct_update.rb,
lib/concurrent-ruby/concurrent/collection/copy_on_write_observer_set.rb,
lib/concurrent-ruby/concurrent/synchronization/jruby_lockable_object.rb,
lib/concurrent-ruby/concurrent/synchronization/mutex_lockable_object.rb,
lib/concurrent-ruby/concurrent/collection/copy_on_notify_observer_set.rb,
lib/concurrent-ruby/concurrent/collection/map/truffleruby_map_backend.rb,
lib/concurrent-ruby/concurrent/collection/map/synchronized_map_backend.rb,
lib/concurrent-ruby/concurrent/executor/serialized_execution_delegator.rb,
lib/concurrent-ruby/concurrent/collection/non_concurrent_priority_queue.rb,
lib/concurrent-ruby/concurrent/synchronization/abstract_lockable_object.rb,
lib/concurrent-ruby/concurrent/collection/map/non_concurrent_map_backend.rb,
lib/concurrent-ruby/concurrent/collection/java_non_concurrent_priority_queue.rb,
lib/concurrent-ruby/concurrent/collection/ruby_non_concurrent_priority_queue.rb,
lib/concurrent-ruby-edge/concurrent/edge.rb,
lib/concurrent-ruby-edge/concurrent/actor.rb,
lib/concurrent-ruby-edge/concurrent/channel.rb,
lib/concurrent-ruby-edge/concurrent/actor/core.rb,
lib/concurrent-ruby-edge/concurrent/actor/root.rb,
lib/concurrent-ruby-edge/concurrent/actor/utils.rb,
lib/concurrent-ruby-edge/concurrent/actor/errors.rb,
lib/concurrent-ruby-edge/concurrent/channel/tick.rb,
lib/concurrent-ruby-edge/concurrent/edge/channel.rb,
lib/concurrent-ruby-edge/concurrent/edge/version.rb,
lib/concurrent-ruby-edge/concurrent/actor/context.rb,
lib/concurrent-ruby-edge/concurrent/edge/promises.rb,
lib/concurrent-ruby-edge/concurrent/edge/throttle.rb,
lib/concurrent-ruby-edge/concurrent/lazy_register.rb,
lib/concurrent-ruby-edge/concurrent/actor/envelope.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour.rb,
lib/concurrent-ruby-edge/concurrent/actor/reference.rb,
lib/concurrent-ruby-edge/concurrent/actor/type_check.rb,
lib/concurrent-ruby-edge/concurrent/actor/utils/pool.rb,
lib/concurrent-ruby-edge/concurrent/channel/selector.rb,
lib/concurrent-ruby-edge/concurrent/edge/cancellation.rb,
lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb,
lib/concurrent-ruby-edge/concurrent/actor/utils/ad_hoc.rb,
lib/concurrent-ruby-edge/concurrent/channel/buffer/base.rb,
lib/concurrent-ruby-edge/concurrent/actor/utils/balancer.rb,
lib/concurrent-ruby-edge/concurrent/channel/buffer/timer.rb,
lib/concurrent-ruby-edge/concurrent/edge/lock_free_queue.rb,
lib/concurrent-ruby-edge/concurrent/actor/utils/broadcast.rb,
lib/concurrent-ruby-edge/concurrent/channel/buffer/ticker.rb,
lib/concurrent-ruby-edge/concurrent/edge/processing_actor.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/awaits.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/buffer.rb,
lib/concurrent-ruby-edge/concurrent/channel/buffer/sliding.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/linking.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/pausing.rb,
lib/concurrent-ruby-edge/concurrent/channel/buffer/buffered.rb,
lib/concurrent-ruby-edge/concurrent/channel/buffer/dropping.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/abstract.rb,
lib/concurrent-ruby-edge/concurrent/actor/public_delegations.rb,
lib/concurrent-ruby-edge/concurrent/channel/buffer/unbuffered.rb,
lib/concurrent-ruby-edge/concurrent/edge/lock_free_linked_set.rb,
lib/concurrent-ruby-edge/concurrent/actor/internal_delegations.rb,
lib/concurrent-ruby-edge/concurrent/executor/wrapping_executor.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/supervising.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/termination.rb,
lib/concurrent-ruby-edge/concurrent/channel/selector/put_clause.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/sets_results.rb,
lib/concurrent-ruby-edge/concurrent/channel/selector/take_clause.rb,
lib/concurrent-ruby-edge/concurrent/edge/old_channel_integration.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/removes_child.rb,
lib/concurrent-ruby-edge/concurrent/channel/selector/after_clause.rb,
lib/concurrent-ruby-edge/concurrent/channel/selector/error_clause.rb,
lib/concurrent-ruby-edge/concurrent/edge/lock_free_linked_set/node.rb,
lib/concurrent-ruby-edge/concurrent/channel/selector/default_clause.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/executes_context.rb,
lib/concurrent-ruby-edge/concurrent/edge/lock_free_linked_set/window.rb,
lib/concurrent-ruby-edge/concurrent/actor/default_dead_letter_handler.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/errors_on_unknown_message.rb

Overview

Note:

Edge Features are under active development and may change frequently.

  • Deprecations are not added before incompatible changes.
  • Edge version: major is always 0, minor bump means incompatible change, patch bump means compatible change.
  • Edge features may also lack tests and documentation.
  • Features developed in concurrent-ruby-edge are expected to move to concurrent-ruby when finalised.

Defined Under Namespace

Modules: Actor, Async, Concern, Edge, ErlangActor, ImmutableStruct, MutableStruct, Promises, SettableStruct Classes: Agent, Array, Atom, AtomicBoolean, AtomicFixnum, AtomicMarkableReference, AtomicReference, CachedThreadPool, Cancellation, Channel, ConcurrentUpdateError, CountDownLatch, CyclicBarrier, Delay, Event, Exchanger, FiberLocalVar, FixedThreadPool, Future, Hash, IVar, ImmediateExecutor, IndirectImmediateExecutor, LazyRegister, LockFreeStack, LockLocalVar, MVar, Map, Maybe, MultipleAssignmentError, MultipleErrors, ProcessingActor, Promise, ReadWriteLock, ReentrantReadWriteLock, SafeTaskExecutor, ScheduledTask, Semaphore, SerializedExecution, SerializedExecutionDelegator, Set, SimpleExecutorService, SingleThreadExecutor, TVar, ThreadLocalVar, ThreadPoolExecutor, Throttle, TimerSet, TimerTask, Tuple, WrappingExecutor

Constant Summary collapse

Error =
Class.new(StandardError)
ConfigurationError =

Raised when errors occur during configuration.

Class.new(Error)
CancelledOperationError =

Raised when an asynchronous operation is cancelled before execution.

Class.new(Error)
LifecycleError =

Raised when a lifecycle method (such as stop) is called in an improper sequence or when the object is in an inappropriate state.

Class.new(Error)
ImmutabilityError =

Raised when an attempt is made to violate an immutability guarantee.

Class.new(Error)
IllegalOperationError =

Raised when an operation is attempted which is not legal given the receiver's current state

Class.new(Error)
InitializationError =

Raised when an object's methods are called when it has not been properly initialized.

Class.new(Error)
MaxRestartFrequencyError =

Raised when an object with a start/stop lifecycle has been started an excessive number of times. Often used in conjunction with a restart policy or strategy.

Class.new(Error)
RejectedExecutionError =

Raised by an Executor when it is unable to process a given task, possibly because of a reject policy or other internal error.

Class.new(Error)
ResourceLimitError =

Raised when any finite resource, such as a lock counter, exceeds its maximum limit/threshold.

Class.new(Error)
TimeoutError =

Raised when an operation times out.

Class.new(Error)
PromiseExecutionError =
Class.new(StandardError)
VERSION =
'1.3.5'
NULL_LOGGER =

Suppresses all output when used for logging.

lambda { |level, progname, message = nil, &block| }
EDGE_VERSION =
'0.7.2'

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.abort_transactionundocumented

Abort a currently running transaction - see Concurrent::atomically.

 139 140 141
# File 'lib/concurrent-ruby/concurrent/tvar.rb', line 139 def abort_transaction raise Transaction::AbortError.new end

.atomicallyundocumented

Run a block that reads and writes TVars as a single atomic transaction. With respect to the value of TVar objects, the transaction is atomic, in that it either happens or it does not, consistent, in that the TVar objects involved will never enter an illegal state, and isolated, in that transactions never interfere with each other. You may recognise these properties from database transactions.

There are some very important and unusual semantics that you must be aware of:

  • Most importantly, the block that you pass to atomically may be executed more than once. In most cases your code should be free of side-effects, except for via TVar.

  • If an exception escapes an atomically block it will abort the transaction.

  • It is undefined behaviour to use callcc or Fiber with atomically.

  • If you create a new thread within an atomically, it will not be part of the transaction. Creating a thread counts as a side-effect.

Transactions within transactions are flattened to a single transaction.

Examples:

a = new TVar(100_000) b = new TVar(100) Concurrent::atomically do a.value -= 10 b.value += 10 end

Raises:

  • (ArgumentError)
 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
# File 'lib/concurrent-ruby/concurrent/tvar.rb', line 82 def atomically raise ArgumentError.new('no block given') unless block_given? # Get the current transaction  transaction = Transaction::current # Are we not already in a transaction (not nested)?  if transaction.nil? # New transaction  begin # Retry loop  loop do # Create a new transaction  transaction = Transaction.new Transaction::current = transaction # Run the block, aborting on exceptions  begin result = yield rescue Transaction::AbortError => e transaction.abort result = Transaction::ABORTED rescue Transaction::LeaveError => e transaction.abort break result rescue => e transaction.abort raise e end # If we can commit, break out of the loop  if result != Transaction::ABORTED if transaction.commit break result end end end ensure # Clear the current transaction  Transaction::current = nil end else # Nested transaction - flatten it and just run the block  yield end end

.available_processor_countFloat

Number of processors cores available for process scheduling. This method takes in account the CPU quota if the process is inside a cgroup with a dedicated CPU quota (typically Docker). Otherwise it returns the same value as #processor_count but as a Float.

For performance reasons the calculated value will be memoized on the first call.

Returns:

  • (Float)

    number of available processors

 194 195 196
# File 'lib/concurrent-ruby/concurrent/utility/processor_counter.rb', line 194 def self.available_processor_count processor_counter.available_processor_count end

.call_dataflow(method, executor, *inputs, &block) ⇒ undocumented

Raises:

  • (ArgumentError)
 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
# File 'lib/concurrent-ruby/concurrent/dataflow.rb', line 56 def call_dataflow(method, executor, *inputs, &block) raise ArgumentError.new('an executor must be provided') if executor.nil? raise ArgumentError.new('no block given') unless block_given? unless inputs.all? { |input| input.is_a? IVar } raise ArgumentError.new("Not all dependencies are IVars.\nDependencies: #{ inputs.inspect }") end result = Future.new(executor: executor) do values = inputs.map { |input| input.send(method) } block.call(*values) end if inputs.empty? result.execute else counter = DependencyCounter.new(inputs.size) { result.execute } inputs.each do |input| input.add_observer counter end end result end

.cpu_quotanil, Float

The maximum number of processors cores available for process scheduling. Returns nil if there is no enforced limit, or a Float if the process is inside a cgroup with a dedicated CPU quota (typically Docker).

Note that nothing prevents setting a CPU quota higher than the actual number of cores on the system.

For performance reasons the calculated value will be memoized on the first call.

Returns:

  • (nil, Float)

    Maximum number of available processors as set by a cgroup CPU quota, or nil if none set

 209 210 211
# File 'lib/concurrent-ruby/concurrent/utility/processor_counter.rb', line 209 def self.cpu_quota processor_counter.cpu_quota end

.cpu_sharesFloat, nil

The CPU shares requested by the process. For performance reasons the calculated value will be memoized on the first call.

Returns:

  • (Float, nil)

    CPU shares requested by the process, or nil if not set

 217 218 219
# File 'lib/concurrent-ruby/concurrent/utility/processor_counter.rb', line 217 def self.cpu_shares processor_counter.cpu_shares end

.create_simple_logger(level = :FATAL, output = $stderr) ⇒ undocumented

Create a simple logger with provided level and output.

 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
# File 'lib/concurrent-ruby/concurrent/concern/logging.rb', line 38 def self.create_simple_logger(level = :FATAL, output = $stderr) level = Concern::Logging.const_get(level) unless level.is_a?(Integer) # TODO (pitr-ch 24-Dec-2016): figure out why it had to be replaced, stdlogger was deadlocking  lambda do |severity, progname, message = nil, &block| return false if severity < level message = block ? block.call : message formatted_message = case message when String message when Exception format "%s (%s)\n%s", message.message, message.class, (message.backtrace || []).join("\n") else message.inspect end output.print format "[%s] %5s -- %s: %s\n", Time.now.strftime('%Y-%m-%d %H:%M:%S.%L'), Concern::Logging::SEV_LABEL[severity], progname, formatted_message true end end

.create_stdlib_logger(level = :FATAL, output = $stderr) ⇒ undocumented

Deprecated.

Create a stdlib logger with provided level and output. If you use this deprecated method you might need to add logger to your Gemfile to avoid warnings from Ruby 3.3.5+.

 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
# File 'lib/concurrent-ruby/concurrent/concern/logging.rb', line 73 def self.create_stdlib_logger(level = :FATAL, output = $stderr) require 'logger' logger = Logger.new(output) logger.level = level logger.formatter = lambda do |severity, datetime, progname, msg| formatted_message = case msg when String msg when Exception format "%s (%s)\n%s", msg.message, msg.class, (msg.backtrace || []).join("\n") else msg.inspect end format "[%s] %5s -- %s: %s\n", datetime.strftime('%Y-%m-%d %H:%M:%S.%L'), severity, progname, formatted_message end lambda do |loglevel, progname, message = nil, &block| logger.add loglevel, message, progname, &block end end

.dataflow(*inputs) {|inputs| ... } ⇒ Object

Dataflow allows you to create a task that will be scheduled when all of its data dependencies are available. Data dependencies are Future values. The dataflow task itself is also a Future value, so you can build up a graph of these tasks, each of which is run when all the data and other tasks it depends on are available or completed.

Our syntax is somewhat related to that of Akka's flow and Habanero Java's DataDrivenFuture. However unlike Akka we don't schedule a task at all until it is ready to run, and unlike Habanero Java we pass the data values into the task instead of dereferencing them again in the task.

The theory of dataflow goes back to the 70s. In the terminology of the literature, our implementation is coarse-grained, in that each task can be many instructions, and dynamic in that you can create more tasks within other tasks.

Example

A dataflow task is created with the dataflow method, passing in a block.

task = Concurrent::dataflow { 14 } 

This produces a simple Future value. The task will run immediately, as it has no dependencies. We can also specify Future values that must be available before a task will run. When we do this we get the value of those futures passed to our block.

a = Concurrent::dataflow { 1 } b = Concurrent::dataflow { 2 } c = Concurrent::dataflow(a, b) { |av, bv| av + bv } 

Using the dataflow method you can build up a directed acyclic graph (DAG) of tasks that depend on each other, and have the tasks run as soon as their dependencies are ready and there is CPU capacity to schedule them. This can help you create a program that uses more of the CPU resources available to you.

Derivation

This section describes how we could derive dataflow from other primitives in this library.

Consider a naive fibonacci calculator.

def fib(n) if n < 2 n else fib(n - 1) + fib(n - 2) end end puts fib(14) #=> 377 

We could modify this to use futures.

def fib(n) if n < 2 Concurrent::Future.new { n } else n1 = fib(n - 1).execute n2 = fib(n - 2).execute Concurrent::Future.new { n1.value + n2.value } end end f = fib(14) #=> #f.execute #=> # sleep(0.5) puts f.value #=> 377 

One of the drawbacks of this approach is that all the futures start, and then most of them immediately block on their dependencies. We know that there's no point executing those futures until their dependencies are ready, so let's not execute each future until all their dependencies are ready.

To do this we'll create an object that counts the number of times it observes a future finishing before it does something - and for us that something will be to execute the next future.

class CountingObserver def initialize(count, &block) @count = count @block = block end def update(time, value, reason) @count -= 1 if @count <= 0 @block.call() end end end def fib(n) if n < 2 Concurrent::Future.new { n }.execute else n1 = fib(n - 1) n2 = fib(n - 2) result = Concurrent::Future.new { n1.value + n2.value } barrier = CountingObserver.new(2) { result.execute } n1.add_observer barrier n2.add_observer barrier n1.execute n2.execute result end end 

We can wrap this up in a dataflow utility.

f = fib(14) #=> #sleep(0.5) puts f.value #=> 377  def dataflow(*inputs, &block) result = Concurrent::Future.new(&block) if inputs.empty? result.execute else barrier = CountingObserver.new(inputs.size) { result.execute } inputs.each do |input| input.add_observer barrier end end result end def fib(n) if n < 2 dataflow { n } else n1 = fib(n - 1) n2 = fib(n - 2) dataflow(n1, n2) { n1.value + n2.value } end end f = fib(14) #=> #sleep(0.5) puts f.value #=> 377 

Since we know that the futures the dataflow computation depends on are already going to be available when the future is executed, we might as well pass the values into the block so we don't have to reference the futures inside the block. This allows us to write the dataflow block as straight non-concurrent code without reference to futures.

def dataflow(*inputs, &block) result = Concurrent::Future.new do values = inputs.map { |input| input.value } block.call(*values) end if inputs.empty? result.execute else barrier = CountingObserver.new(inputs.size) { result.execute } inputs.each do |input| input.add_observer barrier end end result end def fib(n) if n < 2 Concurrent::dataflow { n } else n1 = fib(n - 1) n2 = fib(n - 2) Concurrent::dataflow(n1, n2) { |v1, v2| v1 + v2 } end end f = fib(14) #=> #sleep(0.5) puts f.value #=> 377 

Parameters:

  • inputs (Future)

    zero or more Future operations that this dataflow depends upon

Yields:

  • The operation to perform once all the dependencies are met

Yield Parameters:

  • inputs (Future)

    each of the Future inputs to the dataflow

Yield Returns:

  • (Object)

    the result of the block operation

Returns:

  • (Object)

    the result of all the operations

Raises:

  • (ArgumentError)

    if no block is given

  • (ArgumentError)

    if any of the inputs are not IVars

 34 35 36
# File 'lib/concurrent-ruby/concurrent/dataflow.rb', line 34 def dataflow(*inputs, &block) dataflow_with(Concurrent.global_io_executor, *inputs, &block) end

.dataflow!(*inputs, &block) ⇒ undocumented

 44 45 46
# File 'lib/concurrent-ruby/concurrent/dataflow.rb', line 44 def dataflow!(*inputs, &block) dataflow_with!(Concurrent.global_io_executor, *inputs, &block) end

.dataflow_with(executor, *inputs, &block) ⇒ undocumented

 39 40 41
# File 'lib/concurrent-ruby/concurrent/dataflow.rb', line 39 def dataflow_with(executor, *inputs, &block) call_dataflow(:value, executor, *inputs, &block) end

.dataflow_with!(executor, *inputs, &block) ⇒ undocumented

 49 50 51
# File 'lib/concurrent-ruby/concurrent/dataflow.rb', line 49 def dataflow_with!(executor, *inputs, &block) call_dataflow(:value!, executor, *inputs, &block) end

.disable_at_exit_handlers!undocumented

Deprecated.

Has no effect since it is no longer needed, see https://github.com/ruby-concurrency/concurrent-ruby/pull/841.

Note:

this option should be needed only because of at_exit ordering issues which may arise when running some of the testing frameworks. E.g. Minitest's test-suite runs itself in at_exit callback which executes after the pools are already terminated. Then auto termination needs to be disabled and called manually after test-suite ends.

Note:

This method should never be called from within a gem. It should only be used from within the main application and even then it should be used only when necessary.

Disables AtExit handlers including pool auto-termination handlers. When disabled it will be the application programmer's responsibility to ensure that the handlers are shutdown properly prior to application exit by calling AtExit.run method.

 48 49 50
# File 'lib/concurrent-ruby/concurrent/configuration.rb', line 48 def self.disable_at_exit_handlers! deprecated "Method #disable_at_exit_handlers! has no effect since it is no longer needed, see https://github.com/ruby-concurrency/concurrent-ruby/pull/841." end

.executor(executor_identifier) ⇒ Executor

General access point to global executors.

Parameters:

Returns:

  • (Executor)
 83 84 85
# File 'lib/concurrent-ruby/concurrent/configuration.rb', line 83 def self.executor(executor_identifier) Options.executor(executor_identifier) end

.global_fast_executorThreadPoolExecutor

Global thread pool optimized for short, fast operations.

Returns:

 55 56 57
# File 'lib/concurrent-ruby/concurrent/configuration.rb', line 55 def self.global_fast_executor GLOBAL_FAST_EXECUTOR.value! end

.global_immediate_executorundocumented

 66 67 68
# File 'lib/concurrent-ruby/concurrent/configuration.rb', line 66 def self.global_immediate_executor GLOBAL_IMMEDIATE_EXECUTOR end

.global_io_executorThreadPoolExecutor

Global thread pool optimized for long, blocking (IO) tasks.

Returns:

 62 63 64
# File 'lib/concurrent-ruby/concurrent/configuration.rb', line 62 def self.global_io_executor GLOBAL_IO_EXECUTOR.value! end

.global_loggerundocumented

 114 115 116
# File 'lib/concurrent-ruby/concurrent/concern/logging.rb', line 114 def self.global_logger GLOBAL_LOGGER.value end

.global_logger=(value) ⇒ undocumented

 118 119 120
# File 'lib/concurrent-ruby/concurrent/concern/logging.rb', line 118 def self.global_logger=(value) GLOBAL_LOGGER.value = value end

.global_timer_setConcurrent::TimerSet

Global thread pool user for global timers.

Returns:

 73 74 75
# File 'lib/concurrent-ruby/concurrent/configuration.rb', line 73 def self.global_timer_set GLOBAL_TIMER_SET.value! end

.leave_transactionundocumented

Leave a transaction without committing or aborting - see Concurrent::atomically.

 144 145 146
# File 'lib/concurrent-ruby/concurrent/tvar.rb', line 144 def leave_transaction raise Transaction::LeaveError.new end

.monotonic_time(unit = :float_second) ⇒ Float

Note:

Time calculations on all platforms and languages are sensitive to changes to the system clock. To alleviate the potential problems associated with changing the system clock while an application is running, most modern operating systems provide a monotonic clock that operates independently of the system clock. A monotonic clock cannot be used to determine human-friendly clock times. A monotonic clock is used exclusively for calculating time intervals. Not all Ruby platforms provide access to an operating system monotonic clock. On these platforms a pure-Ruby monotonic clock will be used as a fallback. An operating system monotonic clock is both faster and more reliable than the pure-Ruby implementation. The pure-Ruby implementation should be fast and reliable enough for most non-realtime operations. At this time the common Ruby platforms that provide access to an operating system monotonic clock are MRI 2.1 and above and JRuby (all versions).

Returns the current time as tracked by the application monotonic clock.

Parameters:

  • unit (Symbol) (defaults to: :float_second)

    the time unit to be returned, can be either :float_second, :float_millisecond, :float_microsecond, :second, :millisecond, :microsecond, or :nanosecond default to :float_second.

Returns:

  • (Float)

    The current monotonic time since some unspecified starting point

See Also:

 15 16 17
# File 'lib/concurrent-ruby/concurrent/utility/monotonic_time.rb', line 15 def monotonic_time(unit = :float_second) Process.clock_gettime(Process::CLOCK_MONOTONIC, unit) end

.new_fast_executor(opts = {}) ⇒ undocumented

 87 88 89 90 91 92 93 94 95 96
# File 'lib/concurrent-ruby/concurrent/configuration.rb', line 87 def self.new_fast_executor(opts = {}) FixedThreadPool.new( [2, Concurrent.processor_count].max, auto_terminate: opts.fetch(:auto_terminate, true), idletime: 60, # 1 minute  max_queue: 0, # unlimited  fallback_policy: :abort, # shouldn't matter -- 0 max queue  name: "fast" ) end

.new_io_executor(opts = {}) ⇒ undocumented

 98 99 100 101 102 103 104
# File 'lib/concurrent-ruby/concurrent/configuration.rb', line 98 def self.new_io_executor(opts = {}) CachedThreadPool.new( auto_terminate: opts.fetch(:auto_terminate, true), fallback_policy: :abort, # shouldn't matter -- 0 max queue  name: "io" ) end

.physical_processor_countInteger

Number of physical processor cores on the current system. For performance reasons the calculated value will be memoized on the first call.

On Windows the Win32 API will be queried for the NumberOfCores from Win32_Processor. This will return the total number "of cores for the current instance of the processor." On Unix-like operating systems either the hwprefs or sysctl utility will be called in a subshell and the returned value will be used. In the rare case where none of these methods work or an exception is raised the function will simply return 1.

 181 182 183
# File 'lib/concurrent-ruby/concurrent/utility/processor_counter.rb', line 181 def self.physical_processor_count processor_counter.physical_processor_count end

.processor_countInteger

Number of processors seen by the OS and used for process scheduling. For performance reasons the calculated value will be memoized on the first call.

When running under JRuby the Java runtime call java.lang.Runtime.getRuntime.availableProcessors will be used. According to the Java documentation this "value may change during a particular invocation of the virtual machine... [applications] should therefore occasionally poll this property." We still memoize this value once under JRuby.

Otherwise Ruby's Etc.nprocessors will be used.

Returns:

  • (Integer)

    number of processors seen by the OS or Java runtime

See Also:

 160 161 162
# File 'lib/concurrent-ruby/concurrent/utility/processor_counter.rb', line 160 def self.processor_count processor_counter.processor_count end

.use_simple_logger(level = :FATAL, output = $stderr) ⇒ undocumented

Use logger created by #create_simple_logger to log concurrent-ruby messages.

 66 67 68
# File 'lib/concurrent-ruby/concurrent/concern/logging.rb', line 66 def self.use_simple_logger(level = :FATAL, output = $stderr) Concurrent.global_logger = create_simple_logger level, output end

.use_stdlib_logger(level = :FATAL, output = $stderr) ⇒ undocumented

Deprecated.

Use logger created by #create_stdlib_logger to log concurrent-ruby messages.

 101 102 103
# File 'lib/concurrent-ruby/concurrent/concern/logging.rb', line 101 def self.use_stdlib_logger(level = :FATAL, output = $stderr) Concurrent.global_logger = create_stdlib_logger level, output end

Instance Method Details

#exchange(value, timeout = nil) ⇒ Object

Waits for another thread to arrive at this exchange point (unless the current thread is interrupted), and then transfers the given object to it, receiving its object in return. The timeout value indicates the approximate number of seconds the method should block while waiting for the exchange. When the timeout value is nil the method will block indefinitely.

In some edge cases when a timeout is given a return value of nil may be ambiguous. Specifically, if nil is a valid value in the exchange it will be impossible to tell whether nil is the actual return value or if it signifies timeout. When nil is a valid value in the exchange consider using #exchange! or #try_exchange instead.

Parameters:

  • value (Object)

    the value to exchange with another thread

  • timeout (Numeric, nil) (defaults to: nil)

    in seconds, nil blocks indefinitely

Returns:

  • (Object)

    the value exchanged by the other thread or nil on timeout

 
# File 'lib/concurrent-ruby/concurrent/exchanger.rb', line 341 

#exchange!(value, timeout = nil) ⇒ Object

Waits for another thread to arrive at this exchange point (unless the current thread is interrupted), and then transfers the given object to it, receiving its object in return. The timeout value indicates the approximate number of seconds the method should block while waiting for the exchange. When the timeout value is nil the method will block indefinitely.

On timeout a TimeoutError exception will be raised.

Parameters:

  • value (Object)

    the value to exchange with another thread

  • timeout (Numeric, nil) (defaults to: nil)

    in seconds, nil blocks indefinitely

Returns:

  • (Object)

    the value exchanged by the other thread

Raises:

 
# File 'lib/concurrent-ruby/concurrent/exchanger.rb', line 345 

#initialize(opts = {}) ⇒ undocumented

Create a new thread pool.

Options Hash (opts):

  • :fallback_policy (Symbol) — default: :discard

    the policy for handling new tasks that are received when the queue size has reached max_queue or the executor has shut down

Raises:

  • (ArgumentError)

    if :fallback_policy is not one of the values specified in FALLBACK_POLICIES

See Also:

 
# File 'lib/concurrent-ruby/concurrent/exchanger.rb', line 338 

#try_exchange(value, timeout = nil) ⇒ Concurrent::Maybe

Waits for another thread to arrive at this exchange point (unless the current thread is interrupted), and then transfers the given object to it, receiving its object in return. The timeout value indicates the approximate number of seconds the method should block while waiting for the exchange. When the timeout value is nil the method will block indefinitely.

The return value will be a Maybe set to Just on success or Nothing on timeout.

Examples:

 exchanger = Concurrent::Exchanger.new result = exchanger.exchange(:foo, 0.5) if result.just? puts result.value #=> :bar else puts 'timeout' end

Parameters:

  • value (Object)

    the value to exchange with another thread

  • timeout (Numeric, nil) (defaults to: nil)

    in seconds, nil blocks indefinitely

Returns:

  • (Concurrent::Maybe)

    on success a Just maybe will be returned with the item exchanged by the other thread as #value; on timeout a Nothing maybe will be returned with TimeoutError as #reason

 
# File 'lib/concurrent-ruby/concurrent/exchanger.rb', line 349