Deferring Messages in NServiceBus 4

Recently, I had to figure out how to go about deferring messages from processing on an endpoint as part of a last minute business decision before the launch of some software.

This part of the system hooks into events being published out of the transactional side of the system, turns those events into web service friendly messages (aka, XML), and then send those messages to a web service.

In good form, we designed a “read” endpoint that was responsible for handling those events that were triggered from our transactional system and reading the data we needed to build each messages for the web service. We then designed a “send” endpoint that would build the xml based on the data on the message and then invoke the web service. We had a tight SLA of 5 minutes in which to accomplish this.

Things were going swimmingly.

But when it came time to deploy the system, the business informed us (last minute of course) that the web service provider was not ready on their part, and that we could not send any information to them from our system (there goes our supposed 5 minute SLA!).

We didn’t want to lose the data on the events flowing out of the transactional side of the system, because if we did, we’d never be able to build the appropriate xml message to send to the web service.

Simple, I thought. We’ll just deploy the application with the send endpoint set to not start on deployment. This way, the messages will be placed in the the send endpoint’s queue by the read endpoint, we’ll turn the send endpoint on when the web service provider is ready and those messages can be processed. This is exactly the type of scenario that NServiceBus was meant to solve. Just because our send endpoint is “down”, we lose no business data.

Unfortunately, I quickly learned that not only was our fledgling deploy process incapable of deploying the application with one endpoint set to not start, I also learned that even if the endpoint started, I would have no remote access to any of our production servers. That means I could not disable the send endpoint by hand immediately after deployment. At my company, this is the job of a bunch of “admins” somewhere you have to wait a month to talk to in order to figure out how this could be done.

We didn’t have the time. We couldn’t risk the send endpoint invoking the web service, yet we had no way to launch the system with the endpoint configured to not start.

To solve this problem, we chose to defer all message processing for the send endpoint via two NServiceBus methods, bus.Defer and bus.DoNotContinueDispatchingCurrentMessageToHandlers.


bus.Defer

bus.Defer allows you to provide either a TimeSpan or DateTime object and given a message, defer that message for the given amount of time. When the message is deferred, where does it go? It doesn’t stay on the queue, it’s actually serialized and stored in RaveDb similar to how Saga data is serialized back into RavenDb. When the specified time is up, the message is placed back onto the queue. Once back on the queue, the message is immediately deferred again, thus creating a “loop” in which you can indefinitely defer the message.This is what we needed, a way to “hold off” processing these messages until our web service vendor was ready! We opted to defer the message for one hour.

bus.Defer(new TimeSpan(0, 1, 0, 0), message);  

bus.DoNotContinueDispatchingCurrentMessageToHandlers

But we weren’t done. Just because we’ve deferred the message, it doesn’t mean we’ve stopped other handlers from handling it. We needed a way to “drop the message on the floor” to keep other handlers from handling it. Enter bus.DoNotContinueDispatchingCurrentMessageToHandlers. By calling bus.DoNotContinueDispatchingCurrentMessageToHandlers, you’re essentially telling NServiceBus to continue processing the current handler like the transaction was successful, but it stops any further pipeline dispatching of the message. It’s a way to stop the message dead in its tracks. This method even forwards the message to the audit queue like a successfully processed message.

bus.DoNotContinueDispatchingCurrentMessageToHandlers();  

Now that we had figured out how defer the processing of the message for an hour at a time as well as how to shut down the rest of the processing pipeline for the message, we had to write a handler to host this code. We wrote a simple handler that handled our message and called out to both bus.Defer and bus.DoNotContinueDispatchingCurrentMessageToHandlers:

public class DeferMessagesUntilWebServiceImplementationIsReady:  
 IHandleMessages<MessageIWantToDefer>  
{  
    private readonly IBus bus;

    public DeferMessagesUntilWebServiceImplementationIsReady(IBus bus)  
    {  
        this.bus = bus;  
    }

    public void Handle(MessageIWantToDefer message)  
    {  
        DeferMessageForOneHour(message);  
    }

    private void DeferMessageForOneHour(object message)  
    {  
        bus.Defer(new TimeSpan(0, 1, 0, 0), message);  
        bus.DoNotContinueDispatchingCurrentMessageToHandlers();  
    }  
}  

We still needed to make sure this handler was invoked first before any other handlers that handle that same message. For this, we wrote a configuration class that implements the ISpecifyMessageHandlerOrdering interface. This interface allows you to build a message processing pipeline that goes against NServiceBus’s asynchronous messaging model in which messages may arrive in any order (aka, they are not guaranteed to arrive in the order they are sent). This is what our configuration class looks like:

public class DeferMessagesUntilWebServiceImplementationIsReadyConfiguration: ISpecifyMessageHandlerOrdering  
{  
    public void SpecifyOrder(Order order)  
    {  
        order.SpecifyFirst<DeferMessagesUntilWebServiceImplementationIsReady>();  
    }  
}  

When the send endpoint starts up, the bus architecture will hook into the config class implementing ISpecifyMessageHandlerOrdering and set the message ordering before the endpoint starts pulling messages off the queue.


What we’ve essentially done is allowed our send endpoint to still receive messages from our read endpoint, but instead of allowing those messages to flow through the whole pipeline, we intercept those messages, and defer them indefinitely without losing business data.

An implementation of simple message deferment based on this blog can be found over on my GitHub account: https://github.com/mgmccarthy/DeferringMessagesUsingNServiceBus4

This solution defers a message for one minute, not one hour, for the sake of the code sample. When standing up the solution, you’re endpoint window should get a log message written to it every minute:

DeferMessageForOneMinute

Michael McCarthy

Read more posts by this author.