Handling File Import Processing with NServiceBus Part II: Using bus.Defer Instead of a Saga

In my first post on File Import Processing using NServiceBus, I used a saga based on a presentation I saw an NSBConn in Brooklyn NY last year. After hearing some feedback about the initial blog, I decided to take another crack at the solution, but instead of relying on a saga this time, I ended up using bus.Defer.


Why Did I Use a Saga?

Looking back at the original blog post, you can see that ProcessImportFileSaga only carries two pieces of data:

  • ImportId
  • TotalNumberOfFilesInImport
public class SagaData : IContainSagaData  
{  
    public Guid Id { get; set; }  
    public string Originator { get; set; }  
    public string OriginalMessageId { get; set; }

    [Unique]  
    public Guid ImportId { get; set; }  
    public int TotalNumberOfFilesInImport { get; set; }  
}  

This is what the saga is “doing”:

  • it’s started by the FileImportInitiated message, which provides the ImportId and TotalNumberOfFilesInImport.
  • it handles FileImportSuccesAndFailureCount, which contains RowsSucceeded and RowsFailed. Combining those two pieces of data with TotalNumberOfFilesInImport, the saga determines if the import is finished. The saga checks for the updated RowsSucceeded and RowsFailed every 5 seconds via a timeout and pub/sub.

Stepping back from the implementation detail (using an NServiceBus saga), what do I need to do to get this file import to work if I boil this down to basics?

  • the total number of files in the import
  • the number of files successful so far
  • the number of files failed so far
  • a way to check the most current success/failure count, compare it to the total number of files in the import, and a way to mark the import as complete or not complete.
  • if it’s not complete, I need a way for this process to wait “n” seconds until it checks the different file counts again.

We don’t need a saga for this.

We can use bus.Defer instead (For a quick refresher on bus.Defer, check out my blog post Deferring Messages in NServiceBus 4.)


Bus.Defer to the Rescue

Instead of the overhead a saga can introduce, we can easily check for the “doneness” of the file import every “n” seconds using bus.Defer.

Head over to my GitHub account and take a look at the supporting code.

We’ll ignore the .Conventions, .Data, and .Messages projects for now, and instead, concentrate on the projects that show the change from using a saga to using bus.Defer.

ConsoleClient

ConsoleClient has not changed at all this since this solution was saga-based. We’re still sending a ProcessImportFileRow command for every row in our import:

class Program  
{  
    static void Main(string[] args)  
    {  
        ServiceBus.Init();

        Console.WriteLine("Press ‘Enter’ to initiate a new file import.To exit, Ctrl + C");

        while (Console.ReadLine() != null)  
        {  
            var importId = Guid.NewGuid();  
            const int totalNumberOfFilesInImport = 1000;

            for (var i = 1; i <= totalNumberOfFilesInImport; i++)  
            {  
                Thread.Sleep(50);  
                Console.WriteLine("Sending ProcessImportFileRow for Customer: {0}", i);

                var processFileRow = new ProcessImportFileRow  
                {  
                    ImportId = importId,  
                    CustomerId = i,  
                    CustomerName = string.Format("Customer: {0}", i),  
                    FirstImportRowForThisImport = i == 1,  
                    TotalNumberOfFilesInImport = totalNumberOfFilesInImport  
                };  
                ServiceBus.Bus.Send(processFileRow);  
           }  
       }  
    }  
}  

Let’s take a look at the ProcessImportFileRowHandler in the .FileImportInsertionEndpoint

FileImportInsertionEndpoint

We handle ProcessImportFileRow in this endpoint. The handler does two things, publishes FileImportInitiated if it’s the first row of a new import, and then writes each import row to the database:

public class ProcessImportFileRowHandler : IHandleMessages<ProcessImportFileRow>  
{  
    private readonly IDataStore dataStore;  
    private readonly IBus bus;

    public ProcessImportFileRowHandler(IDataStore dataStore, IBus bus)  
    {  
        this.dataStore = dataStore;  
        this.bus = bus;  
    }

    public void Handle(ProcessImportFileRow message)  
    {  
        if (message.FirstImportRowForThisImport)  
            bus.Publish(new FileImportInitiated { ImportId = message.ImportId, 
                TotalNumberOfFilesInImport = message.TotalNumberOfFilesInImport });

       var success = new Random().Next(100) % 2 == 0; 
       LogManager.GetLogger(typeof(ProcessImportFileRowHandler))
           .InfoFormat(string.Format("Handling ProcessImportFileRow for Customer: {0}", 
                message.CustomerId));

        using (var session = dataStore.OpenSession())  
        {  
            session.Add(new FileImport { Id = Guid.NewGuid(), 
                ImportId = message.ImportId, CustomerId = message.CustomerId, 
                CustomerName = message.CustomerName, Successful = success });  
            session.SaveChanges();  
        }  
    }  
}  

Let’s take a look at where FileImportInitiated is handled in the .FileImportInitiatedEndpoint

FileImportInitiatedEndpoint

Before looking at the FileImportInitiatedHandler in this endpoint, let’s take a look at how this part of the file import used to when it was in a saga. This is the code that was used in our ProcessImportFileSaga:

public void Handle(FileImportSuccesAndFailureCount message)  
{  
    Log.Warn("handling FileImportSuccesAndFailureCount");  
    Log.Warn(string.Format("RowsSucceeded: {0}, RowsFailed: {1}", message.RowsSucceeded, 
        message.RowsFailed));

    if (message.RowsSucceeded + message.RowsFailed == Data.TotalNumberOfFilesInImport)  
    {  
        Bus.Publish(new FileImportCompleted { ImportId = message.ImportId });  
        Log.Warn("Saga Complete");  
        MarkAsComplete();  
    }  
    else  
    {  
        RequestTimeout<TimeoutState>(TimeSpan.FromSeconds(5));  
    }  
}  

The saga used all the data at its disposal to determine whether or not the file import process was done, and if it was not done, checked again 5 seconds later via a timeout:

public void Timeout(TimeoutState state)  
{  
    Log.Warn("Sending CheckFileImportSuccessAndFailureCount.");  
    SendCheckFileImportSuccessAndFailureCount(Data.ImportId);  
}

private void SendCheckFileImportSuccessAndFailureCount(Guid importId)  
{  
    Bus.Send(new CheckFileImportSuccessAndFailureCount { ImportId = importId });  
}  

The CheckFileImportSuccessAndFailureCount message was handled in a different handler, which retrieved the RowsSucceeded and RowsFailed values on FileImportSuccesAndFailureCount, which was bus.Replied back to the saga.

This code has been consolidated into the FileImportInitiatedHandler:

public class FileImportInitiatedHandler : IHandleMessages<FileImportInitiated>  
{  
    private readonly IBus bus;  
    private readonly IDataStore dataStore;  
    private static readonly ILog Log = 
        LogManager.GetLogger(typeof(FileImportInitiatedHandler));

    public FileImportInitiatedHandler(IBus bus, IDataStore dataStore)  
    {  
        this.bus = bus;  
        this.dataStore = dataStore;  
    }

    public void Handle(FileImportInitiated message)  
    {  
        int rowsSucceeded;  
        int rowsFailed;  
        using (var session = dataStore.OpenSession())  
        {  
            rowsSucceeded = session.Query<FileImport>()
                .Where(x => x.ImportId == message.ImportId).Count(x => x.Successful);  
            rowsFailed = session.Query<FileImport>()
                .Where(x => x.ImportId == message.ImportId).Count(x => !x.Successful);  
        }

        if (rowsSucceeded + rowsFailed == message.TotalNumberOfFilesInImport)  
        {  
            Log.Warn("import completed");  
            bus.Publish(new FileImportCompleted { ImportId = message.ImportId });  
        }  
        else  
        {  
            Log.Warn("import not completed. Checking again for complete in 5 seconds");  
            bus.Defer(new TimeSpan(0, 0, 5), message);  
        }
    }  
}  

In this handler, we are:

  • querying FileImport to get the most recent rowsSucceeded and rowsFailed values.
  • if rowsSucceeded + rowsFailed is equal to TotalNumberOfFilesInImport, we publish FileImportCompleted and we’re done.
  • if rowsSucceeded + rowsFailed is less than FileImportCompleted, we’re not done. We bus.Defer the FileImportInitiated message for 5 seconds, and the handler will run again and again until the success/failed message are equal to TotalNumberOfFilesInImport.
  • The amount of time we’re bus.Defer’ing can be changed to meet the businesses SLA on the file import.

We’ve essentially done the same thing the saga did, but this time using we’re using bus.Defer on the message in a handler. When you use bus.Defer, you’re using the same storage and retrieval mechanism that a saga uses on its saga data properties. When the message is handled again after being deferred, all the same data is on the message the last time the handler handled it.

We’re accomplishing the same thing we did before with a saga, but with less messaging and less code.

And if there is anything I love, it’s doing something with less code 🙂


In Summary

Before jumping to a saga (like I did) to solve a problem, take a look at your data and behavior and determine if a saga is really needed, or if there is other functionality of the NServiceBus infrastructure you can use instead.

I want to thank our architect, Gary Stonerock II (@garystonerockii ) on giving me feedback to refactor this solution to get rid of the saga.

Michael McCarthy

Read more posts by this author.