Class: Concurrent::Promises::Channel

Inherits:
Synchronization::Object
  • Object
show all
Defined in:
lib/concurrent-ruby-edge/concurrent/edge/channel.rb

Overview

Note:

Edge Features are under active development and may change frequently.

  • Deprecations are not added before incompatible changes.
  • Edge version: major is always 0, minor bump means incompatible change, patch bump means compatible change.
  • Edge features may also lack tests and documentation.
  • Features developed in concurrent-ruby-edge are expected to move to concurrent-ruby when finalised.

A first in first out channel that accepts messages with push family of methods and returns messages with pop family of methods. Pop and push operations can be represented as futures, see #pop_op and #push_op. The capacity of the channel can be limited to support back pressure, use capacity option in #initialize. #pop method blocks ans #pop_op returns pending future if there is no message in the channel. If the capacity is limited the #push method blocks and #push_op returns pending future.

Examples

Let's start by creating a channel with a capacity of 2 messages.

ch = Concurrent::Promises::Channel.new 2 # => # 

We push 3 messages, then it can be observed that the last thread pushing is sleeping since the channel is full.

threads = Array.new(3) { |i| Thread.new { ch.push message: i } } sleep 0.01 # let the threads run threads # => [#, # #, # #] 

When message is popped the last thread continues and finishes as well.

ch.pop # => {:message=>1} threads.map(&:join) # => [#, # #, # #] 

Same principle applies to popping as well. There are now 2 messages int he channel. Lets create 3 threads trying to pop a message, one will be blocked until new messages is pushed.

threads = Array.new(3) { |i| Thread.new { ch.pop } } sleep 0.01 # let the threads run threads # => [#, # #, # #] ch.push message: 3 # => # threads.map(&:value) # => [{:message=>0}, {:message=>2}, {:message=>3}] 

Promises integration

However this channel is implemented to integrate with promises therefore all operations can be represented as futures.

ch = Concurrent::Promises::Channel.new 2 # => # push_operations = Array.new(3) { |i| ch.push_op message: i } # => [#>, # #>, # #] 

We do not have to sleep here letting the futures execute as Threads. Since there is capacity for 2 messages the Promises are immediately resolved without ever allocating a Thread to execute. Push and pop operations are often more efficient. The remaining pending push operation will also never require another thread, instead it will resolve when a message is popped from the channel making a space for a new message.

ch.pop_op.value! # => {:message=>0} push_operations.map(&:value!) # => [#, # #, # #]  pop_operations = Array.new(3) { |i| ch.pop_op } # => [#1}>, # #2}>, # #] ch.push message: 3 # (push|pop) can be freely mixed with (push_o|pop_op) pop_operations.map(&:value) # => [{:message=>1}, {:message=>2}, {:message=>3}] 

Selecting over channels

A selection over channels can be created with the .select_channel factory method. It will be fulfilled with a first message available in any of the channels. It returns a pair to be able to find out which channel had the message available.

ch1 = Concurrent::Promises::Channel.new 2 # => # ch2 = Concurrent::Promises::Channel.new 2 # => # ch1.push 1 # => # ch2.push 2 # => #  Concurrent::Promises::Channel.select([ch1, ch2]) # => [#, 1] ch1.select(ch2) # => [#, 2]  Concurrent::Promises.future { 3 + 4 }.then_channel_push(ch1) # => # Concurrent::Promises::Channel. # or `ch1.select_op(ch2)` would be equivalent  select_op([ch1, ch2]). then('got number %03d from ch%d') { |(channel, value), format| format format, value, [ch1, ch2].index(channel).succ }.value! # => "got number 007 from ch1" 

try_ variants

All blocking operations (#pop, #push, #select) have non-blocking variant with try_ prefix. They always return immediately and indicate either success or failure.

ch # => # ch.try_push 1 # => true ch.try_push 2 # => true ch.try_push 3 # => false ch.try_pop # => 1 ch.try_pop # => 2 ch.try_pop # => nil 

Timeouts

All blocking operations (#pop, #push, #select) have a timeout option. Similar to try_ variants it will indicate success or timing out, when the timeout option is used.

ch # => # ch.push 1, 0.01 # => true ch.push 2, 0.01 # => true ch.push 3, 0.01 # => false ch.pop 0.01 # => 1 ch.pop 0.01 # => 2 ch.pop 0.01 # => nil 

Backpressure

Most importantly the channel can be used to create systems with backpressure. A self adjusting system where the producers will slow down if the consumers are not keeping up.

channel = Concurrent::Promises::Channel.new 2 # => # log = Concurrent::Array.new # => []  producers = Array.new 2 do |i| Thread.new(i) do |i| 4.times do |j| log.push format "producer %d pushing %d", i, j channel.push [i, j] end end end # => [#, # #]  consumers = Array.new 4 do |i| Thread.new(i) do |consumer| 2.times do |j| from, message = channel.pop log.push format "consumer %d got %d. payload %d from producer %d", consumer, j, message, from do_stuff end end end # => [#, # #, # #, # #]  # wait for all to finish producers.map(&:join) # => [#, # #] consumers.map(&:join) # => [#, # #, # #, # #] # investigate log log # => ["producer 0 pushing 0", # "producer 0 pushing 1", # "producer 0 pushing 2", # "producer 1 pushing 0", # "consumer 0 got 0. payload 0 from producer 0", # "producer 0 pushing 3", # "consumer 1 got 0. payload 1 from producer 0", # "consumer 2 got 0. payload 2 from producer 0", # "consumer 3 got 0. payload 0 from producer 1", # "producer 1 pushing 1", # "producer 1 pushing 2", # "consumer 1 got 1. payload 3 from producer 0", # "producer 1 pushing 3", # "consumer 2 got 1. payload 1 from producer 1", # "consumer 3 got 1. payload 2 from producer 1", # "consumer 0 got 1. payload 3 from producer 1"] 

The producers are much faster than consumers (since they do_stuff which takes some time)
but as it can be seen from the log they fill the channel and then they slow down until there is space available in the channel.

If permanent allocation of threads to the producers and consumers has to be avoided, the threads can be replaced with promises that run a thread pool.

channel = Concurrent::Promises::Channel.new 2 # => # log = Concurrent::Array.new # => []  def produce(channel, log, producer, i) log.push format "producer %d pushing %d", producer, i channel.push_op([producer, i]).then do i + 1 < 4 ? produce(channel, log, producer, i + 1) : :done end end # => :produce  def consume(channel, log, consumer, i) channel.pop_op.then(consumer, i) do |(from, message), consumer, i| log.push format "consumer %d got %d. payload %d from producer %d", consumer, i, message, from do_stuff i + 1 < 2 ? consume(channel, log, consumer, i + 1) : :done end end # => :consume  producers = Array.new 2 do |i| Concurrent::Promises.future(channel, log, i) { |*args| produce *args, 0 }.run end # => [#, # #]  consumers = Array.new 4 do |i| Concurrent::Promises.future(channel, log, i) { |*args| consume *args, 0 }.run end # => [#, # #, # #, # #]  # wait for all to finish producers.map(&:value!) # => [:done, :done] consumers.map(&:value!) # => [:done, :done, :done, :done] # investigate log log # => ["producer 0 pushing 0", # "producer 1 pushing 0", # "producer 1 pushing 1", # "consumer 1 got 0. payload 0 from producer 1", # "consumer 2 got 0. payload 1 from producer 1", # "producer 0 pushing 1", # "producer 0 pushing 2", # "producer 0 pushing 3", # "producer 1 pushing 2", # "consumer 0 got 0. payload 0 from producer 0", # "consumer 3 got 0. payload 1 from producer 0", # "producer 1 pushing 3", # "consumer 2 got 1. payload 2 from producer 0", # "consumer 1 got 1. payload 3 from producer 0", # "consumer 3 got 1. payload 3 from producer 1", # "consumer 0 got 1. payload 2 from producer 1"] 

Synchronization of workers by passing a value

If the capacity of the channel is zero then any push operation will succeed only when there is a matching pop operation which can take the message. The operations have to be paired to succeed.

channel = Concurrent::Promises::Channel.new 0 # => # thread = Thread.new { channel.pop }; sleep 0.01 # allow the thread to go to sleep thread # => # # succeeds because there is matching pop operation waiting in the thread channel.try_push(:v1) # => true # remains pending, since there is no matching operation push = channel.push_op(:v2) # => # thread.value # => :v1 # the push operation resolves as a pairing pop is called channel.pop # => :v2 push # => #> 

Constant Summary collapse

UNLIMITED_CAPACITY =

Default capacity of the Channel, makes it accept unlimited number of messages.

::Object.new
ANY =

An object which matches anything (with #===)

Object.new.tap do |any| def any.===(other) true end def any.to_s 'ANY' end end

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(capacity = UNLIMITED_CAPACITY) ⇒ Channel

Create channel.

Parameters:

  • capacity (Integer, UNLIMITED_CAPACITY) (defaults to: UNLIMITED_CAPACITY)

    the maximum number of messages which can be stored in the channel.

 64 65 66 67 68 69 70 71 72
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 64 def initialize(capacity = UNLIMITED_CAPACITY) super() @Capacity = capacity @Mutex = Mutex.new # TODO (pitr-ch 28-Jan-2019): consider linked lists or other data structures for following attributes, things are being deleted from the middle  @Probes = [] @Messages = [] @PendingPush = [] end

Class Method Details

.select(channels, timeout = nil) ⇒ ::Array(Channel, Object), nil

Returns:

See Also:

 322 323 324
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 322 def select(channels, timeout = nil) channels.first.select(channels[1..-1], timeout) end

.select_matching(matcher, channels, timeout = nil) ⇒ ::Array(Channel, Object), nil

Returns:

See Also:

 340 341 342
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 340 def select_matching(matcher, channels, timeout = nil) channels.first.select_matching(matcher, channels[1..-1], timeout) end

.select_op(channels, probe = Promises.resolvable_future) ⇒ Future(::Array(Channel, Object))

Returns:

See Also:

 316 317 318
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 316 def select_op(channels, probe = Promises.resolvable_future) channels.first.select_op(channels[1..-1], probe) end

.select_op_matching(matcher, channels, probe = Promises.resolvable_future) ⇒ Future(::Array(Channel, Object))

Returns:

See Also:

 334 335 336
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 334 def select_op_matching(matcher, channels, probe = Promises.resolvable_future) channels.first.select_op_matching(matcher, channels[1..-1], probe) end

.try_select(channels) ⇒ ::Array(Channel, Object)

Returns:

See Also:

 310 311 312
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 310 def try_select(channels) channels.first.try_select(channels[1..-1]) end

.try_select_matching(matcher, channels) ⇒ ::Array(Channel, Object)

Returns:

See Also:

 328 329 330
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 328 def try_select_matching(matcher, channels) channels.first.try_select_matching(matcher, channels[1..-1]) end

Instance Method Details

#capacityInteger

Returns Maximum capacity of the Channel.

Returns:

  • (Integer)

    Maximum capacity of the Channel.

 295 296 297
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 295 def capacity @Capacity end

#peek(no_value = nil) ⇒ Object, no_value

Behaves as #try_pop but it does not remove the message from the channel

Parameters:

  • no_value (Object) (defaults to: nil)

    returned when there is no message available

Returns:

  • (Object, no_value)

    message or nil when there is no message

 209 210 211
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 209 def peek(no_value = nil) peek_matching ANY, no_value end

#peek_matching(matcher, no_value = nil) ⇒ Object, no_value

Behaves as #try_pop but it does not remove the message from the channel

Parameters:

  • no_value (Object) (defaults to: nil)

    returned when there is no message available

  • matcher (#===)

    only consider message which matches matcher === a_message

Returns:

  • (Object, no_value)

    message or nil when there is no message

 215 216 217 218 219 220 221 222
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 215 def peek_matching(matcher, no_value = nil) @Mutex.synchronize do message = ns_shift_message matcher, false return message if message != NOTHING message = ns_consume_pending_push matcher, false return message != NOTHING ? message : no_value end end

#pop(timeout = nil, timeout_value = nil) ⇒ Object, nil

Note:

This function potentially blocks current thread until it can continue. Be careful it can deadlock.

Blocks current thread until a message is available in the channel for popping.

Parameters:

  • timeout (Numeric) (defaults to: nil)

    the maximum time in second to wait.

  • timeout_value (Object) (defaults to: nil)

    a value returned by the method when it times out

Returns:

  • (Object, nil)

    message or nil when timed out

 177 178 179
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 177 def pop(timeout = nil, timeout_value = nil) pop_matching ANY, timeout, timeout_value end

#pop_matching(matcher, timeout = nil, timeout_value = nil) ⇒ Object, nil

Note:

This function potentially blocks current thread until it can continue. Be careful it can deadlock.

Blocks current thread until a message is available in the channel for popping.

Parameters:

  • timeout (Numeric) (defaults to: nil)

    the maximum time in second to wait.

  • timeout_value (Object) (defaults to: nil)

    a value returned by the method when it times out

  • matcher (#===)

    only consider message which matches matcher === a_message

Returns:

  • (Object, nil)

    message or nil when timed out

 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 183 def pop_matching(matcher, timeout = nil, timeout_value = nil) # TODO (pitr-ch 27-Jan-2019): should it try to match pending pushes if it fails to match in the buffer? Maybe only if the size is zero. It could be surprising if it's used as a throttle it might be expected that it will not pop if buffer is full of messages which di not match, it might it expected it will block until the message is added to the buffer  # that it returns even if the buffer is full. User might expect that it has to be in the buffer first.  probe = @Mutex.synchronize do message = ns_shift_message matcher if message == NOTHING message = ns_consume_pending_push matcher return message if message != NOTHING else new_message = ns_consume_pending_push ANY @Messages.push new_message unless new_message == NOTHING return message end probe = Promises.resolvable_future @Probes.push probe, false, matcher probe end probe.value!(timeout, timeout_value, [true, timeout_value, nil]) end

#pop_op(probe = Promises.resolvable_future) ⇒ Future(Object)

Returns a future witch will become fulfilled with a value from the channel when one is available. If it is later waited on the operation with a timeout e.g.channel.pop_op.wait(1) it will not prevent the channel to fulfill the operation later after the timeout. The operation has to be either processed later

pop_op = channel.pop_op if pop_op.wait(1) process_message pop_op.value else pop_op.then { |message| log_unprocessed_message message } end 

or the operation can be prevented from completion after timing out by using channel.pop_op.wait(1, [true, nil, nil]). It will fulfill the operation on timeout preventing channel from doing the operation, e.g. popping a message.

Parameters:

  • probe (ResolvableFuture) (defaults to: Promises.resolvable_future)

    the future which will be fulfilled with a channel value

Returns:

  • (Future(Object))

    the probe, its value will be the message when available.

 160 161 162
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 160 def pop_op(probe = Promises.resolvable_future) @Mutex.synchronize { ns_pop_op(ANY, probe, false) } end

#pop_op_matching(matcher, probe = Promises.resolvable_future) ⇒ Future(Object)

Returns a future witch will become fulfilled with a value from the channel when one is available. If it is later waited on the operation with a timeout e.g.channel.pop_op.wait(1) it will not prevent the channel to fulfill the operation later after the timeout. The operation has to be either processed later

pop_op = channel.pop_op if pop_op.wait(1) process_message pop_op.value else pop_op.then { |message| log_unprocessed_message message } end 

or the operation can be prevented from completion after timing out by using channel.pop_op.wait(1, [true, nil, nil]). It will fulfill the operation on timeout preventing channel from doing the operation, e.g. popping a message.

Parameters:

  • probe (ResolvableFuture) (defaults to: Promises.resolvable_future)

    the future which will be fulfilled with a channel value

  • matcher (#===)

    only consider message which matches matcher === a_message

Returns:

  • (Future(Object))

    the probe, its value will be the message when available.

 166 167 168
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 166 def pop_op_matching(matcher, probe = Promises.resolvable_future) @Mutex.synchronize { ns_pop_op(matcher, probe, false) } end

#push(message, timeout = nil) ⇒ self, true, false

Note:

This function potentially blocks current thread until it can continue. Be careful it can deadlock.

Blocks current thread until the message is pushed into the channel.

Parameters:

  • message (Object)
  • timeout (Numeric) (defaults to: nil)

    the maximum time in second to wait.

Returns:

  • (self, true, false)

    self implies timeout was not used, true implies timeout was used and it was pushed, false implies it was not pushed within timeout.

 120 121 122 123 124 125 126 127 128 129 130 131 132
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 120 def push(message, timeout = nil) pushed_op = @Mutex.synchronize do return timeout ? true : self if ns_try_push(message) pushed = Promises.resolvable_future # TODO (pitr-ch 06-Jan-2019): clear timed out pushes in @PendingPush, null messages  @PendingPush.push message, pushed pushed end result = pushed_op.wait!(timeout, [true, self, nil]) result == pushed_op ? self : result end

#push_op(message) ⇒ ResolvableFuture(self)

Returns future which will fulfill when the message is pushed to the channel. If it is later waited on the operation with a timeout e.g.channel.pop_op.wait(1) it will not prevent the channel to fulfill the operation later after the timeout. The operation has to be either processed later

pop_op = channel.pop_op if pop_op.wait(1) process_message pop_op.value else pop_op.then { |message| log_unprocessed_message message } end 

or the operation can be prevented from completion after timing out by using channel.pop_op.wait(1, [true, nil, nil]). It will fulfill the operation on timeout preventing channel from doing the operation, e.g. popping a message.

Parameters:

  • message (Object)

Returns:

 101 102 103 104 105 106 107 108 109 110 111
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 101 def push_op(message) @Mutex.synchronize do if ns_try_push(message) Promises.fulfilled_future self else pushed = Promises.resolvable_future @PendingPush.push message, pushed return pushed end end end

#select(channels, timeout = nil) ⇒ ::Array(Channel, Object), nil

Note:

This function potentially blocks current thread until it can continue. Be careful it can deadlock.

As #select_op but does not return future, it block current thread instead until there is a message available in the receiver or in any of the channels.

Parameters:

  • channels (Channel, ::Array<Channel>)
  • timeout (Numeric) (defaults to: nil)

    the maximum time in second to wait.

Returns:

  • (::Array(Channel, Object), nil)

    message or nil when timed out

See Also:

 278 279 280
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 278 def select(channels, timeout = nil) select_matching ANY, channels, timeout end

#select_matching(matcher, channels, timeout = nil) ⇒ ::Array(Channel, Object), nil

Note:

This function potentially blocks current thread until it can continue. Be careful it can deadlock.

As #select_op but does not return future, it block current thread instead until there is a message available in the receiver or in any of the channels.

Parameters:

  • channels (Channel, ::Array<Channel>)
  • timeout (Numeric) (defaults to: nil)

    the maximum time in second to wait.

  • matcher (#===)

    only consider message which matches matcher === a_message

Returns:

  • (::Array(Channel, Object), nil)

    message or nil when timed out

See Also:

 284 285 286 287
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 284 def select_matching(matcher, channels, timeout = nil) probe = select_op_matching(matcher, channels) probe.value!(timeout, nil, [true, nil, nil]) end

#select_op(channels, probe = Promises.resolvable_future) ⇒ ResolvableFuture(::Array(Channel, Object))

When message is available in the receiver or any of the provided channels the future is fulfilled with a channel message pair. The returned channel is the origin of the message. If it is later waited on the operation with a timeout e.g.channel.pop_op.wait(1) it will not prevent the channel to fulfill the operation later after the timeout. The operation has to be either processed later

pop_op = channel.pop_op if pop_op.wait(1) process_message pop_op.value else pop_op.then { |message| log_unprocessed_message message } end 

or the operation can be prevented from completion after timing out by using channel.pop_op.wait(1, [true, nil, nil]). It will fulfill the operation on timeout preventing channel from doing the operation, e.g. popping a message.

Parameters:

  • channels (Channel, ::Array<Channel>)
  • probe (ResolvableFuture) (defaults to: Promises.resolvable_future)

    the future which will be fulfilled with the message

Returns:

  • (ResolvableFuture(::Array(Channel, Object)))

    a future which is fulfilled with pair [channel, message] when one of the channels is available for reading

 257 258 259
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 257 def select_op(channels, probe = Promises.resolvable_future) select_op_matching ANY, channels, probe end

#select_op_matching(matcher, channels, probe = Promises.resolvable_future) ⇒ ResolvableFuture(::Array(Channel, Object))

When message is available in the receiver or any of the provided channels the future is fulfilled with a channel message pair. The returned channel is the origin of the message. If it is later waited on the operation with a timeout e.g.channel.pop_op.wait(1) it will not prevent the channel to fulfill the operation later after the timeout. The operation has to be either processed later

pop_op = channel.pop_op if pop_op.wait(1) process_message pop_op.value else pop_op.then { |message| log_unprocessed_message message } end 

or the operation can be prevented from completion after timing out by using channel.pop_op.wait(1, [true, nil, nil]). It will fulfill the operation on timeout preventing channel from doing the operation, e.g. popping a message.

Parameters:

  • channels (Channel, ::Array<Channel>)
  • probe (ResolvableFuture) (defaults to: Promises.resolvable_future)

    the future which will be fulfilled with the message

  • matcher (#===)

    only consider message which matches matcher === a_message

Returns:

  • (ResolvableFuture(::Array(Channel, Object)))

    a future which is fulfilled with pair [channel, message] when one of the channels is available for reading

 263 264 265 266
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 263 def select_op_matching(matcher, channels, probe = Promises.resolvable_future) [self, *channels].each { |ch| ch.partial_select_op matcher, probe } probe end

#sizeInteger

Returns The number of messages currently stored in the channel.

Returns:

  • (Integer)

    The number of messages currently stored in the channel.

 290 291 292
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 290 def size @Mutex.synchronize { @Messages.size } end

#to_sString Also known as: inspect

Returns Short string representation.

Returns:

  • (String)

    Short string representation.

 300 301 302
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 300 def to_s format '%s capacity taken %s of %s>', super[0..-2], size, @Capacity end

#try_pop(no_value = nil) ⇒ Object, no_value

Pop a message from the channel if there is one available.

Parameters:

  • no_value (Object) (defaults to: nil)

    returned when there is no message available

Returns:

  • (Object, no_value)

    message or nil when there is no message

 138 139 140
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 138 def try_pop(no_value = nil) try_pop_matching ANY, no_value end

#try_pop_matching(matcher, no_value = nil) ⇒ Object, no_value

Pop a message from the channel if there is one available.

Parameters:

  • no_value (Object) (defaults to: nil)

    returned when there is no message available

  • matcher (#===)

    only consider message which matches matcher === a_message

Returns:

  • (Object, no_value)

    message or nil when there is no message

 145 146 147 148 149 150 151 152
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 145 def try_pop_matching(matcher, no_value = nil) @Mutex.synchronize do message = ns_shift_message matcher return message if message != NOTHING message = ns_consume_pending_push matcher return message != NOTHING ? message : no_value end end

#try_push(message) ⇒ true, false

Push the message into the channel if there is space available.

Parameters:

  • message (Object)

Returns:

  • (true, false)
 77 78 79
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 77 def try_push(message) @Mutex.synchronize { ns_try_push(message) } end

#try_select(channels) ⇒ ::Array(Channel, Object), nil

If message is available in the receiver or any of the provided channels the channel message pair is returned. If there is no message nil is returned. The returned channel is the origin of the message.

Parameters:

Returns:

  • (::Array(Channel, Object), nil)

    pair [channel, message] if one of the channels is available for reading

 232 233 234
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 232 def try_select(channels) try_select_matching ANY, channels end

#try_select_matching(matcher, channels) ⇒ ::Array(Channel, Object), nil

If message is available in the receiver or any of the provided channels the channel message pair is returned. If there is no message nil is returned. The returned channel is the origin of the message.

Parameters:

  • channels (Channel, ::Array<Channel>)
  • matcher (#===)

    only consider message which matches matcher === a_message

Returns:

  • (::Array(Channel, Object), nil)

    pair [channel, message] if one of the channels is available for reading

 238 239 240 241 242 243 244 245
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 238 def try_select_matching(matcher, channels) message = nil channel = [self, *channels].find do |ch| message = ch.try_pop_matching(matcher, NOTHING) message != NOTHING end channel ? [channel, message] : nil end