I was lucky enough to be able to attend NSBCon 2014 last October in Brooklyn, NY. All presentations were wonderful, but the ones I liked the most had to deal with very common problems that we as programmers face every day, and how those problems were solved using the asynchronous communication features of NServiceBus.

One of the most common scenarios we as programmers are asked to do is to import data into our system and export data out of our system. There are myriad of ways to do both, some as simple as attaching a file to a UI and uploading it, or more complex scenarios where automated system drop data into pre-defined folders, and we need to “do something with it”.

Sam Martindale (of Architecting Innovation) gave a great presentation called “Building a Highly Scalable File Processing Platform with NServiceBus“. Jimmy Bogard (of Headspring) also gave a great presentation called “Scaling NServiceBus

Between both presentations, we were shown some of the common mis-steps developers take when using NServiceBus Sagas like problems with saga contention around instantiation and look-up. We were shown how batch jobs lack the scalability (locking databases to import a million + records), durability (on record #567,322 of my batch job import, they import failed, so what state is my data in?) and maintainability we get from NServiceBus Sagas out of the box.

I wanted to build a saga using a combination of Jimmy and Sam’s presentation that used a saga to import files, but in which the saga did not call out to the database to either insert import rows or create or update the “status” of the import. We should be able to do this via messaging.

What are sagas? They’re coordinators, not implementers.

I relied on Jimmy’s guidance to use the “observer pattern” style of saga, where instead of looking up the saga in RavenDb for every row import to insert every row into the database, there would be a “state” of the import stored in the database. That state would be checked by the saga (once started) every “n” seconds. The saga could then use this state to decide if the import was complete or not. If it’s not complete, it can set a timeout and check on the state again. It will continue to do this until it’s done.

There are a couple assumptions I made while designing it:

  • if one or more rows fail in the import, we don’t fail the whole import
  • I put a lot of data on the row import message sent from the client in order to keep the messages sending and processing asynchronously.
  • any type “rules” that needed to be checked on the data on the import were checked for each row.
  • this user is not waiting on a UI for this import to show its successful. Because the import could take awhile to run, this UC for the import would be a user kicking off the import via UI, or some other mechanism kicking off the import.

This is the solution I came up with: https://github.com/mgmccarthy/FileImportProcessingSagaNSB5. Download the project, and I’ll walk through the different messaging patterns and decisions about breaking out endpoints.

Here is an overview of the solution:

SagaImportProcessSolution

  • ConsoleClient- This project contains our client code. In the client, we send one command per import row, and carry the ImportId, the total number of files in the import, a boolean flag stating if this import row is the first row for the given ImportId and the rest of the “business data” we want to import. I’ll talk about the design decision that led me to “saddle” each import row command with the with the non-business data later.
  • Conventions- contains the configuration for messages conventions so I do not have to decorate my message with “ICommand”, “IEvent” or “IMessage”.
  • FileImportInsertionEndpoint- the purpose of this endpoint is two-fold. One, it processes each file import row and inserts that row into the database via the ProcessImportFileRowHandler. Two, it checks the “completed” state of the import via the CheckFileImportSuccessAndFailureCountHandler.
  • Messages- this project contains all messages used in the solution
  • SagaEndpoint- this is the orchestration of the entire process. The SagaEndpoint checks on the complete status of the file import via pub/sub as well as keeping track of amount of successful and failed messages in the import. Is uses the total amount of rows in the import in conjunction with the successful vs. failed rows in order to determine if the Saga is done.

Now that we’ve seen the overview, let’s explore each project in more detail

ConsoleClient

The ConsoleClient creates a new importId and then ties that id to every ProcessImportFileRow command that is sent in the import. The importId will become the unique identifier for this file import. In this case, I’m not actually importing a real file from disk. Reading the file in and processing it is a technical concern that can be done in different ways depending on your situation (for example, is the import fired off manually or triggered by a file landing in a folder? how big is the file, what’s the format, etc…). We iterate through a thousand rows in the file to import and send ProcessImportFileRow for every row we want to import. In this case, we’re importing Customers into our system.

ConsoleClient

NOTE: a design decision of mine was made here that I include the ImportId on every ProcessImportFileRow sent. You’ll also notice the FirstImportRowForThisImport and TotalNumberOfFielsInImport properties on the commands as well. This is to let the handling endpoint know if this is the first row for this import that was sent and how many overall rows are expected for this ImportId. You’ll see why in the next section.

FileImportInsertionEndpoint

This endpoint handles ProcessImportFileRow via the ProcessImportFileRowHandler. The handler first checks the FirstImportRowForThisImport property and if it’s true, publishes FileImportInitiated. We’ll come back to this event after looking at the rest of the handler.

SagaImportProcessFileImportINsertionEndpoint

I use a quick Random call to simulate the file succeeding or failing based on some type of rules and write the row to the database with the Successful flag set to true or false.

As promised, let’s see where the FileImportInitiated command is handled in the next section.

SagaEndpoint

The FileImportInitiated starts the ProcessImportFileSaga.

SagaStartedByFileImportInitiated

The ImportId on FileImportInitiated is used as a correlation identifier throughout the rest of the saga’s lifetime. The message also supplies the saga the total number of files to expect in the import. Both of these pieces of data are saved in the saga’s data property. After we set our saga’s state via saga data, the saga then sends a message to check on the current import’s success and failure count using the saga’s import id:

SendAMessageToGetOverCountOfSuccessfulAndFailedMessageForImport

In aligning with a sagas being coordinators and not implementors, the saga sends a message to get the data it needs. CheckFileImportSuccessAndFailureCount is sent, and handled by CheckFileImportSuccessAndFailureCountHandler in the FileImportInsertionEndpoint:

HandlingCheckFileImportSuccessAndFailureCount

In the CheckFileImportSuccessAndFailureCountHandler we call out to the database to look up the file import by id to get the successful and failed message of the import so far. The intent here is that most likely, on a larger import, the file will not be done processing yet. We then use Bus.Reply to send the data back to the saga that sent this command in the first place.

NOTE: I did not know this, but when using Bus.Reply, you cannot use ICommand or IEvent. The message on the reply needs to be an IMessage. Check this StackOverflow posting for more information http://stackoverflow.com/questions/22973996/nservicebus-wcf-service-with-error-system-invalidoperationexception-reply-is-ne. Also nifty about Bus.Reply, no message endpoint mappings are needed for the message to find it’s way back to the initiator of the request. We get this for free from NServiceBus. Thanks Particular!

Let’s return to the ProcessImportFileSaga to see what the saga does with this data:

SagaHandlingFileImportSuccessAndFailureCount

In this handler, we’re writing some info to the console, but more importantly, we’re using the overall successful plus failed messages and comparing it to the total number of files in the import (which the saga saved when it was started by the FileImportInitiated message) to see if the saga can mark itself as complete. If not, the saga sets a 5 seconds timeout. When this timeout expires it will again send the CheckFileImportSuccessAndFailureCount command and receive a reply with updated data. The timeout can be scaled to meet SLA needs or resource concerns.

So where is the FileImport table being populated from if the saga can send a message to get the most recent successful and failed messages? Let’s return to the ProcessImportFileRowHandler:

SagaImportProcessFileImportINsertionEndpoint

Picking up where we left off, after we publish FileImportInitiated, we take the data that represents a single row in the import and write it to the FileImport entity. The saga checks FileImport via pub/sub, and using the data that’s constantly being inserted into the FileImport table to determine if it’s finished or not.

Things I still want to improve and/or change about this implementation:

  1. There should be a way for the saga to mark itself as complete if other processes fail that would jeopardize the sagas ability to get the most recent successful and failed message. That way, we don’t have a saga stuck in a state in which it can never complete.
  2. I’d like to remove the ImportId, FirstImportRowForThisImport and TotalNumberOfFilesInImport properties off of the ProcessImportFileRow command that’s sent for each row. To me, the business data that is being imported has nothing to do with the technical infrastructure to run the import, and I”m combining both types of data on one message. I’m pretty sure this is wrong.
  3. When the saga is finished, most likely we’ll need to publish an event to let whoever is interested in this import know its done.

Expanding on number 2 some more, I initially was sending a command to an endpoint, which dispatched all the ProcessImportFileRow commands, but with NServiceBus creating an ambient transaction per endpoint, I had to wait for ALL the rows to be iterated through before the commands were sent. Since the I wanted true “fire and forget” and my entire import does not succeed or fail based on one or more failed rows, I stuck with the send-only client endpoint and saddled the command with the extra data I needed to get the saga up and running.

That’s it for my file import saga. Hopefully you found this post useful. Please feel free to leave any comments for things that are flat-out wrong, or things that could use some improving.

Thanks!