Sergey Shishkin

on agile software development

Applying The Message Bus

In the last post I was pursuing Remoteness Ignorance using the Message Bus pattern. I would like to emphasize that I tend to use messaging not only for remote communication (e.g. client-server) but also for in-process communication between different modules of the application (or bounded contexts in terms of DDD). Message Bus encapsulates the communication infrastructure while the domain layer works with messages and the Message Bus interface-based API.

Message Bus API

How could this API look like? Well, starting with very simple cases, one could send a message somewhat like this:

bus.Send(new PatientSearchRequest("bob"));

Subscription Manager

In order to receive and handle this message first we need a subscriber:

public class PatientSearchHandler : ISubscriber<PatientSearchRequest>

{

    public void Handle(PatientSearchRequest message) { … }

}

When we have the subscriber in place, we might want to register it by the bus. To avoid unnecessary object instantiation the bus should support different subscription options: with an instance and with a type. Moreover, we might want to automatically subscribe all the ISubscriber implementers from the IoC container. I implemented a Windsor facility for the purpose of auto-subscription which in turn calls the Subscription Manager on the bus:

bus.SubscriptionManager.Subscribe(subscriberType);

The Subscription Manager is an essential element of the Message Bus. It handles subscriptions and dispatches the received messages to appropriate subscribers, so that the bus can focus on sending and receiving messages.

Reply

Now it’s time to implement the subscriber’s Handle method. The implementation is very simple:

var patients = repository.FindByName(message.Query);

message.Reply(new PatientSearchResponse(patients));

The subscriber does not even have to know the bus, the Message base class takes care of it.

The request has been sent, and the subscriber has responded. But how does the sender actually gets the response? The sender, of course, could implement the ISubscriber interface in order to handle the reply, but we want to allow multiple senders to send different requests and to receive their (and only their) replies.

We obviously need to correlate requests and replies. The Message base class’ Reply method does exactly that. Messages are correlated per CorrelationId, so that the bus can dispatch the reply to the right sender. But wait, the bus does not know the sender!

Callbacks

Here is where Callbacks come into play. The sender registers a Callback which will be called upon the dispatch of the response message:

bus.Send(new PatientSearchRequest("bob"))

    .OnResponse<PatientSearchResponse>(
        response => AppendResults(response.Patients))

    .OnTimeout(() => ResultsComplete())

    .WithTimeoutInterval(TimeSpan.FromSeconds(5))

    .WaitForMultipleResponses;

Actually, this is more than just a simple callback delegate. It is a fluent API for configuring a Callback object, which is registered by the bus for dispatching responses. The Callback object takes care of timeout timers. It also takes care of thread context synchronization, because the incoming messages are processed in worker threads, while the sender sometimes wants to touch the GUI upon the callback.

Another important thing about Callbacks is that we allow multiple responses to be received (see the WaitForMultipleResponses property). It means that the delegate specified in the OnResponse method will be called potentially many times. In this case importance of Timeouts emerges drastically. Upon timeout no further correlated responses are dispatched to the sender. The sender has an opportunity to be notified of the timeout with a delegate passed to the OnTimeout method.

This is a relatively complex scenario. The sender is awaiting for search results from multiple services. Maybe a caching layer in the bus has intercepted the request and sent the reply from the cache. Maybe the patient registry is distributed and each node answers with an individual response. In either case the sender is remains unaware of these infrastructure aspects.

A simplified version of the sample above only waits for single response and uses the default timeout interval:

bus.Send(new PatientSearchRequest("bob"))

    .OnResponse<PatientSearchResponse>(

        response => ShowResults(response.Patients));

In this case the Callback object will be dismissed right after dispatching the first response message, or upon timeout, if no response has been received.

Summary

We just saw a simple API of the Message Bus sending and dispatching messages. The Message Bus relies on the Subscription Manager and Callback objects for dispatching messages. The Message base class takes care of message correlation for replies.

In further posts I’ll show some implementation details of the API described above. Stay tuned 🙂

Technorati Tags: ,
Advertisements

Written by Sergey Shishkin

11.08.2008 at 01:22

Posted in Uncategorized

%d bloggers like this: