I have a Sidekiq worker class that run for a duration which can be quite long sometimes.
Jobs and job states are persisted in database records.
class Worker include Sidekiq::Worker def perform prepare_job_record do_work_that_might_be_long mark_job_completed end end
Sometimes worker processes are restarted due to code release.
The job record has an attribute for job state and it's "stuck" on "started" when the the job is interrupted due to worker process terminated.
1st attempt
def perform # work... rescue Sidekiq::Shutdown mark_job_interrupted end
Failed~
2nd attempt
I read the source code of Sidekiq to see if I can patch it.
But after reading it I think that would be too complex and dangerous, so I didn't even try.
But if you are interested, you can take a look on how it shutdowns the running threads:
-
Sidekiq::Manager#stop
- https://github.com/mperham/sidekiq/blob/v6.1.2/lib/sidekiq/manager.rb#L61 - Do some work to stop processors processing new jobs, run callbacks...
-
Pausing to allow workers to finish...
(The waiting time can be controlled and it is described in https://github.com/mperham/sidekiq/wiki/Deployment#overview) -
Sidekiq::Processor#kill
(https://github.com/mperham/sidekiq/blob/v6.1.2/lib/sidekiq/processor.rb#L49)
The Solution
Let's look at events
https://github.com/mperham/sidekiq/wiki/Deployment#events
On free version this also works but the fetch is delayed (something like 5s? Not sure).
Sidekiq Pro users have super_fetch
and the callback will be triggered much quicker it seems.
So I wrote a singleton class in initializer:
module Sidekiq # Too lazy to think of better name... class LifecycleCenter include Singleton def initialize super @quiet = false end def quiet! @quiet = true end def quiet? @quiet end end end Sidekiq.configure_server do |config| # Other config config.on(:quiet) do Sidekiq::LifecycleCenter.instance.quiet! end end
Nothing special. A singleton class instead of a global variable.
What takes time to figure out is the next part: Raise specific error in worker after Sidekiq fired quiet event.
Since I read the Sidekiq source code earlier, I think I should use a thread.
But I think I am a noob in threading and should try to find a gem that does the threading for me, so I found concurrent-ruby
.
In JS there is setInterval
, in concurrent-ruby
there is Concurrent::TimerTask
.
Note that any error raised inside the block passed into the task object will not raise any error (handled by the gem).
So this will NOT raise error:
sidekiq_state_checking_task = ::Concurrent::TimerTask.new( execution_interval: 1, ) do |task| task.shutdown if true raise CustomError if true end sidekiq_state_checking_task.execute
Feel free to try it in ruby console.
I studied the doc for Concurrent::TimerTask
and tried several solutions and finally got this:
class Worker include Sidekiq::Worker # Not sure if inheriting from `Interrupt` is necessary # That's what Sidekiq does on `Sidekiq::Shutdown` though class CustomShutdown < Interrupt; end def perform prepare_job_record main_thread = ::Thread.current sidekiq_state_checking_task = ::Concurrent::TimerTask.new( execution_interval: 1, ) do |task| ::Sidekiq::LifecycleCenter.instance.quiet?.tap do |quiet| # If you don't shutdown the task # it might keep running task.shutdown if quiet end end sidekiq_state_checking_task.with_observer do |_time, quiet| next unless quiet main_thread.raise CustomShutdown end.execute do_work_that_might_be_long mark_job_completed rescue CustomShutdown => e mark_job_aborted reenqueue_the_job_sometimes end end
The point is to use observer to handle the result from the block (the task). Remember to shutdown the task or it might keep running (even error raised in main thread).
TL;DR
Put the following code in initializer:
module Sidekiq # Too lazy to think of better name... class LifecycleCenter include Singleton def initialize super @quiet = false end def quiet! @quiet = true end def quiet? @quiet end end end Sidekiq.configure_server do |config| # Other config config.on(:quiet) do Sidekiq::LifecycleCenter.instance.quiet! end end
Put the following code in your worker:
class Worker include Sidekiq::Worker # Not sure if inheriting from `Interrupt` is necessary # That's what Sidekiq does on `Sidekiq::Shutdown` though class CustomShutdown < Interrupt; end def perform prepare_job_record main_thread = ::Thread.current sidekiq_state_checking_task = ::Concurrent::TimerTask.new( execution_interval: 1, ) do |task| ::Sidekiq::LifecycleCenter.instance.quiet?.tap do |quiet| # If you don't shutdown the task # it might keep running task.shutdown if quiet end end sidekiq_state_checking_task.with_observer do |_time, quiet| next unless quiet main_thread.raise CustomShutdown end.execute do_work_that_might_be_long mark_job_completed rescue CustomShutdown => e mark_job_aborted reenqueue_the_job_sometimes end end
Feel free to let me know what you think about this method.
Or write an article about your method and I will be happy to add a link to your article!
Top comments (1)
Good one, we just faced this and understood it the hard way how the worker starts the loop again when sidekiq server is restarted. Does your solution still work in 2024?