November 20th 2009

Have you seen my latest project: Getting functional with Erlang?

CQRS Domain Events

As you may have seen in my previous post “CQRS à la Greg Young” now our domain aggregate root is responsible for publishing domain events indicating that some internal state has changed. In fact state changes within our aggregate root are only allowed through such domain events. Secondly the internal event handlers are not allowed to have any sort of business logic in them, they are only supposed to set or update the internal state of the aggregate root directly from the data the event carries. Using these rules you completely separate the business logic from the state changes. This separation enables us to replay historical domain events without any business logic being triggered bringing it back to the same state as the original aggregate root.

Why is this important? Well now we can use these same domain events for our persistence using an Event Store, this pattern by Martin Fowler is called “Event Sourcing”. Obviously you don’t want to process a credit card or send an e-mail every time you load an aggregate root from the event store. Also from the time the original domain event was recorded and when the aggregate root is loaded from the event store the business logic that decided a state change was needed could have changed, this should not affect the actual historical state change. So this separation is to be taken seriously.

Domain events can also be used to signal something to the outside world (taken from the aggregate roots view point) that something has happened without having an actual state change. When persisting our domain events we would not differentiate between those two different domain events.

All domain events should be named with the ubiquitous language in mind, meaning that they should closely represent what the user intended to do in the same language as the user would use to explain it to you. By keeping all these domain events we gain a huge amount of knowledge about what happened and why it happened.

This means that our aggregate root gets the added responsibility of tracking these domain events, but I don’t see this as being any different then for example the proxy that NHibernate generates for you, except perhaps that it is not a proxy and that you have more control over what happens. But it is true your aggregate root has these added responsibilities.

So let us take a look at how the aggregate root provides this functionality, for obvious reasons we use a base class for this, but really all that is needed is that the aggregate root implements the following two interfaces:

The IOrginator interface is for the snapshot functionality which is an optimization technique for speeding up loading aggregate roots from the Event Store. As you can see it is using the “Memento” patter from the Gang Of Four book. I wanted to get this interface out of the way first; it is not needed, but does provide a good optimization for loading aggregate roots.

The IEventProvider interface is the most interesting interface of the two, this one defines how domain events can be retrieved from the aggregate root and how historical domain events can be loaded back. It also defines that each aggregate root must have an Id and a Version, both of these are used by the event store, the Id is obvious so I won’t go into that, the version on the other hand may not be that obvious. The version is used to detect concurrency violations, meaning this is used to prevent conflicts that occur because between the time the command was send and the aggregate root was saved an other user or process has updated the same aggregate root. In this case we would throw an Concurrency Violation Exception which currently results in a rollback. In a future post I plan to look into how you could try to deal with these concurrency violations automatically.

Using domain events for state change

Now we will take a look at how these interfaces are implemented and how the implementation is used. The way I am going to go through the code is as if you where using R# going from method to method. So in order to get a more overall impression I would encourage you to look at the source code. The source code can be found here:

Each aggregate root has to register the domain events and the internal event handlers with the base class. I am working on getting this as static information for the type since this will not change between different instances of the same type. As you can see I am registering the domain event type and an action to handle the specific type. As you can see the actions have the specific domain event as an input parameter.

The RegisterEvent method is defined in the BaseAggregateRoot class, here is a little bit of interesting logic going on where basically a new action is defined that has an IDomainEvent as an input parameter, that is how they can all be stored in the same Dictionary. Then inside this action the provided action is invoked where the input value is cast from an IDomainEvent to the actual expected type TEvent.

So if we would write the example out of what is actually happening for the Client Created Event then it would look like this example below, and this is what is being stored in the _registeredEvents.

And below here is the private apply method that is being called from two different methods in the aggregate root base class. The method retrieves the action that is registered for the provided domain event, and than it invokes the action with the domain event as the input parameter. The apply method takes an IDomainEvent instead of the specific domain event and because of that we have the above mentioned logic.

Let us take a look from where this apply method is being called, first we will look at some actual domain behavior in the aggregate root.

First we execute all our valuable domain logic, the business behavior. Then when we are satisfied that everything is ok and we know what type of state change we need to execute and we Apply a new domain event with the new internal state. The Apply method used here is a protected method on the BaseAgregateRoot. Btw there is nothing stating that there can only be one outcome, i.e. only one type of domain event being Applied.

When we Apply a domain event we will first assign the aggregate root Id to the event so that we can keep track to which aggregate root this event belongs to. Secondly we get a new version and assign this to the event, this is to maintain the correct order of the events. Then we call the apply method which will make the state change to the aggregate root. And finally we will add this domain event to the internal list of applied events. This is very similar with the dirty check of NHibernate (the idea, not the actual implementation).

This is all what what is needed to execute domain behavior and keep track of the domain events that have been used to update internal state of an aggregate root.

Getting the state changes

So now we have build up some internal state changes and we want to persist them to some sort of medium. I am not going to discuss how to actually persist these state changes, that is for a later post, all I want to show you now is how to get them out of the aggregate root. How about we start with the method GetChanges :)

Here we simply return all the applied domain events, by tracking these domain events we in effect track all the state changes that have happened since the aggregate root was instantiated. Here we also request all the applied domain events from all entities. Than new order the domain events by version. After having received and processed all the applied domain events we should Clear the aggregate root from all applied domain events so they won’t be persisted again.

Just a quick word about the entity event providers, in some cases you want to have domain behavior inside entities that are part of the aggregate but are not the aggregate root. Well in this case you want to have those entities generate domain events as well, and you want to get those as well when getting all changes. Same when clearing the domain also the changes inside each entity event provider should be cleared.

Before saving the changes in the aggregate root is finalized we also need to update the version of the aggregate root. This version will match the version of the last applied domain event.

Loading historical domain events

In a next post I’ll dig deeper into the event store but for now lets just assume you have a very nice way of storing these domain events and have the ability to retrieve them again.

As the IEventProvider interface nicely dictates there is a LoadFromHistory method that takes an IEnumerable below here is the implementation of this method.

When you take a look in the for each loop you will notice that here we are calling apply again, please note that this is the private variant which is responsible for updating the internal state of the aggregate root and that these events are not added to the _appliedEvents collection, nor is the Id or Version updated. We don’t want to save these events again. After applying all the historical domain events we update the aggregate root version to the version of the last event.

The base class

I am using a base class to provide this functionality to all the different aggregate roots using inheritance and I know there are different opinions about this. So I wanted to highlight that you can achieve the same results by using composition. Your aggregate roots still need to implement the interfaces, but the implementation can be provided by using composition.

An other thing is that all the public methods in the BaseAggregateRoot are explicate interface implementation. I do this because I want to hide these details for any piece of code that is using the aggregate root as is, only when dealing with persistence we use the interface and get access to the explicit interface methods. Nice and clean.

Aggregate entities

Ok I have already mentioned the term entities and entity event providers, and I would like to focus on them for a bit. An entity is an domain object that is part of the same aggregate as the aggregate root, but is not the aggregate root it self, is however it is managed by the aggregate root. We manage state changes in these entities in the exact same way as we do this in the aggregate root, so each entity is also an event provider. There is a different interface for the entities then for the aggregate root so they can not be confused among each other.

You may have also noticed the hookup version provider method, remember it :)

The problem is that we should deal with the whole aggregate as a whole so all state changes need to be persisted within the same transaction. So we want to have a single point to access the internal state of the whole aggregate as well as a single point to load the history back into the whole aggregate. In order to achieve this the aggregate root needs to register all the entity event providers so it can track them. To enable that we have an additional interface for our aggregate root to implement.

Lets look at the implementation right away

Now this is only a very simple collection that hold references to IEntityEventProviders which is used in the previous shown GetEntityEvents method. So getting out the changes is relatively simple this way. I also created a special collection that will automatically register entities when they are added to the collection.

Remember I mentioned the HookUpVersionProvider method, well this is used to get a reference to the method from the aggregate root that deals with assigning a new version to each event. We want a reference to the same version generator that the aggregate root uses because then all the versions of each domain event will be in sequence independently is the aggregate root or an entity created it.

The collection takes a reference to the aggregate root it is part of in the constructor to be able to add each added entity event provider to the collection in the aggregate root.

Currently there is some duplication between the BaseAggregateRoot and the BaseEntity because they are inherited from different interfaces, I am sure there is some optimization possible there :)

There is finally one more thing to cover about this and that is how historical domain events are being passed into the correct entity, currently this is a bit ugly but I am working on a more elegant solution. Take a look at the code.

So each entity domain event will be registered here as well and are all passed to one specific event handler (one specific event handler for each different entity type). What happens is that we check if the specific event provider is present in the collection by looking for the Id, when the requested event provider is found the historical domain event will be loaded using the load from history method on the event provider.

The problem here is that the information which events the entity can produce is already registered, and that is in the entities them self’s. So by making that information statically available I should be able to auto register the entity event handlers.


I hope that this was a useful explanation of how the aggregate root works with the internal domain events, and that you would agree with me that it really is not very difficult. Next time I want to discuss the event store so we can take a look at persisting the domain events. I will also post a blog about making the entity registration more elegant and less manual work.

  • Keywords:
  • "cqrs"
  • "design"