in General

Responding to RabbitMq Messages in Integration Tests

I’ve made a GitHub repository with a working example of this technique here.

At work today we wanted to write an integration test for some code that would trigger a job by publishing a message to a RabbitMQ queue. This message would be consumed by a worker program that would perform a long running job expected to take two or three seconds to complete, and the result would be written to a database. After this job completed, the code published a second event to another queue to signal that the work was complete.

In our integration test we could publish the first trigger message, but this meant the processing would take place asynchronously. This gave us the problem of figuring out when the job had completed so we could run our assert statements in order to make sure the data was in the correct state. The two obvious approaches were to poll the database at regular intervals to check for the result, or to respond to the completion event. We decided the correct approach was the second but this lead to some real difficulties as there are a few things happening asynchronously.

Here’s the code we ended up using (assume that the event message published to RabbitMq when the job completed is called JobCompleted):

using (var signal = new AutoResetEvent(false))
{
    // Subscribing to JobCompleted events spawns a background thread where we cannot do any
    // assertions (as assertion exceptions in the background thread will not bubble to the 
    // foreground thread where the unit test is running). We need a signalling mechanism so 
    // the background thread can indicate when a message has been received, so the 
    // foreground (unit test) thread can go ahead and run assertions on the data...
    using (var signal = new AutoResetEvent(false))
    {
        // When a message is received, send a signal to the foreground thread...
        _bus.Subscribe<JobCompleted>("WorkerIntegrationTest", _ =>; { signal.Set(); });

        _bus.Publish(new JobCompleted
        {
            CreatedAt = DateTime.UtcNow,
            // Other message data goes here.
        });

        // Wait for a signal from the event subscriber thread, if we don't get one within 
        // 30 seconds fail the test...
        if (!signal.WaitOne(TimeSpan.FromSeconds(30)))
        {
            Assert.Fail("Did not see a JobCompleted event within 30 seconds.");
        }

        using (var connection = new MySqlConnection(_connectionString))
        {
            // Load data to check here.
            Assert.That(data, Is.Not.Null, "Data not present in the database within 30 seconds.");
            // More asserts go here.
            Assert.Pass("Data present in database and appears in correct shape");
        }
    }
}

The AutoResetEvent’s WaitOne method is used to make the code wait 30 seconds unless it receives a signal from the Set method. The test subscribes to the JobCompleted event and if it’s recieves a message it calls the Set method which sends the signal to unblock the thread. At this point we can safely proceed with the asserts on the original thread as we know the data has been written to the database at this point. If the 30 seconds passes without the JobCompleted message being received, then Assert.Fail is called with an appropriate timeout message.

The end result is the tests complete as quickly as possible (faster than waiting for the next database polling cycle) and we get a bit of performance testing thrown in for free!

Write a Comment

Comment