TL;DR: In this post I explain how you can have persistent connections to Redis using Async::Redis client inside Sidekiq and why it doesn't work by default. There are some basic explanations of how Async and Sidekiq work and some source code.
All example code can be found here. Note that we don't actually need Rails, but I used it for examples because Sidekiq is usually used with Rails.
Async is a composable asynchronous I/O framework for Ruby. It allows you to do things concurrently using Fibers. Since 3.0, Ruby has a fiber scheduler and Ruby core supports it. This means you can have non-blocking I/O without much effort, for example, when using Net::HTTP
. If you perform a blocking operation, such as an HTTP call, inside a fiber, it will immediately yield so that another fiber can become active and do some useful work instead of blocking and waiting for the HTTP call to complete.
Using Ruby Async::Redis gem
Besides Ruby's support for the fiber scheduler, for some I/O operations, you might use specific gems, like Async::Redis. You can still use other Ruby gems that use native Ruby I/O with Async and they will give you non-blocking I/O as well, but there are two reasons to prefer Async::Redis:
- Its author, Samuel Williams, worked hard to ensure the interface is more consistent and easier to use. This is especially true for support of pipelines, transactions and subscriptions.
- It uses Async::Pool so you don't need to make connection pool by yourself. Async::Pool has persistent connections out of box, and also it's great that connections are created in a lazy way: a new one will be created only if there are no free connections (and if specified connections limit wasn't reached). This can be extremely handy if you use Async inside Sidekiq and can't calculate how many connections you will need.
The usage is simple:
First, add gem to Gemfile:
gem 'async-redis'
Then create a client:
endpoint = Async::Redis.local_endpoint # localhost:6379 @client = Async::Redis::Client.new(endpoint, limit: 10)
Limit
parameter will be passed to Async::Pool to limit maximum number of acquired connections, you also can check a short doc about another parameter you can pass, concurrency
.
# source code of Async::Pool module Async module Pool class Controller def self.wrap(**options, &block) self.new(block, **options) end def initialize(constructor, limit: nil, concurrency: (limit || 1), policy: nil)
Async::Redis.local_endpoint
is just a IO::Endpoint.tcp
with default local Redis host and port, we can find it in the gem's source code:
def self.local_endpoint(port: 6379) ::IO::Endpoint.tcp('localhost', port) end
Now we can call the client:
Sync do @client.set('key', 'value') puts @client.get('key') # => "value" ensure @client.close end
The resulting code:
# app/lib/redis_example.rb require 'async/redis' endpoint = Async::Redis.local_endpoint @client = Async::Redis::Client.new(endpoint) Sync do @client.set('key', 'value') puts @client.get('key') # => "value" ensure @client.close end
Why do we need a Sync do
block? We'll dive into this a little further.
Using Async::Redis gem with Ruby on Rails
This setup is pretty simple, however, most of the time we don't write pure Ruby programs, usually we have Rails. For Rails, it would be handy to have a persistent client that we can call anywhere. Let's create one:
# app/lib/async_redis_client.rb class AsyncRedisClient def self.instance Thread.current.thread_variable_get(:redis_client) || Thread.current.thread_variable_set(:redis_client, new) end def initialize @client = Async::Redis::Client.new(Async::Redis.local_endpoint, limit: 10) end def with yield @client end end
We use Thread.current.thread_variable_get
and Thread.current.thread_variable_set
to make our client thread-safe. If you won't do it, you will get fiber called across threads
error pretty soon.
Now we have a persistent client that should have persistent connections. We can use it in Sidekiq, right?
# app/workers/redis_worker.rb class RedisWorker include Sidekiq::Job def perform AsyncRedisClient.instance.with do |redis| redis.set('key', 'value') end end end
No. Remember using Sync do
in the beginning? Without it, we will receive this error:
No async task available! (RuntimeError)
A quick search and we see that Async Redis client needs to be called within an existing reactor. You can achieve it by running code inside Async do ...
or Sync do ...
block. Let's have a look at Sync
source code:
module Kernel # Run the given block of code synchronously, but within a reactor if not already in one. # # @yields {|task| ...} The block that will execute asynchronously. # @parameter task [Async::Task] The task that is executing the given block. # # @public Since `stable-v1`. # @asynchronous Will block until given block completes executing. def Sync(&block) if task = ::Async::Task.current? yield task else # This calls Fiber.set_scheduler(self): reactor = Async::Reactor.new begin return reactor.run(finished: ::Async::Condition.new, &block).wait ensure Fiber.set_scheduler(nil) end end end end
Pretty straightforward, creates reactor if not inside existing one. Typical pure Ruby program with Async looks like this:
Sync do # blabla Async do # some async stuff end Async do # some more async stuff end # some more blabla end # will wait for the finish of all async tasks inside
But we are using Sidekiq, so let's just do what it asks, wrap in Sync
block. You could also wrap it in Async
, in this example it doesn't matter a lot because both will create a Reactor and wait for task to be completed before closing Reactor. But If you already have a Reactor (like we will have at the end of the article), using Async
will lead for a Sidekiq job to be completed before our async task completed. But you also can do Async do ... end.wait
.
# app/workers/redis_worker_with_sync.rb class RedisWorkerWithSync include Sidekiq::Job def perform Sync do AsyncRedisClient.instance.with do |redis| redis.set('key', 'value') end end end end
That works! You can also refactor it; for example move Sync
block to Sidekiq middleware or to with
block of our client.
The only thing left is to check that it works as we want: with persistent connections to Redis.
Monitoring Redis connections with Ruby and Async
We could just use redis-cli
for it but we also can write a simple monitoring using Async
. Redis has an info
command that shows total_connections_received
. We can use it to monitor newly received connections after the monitor has started:
# app/lib/redis_monitor.rb require 'async' require 'async/redis' require 'tty-sparkline' require 'tty-screen' endpoint = Async::Redis.local_endpoint redis = Async::Redis::Client.new(endpoint) connection_counts = [] interval = 1 max_width = TTY::Screen.width - 20 Sync do initial_connections_count = redis.info[:total_connections_received].to_i Async do |task| loop do new_connections = redis.info[:total_connections_received].to_i - initial_connections_count connection_counts << new_connections if connection_counts.size > max_width connection_counts.shift end task.sleep(interval) end end Async do |task| loop do print "\e[H\e[2J" # Clear the screen puts "Redis Connection Monitoring" puts "Total created connections: #{connection_counts.last}" sparkline = TTY::Sparkline.new(connection_counts, width: max_width, height: 10) puts sparkline.render task.sleep(interval) end end end
Here we have two tasks. The first one collects total_connections_received
and writes it to connection_counts
that will be used for our chart. The second task uses the tty-sparkline
gem to output the chart to console.
Let's also add a Rake task to fill our Sidekiq queue:
# /lib/tasks/job_pusher.rb namespace :jobs do desc "Push jobs to the queue continuously" task push: :environment do loop do puts 'Pushing job to the queue' RedisWorkerWithSync.perform_async sleep 1 end end end
Now let's run all we need for our experiment:
bundle exec sidekiq
bundle exec rake jobs:push
-
ruby app/lib/redis_monitor.rb
And lets have a look at monitoring result:
The connection count starts at 6 (Sidekiq creates and manages its own connection pool to Redis, atually 7th connection is created by Sidekiq as well when it starts processing jobs) and increases constantly, adding one new connection every second (because we push a new job every second). Why does it happen? I mentioned in the beginning that Async::Redis has persistent connections, hasn't it?
Well, it has. But only within a reactor. Because existing Reactor is definition of a "lifetime" of a Ruby program with Async. After our top Sync or Async block finishes Redis should close all connections, why does this need them anymore?
How does Sidekiq process run
Okay, back to our Rails app. In our app, the lifetime should be the whole Sidekiq process. Luckily, Sidekiq has internal documentation on how it runs. I won't copy the entire documentation here, just the part we are interested in:
Sidekiq::Manager
manages the resources for a givenSidekiq::Capsule
. It creates and starts NSidekiq::Processor
instances. ... EachSidekiq::Processor
instance is a separate thread.
That's exactly what we need, because we need to have a separate reactor for each thread.
Sidekiq 7.3 source code uses Processor
method run
looks like this:
# frozen_string_literal: true module Sidekiq class Processor def run # By setting this thread-local, Sidekiq.redis will access +Sidekiq::Capsule#redis_pool+ # instead of the global pool in +Sidekiq::Config#redis_pool+. Thread.current[:sidekiq_capsule] = @capsule process_one until @done @callback.call(self) rescue Sidekiq::Shutdown @callback.call(self) rescue Exception => ex @callback.call(self, ex) end end end
It uses a loop for processing jobs process_one until @done
and it's easy to wrap:
Sync do process_one until @done end
We can put a file with our monkey-patch in app/lib/sidekiq/processor.rb
:
# app/lib/sidekiq/processor.rb # frozen_string_literal: true module Sidekiq class Processor def run # By setting this thread-local, Sidekiq.redis will access +Sidekiq::Capsule#redis_pool+ # instead of the global pool in +Sidekiq::Config#redis_pool+. Thread.current[:sidekiq_capsule] = @capsule Sync do process_one until @done end @callback.call(self) rescue Sidekiq::Shutdown @callback.call(self) rescue Exception => ex @callback.call(self, ex) end end end
And we need to add it to initializer to load our code instead of original:
# config/initializers/sidekiq.rb require_relative '../../app/lib/sidekiq/processor'
That's it! It will run in Sync block until we shut down Sidekiq. Now we can remove Sync block from our worker and everything works perfectly with persistent connections.
class RedisWorker include Sidekiq::Job def perform AsyncRedisClient.instance.with do |redis| redis.set('key', 'value') end end end
Now when our Sidekiq is wrapped, we can change job pusher to use RedisWorker
instead of RedisWorkerWithSinc
and run it. Let's have a look at the monitor:
We see that connections size raises slowly until it reaches 12. That's because we have Sidekiq with default concurrency of 5, this means that we have 5 separate threads and when our job gets into thread that wasn't used before it has to create it's first connection. Let's change our rake task to push more jobs:
namespace :jobs do desc "Push jobs to the queue continuously" task push: :environment do loop do puts 'Pushing job to the queue' 20.times do RedisWorker.perform_async end sleep 1 end end end
It reaches 12 instantly and stops at this number as we wanted.
Now, let's use Async inside Sidekiq. Currently, each woker performs only one Redis operation, that's why we use one same connection from the pool each time. Imagine that each worker needs to do 100 different Redis operations, with Async we could do them concurrently and more effective:
class RedisWorker include Sidekiq::Job def perform 100.times do Async do rand = SecureRandom.hex AsyncRedisClient.instance.with do |redis| redis.set(rand, rand) end end end end end
The mathematics is simple: Sidekiq utilizes 7 connection by itself and we have 5 Sidekiq threads, each thread has a connection pool to Redis with limit equal 10. Eventually we reach 57 connections.
Note that code above will finish Sidekiq job as soon as all tasks will be scheduled, not completed. We can change code to wait for completion, but we'll need much more tasks to utilize all connections in this way:
class RedisWorker include Sidekiq::Job def perform tasks = 10000.times.map do Async do rand = SecureRandom.hex AsyncRedisClient.instance.with do |redis| redis.set(rand.to_s, rand.to_s) end end end tasks.map(&:wait) end end
And this is what would happen if we rolled back our monkey-patch:
# config/initializers/sidekiq.rb # require_relative '../../app/lib/sidekiq/processor'
class RedisWorker include Sidekiq::Job def perform 100.times do Async do rand = SecureRandom.hex AsyncRedisClient.instance.with do |redis| redis.set(rand, rand) end end end end end
Finally, we can wrap whole worker in Sync
as before to see what will happen:
class RedisWorker include Sidekiq::Job def perform Sync do 100.times do Async do rand = SecureRandom.hex AsyncRedisClient.instance.with do |redis| redis.set(rand, rand) end end end end end end
A little better because previously each Async
created it's Reactor with new connection pool and now we have Sync
that's owns a Reactor with pool for all Async tasks inside. But still, much worse than with patched Sidekiq.
This example is built around Redis, but same would be true for other Async I/O gems that need a Reactor, for example Async::HTTP
.
Using Async
could be tricky, but I encourage you to try it. If you are not familiar to it (and for some reason read this article till the end) I can recommend you to watch this video from the latest RailsConf 2024.
Top comments (2)
How come Sidekiq has a pool of Redis connections if they're not persistent? Surely Sidekiq doesn't create a new Redis connection for each Redis call. So, I'm not sure what async-redis brings to the table exactly.
Hi! Sidekiq indeed has its own pool of persistent connections to Redis. It uses gem
connection_pool
. We could also use it to build our own pool forredis
gem and achieve a similar result. Here is a small discussion about difference betweenasync-redis
and other Redis clients.However, this article is more about how one could integrate
async-io
-based gems that use async-pool into Sidekiq. The same principles would apply, for example, to the async-http gem, as it also needs a persistent Reactor to maintain persistent connections.