Rails Multi-Threaded Integration Tests Using Rspec

Written by

When it comes to features to deal with concurrent threads or processes, it is not enough just to write unit tests to verify the accuracy of our code.

In this post, you will find a short but effective way to create an integration test able to cover the possibility of dealing with more than one process at the same time and to verify that each one gets the expected response.

The situation

We had a table to manage a few important configurations. Modifying those tables would trigger file-related transactions that take some time to complete. So, if somebody else tried to modify the configs before the files were updated, it would create a big problem for the application.

The first solution for this was to use row-level locking using a read-committed isolation level interior transaction that could not be tested correctly because of the surrounding transactions created by DatabaseCleaner. So, we used a "hack" to avoid the problem in testing, but it meant not testing the code that is being used in production.

def set_my_row_status(status, override: false)
    return false if !override && locked_by_another?

    new_uuid = status == MY_ROW_SETUP_PENDING ? my_uuid : nil
    DbConnection.using_writer do
      MyModelConfig.transaction(isolation: sql_isolation) do
        lock!
        old_uuid = override ? locked_by : nil
        MyModelConfig.
          find_by(company_id: company_id, locked_by: [my_uuid, old_uuid, nil])&.
          update(my_row_status: status, locked_by: new_uuid)
      end
    end || false
  End

  def sql_isolation
    Rails.env.test? ? nil : :read_committed
  end

To make better code without requiring that read-committed isolation level, we decided to move the locking onto Redis, away from PG, which would lower the PG load level a bit, and also improve the testing coverage of our code and quality.

The implementationThe first point to tackle was to move the attribute used to lock (boolean) the current record from a column in the database to an attribute stored in Redis. This new attribute will be saved under a key created by concatenating a prefix MY_MODEL_CONFIG_LOCKER plus the record ID.

The method was modified so that we first check for this attribute in Redis and decide whether it should be modified or not. If so, we would call the method that modifies or deletes the Redis attribute.

The value stored in the Redis attribute has got to be the UUID of the current thread, so we can know if the locker of the model is our current thread (meaning that we can modify or unlock the record).

def set_my_row_status(lock?, override: false)
    return false if !lockable_or_override(override)

    new_uuid = lock? ? my_uuid : nil
    old_uuid = override ? cs_locked_by : nil
    return unless [my_uuid, old_uuid, nil].include?(cs_locked_by)

    modify_my_row_status(new_uuid)
end

def cs_locked_by
  redis.get(config_name)
end

def modify_my_row_status(locked_by)
    redis.set(config_name, locked_by) if locked_by
    redis.del(config_name) unless locked_by
    true
End

At this point, our methods are supposed to do what we need them to do, but, in the unit tests, we can only be sure that it would work if only one process tries to modify the record. This is our real problem to solve because this feature was made to actually work with multiple clients requesting the service at the same time.

Testing

We can test the method with more than one process by starting more than one rail console and holding the record on one of them. The expected result would be that the rest of the consoles don’t have the ability to modify the requested record. But, when we want to add the test to the automated tests of the entire application, we would have to create an integration test and somehow keep an eye on each result and verify that only one of the created processes could modify the requested record.

For this test, we decided to use two variables so the user can decide if he/she wants to see the detail of each iteration or context.

$details = EnvarBoolean.enabled?('MY_MODEL_CONFIG_DETAILS')
$summary = EnvarBoolean.enabled?('MY_MODEL_CONFIG_SUMMARY')

Testing Redis

The first test we have to write is to be sure that we at least have a connection to a Redis server. We did that in the same way we would do it in a rails console, with the command ping.

context 'redis server connection' do
    subject { redis.ping }
    it { is_expected.to eq 'PONG' }
  end

The shared examples

The idea here is to run a few scenarios where different numbers of processes try to modify the same record. So I decided to make a shared example and tell them how many processes it will generate and insert into the race to change the record. The name I used for this one parameter is num_procs.

shared_examples_for 'set_workspace_status' do |num_procs|
      num_procs = num_procs || 0
      title = "MyModelConfig Test: set_workspace_status: #procs: #{num_procs}\n"

In the next block, we will use the method Fork to create a new thread for each iteration. For each Thread, we are going to create a temp file where the logs and results of each process will be stored. Later, we are going to read the file and get the data to verify if it is what we expected.

let(:all_proc_data) do
        puts title if $details
        outfiles = []
        my_model_company_config.reset

        (1..num_procs).map do
          tempfile = Tempfile.new
          fork do
            tempfile.puts my_subject
            tempfile.flush
            exit!
          end
          sleep TEST_WORK_DELAY

          outfiles << tempfile
        end
        Process.waitall
        ActiveRecord::Base.establish_connection
        outfiles.map do |outfile|
          outfile.rewind
          lines = outfile.read.split(/\n/)
          outfile.unlink
          lines.each { |line| puts line } if $details
          JSON.parse(lines.last) unless lines.last.nil?
        end
      end

The line my_model_company_config.reset is a method in the model that removes any blocking by deleting the related attribute in Redis.

You will also see that we use sleep to leave a gap between each process.

In this part, we had a little problem with the PostgreSQL connection. Each time a Thread finished its work, it would close the connection to the database, causing an ActiveRecord error. The solution here was to request a connection from the pool by using the method ActiveRecord::Base.establish_connectio".

At the end of this block, we are going to read the generated files and parse the logs to generate a JSON with the data we want.

Now let’s check out what we have in the method my_subject.

my_subject method

def my_subject
      output = []
      thread_uuid = my_model_config.my_uuid
      output << "pid: #{Process.pid} start, uuid: #{thread_uuid}"

      req_status = true
      override = false
      locked_by_before = my_model_config.send(:cs_locked_by)

      output << "pid: #{Process.pid} before work: cs_locked_by: #{locked_by_before}"
      output << "pid: #{Process.pid} required work: lock?: #{req_status}, override: #{override}"
      success = my_model_config.set_workspace_status(req_status, override: override)
      output << "pid: #{Process.pid} stop"
      result_locked_by = my_model_config.send(:cs_locked_by)

      output << {
        pid: Process.pid,
        uuid: thread_uuid,
        req_status: req_status,
        override: override,
        locked_by_before: locked_by_before,
        result_locked_by: result_locked_by,
        success: success
      }.to_json
    end

In this method, we collect the information about the lock previous to requesting the locking of the record, then we request the locking and collect the information again. We are putting that info together into a JSON and pushing the results to the output variable.

That is the data that we read at the end of the previous block.

outfiles.map do |outfile|
          outfile.rewind
          lines = outfile.read.split(/\n/)
          outfile.unlink
          lines.each { |line| puts line } if $details
          JSON.parse(lines.last) unless lines.last.nil?
        End

Validating the results of the tests

  let(:all_result_success) do
        show_summary(title, all_proc_data) if $summary
        all_proc_data.map do |data|
          data['success'] unless data.nil?
        end
      end

      it 'always makes expected changes' do
       expect(all_result_success.count(true)).to be(1)
       expect(all_result_success.count(false)).to be(num_procs - 1)
      end

In these two blocks, we do the actual validation of the results. In the first block, we define a method for showing the summary of the tests. I will leave the method below.

Also, we map the response we got in the all_proc_data in order to get only the column that tells us if the request had a successful result or not.

In the second block (the it clause), we ensure that all but one of the results were FAILED and that only one of the processes succeeded in blocking the record.

This way, we can be sure that only one of N processes was able to block and modify a single record, and can be sure that our code will work as expected when it is needed.

The formatter method

This is the method that shows the user the collected data in a formatted table.

def show_summary(title, data)
      table = Text::Table.new
      table.head = [:pid, :uuid, :req_status, :override, :locked_by_before, :result_locked_by, :success]
      table.rows = data.map do |row|
        row.values_at('pid', 'uuid', 'req_status', 'override', 'status_before',
          'locked_by_before', 'result_status', 'result_locked_by', 'success')
      end
      table.align_column 2, :right
      table.align_column 3, :right
      table.align_column 4, :right
      table.align_column 5, :right
      table.align_column 6, :right
      table.align_column 7, :right
      table.align_column 8, :right
      table.align_column 9, :right
      table.align_column 10, :right
      table.align_column 11, :right
      puts title
      puts table
    end

And this is the shown data:

data generated by show_summary

Integration test file

# frozen_string_literal: true
# domain: PDFs

require 'rails_helper'
require 'procore_redis'
require 'time'
require 'active_support/core_ext/numeric/time'
require 'json'
require 'text-table'

require 'support/rate_class_helpers'
require 'pry-byebug'

TEST_MIN_NUM_PROCS   = 2
TEST_WORK_DELAY      = 0.3.seconds # 5 milliseconds

# rubocop:disable Style/GlobalVars

# Use these envars to produce more output on the spec run, like this:
# MY_MODEL_CONFIG_SUMMARY=true bundle exec rspec -t manual_integration \
#   spec/models/my_model_config_integration_spec.rb

$details = EnvarBoolean.enabled?('MY_MODEL_CONFIG_DETAILS')
$summary = EnvarBoolean.enabled?('MY_MODEL_CONFIG_SUMMARY')

RSpec.describe MyModelConfig, :manual_integration do
  include_context 'rate-class-helpers'

  after do
    MyModelConfig.destroy_all
  end

  # unlike the usual specs, this _manual integration_ spec has connections
  # to a *real* Redis.
  let(:redis) do
    defined?(ProcoreRedis) && ProcoreRedis.new || Redis.new
  end

  let(:company) do
    Company.find_by(id: company_id) ||
      FactoryBot.create(
        :company,
        id:           company_id,
        name:         company_id == 1 ? 'Procore' : 'Example Company',
        country_code: 'US'
      )
  end
  let(:company_id) { 4242 }
  let(:test_config_company_id) { company&.id || 1 }
  let(:test_my_model_token) { nil }
  let(:test_my_model_token_created_at) { nil }

  let(:my_model_config) do
    MyModelConfig.new(
      company_id:                test_config_company_id,
      environment:               'test',
      default_pdf_filename:      'the-pdf',
      default_xml_filename:      'test_xml',
      default_template_filename: 'test_template',
      my_model_token:              test_my_model_token,
      my_model_token_created_at:   test_my_model_token_created_at
    )
  end

  context 'redis server connection' do
    subject { redis.ping }
    it { is_expected.to eq 'PONG' }
  end

  describe 'call set_my_model_status and respect the blockers' do
    # run the test for num_periods during which each
    # client tries to use the resource as fast as possible
    shared_examples_for set_my_model_status do |num_procs|
      num_procs = (num_procs || 0) < TEST_MIN_NUM_PROCS ? TEST_MIN_NUM_PROCS : num_procs
      title = "MyModelConfig Test: set_my_model_status: #procs: #{num_procs}\n"

      let(:all_proc_data) do
        puts title if $details
        outfiles = []
        my_model_config.reset

        (1..num_procs).map do
          tempfile = Tempfile.new
          fork do
            tempfile.puts my_subject
            tempfile.flush
            exit!
          end
          sleep TEST_WORK_DELAY

          outfiles << tempfile
        end
        Process.waitall
        ActiveRecord::Base.establish_connection
        outfiles.map do |outfile|
          outfile.rewind
          lines = outfile.read.split(/\n/)
          outfile.unlink
          lines.each { |line| puts line } if $details
          JSON.parse(lines.last) unless lines.last.nil?
        end
      end

      let(:all_result_success) do
        show_summary(title, all_proc_data) if $summary
        all_proc_data.map do |data|
          data['success'] unless data.nil?
        end
      end

      it 'always makes expected changes' do
        expect(all_result_success.count(true)).to be(1)
        expect(all_result_success.count(false)).to be(num_procs - 1)
      end
    end

    def my_subject
      output = []
      thread_uuid = my_model_config.my_uuid
      output << "pid: #{Process.pid} start, uuid: #{thread_uuid}"

      req_status = true
      override = false
      locked_by_before = my_model_config.send(:cs_locked_by)

      output << "pid: #{Process.pid} before work: cs_locked_by: #{locked_by_before}"
      output << "pid: #{Process.pid} required work: override: #{override}"
      success = my_model_config.set_my_model_status(req_status, override: override)
      output << "pid: #{Process.pid} stop"
      result_locked_by = my_model_config.send(:cs_locked_by)

      output << {
        pid: Process.pid,
        uuid: thread_uuid,
        override: override,
        locked_by_before: locked_by_before,
        result_locked_by: result_locked_by,
        success: success
      }.to_json
    end

    def show_summary(title, data)
      table = Text::Table.new
      table.head = [:pid, :uuid, :req_status, :override, :status_before, :locked_by_before, :result_status, :result_locked_by, :success]
      table.rows = data.map do |row|
        row.values_at('pid', 'uuid', 'req_status', 'override', 'status_before',
          'locked_by_before', 'result_status', 'result_locked_by', 'success')
      end
      table.align_column 2, :right
      table.align_column 3, :right
      table.align_column 4, :right
      table.align_column 5, :right
      table.align_column 6, :right
      table.align_column 7, :right
      table.align_column 8, :right
      table.align_column 9, :right
      table.align_column 10, :right
      table.align_column 11, :right
      puts title
      puts table
    end

    context 'update status testing' do
      #                                       procs
      it_behaves_like 'set_workspace_status',   2
      it_behaves_like 'set_workspace_status',   3
      it_behaves_like 'set_workspace_status',   4
      it_behaves_like 'set_workspace_status',   10
    end
  end
end
# rubocop:enable Style/GlobalVars

Frequently Asked Questions