Sidekiq and exclusive workers

In certain scenarios, a Sidekiq worker requires exclusive operation or access to specific data. While it is ideal for a worker to be designed as idempotent, achieving this can be challenging, particularly in a distributed system. For instance, consider a worker responsible for sending out emails. If two instances of this worker are running simultaneously, there is a high likelihood that recipients will receive duplicate emails. Even worse, these emails might contain slightly different content, leading to user confusion. To address this issue, Sidekiq Enterprise offers mechanisms that facilitate exclusivity (and rate limiting).

Note: I wrote this to help a client focus on a few methods of worker exclusivity that avoid race-conditions.

There are three common exclusivity needs:

  1. Exclusivity irrespective of the worker’s parameters.
  2. Exclusivity with respect to all of the worker’s parameters.
  3. Exclusivity with respect to some of the worker’s parameters.

There is a fourth exclusivity need, but handling this need is outside the scope of this post.

  1. Exclusivity with respect to model data.

Using Sidekiq options

The simplest method to achieve exclusivity #2 is to utilize Sidekiq’s sidekiq_options unique_for: duration. For example,

class UniqueExampleWorker
  include Sidekiq::Worker
 
  sidekiq_options unique_for: 1.minute
 
  def perform(params = {})
    # do the work
  end
end

The option’s duration represents an estimate of how long the worker is expected to run. This exclusivity will only be maintained for the specified duration. If the worker exceeds this time, Sidekiq will not prevent a second worker from running concurrently. When utilizing this mechanism, it's advisable to adopt a conservative approach regarding the duration; in this case, opting for a longer duration is preferable to aiming for precision.

If the worker finishes before the lock_timeout duration, the lock is released and another worker can be queued.

An additional benefit of utilizing this option is that if a worker is already queued or currently running, attempting to queue another worker (eg UniqueExampleWorker.perform_async) will return nil instead of the job id.

Using Sidekiq Limiter

A more adaptable approach to achieving exclusivity for #1, #2, and #3 is to utilize Sidekiq::Limiter.concurrent. Unlike the standard Sidekiq options, this approach is implemented within the worker's perform method. The worker operates as usual, while the limiter allows for a swift exit when necessary. For example, to configure and implement exclusivity #1

class UniqueClassExampleWorker
  include Sidekiq::Worker
 
  LIMITER = Sidekiq::Limiter.concurrent(
    "unique_class_example_worker_limiter", # name
    1, # count, ie expect exclusivity
    lock_timeout: 1.minute, # expected duration of worker
    wait_timeout: 0, # don't wait around for running worker to finish
    policy: :ignore, # don't reschedule the worker for later
  )
 
  def perform(params = {})
    LIMITER.within_limit do
      # do the work
    end
  end
end

The key to exclusivity is the limiter name and the count. Here the name is based on the class name. Any concurrent use of this worker, irrespective of parameters, will not enter the within_limit block.

The limiter’s lock_timeout represents an estimate of how long the worker is expected to run. This exclusivity will only be maintained for the specified duration. If the worker exceeds this time, Sidekiq will not prevent a second worker from running concurrently. It is advisable to adopt a conservative approach regarding the duration; in this case, opting for a longer duration is preferable to aiming for precision.

Unlike when using sidekiq_options, there is no easy way to know if there is an existing worker queued or running. It is possible to examine the Sidekiq queues for the worker, but without careful use of mutual exclusion this will result in a race-condition.

Testing

When testing a worker that uses this approach you will need to replace the concurrent limiter to avoid it’s interference.

require "spec_helper"
require "sidekiq/testing"
 
RSpec.describe UniqueClassExampleWorker do
  # ...
  before do
    stub_const("#{described_class.name}::LIMITER", Sidekiq::Limiter.unlimited)
  end
  # ...
end

Exclusivity with regard to parameters

If the worker's exclusivity must be restricted by one or more parameters (or even derived data), then utilize those values to create a unique limiter name. For example, to exclusively run one worker per id parameter

class UniqueParamsExampleWorker
  include Sidekiq::Worker
 
  def perform(params = {})
    limiter = Sidekiq::Limiter.concurrent(
      "unique_params_example_worker_limiter_#{params['id']}", 
      1, 
      lock_timeout: 1.minute,
      wait_timeout: 0,
      policy: :ignore,
    )
    limiter.within_limit do
      # do the work
    end
  end
end

This example creates the limiter within the perform method. The cost of using a limiter is not with its creation (it is just a plain-old-ruby-object), but with the execution of within_limit. It is only then that the Redis implementation of the limiter occurs.

A more general implementation is

class UniqueParamsExampleWorker
  include Sidekiq::Worker
 
  LIMITER_DEFAULT_OPTIONS = {
    lock_timeout: 1.minute,
    wait_timeout: 0,
    policy: :ignore,
  }.freeze
 
  def limiter(*ids, count: 1, **options)
    digest = ids.
      append(self.class.name).
      map(&:to_s).
      reduce(Digest::MD5.new, :update).
      hexdigest
    Sidekiq::Limiter.concurrent(
      "limiter_#{digest}", 
      count, 
      LIMITER_DEFAULT_OPTIONS.merge(options),
    )
  end
 
  def perform(params = {})
    limiter(params['id']).within_limit do
      # do the work
    end
  end
end

Testing

When testing a worker that uses this approach you will need to replace the concurrent limiter to avoid it’s interference.

require "spec_helper"
require "sidekiq/testing"
 
RSpec.describe UniqueParamsExampleWorker do
  # ...
  before do
    allow_any_instance_of(described_class).
      to receive(:limiter).and_return(Sidekiq::Limiter.unlimited)
  end
  # ...
end

Handling non-exclusive uses

The Sidekiq::Limiter.concurrent examples all utilize the policy: :ignore option. This option instructs Sidekiq to disregard non-exclusive uses. It overrides the default policy: :raise which triggers the Sidekiq::Limiter::OverLimit exception. When this exception is raised, Sidekiq will reschedule the worker for a later time. There is a corresponding backoff policy and a maximum rescheduling count associated with this rescheduling process.

However, you can use this to implement a non-exclusive handler. For example,

def perform(params = {})
  limiter(params['id'], policy: :raise).within_limit do
    # do the work
  end
rescue Sidekiq::Limiter::OverLimit
  # handle the over-limit
end

Just don’t re-raise the exception!