The repository pattern via CQRS with Python/Django/Elasticsearch

Douwe van der Meij
14 min readFeb 9, 2023

--

The repository pattern is probably among one of the most commonly used patterns, especially within ORM frameworks (explanation of ORM below).

However, a lot of people take these ORM frameworks for granted and therefore the decisions around the specific implementation of the repository pattern as well.

This article tries to describe the full potential of the repository pattern, an implementation using CQRS and the powerful relation with the specification pattern.

Photo by Anna Hunko on Unsplash

In practice, a repository is nothing more than a place to store data. Like a library. Apart from the possibility to store a lot of data (in an efficient way), it should also be possible to find the right data quickly when you need it. This is also similar to a library.

A common way to be able to efficiently find the data you need, is to add indices. An index can be seen as of keywords that refers to specific data. Like a list of book titles or author names. When you search for an author, the index can quickly direct you to all the books in the library that the author has written, without having to read all books every time you do a search.

Without having to read books every time might sound a bit stupid, humans intuitively create (mental) indices everywhere. Computers are not that smart. If you don’t tell a computer to keep track of an index, it won’t.

The repository pattern offers more than just indices. Let’s dive in.

ORM frameworks

ORM stands for Object Relational Mapping. It’s a way to represent a data model in an object oriented way, which is easier to work with than direct access to (relational) databases. Using an ORM also cuts away a lot of implementation time and boilerplate code.

In fact, an ORM contains an in-memory representation of the database and can interact with the actual database whenever it needs to or upon request. For the user/developer this database interaction is transparent; you don’t have to bother.

Popular ORM frameworks in Python are SQLAlchemy and Django (the ORM is a large and important part of Django). In this article we focus on Django.

While ORMs can be really handy, they can become a burden when requirements change or load increases. Databases usually offer a lot of specific features. For an ORM, it’s hard to keep up with all the new database features on the one hand, but on the other hand it’s also hard to decide when to use a specific feature and when not to. Developers can “help” the ORM to use specific features, but this means that the ORM itself can quickly become hard to learn and maintain. Martin Fowler sums up pros and cons of ORMs nicely in his article called OrmHate.

One alternative Martin Fowler mentions in his article is CQRS. ORMs tend to be bi-directional; a single interface to read and write data. With CQRS these responsibilities are literally segregated. More about CQRS and segregation below.

My personal opinion about ORMs is that they are great for basic CRUD (create, read, update and delete) operations for relatively straightforward data models. But when the datasets become too large or requirements too exotic (with respect to CRUD), it’s probably best to look for a different solution, like CQRS.

What about document databases?

ORMs target relational databases. Document databases lack relations and thus the implementation is far more simple. For a document database, a JSON structure can be enough. In such a structure there can be nested structures as well, representing a relation.

Consider this JSON document about a sales order with order lines, which would typically be a relation between two tables in a relational database:

{
"name": "order 123",
"number": 123,
"lines": [
{
"description": "product",
"quantity": 1,
"price_cents": 500
}
]
}

There do exist frameworks for document databases, they’re called ODM frameworks. If you really fancy using frameworks you can pick one, but it’s not a necessity and they’re also not that popular as ORMs, because they don’t need to.

The repository pattern is ignorant of relational or document databases, so is CQRS.

Domain Driven Design

A quick note about Domain Driven Design (DDD). In DDD the main entry point for data manipulation — regardless of being relational or not — is the aggregate root.

Aggregate root

Citing Martin Fowler:

A DDD aggregate is a cluster of domain objects that can be treated as a single unit… …An aggregate will have one of its component objects be the aggregate root. Any references from outside the aggregate should only go to the aggregate root. The root can thus ensure the integrity of the aggregate as a whole.

Martin Fowler also uses an order with order lines as an example of an aggregate with the order as its root.

Aggregate roots can be easily serialised into a document ready to be stored in a document database. See the JSON example above.

For relational databases this goes a bit different.

Unit of work

In the field of relational databases, together with the repository pattern (in the form of ORMs), exists a lot of pain. This is also reflected by Martin Fowler in his OrmHate article because:

The object/relational mapping problem is hard. Essentially what you are doing is synchronizing between two quite different representations of data, one in the relational database, and the other in-memory.

A unit of work can be seen as a single transaction, but on a higher level than the relational database.

It needs to be on a higher level, because in your aggregate you have an order with its lines. When a line changes, or a new line is added or removed, you store the aggregate via its root (the order). Now the unit of work will need to find out what needs to be written to which table in the (relational) database. And that final write should also go as single transaction, so everything maintains consistent.

The pain is in the mapping between the two different data structures and keeping them synchronised and consistent. This is a typical problem with any ORM. Choose wisely.

CQRS

CQRS stands for Command Query Responsibility Segregation and is a term first described by Greg Young. This has multiple levels, which I will explain below. According to Greg Young himself, CQRS is a stepping stone towards Event Sourcing. Event Sourcing itself will not be covered in this article, perhaps in another article I will write at some point. Feel free to follow me to get notified.

CQRS can be intimidating when you’re used to ORMs. Let alone Event Sourcing. When you’re used to Django for example, you’re used to import a model and call all() or get(...) or filter(...) on it to read, and save() to write. In most cases this is also called straight from views.py , but that’s another discussion.

Splitting the interface

The first step to get to CQRS, in the case of Django, is to make the interface to the model more abstract. Specifically in the case of writing. So we will split the interface.

Consider the following Django code fragment:

class OrderView(View):
def get(self, request, pk: int, *args, **kwargs):
order = Order.objects.get(pk=pk)
return JsonResponse({
"name": order.name,
...
})

def post(self, request, *args, **kwargs):
data = json.loads(request.body)
order = Order(
name=data.get("name"),
...
)
order.save()
return JsonResponse({
"name": order.name,
...
})

Notice the Order model being queried in get and a new record being added in post . Let’s turn adding a new record into a command.

Consider the following command:

@dataclass
class AddOrderCommand:
data: dict

def handle(self):
order = Order(
name=self.data.get("name"),
...
)
order.save()
return order

Now the view code could look like this:

class OrderView(View):
def get(self, request, pk: int, *args, **kwargs):
order = Order.objects.get(pk=pk)
return JsonResponse({
"name": order.name,
...
})

def post(self, request, *args, **kwargs):
data = json.loads(request.body)
order = AddOrderCommand(data).handle()
return JsonResponse({
"name": order.name,
...
})

Notice the dependency on the Order model has been moved from the view to the command.

In this example, not much happened and one could just call this unnecessary overhead. What people then forget, is the opportunities this brings later on, while at the same time leaving behind cleaner code. Think about what it means when all database mutations are done via commands and all these commands live in separate classes inside a folder called commands. Don’t forget to write tests too. This also becomes easy when the work is done in an isolated command.

Generating events

Since every database mutation is now maintained in a separate command, we can easily enhance the commands with generating events. This too might seem unnecessary overhead, but wait for it.

When we stay inside the Django ecosystem, events can be broadcasted via the signalling system.

order_added_event = django.dispatch.Signal()


@dataclass
class AddOrderCommand:
data: dict

def handle(self):
order = Order(
name=self.data.get("name"),
...
)
order.save()
order_added_event.send(sender=AddOrderCommand.__name__, order=order)
return order

In the example above an event will be emitted after the order has been saved. The signal is declared outside of the command, so it can be easily imported/used elsewhere in the code. You might want to keep all of your signals in a separate signals.py file.

Django core also offers quite a lot default signals, for example around the save operation (pre-save and post-save). Please note that these signals also emit events outside of the command’s scope. Meaning, you can’t be sure if the event comes from this command, another command, e.g., UpdateOrderCommand, or some call from the Django admin interface.

While using the Django signalling system is a very powerful mechanism to generate (internal) events, the Django documentation itself warns developers about using it:

Signals are implicit function calls which make debugging harder. If the sender and receiver of your custom signal are both within your project, you’re better off using an explicit function call.

I disagree, but only when you know what you’re doing.

When moving forward, you could make the events and the emitter more abstract and install different implementations to also emit to the outside world. For example, to Redis, Kafka, or any other (cloud native) event/messaging system.

Read models

Before we go and listen to the events being emitted, first a brief explanation about read models. Read models are, as the name suggests, (data) models that optimised for reading, so not for writing.

Going back to our order example, in our Django view we’ve got the following get endpoint:

class OrderView(View):
def get(self, request, pk: int, *args, **kwargs):
order = Order.objects.get(pk=pk)
return JsonResponse({
"name": order.name,
...
})

Notice the call to the database Order.objects.get(pk=pk). By experience, I can tell that this query can’t be optimised much further; there’s no need to create a different read model for orders to make this call much more performant.

Now consider a query where we search for a (part of) a title of a book, or worse, part of a text somewhere in the book. While Django models, and the relational databases in the background, are capable of doing full-text search queries, it might not give the best performance, especially at scale (consider many thousands of books). A lot of developers I know, immediately see an opportunity for Elasticsearch here, which is a data store optimised for full-text search.

Elasticsearch as a back-end for search queries, while not being the main database, is a read model. It’s optimised for reading and populated with data after the (write) transaction to the main database has been succeeded.

There are several libraries and mechanism available for Django and Elasticsearch to ensure Elasticsearch always has the latest data. This usually is called caching and cache invalidation, which in itself is a relatively hard problem in computer science; you have to orchestrate it in a consistent and structured way.

Elasticsearch isn’t the only back-end for read models of course. Any data store can act as a read model, even SQL databases. The only rule is that it’s not the main data store for writing and it’s populated after the original write transaction is completed. A very common back-end to be used as read-model is an in-memory collection (list, dictionary, object, etc.), even temporary.

Eventual consistency

Having commands emitting events and knowing about read models that need to be populated, we could listen to the events and just do that.

In Django listening to events would go as follows:

from signals import order_added_event


def elasticsearch_projector(sender, **kwargs):
# callback function to populate read model


order_added_event.connect(elasticsearch_projector)

Since we know by event (or sender) from which command it came, we could even implement specific logic in elasticsearch_projector based on that. For example, if we would have a command called UpdateOrderStatus, then the callback would only have to update the status field in the read model(s).

You can of course connect multiple callback functions when you’re using multiple (different) read models.

When using read models or caching in general, there is a little caveat: the data might not be up-to-date, as mentioned before. The theoretical problem is described in the CAP theorem. This problem doesn’t exist when you use the same data store to read and write, but that again introduces other problems.

In essence, read models should be considered as eventual consistent; at some point in time they will be consistent with the main database.

The following image depicts eventual consistency probably the best:

https://twitter.com/gregyoung/status/1101642600342265857

Hexagonal architecture

Hexagonal architecture, also known as Ports and Adapters, is an architectural pattern to split logic/intent from the actual implementation and is first described by Alistair Cockburn. In general it’s about defining interfaces that are part of the core/heart of the application, the domain (in DDD), and adapters per interface surrounding the core.

Please check out my article about Hexagonal Architecture in Python for a more in-depth explanation with hands-on code examples.

In my applications I always use a folder called domain and a folder called adapters . In the domain folder all domain objects, aggregates, commands, logic, services and repositories are described. For repositories and services, only the interfaces are described when there is a dependency with something outside of the domain. For example, connecting to an external API or connecting to a real database. The implementations of these interfaces are located in the adapters folder.

The general rule is that within the domain folder, there may be no dependency on something external, outside of the domain. Adapters have a dependency on the domain interfaces and are allowed to have external dependencies. This complies with the SOLID dependency inversion principle.

The domain would be the place to write an interface for a repository that is able to work with order aggregates. This interface would have an implementation in Django which is located in the adapters folder.

The repository would describe a get function and a save function, among others. While the get function can be called directly, the save function should (must) be called only from a command.

Consider the following interface:

class OrderRepository(ABC):
@abstractmethod
def get(self, pk: int) -> Optional[Order]:
...

@abstractmethod
def save(self, order: Order) -> Order:
...

Now consider the following implementation of the interface:

class DjangoOrderRepository(OrderRepository):
def get(self, pk: int) -> Optional[Order]:
try:
return Order.from_django(DjangoOrder.objects.get(pk=pk))
except DjangoOrder.DoesNotExist:
return None
except DjangoOrder.MultipleObjectsReturned:
return None

def save(self, order: Order) -> Order:
return order.to_django().save()

Notice that we have to convert the internal order object from the domain back and forth to a Django representation. Also note that this example is simplified as it doesn’t include a proper unit of work for the order lines for example.

A quick look at the Order class:

@dataclass
class Order:
pk: int
...

@staticmethod
def from_django(order: DjangoOrder):
return Order(pk=order.pk, ...)

def to_django(self):
return DjangoOrder(pk=self.pk, ...)

The whole application can be built based on the OrderRepository interface. The specific Django implementation is merely an infrastructural decision. When respecting the SOLID Liskov substitution principle, any specific OrderRepository implementation would do. Changing it at a later stage should not affect the domain.

A common problem though, with this generic approach, is propagating business logic to the infrastructure layer, e.g., filter queries to the database. In the OrderRepository example above I deliberately left such functions out for simplicity, but consider a generic find function to search for data based on combinations of different fields and values. A solution to this problem is offered by the specification pattern.

Specification pattern

The specification pattern is a technique to encapsulate business rules in an object-oriented way. The specification patterns offers an extensible set of operators to be able to build endless structures of functional logic.

The most basic operator is the equality operator. Let’s say we want to find all orders with status open. The predicate would be:

order.status == "open"

In Django this would be written as:

Order.objects.filter(status="open")

But because of dependency on Django, we don’t want the Django notation to be present in the domain. But we also don’t want to evaluate the predicate in the domain, we want to let the underlying database do the comparison. This is where specifications come in handy.

Consider the class EqualsSpecification:

class EqualsSpecification(Specification):
def __init__(self, field: str, value: Any):
self.field = field
self.value = value

def is_satisfied_by(self, obj: Any) -> bool:
return getattr(obj, self.field) == self.value

Notice the is_satisfied_by function. By definition, an implementation of the specification pattern has to implement an is_satisfied_by function to evaluate objects. This could even be done in the application domain, if necessary.

Now consider the following specification object:

spec = EqualsSpecification("status", "open")

We could now call spec.is_satisfied_by(order) with an order object and get a boolean result.

More complex specification structures can be made with the AndSpecification and OrSpecification, which essentially make nested collections (tree structures) possible.

Django unfortunately can’t work with such specifications (yet), but we can transform our specification object(s) into something that can be used in Django. For example the Django Q object.

Please check out my article about the specification pattern for more in-depth information and to learn how to use it in practice with Django.

We can create a Q object by transforming the specification with a helper class and use it in Django:

q = DjangoOrmSpecificationBuilder.build(spec)
DjangoOrder.objects.filter(q)

Specifications are durable. This is also the power of the specification pattern. You don’t have to touch existing specifications when you change the infrastructure. Consider having an ElasticsearchSpecificationBuilder, or any other target platform specification builder as well.

The specification class, the operators and builders are part of an open source project I’m maintaining. You can find it on Github.

Interface segregation

Now we know about commands, read models, hexagonal architecture and specifications, we can talk about the SOLID interface segregation principle. This is where we will get to the full potential of the repository pattern.

Until now, we’ve used a single OrderRepository with an example implementation in Django. Earlier in this article Elasticsearch was mentioned, as a read model. We want our Elasticsearch read model of course to also implement (parts of) the same OrderRepository interface.

For simplicity, we split our OrderRepository into an OrderReadRepository and an OrderWriteRepository.

class OrderReadRepository(ABC):
@abstractmethod
def get(self, pk: int) -> Optional[Order]:
...

@abstractmethod
def find(self, specification: Specification = None) -> Iterator[Order]:
...


class OrderWriteRepository(ABC):
@abstractmethod
def save(self, order: Order) -> Order:
...

The DjangoOrderWriteRepository will implement the OrderWriteRepository interface and the ElasticsearchOrderReadRepository will implement the OrderReadRepository interface.

Next we could define a CompoundOrderRepository, which expects an OrderReadRepository and an OrderWriteRepository upon instantiation, so from the perspective of the domain there’s only one single interface for reading and writing orders.

Without interface segregation we would have:

class DjangoOrderRepository(OrderRepository):
def get(self, pk: int) -> Optional[Order]:
...

def find(self, specification: Specification = None) -> Iterator[Order]:
...

def save(self, order: Order) -> Order:
...


order_repository = DjangoOrderRepository()

With interface segregation we will have:

class ElasticsearchOrderReadRepository(OrderReadRepository):
def get(self, pk: int) -> Optional[Order]:
...

def find(self, specification: Specification = None) -> Iterator[Order]:
...


class DjangoOrderWriteRepository(OrderWriteRepository):
def save(self, order: Order) -> Order:
...


class OrderRepository(OrderReadRepository, OrderWriteRepository, ABC):
...


class CompoundOrderRepository(OrderRepository):
def __init__(
self,
order_read_repository: OrderReadRepository,
order_write_repository: OrderWriteRepository,
):
self.order_read_repository = order_read_repository
self.order_write_repository = order_write_repository

def get(self, pk: int) -> Optional[Order]:
return self.order_read_repository.get(pk=pk)

def find(self, specification: Specification = None) -> Iterator[Order]:
return self.order_read_repository.find(specification=specification)

def save(self, order: Order) -> Order:
return self.order_write_repository.save(order=order)


order_repository = CompoundOrderRepository(
order_read_repository=ElasticsearchOrderReadRepository(),
order_write_repository=DjangoOrderWriteRepository(),
)

Notice the find function be available in the OrderReadRepository, containing a specification parameter.

Note that this example is simplified. Elasticsearch still needs to be populated from events by eventual consistency. So there will be a save function somewhere in the repository. However, it doesn’t have to be called save , it can also have another name, depicted by another interface, to prevent confusion or misuse.

Conclusion

This article ended up as a fairly large article with a long introduction to get to the apotheosis of interface segregation. In my opinion, this was necessary to explain the full potential of the repository pattern.

Building applications this way ensures compliancy with the SOLID principles. At least three of them. This comes with a lot of other benefits, like maintainability and testability, among many others.

I’m a big fan of clean code, and I hope this article inspired you to also make tomorrow a little bit better.

Feel free to reach out if you have any questions or remarks regarding this article. I’m happy to help. You can reach out to me here or on my personal website.

--

--