Class: Concurrent::MVar

Inherits:
Synchronization::Object
  • Object
show all
Includes:
Concern::Dereferenceable
Defined in:
lib/concurrent-ruby/concurrent/mvar.rb

Overview

An MVar is a synchronized single element container. They are empty or contain one item. Taking a value from an empty MVar blocks, as does putting a value into a full one. You can either think of them as blocking queue of length one, or a special kind of mutable variable.

On top of the fundamental #put and #take operations, we also provide a #modify that is atomic with respect to operations on the same instance. These operations all support timeouts.

We also support non-blocking operations #try_put! and #try_take!, a #set! that ignores existing values, a #value that returns the value without removing it or returns MVar::EMPTY, and a #modify! that yields MVar::EMPTY if the MVar is empty and can be used to set MVar::EMPTY. You shouldn't use these operations in the first instance.

MVar is a Dereferenceable.

MVar is related to M-structures in Id, MVar in Haskell and SyncVar in Scala.

Note that unlike the original Haskell paper, our #take is blocking. This is how Haskell and Scala do it today.

Copy Options

Object references in Ruby are mutable. This can lead to serious problems when the Concern::Dereferenceable#value of an object is a mutable reference. Which is always the case unless the value is a Fixnum, Symbol, or similar "primitive" data type. Each instance can be configured with a few options that can help protect the program from potentially dangerous operations. Each of these options can be optionally set when the object instance is created:

  • :dup_on_deref When true the object will call the #dup method on the value object every time the #value method is called (default: false)
  • :freeze_on_deref When true the object will call the #freeze method on the value object every time the #value method is called (default: false)
  • :copy_on_deref When given a Proc object the Proc will be run every time the #value method is called. The Proc will be given the current value as its only argument and the result returned by the block will be the return value of the #value call. When nil this option will be ignored (default: nil)

When multiple deref options are set the order of operations is strictly defined. The order of deref operations is:

  • :copy_on_deref
  • :dup_on_deref
  • :freeze_on_deref

Because of this ordering there is no need to #freeze an object created by a provided :copy_on_deref block. Simply set :freeze_on_deref to true. Setting both :dup_on_deref to true and :freeze_on_deref to true is as close to the behavior of a "pure" functional language (like Erlang, Clojure, or Haskell) as we are likely to get in Ruby.

See Also

  1. P. Barth, R. Nikhil, and Arvind. M-Structures: Extending a parallel, non- strict, functional language with state. In Proceedings of the 5th ACM Conference on Functional Programming Languages and Computer Architecture (FPCA), 1991.

  2. S. Peyton Jones, A. Gordon, and S. Finne. Concurrent Haskell. In Proceedings of the 23rd Symposium on Principles of Programming Languages (PoPL), 1996.

Constant Summary collapse

EMPTY =

Unique value that represents that an MVar was empty

::Object.new
TIMEOUT =

Unique value that represents that an MVar timed out before it was able to produce a value.

::Object.new

Instance Method Summary collapse

Constructor Details

#initialize(value = EMPTY, opts = {}) ⇒ MVar

Create a new MVar, either empty or with an initial value.

Parameters:

  • opts (Hash) (defaults to: {})

    the options controlling how the future will be processed

Options Hash (opts):

  • :dup_on_deref (Boolean) — default: false

    Call #dup before returning the data from Concern::Dereferenceable#value

  • :freeze_on_deref (Boolean) — default: false

    Call #freeze before returning the data from Concern::Dereferenceable#value

  • :copy_on_deref (Proc) — default: nil

    When calling the Concern::Dereferenceable#value method, call the given proc passing the internal value as the sole argument then return the new value returned from the proc.

 54 55 56 57 58 59 60
# File 'lib/concurrent-ruby/concurrent/mvar.rb', line 54 def initialize(value = EMPTY, opts = {}) @value = value @mutex = Mutex.new @empty_condition = ConditionVariable.new @full_condition = ConditionVariable.new set_deref_options(opts) end

Instance Method Details

#borrow(timeout = nil) ⇒ Object

acquires lock on the from an MVAR, yields the value to provided block, and release lock. A timeout can be set to limit the time spent blocked, in which case it returns TIMEOUT if the time is exceeded.

Returns:

  • (Object)

    the value returned by the block, or TIMEOUT

 86 87 88 89 90 91 92 93 94 95 96 97
# File 'lib/concurrent-ruby/concurrent/mvar.rb', line 86 def borrow(timeout = nil) @mutex.synchronize do wait_for_full(timeout) # If we timed out we'll still be empty  if unlocked_full? yield @value else TIMEOUT end end end

#empty?Boolean

Returns if the MVar is currently empty.

Returns:

  • (Boolean)
 195 196 197
# File 'lib/concurrent-ruby/concurrent/mvar.rb', line 195 def empty? @mutex.synchronize { @value == EMPTY } end

#full?Boolean

Returns if the MVar currently contains a value.

Returns:

  • (Boolean)
 200 201 202
# File 'lib/concurrent-ruby/concurrent/mvar.rb', line 200 def full? !empty? end

#modify(timeout = nil) ⇒ Object

Atomically take, yield the value to a block for transformation, and then put the transformed value. Returns the pre-transform value. A timeout can be set to limit the time spent blocked, in which case it returns TIMEOUT if the time is exceeded.

Returns:

  • (Object)

    the pre-transform value, or TIMEOUT

Raises:

  • (ArgumentError)
 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
# File 'lib/concurrent-ruby/concurrent/mvar.rb', line 123 def modify(timeout = nil) raise ArgumentError.new('no block given') unless block_given? @mutex.synchronize do wait_for_full(timeout) # If we timed out we'll still be empty  if unlocked_full? value = @value @value = yield value @full_condition.signal apply_deref_options(value) else TIMEOUT end end end

#modify!undocumented

Non-blocking version of modify that will yield with EMPTY if there is no value yet.

Raises:

  • (ArgumentError)
 179 180 181 182 183 184 185 186 187 188 189 190 191 192
# File 'lib/concurrent-ruby/concurrent/mvar.rb', line 179 def modify! raise ArgumentError.new('no block given') unless block_given? @mutex.synchronize do value = @value @value = yield value if unlocked_empty? @empty_condition.signal else @full_condition.signal end apply_deref_options(value) end end

#put(value, timeout = nil) ⇒ Object

Put a value into an MVar, blocking if there is already a value until it is empty. A timeout can be set to limit the time spent blocked, in which case it returns TIMEOUT if the time is exceeded.

Returns:

  • (Object)

    the value that was put, or TIMEOUT

 103 104 105 106 107 108 109 110 111 112 113 114 115 116
# File 'lib/concurrent-ruby/concurrent/mvar.rb', line 103 def put(value, timeout = nil) @mutex.synchronize do wait_for_empty(timeout) # If we timed out we won't be empty  if unlocked_empty? @value = value @full_condition.signal apply_deref_options(value) else TIMEOUT end end end

#set!(value) ⇒ undocumented

Non-blocking version of put that will overwrite an existing value.

 169 170 171 172 173 174 175 176
# File 'lib/concurrent-ruby/concurrent/mvar.rb', line 169 def set!(value) @mutex.synchronize do old_value = @value @value = value @full_condition.signal apply_deref_options(old_value) end end

#take(timeout = nil) ⇒ Object

Remove the value from an MVar, leaving it empty, and blocking if there isn't a value. A timeout can be set to limit the time spent blocked, in which case it returns TIMEOUT if the time is exceeded.

Returns:

  • (Object)

    the value that was taken, or TIMEOUT

 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
# File 'lib/concurrent-ruby/concurrent/mvar.rb', line 66 def take(timeout = nil) @mutex.synchronize do wait_for_full(timeout) # If we timed out we'll still be empty  if unlocked_full? value = @value @value = EMPTY @empty_condition.signal apply_deref_options(value) else TIMEOUT end end end

#try_put!(value) ⇒ undocumented

Non-blocking version of put, that returns whether or not it was successful.

 156 157 158 159 160 161 162 163 164 165 166
# File 'lib/concurrent-ruby/concurrent/mvar.rb', line 156 def try_put!(value) @mutex.synchronize do if unlocked_empty? @value = value @full_condition.signal true else false end end end

#try_take!undocumented

Non-blocking version of take, that returns EMPTY instead of blocking.

 142 143 144 145 146 147 148 149 150 151 152 153
# File 'lib/concurrent-ruby/concurrent/mvar.rb', line 142 def try_take! @mutex.synchronize do if unlocked_full? value = @value @value = EMPTY @empty_condition.signal apply_deref_options(value) else EMPTY end end end

#valueObject Also known as: deref Originally defined in module Concern::Dereferenceable

Return the value this object represents after applying the options specified by the #set_deref_options method.

Returns:

  • (Object)

    the current value of the object