We're back at it with another edition of Multithreaded Ruby where we'll continue to dive into concurrency using our beloved language!
Today, I'm going to introduce you to a famous multi-process synchronization problem called the Producer-Consumer problem and we're going to look at Ruby's ConditionVariable
class.
Back to Deadlock
A paragraph into the new article and we're at deadlocks again? Well yes, they're pretty prevalent and we did not actually touch on a solution to the problem last time.
Let's bring back the deadlock example we used in Part I, modified just a tiny bit.
require 'thwait' item_accessories = {} item = {} item_acc_lock = Mutex.new item_lock = Mutex.new a = Thread.new { item_acc_lock.synchronize { sleep 1 # pretend to work on item_accessories item_lock.synchronize { # pretend to work on item sleep 1 puts 'Worked on accessories, then on item' } } } b = Thread.new { item_lock.synchronize { sleep 1 # pretend to work on item item_acc_lock.synchronize { # pretend to work on item_accessories sleep 1 puts 'Worked on item, then on accessories' } } } ThWait.all_waits(a, b)
> enether$ ruby item_worker.rb /Users/enether/.rvm/rubies/ruby-2.4.1/lib/ruby/2.4.0/thwait.rb:112:in `pop': No live threads left. Deadlock? (fatal)
No surprise here, thread a
obviously takes a hold of item_acc_lock
, thread b
takes a hold of item_lock
and each of them waits for the opposite lock in an endless loop. So how could we avoid this?
What if we had a way to temporarily release one of the locks at a specific point in our program where we could afford doing so? That way, the other thread could take the lock, do its thing and return it back for the original one to finish its work.
Enter ConditionVariable
ConditionVariable is a Ruby class which lets you block a thread until another thread signals it OK to continue. It is a way to say - "I'm waiting for a lock and I can give up mine at this exact time". It is an ideal way to synchronize our a
and b
threads here:
require 'thwait' item_accessories = {} item = {} item_acc_lock = Mutex.new item_lock = Mutex.new cv = ConditionVariable.new a = Thread.new { item_acc_lock.synchronize { sleep 1 # pretend to work on item_accessories # At this point, we've just finished work on item_accessories and we're at a window where we # might not care if item_accessories changes. So: let somebody else take it and give it back cv.wait(item_acc_lock) # Temporarily sleeps the thread and releases the lock puts 'Gained back access to item_acc_lock' # on this line, item_acc_lock is re-acquired item_lock.synchronize { # pretend to work on item sleep 1 puts 'Worked on accessories, then on item' } } } b = Thread.new { item_lock.synchronize { sleep 1 # pretend to work on item item_acc_lock.synchronize { # pretend to work on item_accessories sleep 1 puts 'Worked on item, then on accessories' } cv.signal puts "I'm still working, but I'm finished with item_acc_lock" } } ThWait.all_waits(a, b)
> enether$ ruby synchronized_item_worker.rb Worked on item, then on accessories I'm still working, but I'm finished with item_acc_lock Gained back access to item_acc_lock Worked on accessories, then on item
What we achieved here is a sort of synchronization: we can now be sure that one b
thread will always reach its cv.signal
line before an a
thread starts working on item_accessories
.
Here is a picture visualizing the process:
You might want to open this in another tab - High-Resolution
It is worth noting that the ConditionVariable#signal
method will only wake up one thread which is waiting for the variable. This means that if we have two threads waiting on a ConditionVariable
and its signal
method is called only once, the thread that does not get called will end up waiting forever for the ConditionVariable
, resulting in a deadlock
require 'thwait' lock = Mutex.new cv = ConditionVariable.new threads = [] 2.times do threads << Thread.new { lock.synchronize { cv.wait(lock) } } end threads << Thread.new { lock.synchronize { sleep 1 } cv.signal } ThWait.all_waits(*threads)
> enether$ ruby cv_pitfall.rb /Users/enether/.rvm/rubies/ruby-2.4.1/lib/ruby/2.4.0/thwait.rb:112:in `pop': No live threads left. Deadlock? (fatal)
In such a scenario, you need to either call signal
as many times as there are waits
or use another method - ConditionVariable#broadcast
, which will wake up every thread that is waiting on the condition variable.
Producer-Consumer
The producer-consumer problem consists of at minimum two threads, one representing a producer and one representing a consumer.
- Producer - Sole job is to create an item and put it into the buffer
- Consumer - Sole job is to take the item from the buffer and process it
Here is a sample implementation in Ruby, where the tasks
array acts as the buffer with a fictional limitation of having at most 2 items in it at once:
require 'thwait' threads = [] tasks = [] mutex = Mutex.new # producer 2.times do threads << Thread.new do loop do mutex.synchronize do if tasks.length < 2 tasks << "Task :)" end end end end end # consumer 5.times do threads << Thread.new do loop do task = nil mutex.synchronize do if tasks.length != 0 task = tasks.shift end end unless task.nil? 100000.times do # Simulating task execution's CPU work # also doing it outside the mutex so we don't block the tasks array (other producer might want to take a task as well) end end end end end ThWait.all_waits(threads)
Producer-Consumer problem
As the name implies, something is not quite right with the code above. Finding problems in concurrent code is hard, so I'm going to give you a couple of minutes to figure out what is wrong.
...
...
...
Okay, if you managed to figure it out - great, if not - don't fret, multithreaded programming is unintuitive.
The problem with the code above is that it wastes time. You see, as we can't control when and to which thread the OS switches to, there is the possibility that we leave our consumer thread and enter the producer's when there is no reason to.
Imagine all our consumer threads are currently executing a task and our tasks
array is full (has two elements). If the OS decides to do a context switch and gives control to a producer thread it would only waste time. Since the tasks
array would be full, the producer would loop only to check that the array is full and not do anything else, only managing to take precious CPU time from our consumer threads.
Let's track exactly how much useless iterations this thing does:
require 'thwait' threads = [] tasks = [] mutex = Mutex.new to_exit = false times_tasks_added = 0 times_time_wasted = 0 executed_tasks = 0 # consumer 2.times do threads << Thread.new do loop do Thread.kill(Thread.current) if to_exit # a way to stop execution mutex.synchronize do if tasks.length < 2 tasks << "Task :)" times_tasks_added += 1 else # time here is absolutely wasted times_time_wasted += 1 end end end end end # producer 5.times do threads << Thread.new do loop do Thread.kill(Thread.current) if to_exit # a way to stop execution task = nil mutex.synchronize do if tasks.length != 0 task = tasks.shift end end unless task.nil? 100000.times do # Simulating CPU work # also doing it outside the mutex so we don't block the tasks array (other producer might want to take a task as well) end executed_tasks += 1 if executed_tasks >= 100 # don't loop forever to_exit = true end end end end end ThWait.all_waits(threads) puts "Total tasks added: #{times_tasks_added}" puts "Total times we branched out into the useless else statement: #{times_time_wasted}"
And here are the results:
> enether$ ruby squander_of_time.rb Total tasks added: 102 Total times we branched out into the useless else statement: 1633 > enether$ ruby squander_of_time.rb Total tasks added: 102 Total times we branched out into the useless else statement: 848282 > enether$ ruby squander_of_time.rb Total tasks added: 102 Total times we branched out into the useless else statement: 356418
We know that thread context-switching is non-deterministic and these results further prove it. Sometimes we do as little as 1633 useless executions of the else
branch and some - as much as 848k, 8316 times more than we need!
This is bad because it will cause your program to run slower at some times and behave seemingly normal in others.
It's good to note that I personally found it hard to figure out the problem in this code even though it's specifically made to illustrate it. Imagine how hard it would be to spot such a thing in an established codebase!
This slow-down is not acceptable, so let's fix it. Thinking the problem through, our problem seems to boil down to having a producer thread wake up when it doesn't make sense.
What would be perfect is if we had the ability to somehow control when we resume the producer thread - specifically when a task is removed from the tasks
buffer so we're sure there's room to add another one.
ConditionVariable to the rescue!
The fix is simple: We're just going to put a condition variable which is going to give up the mutex
lock whenever we detect that we do not need to continue looping in the producer. We're also going to need to tell the producer he can resume when we have an empty spot in the tasks
buffer.
require 'thwait' threads = [] tasks = [] mutex = Mutex.new to_exit = false times_tasks_added = 0 times_time_wasted = 0 executed_tasks = 0 cv = ConditionVariable.new # consumer 2.times do threads << Thread.new do loop do Thread.kill(Thread.current) if to_exit # a way to stop execution mutex.synchronize do if tasks.length < 2 tasks << "Task :)" times_tasks_added += 1 else times_time_wasted += 1 cv.wait(mutex) # no need to continue looping in such a case, only continue after it makes sense end end end end end # producer 5.times do threads << Thread.new do loop do Thread.kill(Thread.current) if to_exit # a way to stop execution task = nil mutex.synchronize do if tasks.length != 0 task = tasks.shift cv.signal # one new task can now be added end end unless task.nil? 100000.times do # Simulating CPU work # also doing it outside the mutex so we don't block te tasks array (other producer might want to take a task as well) end executed_tasks += 1 if executed_tasks >= 100 # don't loop forever to_exit = true end end end end end ThWait.all_waits(threads) puts "Total tasks added: #{times_tasks_added}" puts "Total times we branched out into the useless else statement: #{times_time_wasted}"
> enether$ ruby saver_of_time.rb Total tasks added: 101 Total times we branched out into the useless else statement: 50 > enether$ ruby saver_of_time.rb Total tasks added: 100 Total times we branched out into the useless else statement: 45 > enether$ ruby saver_of_time.rb Total tasks added: 100 Total times we branched out into the useless else statement: 42
Woo, performance!
In reality, it's worth noting that the previous example and this one actually run in about the same time. 400k useless iterations sound like a lot but our computers are fast enough to not let us notice this inefficiency. Regardless, I hope this example managed to clearly illustrate the problem.
Further optimization
Do we even need to enter the else
branch at all? Could we not put cv.wait
inside the block which adds the tasks and have it call wait
when the buffer is full? We can and that way it should never enter the else
block, as it would only be resumed to add a task and sleep if the buffer is full.
# ... loop do Thread.kill(Thread.current) if to_exit # a way to stop execution mutex.synchronize do if tasks.length < 2 tasks << "Task :)" times_tasks_added += 1 if tasks.length >= 2 cv.wait(mutex) # no need to continue looping in such a case, only continue after it makes sense end else times_time_wasted += 1 end end end # ...
> enether$ ruby no_time_wasted.rb Total tasks added: 100. Total times we branched out into the useless else statement: 16 > enether$ ruby no_time_wasted.rb Total tasks added: 102 Total times we branched out into the useless else statement: 13 > enether$ ruby no_time_wasted.rb Total tasks added: 101 Total times we branched out into the useless else statement: 372021
What the hell? We still got entered the else
block and we even got back to the previous levels of needless execution!
Concurrent programming is hard. We're using two producer threads here and when one fills up the tasks
array it frees the mutex. The other producer thread seems to get resumed (remember we free the mutex only when tasks
is full) and enters the else
branch as the tasks
buffer is full.
You might want to open this in another tab - High-Resolution
Okay, well the simplest thing to do is put a wait
back where we had one. This should limit the useless calls as much as possible:
# ... loop do Thread.kill(Thread.current) if to_exit # a way to stop execution mutex.synchronize do if tasks.length < 2 tasks << "Task :)" times_tasks_added += 1 if tasks.length >= 2 cv.wait(mutex) end else times_time_wasted += 1 cv.wait(mutex) end end end # ...
> enether$ ruby no_time_wasted_fixed.rb Total tasks added: 101 Total times we branched out into the useless else statement: 5 > enether$ ruby no_time_wasted_fixed.rb Total tasks added: 101 Total times we branched out into the useless else statement: 8 > enether$ ruby no_time_wasted_fixed.rb Total tasks added: 101 Total times we branched out into the useless else statement: 3
Well I'm afraid this is as much as we can do with the ConditionVariable
. The reason we still enter the else
block a couple of times is most likely the one depicted in the image above.
Although there is one other possibility:
Spurious Wakeups
A spurious wakeup is when a ConditionVariable
gets woken up without getting signaled to. This might sound stupid but it makes sense, since it seems to boost performance in some cases. According to David R. Butenhof's Programming with POSIX Threads (ISBN 0-201-63392-2): "Spurious wakeups may sound strange, but on some multiprocessor systems, making condition wakeup completely predictable might substantially slow all condition variable operations.".
They also enforce robust multithreaded code, essentially enforcing you to take care of such cases. This is why it is strongly recommended that you always put your ConditionVariable
s inside a loop which always checks the appropriate condition (as we do with if tasks.length < 2
).
Here is an interesting discussion on the topic: comp.programming.threads
I personally could not identify when a producer thread was woken up spuriously or simply got scheduled when a previous producer went to sleep. I did dig through the MRI code to verify that cv.wait
is vulnerable to spurious wakeups.
Here is the way it gets called - rb_condvar_wait -> do_sleep -> mutex_sleep -> rb_mutex_sleep -> rb_mutex_sleep_forever -> rb_thread_sleep_deadly_allow_spurious_wakeup -> sleep_forever
It seems to boil down to calling the sleep_forever
function which calls native_sleep
in a loop. After the code exits from the sleep, Ruby checks if the thread was woken up on purpose (in RUBY_VM_CHECK_INTS_BLOCKING(th)
) and schedules it to be interrupted if so. Since ours isn't, it likely enters the if (!spurious_check)
block and break
s the loop, effectively stopping the sleep.
Summary
We touched on a couple of important topics in multithreaded programming.
We learned about the precious ConditionVariable
class and more specifically how it allows you to pause threads at will and schedule a resume when you decide to.
-
ConditionVariable#wait(mutex)
- puts the current thread to sleep, releases the given mutex for the time being and gets resumed strictly after a signal -
ConditionVariable#signal
- allows one thread that holds the given condition variable that to resume -
ConditionVariable#broadcast
- allows all threads that hold the given condition variable to resume
We dabbled into the producer-consumer problem, trying to optimize it on our own and further explored concurrency problems and in the end learned about spurious wakeups.
Top comments (2)
Hi, I really like your post which does me a favor a lot on comprehend multi-thread.
And I only get an question here to ask,
executed_tasks += 1
, don't we need to put that intomutex.synchronize
to be an atomic execution ?Great post, I loved it!
I found few typos: (?)
1.
In the code example under the "Let's track exactly how much useless iterations this thing does:" and in the next examples as well
2.