In certain scenarios, a Sidekiq worker requires exclusive operation or access to specific data. While it is ideal for a worker to be designed as idempotent, achieving this can be challenging, particularly in a distributed system. For instance, consider a worker responsible for sending out emails. If two instances of this worker are running simultaneously, there is a high likelihood that recipients will receive duplicate emails. Even worse, these emails might contain slightly different content, leading to user confusion. To address this issue, Sidekiq Enterprise offers mechanisms that facilitate exclusivity (and rate limiting).
Note: I wrote this to help a client focus on a few methods of worker exclusivity that avoid race-conditions.
There are three common exclusivity needs:
- Exclusivity irrespective of the worker’s parameters.
- Exclusivity with respect to all of the worker’s parameters.
- Exclusivity with respect to some of the worker’s parameters.
There is a fourth exclusivity need, but handling this need is outside the scope of this post.
- Exclusivity with respect to model data.
Using Sidekiq options
The simplest method to achieve exclusivity #2 is to utilize Sidekiq’s sidekiq_options unique_for: duration
. For example,
class UniqueExampleWorker include Sidekiq::Worker sidekiq_options unique_for: 1.minute def perform(params = {}) # do the work end end
The option’s duration represents an estimate of how long the worker is expected to run. This exclusivity will only be maintained for the specified duration. If the worker exceeds this time, Sidekiq will not prevent a second worker from running concurrently. When utilizing this mechanism, it's advisable to adopt a conservative approach regarding the duration; in this case, opting for a longer duration is preferable to aiming for precision.
If the worker finishes before the lock_timeout duration, the lock is released and another worker can be queued.
An additional benefit of utilizing this option is that if a worker is already queued or currently running, attempting to queue another worker (eg UniqueExampleWorker.perform_async
) will return nil
instead of the job id.
Using Sidekiq Limiter
A more adaptable approach to achieving exclusivity for #1, #2, and #3 is to utilize Sidekiq::Limiter.concurrent
. Unlike the standard Sidekiq options, this approach is implemented within the worker's perform
method. The worker operates as usual, while the limiter allows for a swift exit when necessary. For example, to configure and implement exclusivity #1
class UniqueClassExampleWorker include Sidekiq::Worker LIMITER = Sidekiq::Limiter.concurrent( "unique_class_example_worker_limiter", # name 1, # count, ie expect exclusivity lock_timeout: 1.minute, # expected duration of worker wait_timeout: 0, # don't wait around for running worker to finish policy: :ignore, # don't reschedule the worker for later ) def perform(params = {}) LIMITER.within_limit do # do the work end end end
The key to exclusivity is the limiter name and the count. Here the name is based on the class name. Any concurrent use of this worker, irrespective of parameters, will not enter the within_limit block.
The limiter’s lock_timeout represents an estimate of how long the worker is expected to run. This exclusivity will only be maintained for the specified duration. If the worker exceeds this time, Sidekiq will not prevent a second worker from running concurrently. It is advisable to adopt a conservative approach regarding the duration; in this case, opting for a longer duration is preferable to aiming for precision.
Unlike when using sidekiq_options, there is no easy way to know if there is an existing worker queued or running. It is possible to examine the Sidekiq queues for the worker, but without careful use of mutual exclusion this will result in a race-condition.
Testing
When testing a worker that uses this approach you will need to replace the concurrent limiter to avoid it’s interference.
require "spec_helper" require "sidekiq/testing" RSpec.describe UniqueClassExampleWorker do # ... before do stub_const("#{described_class.name}::LIMITER", Sidekiq::Limiter.unlimited) end # ... end
Exclusivity with regard to parameters
If the worker's exclusivity must be restricted by one or more parameters (or even derived data), then utilize those values to create a unique limiter name. For example, to exclusively run one worker per id parameter
class UniqueParamsExampleWorker include Sidekiq::Worker def perform(params = {}) limiter = Sidekiq::Limiter.concurrent( "unique_params_example_worker_limiter_#{params['id']}", 1, lock_timeout: 1.minute, wait_timeout: 0, policy: :ignore, ) limiter.within_limit do # do the work end end end
This example creates the limiter within the perform
method. The cost of using a limiter is not with its creation (it is just a plain-old-ruby-object), but with the execution of within_limit
. It is only then that the Redis implementation of the limiter occurs.
A more general implementation is
class UniqueParamsExampleWorker include Sidekiq::Worker LIMITER_DEFAULT_OPTIONS = { lock_timeout: 1.minute, wait_timeout: 0, policy: :ignore, }.freeze def limiter(*ids, count: 1, **options) digest = ids. append(self.class.name). map(&:to_s). reduce(Digest::MD5.new, :update). hexdigest Sidekiq::Limiter.concurrent( "limiter_#{digest}", count, LIMITER_DEFAULT_OPTIONS.merge(options), ) end def perform(params = {}) limiter(params['id']).within_limit do # do the work end end end
Testing
When testing a worker that uses this approach you will need to replace the concurrent limiter to avoid it’s interference.
require "spec_helper" require "sidekiq/testing" RSpec.describe UniqueParamsExampleWorker do # ... before do allow_any_instance_of(described_class). to receive(:limiter).and_return(Sidekiq::Limiter.unlimited) end # ... end
Handling non-exclusive uses
The Sidekiq::Limiter.concurrent
examples all utilize the policy: :ignore
option. This option instructs Sidekiq to disregard non-exclusive uses. It overrides the default policy: :raise
which triggers the Sidekiq::Limiter::OverLimit
exception. When this exception is raised, Sidekiq will reschedule the worker for a later time. There is a corresponding backoff policy and a maximum rescheduling count associated with this rescheduling process.
However, you can use this to implement a non-exclusive handler. For example,
def perform(params = {}) limiter(params['id'], policy: :raise).within_limit do # do the work end rescue Sidekiq::Limiter::OverLimit # handle the over-limit end
Just don’t re-raise the exception!