Sidekiq-ORM Project
A fairly common Sidekiq::Worker
pattern, is a parent worker which goes over some model collection and enqueues a child worker for each model in the collection.
lets checkout an example:
# Parent Worker
class UserSyncer
include Sidekiq::Worker
def perform
User.active.each do |user|
ChildWorker.perform_async(user.id)
end
end
end
# Child Worker
class UserTaskWorker
include Sidekiq::Worker
def perform(user_id)
# task logic
end
end
Sidekiq::ActiveRecord::ManagerWorker
wraps this boilerplate with a clean DSL:
# Parent Worker
class UserSyncer < Sidekiq::ActiveRecord::ManagerWorker
sidekiq_delegate_task_to :user_task_worker # or UserTaskWorker
end
Then, just call the worker with the model collection:
UserSyncer.perform_query_async(User.active)
Let's start with an example first:
class UserController < ApplicationController
def update
if current_user.update(update_params)
FriendRequestWorker.perform_query_async(current_user.friends)
UserSyncerWorker.perform_query_async(User.active)
end
respond_with current_user
end
end
In the above example, FriendRequestWorker
will go over all the user's friends, which we can assume is a fairly medium size collection.
After that, UserSyncerWorker
will go over all of the User.active
collection, which can be a huge collection, in large scale systems.
In such cases, when you don't to be blocking, it's recommended to delay the query execution as well.
The way this can be done, is to set the default_models_query
in the ManagerWorker and calling perform_async
, which will run the specified query asynchronously:
class UserSyncerWorker < Sidekiq::ActiveRecord::ManagerWorker
sidekiq_delegate_task_to UserTaskWorker
default_models_query -> { User.active }
end
override_options = { :batch_size => 300 }
UserSyncerWorker.perform_async(override_options)
Specifies the size of each batch to push in bulk.
This is also the number of models to fetch in each find_in_batches query.
Default batch size is 1000
sidekiq_manager_options :batch_size => 500
Default identifier_key is :id
sidekiq_manager_options :identifier_key => :email
Other arguments to enqueue aside from the identifier
sidekiq_manager_options :additional_keys => [:status, :username]
Just pass the options to the manager:
override_options = {
:batch_size => 500,
:identifier_key => :user_token,
:additional_keys => [:status]
}
UserSyncer.perform_query_async(User.active, override_options)