ActiveJob is a framework that ensures standardized job infrastructure for all Rails applications, which lets you define and run jobs regardless of selected queuing backend. The API differences between Delayed Job, Resque, Sidekiq, and others are hidden by ActiveJob and make switching between them seamless.
Using the ActiveJob framework, we recently helped a client standardize all jobs across their system. This allowed them to switch from Delayed Job to Sidekiq easily. This post will guide you through each step we followed to accomplish this without any headaches.
Challenges With Job Statistics
After successfully converting to ActiveJob with Sidekiq adapter, we encountered a scenario where we needed to know if a specific job was still running for target logic. We realized that with ActiveJob and Sidekiq queue adaptor the granular stats such as checking if a job is still running are non-existent. Sidekiq captures this information outside of Rails scope.
Delayed Job information such as last_error, run_at, locked_at, failed_at, locked_at are stored on the Delayed::Job ActiveRecord model and are readily available through simple query lookup.
Delayed::Job.find(job_id).running?
But Sidekiq only provides general statistics for entire queues. For example, with Sidekiq you can fetch size for each status via Sidekiq::Stats:
stats = Sidekiq::Stats.new
processed = stats.processed
failed = stats.failed
enqueued = stats.enqueued
Or, fetch information for each queue via Sidekiq::Queue:
queue = Sidekiq::Queue.new(“pdf”)
queue.size
queue.latency
Sidekiq stats information is not granular enough for a single job. In addition, we didn’t want lookup logic to be adapter specific if the queue adapter was switched again in the future. One way to solve this issue could be through an update of flag or timestamp on an ActiveRecord model via ActiveJob callbacks.
before_enqueue
around_enqueue
after_enqueue
before_perform
around_perform
after_perform
For example:
after_perform { …some_record.update(job_ran_at: Time.zone.now) }
But in our case this was not sufficient enough if we needed to track job information for additional jobs. The closest solution came in a form of a `active_job_status` gem. By including `ActiveJobStatus::Hooks` in the job definition, we can now access the job statistics for 72 hours (setting can be altered for a longer duration). With the gem we can access all common information about a single job without being concerned with the queue adaptor:
my_job = MyJob.perform_later
job_status = ActiveJobStatus.fetch(my_job.job_id)
job_status.queued?
job_status.working?
job_status.completed?
job_status.status
# => :queued, :working, :completed, nil
If job status information is not an issue, then continue reading.
Step 1: Configuration
First step is a minor configuration update in config/application.rb to enable ActiveJob.
config.active_job.queue_adapter = :delayed_job
Step 2: Ensure job definitions are placed in ‘/jobs’ directory
If you are defining a new job, this Rails generator can be used to create it.
bin/rails generate job new_service
For a specific queue, use --queue:
bin/rails generate job new_service --queue target_queue
In our client's case, we had an existing delayed job that needed to be moved from services to jobs directory. File pdftk_merge.rb was moved from app/services/pdf to app/jobs/pdf.
Step 3: Ensure correct file name
Ensure the file name ends with job.
Rename Job file from pdftk_merge.rb to pdftk_merge_job.rb.
Step 4: Update Job definition
Our client had some custom definitions, which required some minor updates to convert to the ActiveJob structure. For example, the client used 'call' instead of the traditional 'perform' because their BaseJob definition had extensive logic and custom helper methods. Nevertheless, we were able to convert successfully by following the steps below.
Checklist:
1. Update Job name to include Job from PDF::PdftkMerge to PDF::PdftkMergeJob.
2. Ensure PDF::PdftkMergeJob inherits from ApplicationJob. If Rails generator was not used, you will need to define class manually in jobs root directory.
3. Ensure the perform method is defined. In the case of this existing definition, let's move initialize() logic to call() and rename call() method to perform().
4. Replace self.queue_name with ActiveJob helper method queue_as.
Before
class PDF::PdftkMerge < BaseJob
attr_accessor :input_files, :output_file
def self.queue_name
"pdf"
end
def initialize(opts = {})
@input_files = opts[:input_files]
@output_file = opts[:output_file]
end
def call
return nil if @input_files.empty? || @output_file.blank?
pdftk_output
rescue StandardError => err
Rollbar.error(err)
raise err
end
def pdftk_output
# pdftk_output logic
end
end
After
class PDF::PdftkMergeJob < ApplicationJob
attr_accessor :input_files, :output_file
queue_as :pdf
def perform(opts = {})
@input_files = opts[:input_files]
@output_file = opts[:output_file]
return nil if @input_files.empty? || @output_file.blank?
pdftk_output
rescue StandardError => err
Rollbar.error(err)
raise err
end
def pdftk_output
# pdftk_output logic
end
end
Step 5: Enqueue Job, ensure Delayed Job is enqueued
Run Job and ensure it is enqueued in Delayed Job. Make sure it runs as expected, as well. To perform after the queuing system is free, use perform_later.
PDF::PdftkMergeJob.perform_later([input_files], output_filepath)
To perform after one hour, use perform_later with set.
PDF::PdftkMergeJob.set(wait: 1.hour).perform_later([input_files], output_filepath)
To perform for a specific queue, use perform_later with set.
PDF::PdftkMergeJob.set(queue: :another_queue).perform_later([input_files], output_filepath)
Step 6: Write RSpec test
Write an RSpec for the Job in spec/jobs/pdf/pdftk_merge_job_spec.rb or update an existing one. For example:
require "rails_helper"
RSpec.describe PDF::PdftkMergeJob, type: :job do
subject(:job) { described_class.perform_later([input_files], output_filepath) }
it "queues the job" do
expect { job }.to change(ActiveJob::Base.queue_adapter.enqueued_jobs, :size).by(1)
end
it "is in pdf queue" do
expect(PDF::PdftkMergeJob.new.queue_name).to eq("pdf")
end
it "executes perform" do
perform_enqueued_jobs { job }
# expect pdftk_output logic
end
it "handles standard error" do
allow_any_instance_of(PDF::PdftkMergeJob).to receive(:call).and_raise(StandardError)
expect { PDF::PdftkMergeJob.perform_later([input_files], output_filepath) }
.to raise_error(StandardError)
end
end
end
Tip: Retry on error
Instead of throwing StandardError, we might want to retry Job with rescue_from block and retry_job ActiveJob helper method.
class PDF::PdftkMergeJob < ApplicationJob
rescue_from(StandardError) do
retry_job wait: 5.minutes, queue: :pdf_low_priority
end
Step 7: Update to Sidekiq queue
Finally, after verifying that the Job definition update to the ActiveJob structure was successful, update to the Sidekiq queue in the configuration.
config.active_job.queue_adapter = :sidekiq # previously delayed_job
Since the Job definition follows the correct pattern, switching to the Sidekiq queue should not require any changes to the existing Job structure.
Conclusion
The conversion to ActiveJob from another queue implementation (in our case Delayed Job) was straightforward and we didn’t need to make any major changes to the existing job definitions. Since switching from Delayed Job to Sidekiq queue we have more transparency into the progress of these jobs once they are enqueued up. Sidekiq provides a very detailed browser user interface to quickly view progress and ability to manage enqueued jobs. We avoided using Sidekiq specific logic by utilizing ActiveJob framework for any lookup or job statistics, so that any future changes to the queue adapter will be seamless and will not require any logic updates.