聚合与一致性边界
In this chapter, we’d like to revisit our domain model to talk about invariants and constraints, and see how our domain objects can maintain their own internal consistency, both conceptually and in persistent storage. We’ll discuss the concept of a consistency boundary and show how making it explicit can help us to build high-performance software without compromising maintainability.
在本章中,我们将重新审视我们的领域模型,讨论不变量和约束,并探讨领域对象是如何在概念上以及持久化存储中维护其自身的内部一致性的。 我们会讨论 一致性边界 的概念,并展示如何通过显式定义一致性边界来帮助我们构建高性能的软件,同时不牺牲可维护性。
Adding the Product aggregate(新增产品聚合) shows a preview of where we’re headed: we’ll introduce
a new model object called Product
to wrap multiple batches, and we’ll make
the old allocate()
domain service available as a method on Product
instead.
Adding the Product aggregate(新增产品聚合) 展示了我们前进方向的预览:我们将引入一个名为 Product
的新模型对象,用来封装多个批次(batches),
并且我们会将旧的 allocate()
领域服务改为在 Product
上作为一个方法提供。
Why? Let’s find out.
为什么?让我们一探究竟。
Tip
|
The code for this chapter is in the chapter_07_aggregate branch on GitHub: 本章的代码位于 chapter_07_aggregate 分支 在 GitHub: git clone https://github.com/cosmicpython/code.git cd code git checkout chapter_07_aggregate # or to code along, checkout the previous chapter: git checkout chapter_06_uow |
为什么不直接在电子表格中运行所有内容?
What’s the point of a domain model, anyway? What’s the fundamental problem we’re trying to address?
那么,领域模型的意义究竟是什么?我们试图解决的核心问题是什么呢?
Couldn’t we just run everything in a spreadsheet? Many of our users would be delighted by that. Business users like spreadsheets because they’re simple, familiar, and yet enormously powerful.
难道我们不能直接在电子表格中运行所有内容吗?许多用户会对此感到 非常高兴。 业务用户 喜欢 电子表格,因为它们简单、熟悉,却又极其强大。
In fact, an enormous number of business processes do operate by manually sending spreadsheets back and forth over email. This "CSV over SMTP" architecture has low initial complexity but tends not to scale very well because it’s difficult to apply logic and maintain consistency.
事实上,大量的业务流程确实是通过手动在电子邮件中传递电子表格来运作的。这种“通过 SMTP 传递 CSV”的架构初始复杂性很低, 但往往难以很好地扩展,因为很难应用逻辑并维护一致性。
Who is allowed to view this particular field? Who’s allowed to update it? What happens when we try to order –350 chairs, or 10,000,000 tables? Can an employee have a negative salary?
谁被允许查看这个特定字段?谁被允许更新它?当我们尝试订购 -350 把椅子或 10,000,000 张桌子时会发生什么?一个员工可以有负数的薪水吗?
These are the constraints of a system. Much of the domain logic we write exists to enforce these constraints in order to maintain the invariants of the system. The invariants are the things that have to be true whenever we finish an operation.
这些是系统的约束条件。我们编写的大量领域逻辑是为了实施这些约束,以保持系统的不变量。 不变量 是指每当我们完成一次操作时,必须保持为真的那些事情。
不变量、约束与一致性
The two words are somewhat interchangeable, but a constraint is a rule that restricts the possible states our model can get into, while an invariant is defined a little more precisely as a condition that is always true.
这两个词在某种程度上可以互换使用,但 约束 是限制我们模型可能进入状态的规则,而 不变量 更准确地被定义为始终为真的条件。
If we were writing a hotel-booking system, we might have the constraint that double bookings are not allowed. This supports the invariant that a room cannot have more than one booking for the same night.
如果我们正在编写一个酒店预订系统,我们可能会有一个不允许重复预订的约束。这项约束支持了这样一个不变量:同一晚一间房间不能有多个预订。
Of course, sometimes we might need to temporarily bend the rules. Perhaps we need to shuffle the rooms around because of a VIP booking. While we’re moving bookings around in memory, we might be double booked, but our domain model should ensure that, when we’re finished, we end up in a final consistent state, where the invariants are met. If we can’t find a way to accommodate all our guests, we should raise an error and refuse to complete the operation.
当然,有时我们可能需要暂时 打破 规则。比如,因为 VIP 预订的原因,我们可能需要调整房间的分配。当我们在内存中移动预订时, 可能会出现重复预订的情况,但我们的领域模型应该确保在操作完成时,最终会达到一个一致的状态,且所有不变量都得到满足。如果无法找到办法容纳所有的客人,我们应当抛出错误并拒绝完成操作。
Let’s look at a couple of concrete examples from our business requirements; we’ll start with this one:
让我们来看几个源自业务需求的具体示例;我们从下面这个开始:
An order line can be allocated to only one batch at a time.
一个订单项在同一时间只能分配给一个批次。
This is a business rule that imposes an invariant. The invariant is that an
order line is allocated to either zero or one batch, but never more than one.
We need to make sure that our code never accidentally calls Batch.allocate()
on two different batches for the same line, and currently, there’s nothing
there to explicitly stop us from doing that.
这是一个施加了不变量的业务规则。不变量是指一个订单项要么未分配到任何批次,要么只分配到一个批次,但绝不会超过一个批次。
我们需要确保代码永远不会意外地对同一个订单项在两个不同的批次上调用 Batch.allocate()
,而目前没有任何机制能够明确地阻止我们这么做。
不变量、并发与锁
Let’s look at another one of our business rules:
让我们再来看另一个业务规则:
We can’t allocate to a batch if the available quantity is less than the quantity of the order line.
如果批次的可用数量小于订单项的数量,我们就不能将其分配到该批次。
Here the constraint is that we can’t allocate more stock than is available to a batch, so we never oversell stock by allocating two customers to the same physical cushion, for example. Every time we update the state of the system, our code needs to ensure that we don’t break the invariant, which is that the available quantity must be greater than or equal to zero.
这里的约束是,我们不能将超过批次可用库存的数量分配出去,以避免超卖库存,例如不会将同一个实际的靠垫分配给两个客户。每次更新系统状态时, 我们的代码都需要确保不会破坏不变量,而不变量是:可用数量必须大于或等于零。
In a single-threaded, single-user application, it’s relatively easy for us to maintain this invariant. We can just allocate stock one line at a time, and raise an error if there’s no stock available.
在单线程、单用户的应用程序中,维护这个不变量相对来说是比较容易的。我们只需一次分配一条订单项,如果没有足够的可用库存,就抛出一个错误即可。
This gets much harder when we introduce the idea of concurrency. Suddenly we might be allocating stock for multiple order lines simultaneously. We might even be allocating order lines at the same time as processing changes to the batches themselves.
当我们引入 并发 的概念时,事情就变得困难得多了。突然之间,我们可能会同时为多个订单项分配库存。 我们甚至可能在分配订单项的同时处理批次 本身 的变更。
We usually solve this problem by applying locks to our database tables. This prevents two operations from happening simultaneously on the same row or same table.
我们通常通过对数据库表应用 锁 来解决这个问题。这可以防止两个操作在同一行或同一表上同时发生。
As we start to think about scaling up our app, we realize that our model
of allocating lines against all available batches may not scale. If we process
tens of thousands of orders per hour, and hundreds of thousands of
order lines, we can’t hold a lock over the whole batches
table for
every single one—we’ll get deadlocks or performance problems at the very least.
当我们开始考虑扩大应用程序的规模时,我们会意识到,将订单项分配到所有可用批次的这种模型可能无法扩展。
如果我们每小时处理数万个订单和数十万个订单项,我们无法在每次操作时对整个 batches
表加锁——这样做至少会导致死锁或性能问题。
什么是聚合?
OK, so if we can’t lock the whole database every time we want to allocate an
order line, what should we do instead? We want to protect the invariants of our
system but allow for the greatest degree of concurrency. Maintaining our
invariants inevitably means preventing concurrent writes; if multiple users can
allocate DEADLY-SPOON
at the same time, we run the risk of overallocating.
OK,那么如果我们每次想分配一个订单项时都无法锁住整个数据库,那我们应该怎么做呢?我们希望保护系统的不变量,同时允许尽可能高的并发性。
维护不变量不可避免地意味着要防止并发写操作;如果多个用户可以同时分配 DEADLY-SPOON
,我们就面临着超额分配的风险。
On the other hand, there’s no reason we can’t allocate DEADLY-SPOON
at the
same time as FLIMSY-DESK
. It’s safe to allocate two products at the
same time because there’s no invariant that covers them both. We don’t need them
to be consistent with each other.
另一方面,我们完全可以在分配 DEADLY-SPOON
的同时分配 FLIMSY-DESK
。同时分配两个产品是安全的,
因为没有不变量将这两个产品关联在一起。我们不需要它们彼此之间保持一致性。
The Aggregate pattern is a design pattern from the DDD community that helps us to resolve this tension. An aggregate is just a domain object that contains other domain objects and lets us treat the whole collection as a single unit.
聚合(Aggregate)_模式是来自 DDD(领域驱动设计)社区的一种设计模式,可帮助我们解决这种矛盾。 _聚合 只是一个包含其他领域对象的领域对象,并允许我们将整个集合视为一个单元来处理。
The only way to modify the objects inside the aggregate is to load the whole thing, and to call methods on the aggregate itself.
修改聚合内部对象的唯一方法是加载整个聚合,并调用聚合自身的方法。
As a model gets more complex and grows more entity and value objects, referencing each other in a tangled graph, it can be hard to keep track of who can modify what. Especially when we have collections in the model as we do (our batches are a collection), it’s a good idea to nominate some entities to be the single entrypoint for modifying their related objects. It makes the system conceptually simpler and easy to reason about if you nominate some objects to be in charge of consistency for the others.
随着模型变得越来越复杂并增加更多实体和值对象,这些对象之间可能会通过一个纠缠不清的图互相引用,这使得追踪谁可以修改什么变得困难。 尤其是当模型中包含 集合(如我们的批次是一个集合)时,指定某些实体作为唯一的入口来修改与其相关的对象是一个好主意。 如果指定某些对象负责其他对象的一致性,那么系统的概念会变得更加简单,也更容易推理。
For example, if we’re building a shopping site, the Cart might make a good aggregate: it’s a collection of items that we can treat as a single unit. Importantly, we want to load the entire basket as a single blob from our data store. We don’t want two requests to modify the basket at the same time, or we run the risk of weird concurrency errors. Instead, we want each change to the basket to run in a single database transaction.
例如,如果我们在构建一个购物网站,那么购物车可能是一个很好的聚合:它是一个可以作为单一单元处理的商品集合。 重要的是,我们希望将整个购物车作为一个整体从数据存储中加载。我们不希望两个请求同时修改购物车,否则可能会导致奇怪的并发错误。 相反,我们希望对购物车的每一次修改都在一次单独的数据库事务中运行。
We don’t want to modify multiple baskets in a transaction, because there’s no use case for changing the baskets of several customers at the same time. Each basket is a single consistency boundary responsible for maintaining its own invariants.
我们不希望在一个事务中修改多个购物车,因为没有同时更改多个客户购物车的用例。每个购物车是一个单独的 一致性边界,负责维护其自身的不变量。
An AGGREGATE is a cluster of associated objects that we treat as a unit for the purpose of data changes.
聚合是一些相关对象的集合,我们将其视为一个单元以进行数据更改。
Domain-Driven Design blue book
Per Evans, our aggregate has a root entity (the Cart) that encapsulates access to items. Each item has its own identity, but other parts of the system will always refer to the Cart only as an indivisible whole.
根据 Evans 的定义,我们的聚合有一个根实体(购物车),它封装了对物品的访问。每个物品都有自己的标识, 但系统的其他部分将始终将购物车视为一个不可分割的整体进行引用。
Tip
|
Just as we sometimes use _leading_underscores to mark methods or functions
as "private," you can think of aggregates as being the "public" classes of our
model, and the rest of the entities and value objects as "private."
就像我们有时使用 _前导下划线 来标记方法或函数为“私有”一样,你可以将聚合视为我们模型中的“公共”类,
而将其他实体和值对象视为“私有”。
|
选择一个聚合
What aggregate should we use for our system? The choice is somewhat arbitrary, but it’s important. The aggregate will be the boundary where we make sure every operation ends in a consistent state. This helps us to reason about our software and prevent weird race issues. We want to draw a boundary around a small number of objects—the smaller, the better, for performance—that have to be consistent with one another, and we need to give this boundary a good name.
在我们的系统中应该选择哪个聚合呢?这个选择在某种程度上是任意的,但却非常重要。聚合将成为我们确保每个操作以一致状态结束的边界。 这有助于我们更好地理解软件并防止奇怪的竞争问题。我们希望围绕一小部分必须彼此保持一致的对象划定边界——对象越少越好, 以提高性能——并且我们需要为这个边界起一个合适的名字。
The object we’re manipulating under the covers is Batch
. What do we call a
collection of batches? How should we divide all the batches in the system into
discrete islands of consistency?
我们在底层操作的对象是 Batch
。那我们该如何称呼一组批次呢?我们又该如何将系统中的所有批次划分为一些独立的一致性单元呢?
We could use Shipment
as our boundary. Each shipment contains several
batches, and they all travel to our warehouse at the same time. Or perhaps we
could use Warehouse
as our boundary: each warehouse contains many batches,
and counting all the stock at the same time could make sense.
我们 可以 使用 货运(Shipment)
作为边界。每个货运包含多个批次,它们会同时运送到我们的仓库。
或者,我们也可以使用 仓库(Warehouse)
作为边界:每个仓库包含许多批次,同时统计所有库存可能是合理的选择。
Neither of these concepts really satisfies us, though. We should be able to
allocate DEADLY-SPOONs
or FLIMSY-DESKs
in one go, even if they’re not in the
same warehouse or the same shipment. These concepts have the wrong granularity.
然而,这些概念都无法真正满足我们的需求。我们应该能够一次性分配 DEADLY-SPOON
或 FLIMSY-DESK
,即使它们不在同一个仓库或同一个货运中。
这些概念的粒度并不合适。
When we allocate an order line, we’re interested only in batches
that have the same SKU as the order line. Some sort of concept like
GlobalSkuStock
could work: a collection of all the batches for a given SKU.
当我们分配一个订单项时,我们只关心与该订单项有相同 SKU 的批次。一种像 全局SKU库存(GlobalSkuStock)
的概念可能会
奏效:即给定 SKU 的所有批次的集合。
It’s an unwieldy name, though, so after some bikeshedding via SkuStock
, Stock
,
ProductStock
, and so on, we decided to simply call it Product
—after all,
that was the first concept we came across in our exploration of the
domain language back in [chapter_01_domain_model].
不过,这个名字略显笨拙,所以经过一番关于 Sku库存(SkuStock)
、库存(Stock)
、产品库存(ProductStock)
等名称的讨论后,
我们最终决定简单地称它为 产品(Product)
——毕竟, 这是我们在探索领域语言时最早接触到的概念之一,早在 [chapter_01_domain_model] 中就已经提到过了。
So the plan is this: when we want to allocate an order line, instead of
Before: allocate against all batches using the domain service(之前:使用领域服务在所有批次中进行分配), where we look up all the Batch
objects in
the world and pass them to the allocate()
domain service…
所以计划是这样的:当我们想要分配一个订单项时,与其采用 Before: allocate against all batches using the domain service(之前:使用领域服务在所有批次中进行分配) 中的方式,
即查找系统中所有的 批次(Batch)
对象并将它们传递给 allocate()
领域服务…
[plantuml, apwp_0702, config=plantuml.cfg] @startuml scale 4 hide empty members package "Service Layer" as services { class "allocate()" as allocate { } hide allocate circle hide allocate members } package "Domain Model" as domain_model { class Batch { } class "allocate()" as allocate_domain_service { } hide allocate_domain_service circle hide allocate_domain_service members } package Repositories { class BatchRepository { list() } } allocate -> BatchRepository: list all batches allocate --> allocate_domain_service: allocate(orderline, batches) @enduml
…we’ll move to the world of After: ask Product to allocate against its batches(之后:让产品在其批次中进行分配), in which there is a new
Product
object for the particular SKU of our order line, and it will be in charge
of all the batches for that SKU, and we can call a .allocate()
method on that
instead.
…我们将进入 After: ask Product to allocate against its batches(之后:让产品在其批次中进行分配) 所描述的世界,在这个世界中,每个订单项的特定 SKU 会对应一个新的 Product
对象,
它负责该 SKU 的所有批次。然后,我们可以直接在这个对象上调用 .allocate()
方法。
[plantuml, apwp_0703, config=plantuml.cfg] @startuml scale 4 hide empty members package "Service Layer" as services { class "allocate()" as allocate { } } hide allocate circle hide allocate members package "Domain Model" as domain_model { class Product { allocate() } class Batch { } } package Repositories { class ProductRepository { get() } } allocate -> ProductRepository: get me the product for this SKU allocate --> Product: product.allocate(orderline) Product o- Batch: has @enduml
Let’s see how that looks in code form:
让我们看看这在代码中的样子:
class Product:
def __init__(self, sku: str, batches: List[Batch]):
self.sku = sku #(1)
self.batches = batches #(2)
def allocate(self, line: OrderLine) -> str: #(3)
try:
batch = next(b for b in sorted(self.batches) if b.can_allocate(line))
batch.allocate(line)
return batch.reference
except StopIteration:
raise OutOfStock(f"Out of stock for sku {line.sku}")
-
Product
's main identifier is thesku
.Product
的主要标识符是sku
。 -
Our
Product
class holds a reference to a collection ofbatches
for that SKU. 我们的Product
类保存了对该 SKU 的batches
集合的引用。 -
Finally, we can move the
allocate()
domain service to be a method on theProduct
aggregate. 最后,我们可以将allocate()
领域服务转移为Product
聚合上的一个方法。
Note
|
This Product might not look like what you’d expect a Product
model to look like. No price, no description, no dimensions.
Our allocation service doesn’t care about any of those things.
This is the power of bounded contexts; the concept
of a product in one app can be very different from another.
See the following sidebar for more discussion.
这个 Product 可能看起来不像你期望的那种 Product 模型。没有价格、没有描述、没有尺寸。而我们的分配服务并不关心这些东西。
这正是限界上下文(bounded contexts)的力量;一个应用程序中的产品概念可以与另一个应用程序中的产品概念非常不同。请参阅以下侧栏获取更多讨论。
|
一个聚合 = 一个仓储
Once you define certain entities to be aggregates, we need to apply the rule that they are the only entities that are publicly accessible to the outside world. In other words, the only repositories we are allowed should be repositories that return aggregates.
一旦你将某些实体定义为聚合,我们就需要遵循一个规则:它们是唯一对外部世界公开访问的实体。 换句话说,我们唯一允许的仓储应该是那些返回聚合的仓储。
Note
|
The rule that repositories should only return aggregates is the main place where we enforce the convention that aggregates are the only way into our domain model. Be wary of breaking it! 仓储只应返回聚合的这一规则是我们强制执行“聚合是进入领域模型唯一途径”这一约定的主要方式。请谨慎打破这一规则! |
In our case, we’ll switch from BatchRepository
to ProductRepository
:
在我们的例子中,我们将从使用 BatchRepository
切换为使用 ProductRepository
:
class AbstractUnitOfWork(abc.ABC):
products: repository.AbstractProductRepository
...
class AbstractProductRepository(abc.ABC):
@abc.abstractmethod
def add(self, product):
...
@abc.abstractmethod
def get(self, sku) -> model.Product:
...
The ORM layer will need some tweaks so that the right batches automatically get
loaded and associated with Product
objects. The nice thing is, the Repository
pattern means we don’t have to worry about that yet. We can just use
our FakeRepository
and then feed through the new model into our service
layer to see how it looks with Product
as its main entrypoint:
ORM 层需要进行一些调整,以便正确的批次能够自动加载并关联到 Product
对象上。值得庆幸的是,仓储模式让我们暂时无需担心这些问题。
我们可以直接使用我们的 FakeRepository
,然后将新模型传递到服务层,来看看以 Product
作为主要入口点时的表现:
def add_batch(
ref: str, sku: str, qty: int, eta: Optional[date],
uow: unit_of_work.AbstractUnitOfWork,
):
with uow:
product = uow.products.get(sku=sku)
if product is None:
product = model.Product(sku, batches=[])
uow.products.add(product)
product.batches.append(model.Batch(ref, sku, qty, eta))
uow.commit()
def allocate(
orderid: str, sku: str, qty: int,
uow: unit_of_work.AbstractUnitOfWork,
) -> str:
line = OrderLine(orderid, sku, qty)
with uow:
product = uow.products.get(sku=line.sku)
if product is None:
raise InvalidSku(f"Invalid sku {line.sku}")
batchref = product.allocate(line)
uow.commit()
return batchref
那么性能如何呢?
We’ve mentioned a few times that we’re modeling with aggregates because we want to have high-performance software, but here we are loading all the batches when we only need one. You might expect that to be inefficient, but there are a few reasons why we’re comfortable here.
我们已经多次提到,使用聚合建模是因为我们想要构建高性能的软件。但现在我们在只需要一个批次时却加载了 所有 的批次。 你可能会觉得这样做效率不高,但这里有几个理由让我们对此感到放心。
First, we’re purposefully modeling our data so that we can make a single query to the database to read, and a single update to persist our changes. This tends to perform much better than systems that issue lots of ad hoc queries. In systems that don’t model this way, we often find that transactions slowly get longer and more complex as the software evolves.
首先,我们有意对数据进行建模,以便能够通过单一查询从数据库读取数据,并通过单次更新来持久化我们的更改。 这种方式的性能通常远胜于那些发出大量临时查询的系统。在未按这种方式建模的系统中,我们经常发现事务随着软件的发展会变得越来越长、越来越复杂。
Second, our data structures are minimal and comprise a few strings and integers per row. We can easily load tens or even hundreds of batches in a few milliseconds.
其次,我们的数据结构是极简的,每行仅包含少量字符串和整数。我们可以轻松地在几毫秒内加载数十甚至数百个批次。
Third, we expect to have only 20 or so batches of each product at a time. Once a batch is used up, we can discount it from our calculations. This means that the amount of data we’re fetching shouldn’t get out of control over time.
第三,我们预计每种产品同时只有大约 20 个批次。一旦某个批次被用完,就可以将其从我们的计算中排除。 这意味着我们获取的数据量不会随着时间的推移而失控。
If we did expect to have thousands of active batches for a product, we’d have a couple of options. For one, we could use lazy-loading for the batches in a product. From the perspective of our code, nothing would change, but in the background, SQLAlchemy would page through data for us. This would lead to more requests, each fetching a smaller number of rows. Because we need to find only a single batch with enough capacity for our order, this might work pretty well.
如果我们 确实 预计某个产品会有数千个活动批次,我们会有几个选项可供选择。例如,我们可以对产品中的批次使用延迟加载(lazy-loading)。 从我们代码的角度来看,这不会引起任何变化,但在后台,SQLAlchemy 会为我们分页加载数据。这将导致多次请求,每次请求获取较少的行数。 因为我们只需要找到一个能够满足订单容量的批次,这种方法可能会非常有效。
You’ve just seen the main top layers of the code, so this shouldn’t be too hard,
but we’d like you to implement the Product
aggregate starting from Batch
,
just as we did.
你刚刚看到了代码的主要顶层结构,所以这应该不会太难。我们希望你从`Batch`开始实现`Product`聚合,就像我们做的一样。
Of course, you could cheat and copy/paste from the previous listings, but even if you do that, you’ll still have to solve a few challenges on your own, like adding the model to the ORM and making sure all the moving parts can talk to each other, which we hope will be instructive.
当然,你可以通过复制/粘贴之前的代码清单来“作弊”,但即使这样,你仍然需要自行解决一些挑战, 比如将模型添加到 ORM 中,并确保所有组件能够相互通信。我们希望这些步骤对你有所启发。
You’ll find the code on GitHub.
We’ve put in a "cheating" implementation in the delegates to the existing
allocate()
function, so you should be able to evolve that toward the real
thing.
你可以在 GitHub上 找到代码。
我们在委托中放入了一个“作弊”的实现,委托给了现有的 allocate()
函数,所以你应该能够将其逐步完善为真正的实现。
We’ve marked a couple of tests with @pytest.skip()
. After you’ve read the
rest of this chapter, come back to these tests to have a go at implementing
version numbers. Bonus points if you can get SQLAlchemy to do them for you by
magic!
我们使用 @pytest.skip()
标记了几个测试。在你阅读完本章的剩余部分后,可以回过头来尝试实现版本号。
如果你能让 SQLAlchemy 魔法般地为你完成这些工作,那就额外加分!
If all else failed, we’d just look for a different aggregate. Maybe we could split up batches by region or by warehouse. Maybe we could redesign our data access strategy around the shipment concept. The Aggregate pattern is designed to help manage some technical constraints around consistency and performance. There isn’t one correct aggregate, and we should feel comfortable changing our minds if we find our boundaries are causing performance woes.
如果其他方法都失败了,我们可以尝试寻找一个不同的聚合方式。也许我们可以按照区域或仓储来划分批次,或者围绕发货的概念重新设计我们的数据访问策略。 聚合模式的目的是帮助应对一致性和性能相关的一些技术约束。并不存在 唯一 正确的聚合方式,如果我们发现定义的边界导致性能问题, 我们应该随时调整思路,不拘泥于现有方案。
使用版本号的乐观并发控制
We have our new aggregate, so we’ve solved the conceptual problem of choosing an object to be in charge of consistency boundaries. Let’s now spend a little time talking about how to enforce data integrity at the database level.
我们已经有了新的聚合,因此解决了选择负责一致性边界对象的概念性问题。现在,让我们花点时间讨论如何在数据库层面强制执行数据完整性。
Note
|
This section has a lot of implementation details; for example, some of it is Postgres-specific. But more generally, we’re showing one way of managing concurrency issues, but it is just one approach. Real requirements in this area vary a lot from project to project. You shouldn’t expect to be able to copy and paste code from here into production. 本节包含许多实现细节,例如,其中一些是特定于 Postgres 的。但更普遍来说,我们展示了一种管理并发问题的方法,不过这仅仅是一种方法。 实际需求在这一领域因项目而异。因此,你不应该期望能够将这里的代码直接复制粘贴到生产环境中使用。 |
We don’t want to hold a lock over the entire batches
table, but how will we
implement holding a lock over just the rows for a particular SKU?
我们不希望对整个 batches
表持有锁,但我们将如何实现仅对特定 SKU 的行持有锁呢?
One answer is to have a single attribute on the Product
model that acts as a marker for
the whole state change being complete and to use it as the single resource
that concurrent workers can fight over. If two transactions read the
state of the world for batches
at the same time, and both want to update
the allocations
tables, we force both to also try to update the
version_number
in the products
table, in such a way that only one of them
can win and the world stays consistent.
一个解决方法是在 Product
模型上设置一个单一属性,用作整个状态变更完成的标记,并将其作为并发工作者争用的唯一资源。
如果两个事务同时读取了 batches
的状态,并且都试图更新 allocations
表,
我们可以强制它们同时尝试更新 products
表中的 version_number
,以确保只有其中一个能成功,保持系统的一致性。
Sequence diagram: two transactions attempting a concurrent update on Product
(时序图:两个事务尝试并发更新产品) illustrates two concurrent
transactions doing their read operations at the same time, so they see
a Product
with, for example, version=3
. They both call Product.allocate()
in order to modify a state. But we set up our database integrity
rules such that only one of them is allowed to commit
the new Product
with version=4
, and the other update is rejected.
Sequence diagram: two transactions attempting a concurrent update on Product
(时序图:两个事务尝试并发更新产品) 图解说明了两个并发事务同时进行读取操作,因此它们会看到一个 Product
,例如,version=3
。
它们都会调用 Product.allocate()
来修改状态。但我们设置了数据库完整性规则,
以确保只有其中一个事务被允许 commit
带有 version=4
的新 Product
,而另一个更新会被拒绝。
Tip
|
Version numbers are just one way to implement optimistic locking. You
could achieve the same thing by setting the Postgres transaction isolation
level to SERIALIZABLE , but that often comes at a severe performance cost.
Version numbers also make implicit concepts explicit.
版本号只是实现乐观锁的一种方式。你也可以通过将 Postgres 的事务隔离级别设置为 SERIALIZABLE 来实现相同的效果,
但这样往往会带来严重的性能开销。而版本号则能将隐含的概念显式化。
|
Product
(时序图:两个事务尝试并发更新产品)[plantuml, apwp_0704, config=plantuml.cfg] @startuml scale 4 entity Model collections Transaction1 collections Transaction2 database Database Transaction1 -> Database: get product Database -> Transaction1: Product(version=3) Transaction2 -> Database: get product Database -> Transaction2: Product(version=3) Transaction1 -> Model: Product.allocate() Model -> Transaction1: Product(version=4) Transaction2 -> Model: Product.allocate() Model -> Transaction2: Product(version=4) Transaction1 -> Database: commit Product(version=4) Database -[#green]> Transaction1: OK Transaction2 -> Database: commit Product(version=4) Database -[#red]>x Transaction2: Error! version is already 4 @enduml
What we’ve implemented here is called optimistic concurrency control because our default assumption is that everything will be fine when two users want to make changes to the database. We think it’s unlikely that they will conflict with each other, so we let them go ahead and just make sure we have a way to notice if there is a problem.
我们在这里实现的被称为 乐观 并发控制,因为我们的默认假设是,当两个用户想要对数据库进行修改时,一切都会正常进行。 我们认为他们发生冲突的可能性很低,因此我们允许他们继续操作,只需确保我们有办法注意到是否存在 问题。
Pessimistic concurrency control works under the assumption that two users
are going to cause conflicts, and we want to prevent conflicts in all cases, so
we lock everything just to be safe. In our example, that would mean locking
the whole batches
table, or using SELECT FOR UPDATE—we’re pretending
that we’ve ruled those out for performance reasons, but in real life you’d
want to do some evaluations and measurements of your own.
悲观 并发控制基于以下假设:两个用户会引发冲突,因此我们希望在所有情况下都防止冲突发生,于是锁定所有内容以确保安全。
在我们的示例中,这将意味着锁定整个 batches
表,或者使用 SELECT FOR UPDATE。我们假设由于性能原因已经排除了这些选项,
但在实际情况下,你可能需要进行一些评估和测量来决定最佳方案。
With pessimistic locking, you don’t need to think about handling failures because the database will prevent them for you (although you do need to think about deadlocks). With optimistic locking, you need to explicitly handle the possibility of failures in the (hopefully unlikely) case of a clash.
使用悲观锁定时,你无需考虑处理失败的情况,因为数据库会为你防止这些失败(不过你需要考虑死锁问题)。而使用乐观锁定时, 你需要显式地处理在(希望是低概率的)冲突情况下可能出现的失败情况。
The usual way to handle a failure is to retry the failed operation from the
beginning. Imagine we have two customers, Harry and Bob, and each submits an order
for SHINY-TABLE
. Both threads load the product at version 1 and allocate
stock. The database prevents the concurrent update, and Bob’s order fails with
an error. When we retry the operation, Bob’s order loads the product at
version 2 and tries to allocate again. If there is enough stock left, all is
well; otherwise, he’ll receive OutOfStock
. Most operations can be retried this
way in the case of a concurrency problem.
处理失败的常见方式是从头开始重试失败的操作。想象一下,有两位客户,Harry 和 Bob,他们各自提交了一个 SHINY-TABLE
的订单。
两个线程都加载了版本为 1 的产品并分配了库存。数据库阻止了并发更新,结果 Bob 的订单因为错误而失败。当我们 重试 操作时,
Bob 的订单会加载版本为 2 的产品并再次尝试分配。如果还有足够的库存,一切就会正常完成;否则,他将收到 OutOfStock
的通知。
在大多数情况下,如果出现并发问题,操作都可以通过这种方式进行重试。
Read more on retries in [recovering_from_errors] and [footguns].
关于重试的更多内容,请参阅 [recovering_from_errors] 和 [footguns]。
实现版本号的选项
There are essentially three options for implementing version numbers:
实现版本号本质上有三种选项:
-
version_number
lives in the domain; we add it to theProduct
constructor, andProduct.allocate()
is responsible for incrementing it.version_number
存在于领域中;我们将其添加到Product
构造函数中,并由Product.allocate()
负责对其进行递增。 -
The service layer could do it! The version number isn’t strictly a domain concern, so instead our service layer could assume that the current version number is attached to
Product
by the repository, and the service layer will increment it before it does thecommit()
. 服务层也可以负责!版本号并不是 严格 的领域关注点,因此我们的服务层可以假设当前版本号是由仓储附加到Product
上的, 而服务层会在执行commit()
之前递增它。 -
Since it’s arguably an infrastructure concern, the UoW and repository could do it by magic. The repository has access to version numbers for any products it retrieves, and when the UoW does a commit, it can increment the version number for any products it knows about, assuming them to have changed. 由于可以说版本号是一个基础设施层的关注点,工作单元和仓储可以通过“魔法”来实现它。仓储能够访问它检索到的任何产品的版本号, 而当工作单元执行
commit
时,它可以对它已知的任何产品的版本号进行递增,假设这些产品已经发生了更改。
Option 3 isn’t ideal, because there’s no real way of doing it without having to assume that all products have changed, so we’ll be incrementing version numbers when we don’t have to.[1]
选项3并不理想,因为没有实际的方式可以实现它而不假设 所有 的产品都已被更改,因此我们会在不需要的情况下递增版本号。
脚注:[或许我们可以借助一些 ORM/SQLAlchemy 的魔法来告诉我们对象何时被修改,但在通用情况下这又该如何工作呢——例如对于一个 CsvRepository
?]
Option 2 involves mixing the responsibility for mutating state between the service layer and the domain layer, so it’s a little messy as well.
选项2将状态变更的职责混合到了服务层和领域层之间,因此也有点混乱。
So in the end, even though version numbers don’t have to be a domain concern, you might decide the cleanest trade-off is to put them in the domain:
因此,最终,即使版本号不 一定 是领域的关注点,你可能会决定最干净的权衡是将它们放入领域中:
class Product:
def __init__(self, sku: str, batches: List[Batch], version_number: int = 0): #(1)
self.sku = sku
self.batches = batches
self.version_number = version_number #(1)
def allocate(self, line: OrderLine) -> str:
try:
batch = next(b for b in sorted(self.batches) if b.can_allocate(line))
batch.allocate(line)
self.version_number += 1 #(1)
return batch.reference
except StopIteration:
raise OutOfStock(f"Out of stock for sku {line.sku}")
-
There it is! 就是这样!
Tip
|
If you’re scratching your head at this version number business, it might
help to remember that the number isn’t important. What’s important is
that the Product database row is modified whenever we make a change to the
Product aggregate. The version number is a simple, human-comprehensible way
to model a thing that changes on every write, but it could equally be a
random UUID every time.
如果你对这个版本号的概念感到困惑,记住这一点可能会有所帮助:版本号本身并不重要。重要的是,每当我们对 Product 聚合进行修改时,
Product 数据库行都会被更新。版本号是一种简单且易于理解的方式,用来表示每次写操作都会发生变化的事物,但它同样也可以是每次生成的随机 UUID。
|
测试我们的数据完整性规则
Now to make sure we can get the behavior we want: if we have two
concurrent attempts to do allocation against the same Product
, one of them
should fail, because they can’t both update the version number.
现在要确保我们能够获得所需的行为:如果有两个并发操作试图对同一个 Product
进行分配,其中一个操作应该失败,因为它们无法同时更新版本号。
First, let’s simulate a "slow" transaction using a function that does allocation and then does an explicit sleep:[2]
首先,让我们通过一个函数来模拟一个“慢”事务,该函数会先进行分配操作,然后显式地调用 sleep:脚注:[在我们的用例中,time.sleep()
很有效,
但它并不是重现并发错误最可靠或最高效的方法。可以考虑使用信号量(semaphores)或类似的线程间同步原语,以更好地保证行为的一致性。]
def try_to_allocate(orderid, sku, exceptions):
line = model.OrderLine(orderid, sku, 10)
try:
with unit_of_work.SqlAlchemyUnitOfWork() as uow:
product = uow.products.get(sku=sku)
product.allocate(line)
time.sleep(0.2)
uow.commit()
except Exception as e:
print(traceback.format_exc())
exceptions.append(e)
Then we have our test invoke this slow allocation twice, concurrently, using threads:
然后,我们的测试会使用线程同时调用这个慢速分配函数两次:
def test_concurrent_updates_to_version_are_not_allowed(postgres_session_factory):
sku, batch = random_sku(), random_batchref()
session = postgres_session_factory()
insert_batch(session, batch, sku, 100, eta=None, product_version=1)
session.commit()
order1, order2 = random_orderid(1), random_orderid(2)
exceptions = [] # type: List[Exception]
try_to_allocate_order1 = lambda: try_to_allocate(order1, sku, exceptions)
try_to_allocate_order2 = lambda: try_to_allocate(order2, sku, exceptions)
thread1 = threading.Thread(target=try_to_allocate_order1) #(1)
thread2 = threading.Thread(target=try_to_allocate_order2) #(1)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
[[version]] = session.execute(
"SELECT version_number FROM products WHERE sku=:sku",
dict(sku=sku),
)
assert version == 2 #(2)
[exception] = exceptions
assert "could not serialize access due to concurrent update" in str(exception) #(3)
orders = session.execute(
"SELECT orderid FROM allocations"
" JOIN batches ON allocations.batch_id = batches.id"
" JOIN order_lines ON allocations.orderline_id = order_lines.id"
" WHERE order_lines.sku=:sku",
dict(sku=sku),
)
assert orders.rowcount == 1 #(4)
with unit_of_work.SqlAlchemyUnitOfWork() as uow:
uow.session.execute("select 1")
-
We start two threads that will reliably produce the concurrency behavior we want:
read1, read2, write1, write2
. 我们启动两个线程,这将可靠地重现我们想要的并发行为:read1, read2, write1, write2
。 -
We assert that the version number has been incremented only once. 我们断言版本号只增加了一次。
-
We can also check on the specific exception if we like. 如果需要,我们还可以检验具体的异常情况。
-
And we double-check that only one allocation has gotten through. 我们进一步确认只有一个分配操作成功了。
通过使用数据库事务隔离级别来强制执行并发规则
To get the test to pass as it is, we can set the transaction isolation level on our session:
为了让测试按预期通过,我们可以在会话上设置事务隔离级别:
DEFAULT_SESSION_FACTORY = sessionmaker(
bind=create_engine(
config.get_postgres_uri(),
isolation_level="REPEATABLE READ",
)
)
Tip
|
Transaction isolation levels are tricky stuff, so it’s worth spending time understanding the Postgres documentation.[3] 事务隔离级别是比较复杂的内容,因此值得花些时间阅读和理解 Postgres 文档。脚注:例子。] |
悲观并发控制示例:SELECT FOR UPDATE
There are multiple ways to approach this, but we’ll show one. SELECT FOR UPDATE
produces different behavior; two concurrent transactions will not be allowed to
do a read on the same rows at the same time:
有多种方法可以实现这一点,但我们将展示其中一种方法。 SELECT FOR UPDATE
会产生不同的行为:两个并发事务将不能同时读取相同的行:
SELECT FOR UPDATE
is a way of picking a row or rows to use as a lock
(although those rows don’t have to be the ones you update). If two
transactions both try to SELECT FOR UPDATE
a row at the same time, one will
win, and the other will wait until the lock is released. So this is an example
of pessimistic concurrency control.
SELECT FOR UPDATE
是一种选择一行或多行用作锁的方法(尽管这些行不一定是你要更新的行)。
如果两个事务同时尝试对同一行执行 SELECT FOR UPDATE
,其中一个会成功,而另一个则会等待直到锁被释放。因此,这就是一个悲观并发控制的示例。
Here’s how you can use the SQLAlchemy DSL to specify FOR UPDATE
at
query time:
以下是如何使用 SQLAlchemy 的 DSL 在查询时指定 FOR UPDATE
:
def get(self, sku):
return (
self.session.query(model.Product)
.filter_by(sku=sku)
.with_for_update()
.first()
)
This will have the effect of changing the concurrency pattern from
这会将并发模式从以下方式改变:
read1, read2, write1, write2(fail)
to
read1, write1, read2, write2(succeed)
Some people refer to this as the "read-modify-write" failure mode. Read "PostgreSQL Anti-Patterns: Read-Modify-Write Cycles" for a good overview.
有些人将这种模式称为“读-修改-写”失败模式。阅读 "PostgreSQL Anti-Patterns: Read-Modify-Write Cycles" 以获得一个很好的 概述。
We don’t really have time to discuss all the trade-offs between REPEATABLE READ
and SELECT FOR UPDATE
, or optimistic versus pessimistic locking in general.
But if you have a test like the one we’ve shown, you can specify the behavior
you want and see how it changes. You can also use the test as a basis for
performing some performance experiments.
我们没有足够的时间来详细讨论 REPEATABLE READ
和 SELECT FOR UPDATE
之间的所有权衡,或者一般情况下乐观锁与悲观锁的对比。
但如果你有一个像我们展示的那样的测试,你可以指定你想要的行为并观察其变化。你还可以将该测试作为进行一些性能实验的基础。
总结
Specific choices around concurrency control vary a lot based on business circumstances and storage technology choices, but we’d like to bring this chapter back to the conceptual idea of an aggregate: we explicitly model an object as being the main entrypoint to some subset of our model, and as being in charge of enforcing the invariants and business rules that apply across all of those objects.
关于并发控制的具体选择因业务环境和存储技术的不同而存在很大差异,但我们希望将本章的重点回归到聚合的概念性思想上: 我们通过显式建模将一个对象作为模型中某个子集的主要入口,并将其负责强制执行适用于所有这些对象的不变量和业务规则。
Choosing the right aggregate is key, and it’s a decision you may revisit over time. You can read more about it in multiple DDD books. We also recommend these three online papers on effective aggregate design by Vaughn Vernon (the "red book" author).
选择合适的聚合是关键,这一决策可能会随着时间的推移而不断重新评估。有关更多内容,你可以查阅多本 DDD(领域驱动设计)相关的书籍。 我们还推荐阅读 Vaughn Vernon(“红皮书”作者)撰写的关于 有效的聚合设计 的三篇在线论文。
Aggregates: the trade-offs(聚合:权衡取舍) has some thoughts on the trade-offs of implementing the Aggregate pattern.
Aggregates: the trade-offs(聚合:权衡取舍) 提供了一些关于实现聚合模式时权衡取舍的思考。
Pros(优点) | Cons(缺点) |
---|---|
|
|
- Aggregates are your entrypoints into the domain model(聚合是你进入领域模型的入口点)
-
By restricting the number of ways that things can be changed, we make the system easier to reason about. 通过限制可以更改事物的方式数量,我们使系统更容易理解。
- Aggregates are in charge of a consistency boundary(聚合负责一致性边界)
-
An aggregate’s job is to be able to manage our business rules about invariants as they apply to a group of related objects. It’s the aggregate’s job to check that the objects within its remit are consistent with each other and with our rules, and to reject changes that would break the rules. 聚合的职责是管理与一组相关对象相关的不变量业务规则。聚合的任务是检查其管辖范围内的对象之间以及它们与我们的规则之间的一致性, 并拒绝那些会破坏规则的更改。
- Aggregates and concurrency issues go together(聚合与并发问题密切相关)
-
When thinking about implementing these consistency checks, we end up thinking about transactions and locks. Choosing the right aggregate is about performance as well as conceptual organization of your domain. 在考虑实现这些一致性检查时,我们最终会涉及事务和锁的思考。选择合适的聚合不仅关系到性能,还涉及领域概念的组织。
第一部分回顾
Do you remember A component diagram for our app at the end of Part I(第一部分结束时我们应用程序的组件图), the diagram we showed at the beginning of [part1] to preview where we were heading?
你还记得 A component diagram for our app at the end of Part I(第一部分结束时我们应用程序的组件图) 吗?这是我们在 [part1] 开头展示的一个图,用来预览我们的学习方向。
So that’s where we are at the end of Part I. What have we achieved? We’ve seen how to build a domain model that’s exercised by a set of high-level unit tests. Our tests are living documentation: they describe the behavior of our system—the rules upon which we agreed with our business stakeholders—in nice readable code. When our business requirements change, we have confidence that our tests will help us to prove the new functionality, and when new developers join the project, they can read our tests to understand how things work.
这就是我们在第一部分结束时所处的位置。我们取得了哪些成就呢?我们已经了解了如何构建由一组高层次单元测试驱动的领域模型。 我们的测试是活的文档:它们以清晰可读的代码描述了我们系统的行为——那些我们与业务相关方达成一致的规则。当业务需求发生变化时, 我们有信心相信测试将帮助我们验证新的功能;而当新开发者加入项目时,他们可以阅读我们的测试以了解系统是如何工作的。
We’ve decoupled the infrastructural parts of our system, like the database and API handlers, so that we can plug them into the outside of our application. This helps us to keep our codebase well organized and stops us from building a big ball of mud.
我们已经将系统的基础设施部分(如数据库和 API 处理程序)解耦,使其能够作为外部组件连接到我们的应用程序。这有助于保持代码库的良好组织, 防止我们构建出一团混乱的代码结构。
By applying the dependency inversion principle, and by using ports-and-adapters-inspired patterns like Repository and Unit of Work, we’ve made it possible to do TDD in both high gear and low gear and to maintain a healthy test pyramid. We can test our system edge to edge, and the need for integration and end-to-end tests is kept to a minimum.
通过应用依赖反转原则,并使用类似于端口和适配器(Ports-and-Adapters)模式的设计,如仓储(Repository)和工作单元(Unit of Work), 我们实现了在高效模式和低效模式下进行测试驱动开发(TDD)的可能性,并维护了一个健康的测试金字塔。我们可以从头到尾测试我们的系统, 同时将对集成测试和端到端测试的需求降至最低。
Lastly, we’ve talked about the idea of consistency boundaries. We don’t want to lock our entire system whenever we make a change, so we have to choose which parts are consistent with one another.
最后,我们讨论了一致性边界的概念。我们不希望在每次进行更改时都锁定整个系统,因此必须选择哪些部分需要彼此保持一致。
For a small system, this is everything you need to go and play with the ideas of domain-driven design. You now have the tools to build database-agnostic domain models that represent the shared language of your business experts. Hurrah!
对于一个小型系统来说,这已经是探索领域驱动设计(DDD)理念所需的一切了。你现在拥有了构建与数据库无关的领域模型的工具, 这些模型能够体现你的业务专家之间的通用语言。芜湖!
Note
|
At the risk of laboring the point—we’ve been at pains to point out that each pattern comes at a cost. Each layer of indirection has a price in terms of complexity and duplication in our code and will be confusing to programmers who’ve never seen these patterns before. If your app is essentially a simple CRUD wrapper around a database and isn’t likely to be anything more than that in the foreseeable future, you don’t need these patterns. Go ahead and use Django, and save yourself a lot of bother. 冒着重复强调这一点的风险——我们一直致力于指出,每种模式都伴随着一定的代价。每一层间接抽象都会在代码中带来复杂性和重复性, 同时也会让从未见过这些模式的程序员感到困惑。如果你的应用本质上只是一个围绕数据库的简单 CRUD 封装,并且在可预见的未来也不会变得比这更复杂, 你完全不需要这些模式。尽管使用 Django 吧,这样可以为自己省去许多麻烦。 |
In Part II, we’ll zoom out and talk about a bigger topic: if aggregates are our boundary, and we can update only one at a time, how do we model processes that cross consistency boundaries?
在第二部分,我们将放大视角,讨论一个更大的主题:如果聚合是我们的边界,并且我们一次只能更新一个,那么我们该如何为跨越一致性边界的流程建模?
CsvRepository
?
time.sleep()
works well in our use case, but it’s not the most reliable or efficient way to reproduce concurrency bugs. Consider using semaphores or similar synchronization primitives shared between your threads to get better guarantees of behavior.
SERIALIZABLE
is equivalent to Postgres’s REPEATABLE READ
, for example.