当前位置: 移动技术网 > IT编程>软件设计>领域驱动 > CQRS之旅——旅程6(我们系统的版本管理)

CQRS之旅——旅程6(我们系统的版本管理)

2019年06月27日  | 移动技术网IT编程  | 我要评论

旅程6:我们系统的版本管理

准备下一站:升级和迁移

“变化是生活的调味品。”威廉·考珀

此阶段的最高目标是了解如何升级包含实现cqrs模式和事件源的限界上下文的系统。团队在这一阶段实现的用户场景包括对代码的更改和对数据的更改:更改了一些现有的数据模式并添加了新的数据模式。除了升级系统和迁移数据外,团队还计划在没有停机时间的情况下进行升级和迁移,以便在microsoft azure中运行实时系统。

本章的工作术语定义:

本章使用了一些术语,我们将在下面进行描述。有关更多细节和可能的替代定义,请参阅参考指南中的“深入cqrs和es”。

  • command(命令):命令是要求系统执行更改系统状态的操作。命令是必须服从(执行)的一种指令,例如:makeseatreservation。在这个限界上下文中,命令要么来自用户发起请求时的ui,要么来自流程管理器(当流程管理器指示聚合执行某个操作时)。单个接收方处理一个命令。命令总线(command bus)传输命令,然后命令处理程序将这些命令发送到聚合。发送命令是一个没有返回值的异步操作。

  • 事件(event):一个事件,比如orderconfirmed,描述了系统中发生的一些事情,通常是一个命令的结果。领域模型中的聚合引发事件。事件也可以来自其他限界上下文。多个订阅者可以处理特定的事件。聚合将事件发布到事件总线。处理程序在事件总线上注册特定类型的事件,然后将事件传递给订阅服务器。在订单和注册限界上下文中,订阅者是流程管理器和读取模型生成器。

  • 幂等性(idempotency):幂等性是一个操作的特性,这意味着该操作可以多次应用而不改变结果。例如,“将x的值设置为10”的操作是幂等的,而“将x的值加1”的操作不是幂等的。在消息传递环境中,如果消息可以多次传递而不改变结果,则消息是幂等的:这可能是因为消息本身的性质,也可能是因为系统处理消息的方式。

用户故事:

在这个过程的这个阶段,团队实现了下面描述的用户故事。

不停机升级

v2版本的目标是升级系统,包括任何必要的数据迁移,而不需要把系统停机。如果这在当前实现中不可行,那么停机时间应该最小化,并且应该修改系统,以便在将来支持零停机时间升级(从v3版本开始)。

beth(业务经理)发言:

确保我们能够在不停机的情况下进行升级,这对我们在市场中的信誉至关重要。

显示剩余座位数量

目前,当注册者创建一个订单时,没有显示每种座位类型的剩余座位数量。当注册者选择购买座位时,ui应该显示此信息。

处理不需要付费的座位

目前,当注册者选择不需要付费的座位时,ui流仍然会将注册者带到支付页面,即使不需要支付任何费用。系统应该检测什么时候没有支付,并调整流程,让注册者直接进入订单的确认页面。

架构

该应用程序旨在部署到microsoft azure。在旅程的那个阶段,应用程序由两个角色组成,一个包含asp.net mvc web应用程序的web角色和一个包含消息处理程序和领域对象的工作角色。应用程序在写端和读端都使用azure sql database实例进行数据存储。应用程序使用azure服务总线来提供其消息传递基础设施。下图展示了这个高级体系结构。

在研究和测试解决方案时,可以在本地运行它,可以使用azure compute emulator,也可以直接运行mvc web应用程序,并运行承载消息处理程序和领域域对象的控制台应用程序。在本地运行应用程序时,可以使用本地sql server express数据库,并使用一个在sql server express数据库实现的简单的消息传递基础设施。

有关运行应用程序的选项的更多信息,请参见附录1“”。

模式和概念

在旅程的这个阶段,团队处理的大多数关键挑战都与如何最好地执行从v1到v2的迁移有关。本节将介绍其中的一些挑战。

处理“事件定义发生更改"的情况

当团队检查v2的发布需求,很明显,我们需要改变在订单和注册限界上下文中使用的一些事件来适应一些新特性:registrationprocessmanager将会改变,当订单有一个不需要付费的座位时系统将提供一个更好的用户体验。

订单和注册限界上下文使用事件源,因此在迁移到v2之后,事件存储将包含旧事件,但将开始保存新事件。当系统事件被重放时,系统必须能正确处理所有的旧事件和新事件。

团队考虑了两种方法来处理系统中的这类更改。

在基础设施中进行事件映射或过滤

在基础设施中映射和过滤事件消息是一种选择。此方法是对旧的事件消息和消息格式进行处理,在它们到达领域之前在基础设施的某个位置处理它们。您可以过滤掉不再相关的旧消息,并使用映射将旧格式的消息转换为新格式。这种方法最初比较复杂,因为它需要对基础设施进行更改,但是它可以保持领域域的纯粹,领域只需要理解当前的新事件集合就可以了。

在聚合中处理多个版本的消息

在聚合中处理多个版本的消息是另一种选择。在这种方法中,所有消息类型(包括旧消息和新消息)都传递到领域,每个聚合必须能够处理旧消息和新消息。从短期来看,这可能是一个合适的策略,但它最终会导致域模型受到遗留事件处理程序的污染。

团队为v2版本选择了这个选项,因为它包含了最少数量的代码更改。

jana(软件架构师)发言:

当前在聚合中处理旧事件和新事件并不妨碍您以后使用第一种选择:在基础设施中使用映射/过滤机制。

履行消息幂等性

v2版本中要解决的一个关键问题是使系统更加健壮。在v1版本中,在某些场景中,可能会多次处理某些消息,导致系统中的数据不正确或不一致。

jana(软件架构师)发言:

消息幂等性在任何使用消息传递的系统中都很重要,这不仅仅是在实现cqrs模式或使用事件源的系统中。

在某些场景中,设计幂等消息是可能的,例如:使用“将座位配额设置为500”的消息,而不是“在座位配额中增加100”的消息。您可以安全地多次处理第一个消息,但不能处理第二个消息。

然而,并不总是能够使用幂等消息,因此团队决定使用azure服务总线的重复删除特性,以确保它只传递一次消息。团队对基础设施进行了一些更改,以确保azure服务总线能够检测重复消息,并配置azure服务总线来执行重复消息检测。

要了解contoso是如何实现这一点的,请参阅下面的“不让命令消息重复”一节。此外,我们需要考虑系统中的消息处理程序如何从队列和topic检索消息。当前的方法使用azure服务总线peek/lock机制。这是一个分成三个阶段的过程:

  1. 处理程序从队列或topic检索消息,并在其中留下消息的锁定副本。其他客户端无法看到或访问锁定的消息。
  2. 处理程序处理消息。
  3. 处理程序从队列中删除锁定的消息。如果锁定的消息在固定时间后没有解锁或删除,则解锁该消息并使其可用,以便再次检索。

如果步骤由于某种原因失败,这意味着系统可以不止一次地处理消息。

jana(软件架构师)发言:

该团队计划在旅程的下一阶段解决这个问题(步骤失败的问题)。更多信息,请参见第7章“”。

阻止多次处理事件

在v1中,在某些场景里,如果在处理事件时发生错误,系统可能多次处理事件。为了避免这种情况,团队修改了体系结构,以便每个事件处理程序都有自己对azure topic的订阅。下图显示了两个不同的模型。

在v1中,可能发生以下行为:

  1. eventprocessor实例从服务总线中的所有订阅者那里接收到orderplaced事件。
  2. eventprocessor实例有两个已注册的处理程序,registrationprocessmanagerrouterorderviewmodelgenerator处理程序类,所以会在两个里都触发调用handle方法。
  3. orderviewmodelgenerator类中的handle方法执行成功。
  4. registrationprocessmanagerrouter类中的handle方法抛出异常。
  5. eventprocessor实例捕获到异常然后抛弃掉事件消息。消息将自动放回订阅中。
  6. eventprocessor实例第二次从所有订阅者那里接收到orderplaced事件。
  7. 事件又触发两个处理方法,导致registrationprocessmanagerrouter类和orderviewmodelgenerator第二次处理事件消息。
  8. 每当registrationprocessmanagerrouter类抛出异常时,orderviewmodelgenerator类都会触发处理该事件。

在v2模型中,如果处理程序类抛出异常,eventprocessor实例将事件消息放回与该处理程序类关联的订阅。重试逻辑现在只会导致eventprocessor实例重试引发异常的处理程序,因此没有其他处理程序会重新处理消息。

集成事件的持久化

在v1版本中提出的一个问题是,系统如何持久化从会议管理限界上下文发送到订单和注册限界上下文的集成事件。这些事件包括关于会议创建和发布的信息,以及座位类型和配额更改的详细信息。

在v1版本中,订单和注册上下文中的conferenceviewmodelgenerator类通过更新视图模型并向seatsavailability聚合发送命令来处理这些事件,以告诉它更改座位配额值。

这种方法意味着订单和注册限界上下文不存储任何历史记录,这可能会导致问题。例如,其他视图从这里中查找座椅类型描述时,这里只包含座椅类型描述的最新值。因此,在其他地方重播一组事件可能会重新生成另一个包含不正确座椅类型描述的读取模型投影。

团队考虑了以下五个方法来纠正这种情况:

  • 将所有事件保存在原始限界上下文中(会议管理限界上下文中),并使用共享的事件存储,订单和注册限界上下文中可以访问该存储来重播这些事件。接收限界上下文可以重放事件流,直到它需要查看的之前的座椅类型描述时为止。
  • 当所有事件到达接收限界上下文(订单和注册限界上下文)时保存它们。
  • 让视图模型生成器中的命令处理程序保存事件,只选择它需要的那些。
  • 让视图模型生成器中的命令处理程序保存不同的事件,实际上就是为此视图模型使用事件源。
  • 将来自所有限界上下文的所有命令和事件消息存储在消息日志中。

第一种选择并不总是可行的。在这种特殊情况下,它可以工作,因为同一个团队同时实现了限界上下文和基础设施,使得使用共享事件存储变得很容易。

gary(cqrs专家)发言:

尽管从纯粹主义者的角度来看,第一个选项破坏了限界上下文之间的严格隔离,但在某些场景中,它可能是一个可接受的实用解决方案。

第三种选择可能存在的风险是,所需的事件集合可能在未来发生变化。如果我们现在不保存事件,它们将永远丢失。

尽管第五个选项存储了所有命令和事件,其中一些可能永远都不需要再次引用,但它确实提供了一个完整的日志,记录了系统中发生的所有事情。这对于故障诊断很有用,还可以帮助您满足尚未确定的需求。该团队选择了这个选项而不是选项二,因为它提供了一个更通用的机制,可能具有未来的好处。

持久化事件的目的是,当订单和注册上下文需要有关当前座位配额的信息时,可以回放这些事件,以便计算剩余座位的数量。要一致地计算这些数字,必须始终以相同的顺序回放事件。这种顺序有几种选择:

  • 会议管理限界上下文发送事件的顺序。
  • 订单和注册上下文接收事件的顺序。
  • 订单和注册上下文处理事件的顺序。

大多数情况下,这些顺序是相同的。没有什么正确的顺序。你只需要选择一个和它保持一致就行了。因此,选择由简单性决定。在本例中,最简单的方法是按照订单和注册限界上下文中处理程序接收事件的顺序持久化事件(第二个选项)。

markus(软件开发人员)发言:

这种选择通常不会出现在事件源中。每个聚合会都以固定的顺序创建事件,这就是系统用于持久存储事件的顺序。在此场景中,集成事件不是由单个聚合创建的。

为这些事件保存时间戳也有类似的问题。如果将来需要查看特定时间剩余的座位数量,那么时间戳可能会很有用。这里的选择是,当事件在会议管理限界上下文中创建时,还是在订单和注册限界上下文中接收时,应该创建时间戳?当会议管理限界上下文创建事件时,订单和注册限界上下文可能由于某种原因离线。因此,团队决定在会议管理有界上下文发布事件时创建时间戳。

消息排序

团队创建并运行来验证v1版本的验收测试,凸显出了消息排序的一个潜在问题:执行会议管理限界上下文的验收测试向订单和注册限界上下文发送了一系列命令,这些命令有时会出现顺序错误。

markus(软件开发人员)发言:

当人类用户真实测试系统的这一部分时,不太会注意到这种效果,因为发出命令的时间间隔要长得多,这使得消息不太可能无序地到达。

团队考虑了两种方法来确保消息以正确的顺序到达。

  • 第一个方法是使用消息会话,这是azure服务总线的一个特性。如果您使用消息会话,这将确保会话内的消息以与它们发送时相同的顺序传递。
  • 第二种方法是修改应用程序中的处理程序,通过使用发送消息时添加到消息中的序列号或时间戳来检测无序消息。如果接收处理程序检测到一条无序消息,它将拒绝该消息,并在处理了在被拒绝消息之前发送的消息之后,将其放回稍后处理的队列或topic。

在这种情况下,首选的解决方案是使用azure服务总线消息会话,因为这只需要对现有代码进行更少的更改。这两种方法都会给消息传递带来一些额外的延迟,但是团队并不认为这会对系统的性能产生显著的影响。

实现细节

本节描述订单和注册限界上下文的实现的一些重要功能。您可能会发现拥有一份代码拷贝很有用,这样您就可以继续学习了。您可以从下载一个副本,或者在github上查看存储库:。您可以从github上的tags页面下载v2版本的代码。

备注:不要期望代码示例与参考实现中的代码完全匹配。本章描述了cqrs过程中的一个步骤,随着我们了解更多并重构代码,实现可能会发生变化。

**添加对“不需要支付的订单”的支持

做出这一改变有三个具体的目标,它们都是相关的。我们希望:

  • 修改registrationprocessmanager类和相关聚合,以处理不需要支付的订单。
  • 修改ui中的导航,当订单不需要支付时跳过付款步骤。
  • 确保系统在升级到v2之后能够正确地工作,包括使用新事件和旧事件。

registrationprocessmanager类的更改

在此之前,registrationprocessmanager类在收到来自ui的注册者已完成支付的通知后发送了一个confirmorderpayment命令。现在,如果有一个不需要支付订单,ui将直接向订单聚合发送一个confirmorder命令。如果订单需要支付,registrationprocessmanager类在从ui接收到成功支付的通知后,再向订单聚合发送一个confirmorder命令。

jana(软件架构师)发言:

注意,命令的名称已从confirmorderpayment更改为confirmorder。这反映了订单不需要知道任何关于付款的信息。它只需要知道订单已经确认。类似地,现在有一个新的orderconfirmed事件用于替代旧的orderpaymentconfirmed事件。

当订单聚合接收到confirmorder命令时,它将引发一个orderconfirmed事件。除被持久化外,该事件还由以下对象处理:

  • orderviewmodelgenerator类,它在其中更新读取模型中的订单状态。
  • seatassignments聚合,在其中初始化一个新的seatassignments实例。
  • registrationprocessmanager类,它在其中触发一个提交座位预订的命令。

ui的更改

ui中的主要更改是在registrationcontroller mvc控制器类中的specifyregistrantandpaymentdetails action里的。之前,此action方法返回initiateregistrationwiththirdpartyprocessorpayment(action result)。现在,如果order对象的新isfreeofcharge属性为true,它将返回一个completeregistrationwithoutpayment(action result)。否则,它返回一个completeregistrationwiththirdpartyprocessorpayment(action result)。

[httppost]
public actionresult specifyregistrantandpaymentdetails(assignregistrantdetails command, string paymenttype, int orderversion)
{
    ...

    var pricedorder = this.orderdao.findpricedorder(orderid);
    if (pricedorder.isfreeofcharge)
    {
        return completeregistrationwithoutpayment(command, orderid);
    }

    switch (paymenttype)
    {
        case thirdpartyprocessorpayment:

            return completeregistrationwiththirdpartyprocessorpayment(command, pricedorder, orderversion);

        case invoicepayment:
            break;

        default:
            break;
    }

    ...
}

completeregistrationwiththirdpartyprocessorpayment将用户重定向到thirdpartyprocessorpayment action,completeregistrationwithoutpayment方法将用户直接重定向到thankyou action。

数据迁移

会议管理限界上下文在其azure sql数据库实例中的pricedorders表中存储来自订单和注册限界上下文的订单信息。以前,会议管理限界上下文接收orderpaymentconfirmed事件,现在它接收orderconfirmed事件,该事件包含一个附加的isfreeofcharge属性。这将成为数据库中的一个新列。

markus(软件开发人员)发言:

在迁移过程中,我们不需要修改该表中的现有数据,因为布尔值的默认值为false。所有现有条目都是在系统支持不需要付费的订单之前创建的。

在迁移过程中,任何正在运行的confirmorderpayment命令都可能丢失,因为它们不再由订单聚合处理。您应该验证当前的命令总线没有这些命令。

poe(it运维人员)发言:

我们需要仔细计划如何部署v2版本,以便确保所有现有的、正在运行的confirmorderpayment命令都由运行v1版本的工作角色实例处理。

系统将registrationprocessmanager类实例的状态保存到sql数据库表中。这个表的架构没有变化。迁移后您将看到的惟一更改是statevalue列中的一个新添加值。这反映了registrationprocessmanager类中的processstate枚举中额外的paymentconfirmationreceived值,如下面的代码示例所示:

public enum processstate
{
    notstarted = 0,
    awaitingreservationconfirmation = 1,
    reservationconfirmationreceived = 2,
    paymentconfirmationreceived = 3,
}

在v1版本中,事件源系统为订单聚合保存的事件包括orderpaymentconfirmed事件。因此,事件存储区包含此事件类型的实例。在v2版本中,orderpaymentconfirmed事件被替换为orderconfirmed事件。

团队决定在v2版本中,当反序列化事件时,不在基础设施级别映射和过滤事件。这意味着,当系统从事件存储中重播这些事件时,处理程序必须同时理解旧事件和新事件。下面的代码示例在seatassignmentshandler类中显示了这一点:

static seatassignmentshandler()
{
    mapper.createmap<orderpaymentconfirmed, orderconfirmed>();
}

public seatassignmentshandler(ieventsourcedrepository<order> ordersrepo, ieventsourcedrepository<seatassignments> assignmentsrepo)
{
    this.ordersrepo = ordersrepo;
    this.assignmentsrepo = assignmentsrepo;
}

public void handle(orderpaymentconfirmed @event)
{
    this.handle(mapper.map<orderconfirmed>(@event));
}

public void handle(orderconfirmed @event)
{
    var order = this.ordersrepo.get(@event.sourceid);
    var assignments = order.createseatassignments();
    assignmentsrepo.save(assignments);
}

您还可以在orderviewmodelgenerator类中看到同样的技术。

order类中的方法略有不同,因为这是持久化到事件存储中的事件之一。下面的代码示例显示了order类中受保护构造函数的一部分:

protected order(guid id)
    : base(id)
{
    ...
    base.handles<orderpaymentconfirmed>(e => this.onorderconfirmed(mapper.map<orderconfirmed>(e)));
    base.handles<orderconfirmed>(this.onorderconfirmed);
    ...
}

jana(软件架构师)发言:

以这种方式处理旧事件对于这个场景非常简单,因为惟一需要更改的是事件的名称。如果事件的属性也发生了变化,情况会更加复杂。将来,contoso将考虑在基础设施中进行映射,以避免遗留事件污染领域模型。

在ui中显示剩余座位

做出这一改变有三个具体的目标,它们都是相关的。我们想要:

  • 修改系统,在会议系统的读模型中包含每个座位类型的剩余座位数量信息。
  • 修改ui以显示每种座位类型的剩余座位数量。
  • 确保升级到v2后系统功能正常。

向读模型添加关于剩余座位数量的信息

系统要能显示剩余座位数量的信息来自两个地方:

  • 当业务客户创建新的座位类型或修改座位配额时,会议管理限界上下文将引发seatcreatedseatupdated事件。
  • 在订单和注册限界上下文中,当注册者创建一个订单的时候,可用座位(seatsavailability)聚合将引发seatsreserved、seatsreservationcancelled和availableseatschanged事件。

备注:conferenceviewmodelgenerator类不使用seatcreatedseatupdated事件。

订单和注册限界上下文中的conferenceviewmodelgenerator类现在处理这些事件,并使用它们来计算和存储读模型中的座位类型数量。下面的代码示例显示了conferenceviewmodelgenerator类中的相关处理程序:

public void handle(availableseatschanged @event)
{
    this.updateavailablequantity(@event, @event.seats);
}

public void handle(seatsreserved @event)
{
    this.updateavailablequantity(@event, @event.availableseatschanged);
}

public void handle(seatsreservationcancelled @event)
{
    this.updateavailablequantity(@event, @event.availableseatschanged);
}

private void updateavailablequantity(iversionedevent @event, ienumerable<seatquantity> seats)
{
    using (var repository = this.contextfactory.invoke())
    {
        var dto = repository.set<conference>().include(x => x.seats).firstordefault(x => x.id == @event.sourceid);
        if (dto != null)
        {
            if (@event.version > dto.seatsavailabilityversion)
            {
                foreach (var seat in seats)
                {
                    var seatdto = dto.seats.firstordefault(x => x.id == seat.seattype);
                    if (seatdto != null)
                    {
                        seatdto.availablequantity += seat.quantity;
                    }
                    else
                    {
                        trace.traceerror("failed to locate seat type read model being updated with id {0}.", seat.seattype);
                    }
                }

                dto.seatsavailabilityversion = @event.version;

                repository.save(dto);
            }
            else
            {
                trace.tracewarning ...
            }
        }
        else
        {
            trace.traceerror ...
        }
    }
}

updateavailablequantity方法将事件上的版本与读模型的当前版本进行比较,以检测可能的重复消息。

markus(软件开发人员)发言:

此检查仅检测重复的消息,而不是超出序列的消息。

修改ui以显示剩余的座位数量

现在,当ui向会议的读模型查询座位类型列表时,列表包括当前可用的座位数量。下面的代码示例显示了registrationcontroller mvc控制器如何使用seattype类的availablequantity

private orderviewmodel createviewmodel()
{
    var seattypes = this.conferencedao.getpublishedseattypes(this.conferencealias.id);
    var viewmodel =
        new orderviewmodel
        {
            conferenceid = this.conferencealias.id,
            conferencecode = this.conferencealias.code,
            conferencename = this.conferencealias.name,
            items =
                seattypes.select(
                    s =>
                        new orderitemviewmodel
                        {
                            seattype = s,
                            orderitem = new draftorderitem(s.id, 0),
                            availablequantityfororder = s.availablequantity,
                            maxselectionquantity = math.min(s.availablequantity, 20)
                        }).tolist(),
        };

    return viewmodel;
}

数据迁移

保存会议读模型数据的数据库有一个新列来保存用于检查重复事件的版本号,而保存座位类型读模型数据有一个新列来保存可用的座椅数量。

作为数据迁移的一部分,有必要为每个可用座位(seatsavailability)聚合重放事件存储中的所有事件,以便正确计算可用数量。

不让命令消息重复

系统目前使用azure服务总线传输消息。当系统从conferenceprocessor类的启动代码初始化azure服务总线时,它配置topic来检测重复的消息,如下面的servicebusconfig类的代码示例所示:

private void createtopicifnotexists() 
{     
    var topicdescription =         
        new topicdescription(this.topic)         
        {             
            requiresduplicatedetection = true,
            duplicatedetectionhistorytimewindow = topic.duplicatedetectionhistorytimewindow,         
        };     
    try     
    {         
        this.namespacemanager.createtopic(topicdescription);     
    }     
    catch (messagingentityalreadyexistsexception) { } 
} 
备注:您可以在settings.xml文件中配置duplicatedetectionhistorytimewindow
可以向topic元素添加这个属性。默认值是1小时。

但是,为了使重复检测工作正常,您必须确保每个消息都有一个惟一的id。下面的代码示例显示了markseatsasreserved命令:

public class markseatsasreserved : icommand
{
    public markseatsasreserved()
    {
        this.id = guid.newguid();
        this.seats = new list<seatquantity>();
    }

    public guid id { get; set; }

    public guid orderid { get; set; }

    public list<seatquantity> seats { get; set; }

    public datetime expiration { get; set; }
}

commandbus类中的buildmessage方法使用命令id创建一个惟一的消息id, azure服务总线可以使用这个消息id来检测重复:

private brokeredmessage buildmessage(envelope command) 
{ 
    var stream = new memorystream(); 
    ...

    var message = new brokeredmessage(stream, true);
    if (!default(guid).equals(command.body.id))
    {
        message.messageid = command.body.id.tostring();
    }

...

    return message;
} 

保证消息顺序

团队决定使用azure服务总线消息会话来保证系统中的消息顺序。

系统从conferenceprocessor类中的onstart方法配置azure服务总线topic和订阅。settings.xml配置文件中的配置指定了具体的订阅使用会话。servicebusconfig类中的以下代码示例显示了系统如何创建和配置订阅。

private void createsubscriptionifnotexists(namespacemanager namespacemanager, topicsettings topic, subscriptionsettings subscription)
{
    var subscriptiondescription =
        new subscriptiondescription(topic.path, subscription.name)
        {
            requiressession = subscription.requiressession
        };

    try
    {
        namespacemanager.createsubscription(subscriptiondescription);
    }
    catch (messagingentityalreadyexistsexception) { }
}

以下来自sessionsubscriptionreceiver类的代码示例演示了如何使用会话接收消息:

private void receivemessages(cancellationtoken cancellationtoken)
{
    while (!cancellationtoken.iscancellationrequested)
    {
        messagesession session;
        try
        {
            session = this.receiveretrypolicy.executeaction<messagesession>(this.doacceptmessagesession);
        }
        catch (exception e)
        {
            ...
        }

        if (session == null)
        {
            thread.sleep(100);
            continue;
        }


        while (!cancellationtoken.iscancellationrequested)
        {
            brokeredmessage message = null;
            try
            {
                try
                {
                    message = this.receiveretrypolicy.executeaction(() => session.receive(timespan.zero));
                }
                catch (exception e)
                {
                    ...
                }

                if (message == null)
                {
                    // if we have no more messages for this session, exit and try another.
                    break;
                }

                this.messagereceived(this, new brokeredmessageeventargs(message));
            }
            finally
            {
                if (message != null)
                {
                    message.dispose();
                }
            }
        }

        this.receiveretrypolicy.executeaction(() => session.close());
    }
}

private messagesession doacceptmessagesession()
{
    try
    {
        return this.client.acceptmessagesession(timespan.fromseconds(45));
    }
    catch (timeoutexception)
    {
        return null;
    }
}

markus(软件开发人员)发言:

您可能会发现,将使用消息会话的receivemessages方法的这个版本与subscriptionreceiver类中的原始版本进行比较是很有用的。

您必须确保当你发送消息包含一个会话id,这样才能使用消息会话接收一条消息。系统使用事件的sourceid作为会话id,如下面的代码示例所示的eventbus类中的buildmessage方法:

var message = new brokeredmessage(stream, true);
message.sessionid = @event.sourceid.tostring();

通过这种方式,您可以确保以正确的顺序接收来自单个源的所有消息。

poe(it运维人员)发言:

在v2版本中,团队更改了系统创建azure服务总线topic和订阅的方式。之前,subscriptionreceiver类创建了它们(如果它们还不存在)。现在,系统在应用程序启动时使用配置数据创建它们。这发生在启动过程的早期,以避免在系统初始化订阅之前将消息发送到topic时丢失消息的风险。

然而,只有当消息按正确的顺序传递到总线上时,会话才能保证按顺序传递消息。如果系统异步发送消息,则必须特别注意确保消息以正确的顺序放在总线上。在我们的系统中,来自每个单独聚合实例的事件按顺序到达是很重要的,但是我们不关心来自不同聚合实例的事件的顺序。因此,尽管系统异步发送事件,eventstorebuspublisher实例仍然会在发送下一个事件之前等待前一个事件已发送的确认。以下来自topicsender类的示例说明了这一点:

public void send(func<brokeredmessage> messagefactory)
{
    var resetevent = new manualresetevent(false);
    exception exception = null;
    this.retrypolicy.executeaction(
        ac =>
        {
            this.dobeginsendmessage(messagefactory(), ac);
        },
        ar =>
        {
            this.doendsendmessage(ar);
        },
        () => resetevent.set(),
        ex =>
        {
            trace.traceerror("an unrecoverable error occurred while trying to send a message:\r\n{0}", ex);
            exception = ex;
            resetevent.set();
        });

    resetevent.waitone();
    if (exception != null)
    {
        throw exception;
    }
}

jana(软件架构师)发言:

此代码示例展示了系统如何使用transient fault handling application block来让异步调用可靠。

有关消息排序和azure服务总线的更多信息,请参见

有关异步发送消息和排序的信息,请参阅博客文章

从会议管理限界上下文中持久化事件

团队决定创建一个包含所有发送的命令和事件的消息日志。这将使订单和注册限界上下文能够从会议管理限界上下文查询此日志,以获取其构建读模型所需的事件。这不是事件源,因为我们没有使用这些事件来重建聚合的状态,尽管我们使用类似的技术来捕获和持久化这些集成事件。

gary(cqrs专家)发言:

此消息日志确保不会丢失任何消息,以便将来能够满足其他需求。

向消息添加额外元数据

系统现在将所有消息保存到消息日志中。为了方便查询特定命令或事件,系统现在向每个消息添加了更多的元数据。以前,惟一的元数据是事件类型,现在,事件元数据包括事件类型、命名空间、程序集和路径。系统将元数据添加到eventbus类中的事件和commandbus类中的命令中。

捕获消息并将消息持久化到消息日志中

系统使用azure服务总线中对会议/命令和会议/事件topic的额外订阅来接收系统中每条消息的副本。然后,它将消息附加到azure表存储中。下面的代码示例显示了azuremessagelogwriter类的实例,它用于将消息保存到表中:

public class messagelogentity : tableserviceentity 
{ 
    public string kind { get; set; }     
    public string correlationid { get; set; }     
    public string messageid { get; set; }     
    public string sourceid { get; set; }     
    public string assemblyname { get; set; }     
    public string namespace { get; set; }     
    public string fullname { get; set; }     
    public string typename { get; set; }     
    public string sourcetype { get; set; }     
    public string creationdate { get; set; }     
    public string payload { get; set; } 
} 

kind属性指定消息是命令还是事件。messageid和correlationid属性由消息传递基础设施设置的,其余属性是从消息元数据中设置的。

下面的代码示例显示了这些消息的分区和rowkey的定义:

partitionkey = message.enqueuedtimeutc.tostring("yyymm"),
rowkey = message.enqueuedtimeutc.ticks.tostring("d20") + "_" + message.messageid

注意,rowkey保存了消息最初发送的顺序,并添加到消息id上,以确保惟一性,以防两条消息同时入队。

jana(软件架构师)发言:

这与事件存储不同,在事件存储区中,分区键标识聚合实例,而rowkey标识聚合的版本号。

数据迁移

当contoso将系统从v1迁移到v2时,它将使用消息日志在订单和注册限界上下文中重建会议和价格订单的读模型。

gary(cqrs专家)发言:

contoso可以在需要重建与聚合无关的事件构建的读模型时来使用消息日志,例如来自会议管理限界上下文的集成事件。

会议读模型包含会议的信息,并包含来自会议管理限界上下文的conferencecreated、conferenceupdated、conferencepublished、conferenceunpublished、seatcreated和seatupdated事件的信息。

价格订单读模型持有来自于seatcreated和seatupdated事件的信息,这些事件来自于会议管理限界上下文。

然而,在v1中,这些事件消息没有被持久化,因此读模型不能在v2中重新填充。为了解决这个问题,团队实现了一个数据迁移实用程序,它使用一种最佳方法来生成包含要存储在消息日志中的丢失数据的事件。例如,在迁移到v2之后,消息日志不包含conferencecreated事件,因此迁移实用程序在会议管理限界上下文使用的数据库中找到这些信息,并创建丢失的事件。您可以在migrationtov2项目的migrator类中的generatepasteventlogmessagesforconferencemanagement方法中看到这是如何完成的。

markus(软件开发人员)发言:

您可以在这个类中看到,contoso还将所有现有的事件源事件复制到消息日志中。

如下面所示,migrator类中的regenerateviewmodels方法重新构建读取的模型。它通过调用query方法从消息日志中检索所有事件,然后使用conferenceviewmodelgeneratorpricedorderviewmodelupdater类来处理消息。

internal void regenerateviewmodels(azureeventlogreader logreader, string dbconnectionstring)
{
    var commandbus = new nullcommandbus();

    database.setinitializer<conferenceregistrationdbcontext>(null);

    var handlers = new list<ieventhandler>();
    handlers.add(new conferenceviewmodelgenerator(() => new conferenceregistrationdbcontext(dbconnectionstring), commandbus));
    handlers.add(new pricedorderviewmodelupdater(() => new conferenceregistrationdbcontext(dbconnectionstring)));

    using (var context = new conferenceregistrationmigrationdbcontext(dbconnectionstring))
    {
        context.updatetables();
    }

    try
    {
        var dispatcher = new messagedispatcher(handlers);
        var events = logreader.query(new querycriteria { });

        dispatcher.dispatchmessages(events);
    }
    catch
    {
        using (var context = new conferenceregistrationmigrationdbcontext(dbconnectionstring))
        {
            context.rollbacktablesmigration();
        }

        throw;
    }
}

jana(软件架构师)发言:

查询可能不会很快,因为它将从多个分区检索实体。

注意这个方法如何使用nullcommandbus实例来接收来自conferenceviewmodelgenerator实例的任何命令,因为我们只是在这里重新构建读模型。

以前,pricedorderviewmodelgenerator使用conferencedao类来获取关于座位的信息。现在,它是自治的,并直接处理seatcreatedseatupdated事件来维护这些信息。作为迁移的一部分,必须将此信息添加到读模型中。在前面的代码示例中,pricedorderviewmodelupdater类只处理seatcreatedseatupdated事件,并将缺失的信息添加到价格订单读模型中。

从v1迁移到v2

从v1迁移到v2需要更新已部署的应用程序代码并迁移数据。在生产环境中执行迁移之前,应该始终在测试环境中演练迁移。以下是所需步骤:

  1. 将v2版本部署到azure的staging环境中。v2版本有一个maintenancemode属性,最初设置为true。在此模式下,应用程序向用户显示一条消息,说明站点当前正在进行维护,而工作角色将不处理消息。
  2. 准备好之后,将v2版本(仍然处于维护模式,maintenancemode为true)切换到azure生产环境中。
  3. 让v1版本(现在在staging环境中运行)运行几分钟,以确保所有正在运行的消息都完成了它们的处理。
  4. 运行迁移程序来迁移数据(参见下面)。
  5. 成功完成数据迁移后,将每种工作角色的maintenancemode属性更改为false。
  6. v2版本现在运行在azure中。

jana(软件架构师)发言:

团队考虑使用单独的应用程序在升级过程中向用户显示一条消息,告诉他们站点正在进行维护。然而,在v2版本中使用maintenancemode属性提供了一个更简单的过程,并为应用程序添加了一个潜在有用的新特性。

poe(it运维人员)发言:

由于对事件存储的更改,不可能执行从v1到v2的无停机升级。然而,团队所做的更改将确保从v2迁移到v3将不需要停机时间。

markus(软件开发人员)发言:

团队对迁移实用程序应用了各种优化,例如批处理操作,以最小化停机时间。

下面几节总结了从v1到v2的数据迁移。这些步骤中的一些在前面已经讨论过,涉及到应用程序的特定更改或增强。

团队为v2引入的一个更改是,将所有命令和事件消息的副本保存在消息日志中,以便作为未来的证据,通过捕获将来可能使用的所有内容来保证应用程序的安全性。迁移过程考虑到了这个新特性。

因为迁移过程复制了大量的数据,所以您应该在azure工作角色中运行迁移过程,以最小化成本。迁移实用程序是一个控制台应用程序,因此您可以使用azure和远程桌面服务。有关如何在azure角色实例中运行应用程序的信息,请参见。

poe(it运维人员)发言:

在一些组织中,安全策略不允许您在azure生产环境使用远程桌面服务。但是,您只需要一个在迁移期间承载远程桌面会话的工作角色,您可以在迁移完成后删除它。您还可以将迁移代码作为工作角色而不是控制台应用程序运行,并确保它记录迁移的状态,以便您验证。

为会议管理限界上下文生成过去的日志消息

迁移过程的一部分是在可能的情况下重新创建v1版本处理后丢弃的消息,然后将它们添加到消息日志中。在v1版本中,所有从会议管理限界上下文发送到订单和注册限界上下文的集成事件都以这种方式丢失了。系统不能重新创建所有丢失的事件,但可以创建表示迁移时系统状态的事件。

有关更多信息,请参见本章前面的“从会议管理限界上下文中持久化事件”一节。

迁移事件源里的事件

在v2版本中,事件存储为每个事件存储额外的元数据,以便于查询事件。迁移过程将所有事件从现有事件存储复制到具有新模式的新事件存储。

jana(软件架构师)发言:

原始事件不会以任何方式更新,而是被视为不可变的。

同时,系统将所有这些事件的副本添加到v2版本中引入的消息日志中。

有关更多信息,请参见migrationtov2项目中migrator类中的migrateeventsourcedandgeneratepasteventlogs

重建读模型**

v2版本包括对订单和注册限界上下文中读模型定义的几个更改。migrationtov2项目在订单和注册限界上下文中重新构建会议的读模型和价格订单的读模型。

有关更多信息,请参见本章前面的“从会议管理限界上下文中持久化事件”一节。

对测试的影响

在这个过程的这个阶段,测试团队继续扩展验收测试集合。他们还创建了一组测试来验证数据迁移过程。

再说specflow

之前,这组specflow测试以两种方式实现:通过自动化web浏览器模拟用户交互,或者直接在mvc控制器上操作。这两种方法都有各自的优缺点,我们在第4章“”中讨论过。

在与另一位专家讨论了这些测试之后,团队还实现了第三种方法。从领域驱动设计(ddd)方法的角度来看,ui不是领域模型的一部分,核心团队的重点应该是在领域专家的帮助下理解领域,并在领域中实现业务逻辑。ui只是机械部分,用于使用户能够与领域进行交互。因此,验收测试应该包括验证领域模型是否以领域专家期望的方式工作。因此,团队使用specflow创建了一组验收测试,这些测试旨在在不影响系统ui部分的情况下测试领域。

下面的代码示例显示了selfregistrationendtoendwithdomain.feature文件,该文件在conference.acceptancetests项目中的features\domain\registration文件夹里,注意when和then子句怎么使用命令和事件的。

gary(cqrs专家)发言:

通常,如果您的领域模型只使用聚合,您会期望when子句发送命令,then子句查看事件或异常。然而,在本例中,领域模型包含一个通过发送命令来响应事件的流程管理器。测试将检查是否发送了所有预期的命令,并引发了所有预期的事件。

feature: self registrant end to end scenario for making a registration for a conference site with domain commands and events
    in order to register for a conference
    as an attendee
    i want to be able to register for the conference, pay for the registration order and associate myself with the paid order automatically


scenario: make a reservation with the selected order items
given the list of the available order items for the cqrs summit 2012 conference
    | seat type                 | rate | quota |
    | general admission         | $199 | 100   |
    | cqrs workshop             | $500 | 100   |
    | additional cocktail party | $50  | 100   |
and the selected order items
    | seat type                 | quantity |
    | general admission         | 1        |
    | additional cocktail party | 1        |
when the registrant proceeds to make the reservation
    # command:registertoconference
then the command to register the selected order items is received 
    # event: orderplaced
and the event for order placed is emitted
    # command: makeseatreservation
and the command for reserving the selected seats is received
    # event: seatsreserved
and the event for reserving the selected seats is emitted
    # command: markseatsasreserved
and the command for marking the selected seats as reserved is received
    # event: orderreservationcompleted 
and the event for completing the order reservation is emitted
    # event: ordertotalscalculated
and the event for calculating the total of $249 is emitted

下面的代码示例显示了feature文件的一些步骤实现。这些步骤使用命令总线发送命令。

[when(@"the registrant proceed to make the reservation")]
public void whentheregistrantproceedtomakethereservation()
{
    registertoconference = scenariocontext.current.get<registertoconference>();
    var conferencealias = scenariocontext.current.get<conferencealias>();

    registertoconference.conferenceid = conferencealias.id;
    orderid = registertoconference.orderid;
    this.commandbus.send(registertoconference);

    // wait for event processing
    thread.sleep(constants.waittimeout);
}

[then(@"the command to register the selected order items is received")]
public void thenthecommandtoregistertheselectedorderitemsisreceived()
{
    var orderrepo = eventsourcehelper.getrepository<registration.order>();
    registration.order order = orderrepo.find(orderid);

    assert.notnull(order);
    assert.equal(orderid, order.id);
}

[then(@"the event for order placed is emitted")]
public void thentheeventfororderplacedisemitted()
{
    var orderplaced = messageloghelper.getevents<orderplaced>(orderid).singleordefault();

    assert.notnull(orderplaced);
    assert.true(orderplaced.seats.all(
        os => registertoconference.seats.count(cs => cs.seattype == os.seattype && cs.quantity == os.quantity) == 1));
}

在迁移过程中发现的bug

当测试团队在迁移之后在系统上运行测试时,我们发现订单和注册限界上下文中座位类型的数量与迁移之前的数量不同。调查揭示了以下原因。

如果会议从未发布过,则会议管理限界上下文允许业务客户删除座位类型,但不会引发集成事件向订单和注册限界上下文报告这一情况。所以,当业务客户创建新的座位类型时,订单和注册限界上下文从会议管理限界上下文接收事件,而不是当业务客户删除座位类型时。

迁移过程的一部分创建一组集成事件,以替换v1版本处理后丢弃的事件。它通过读取会议管理限界上下文使用的数据库来创建这些事件。此过程没有为已删除的座位类型创建集成事件。

总之,在v1版本中,已删除的座位类型错误地出现在订单和注册限界上下文的读模型中。在迁移到v2版本之后,这些已删除的座位类型没有出现在订单和注册限界上下文的读模型中。

poe(it运维人员)发言:

测试迁移过程不仅验证迁移是否按预期运行,而且可能揭示应用程序本身的bug。

总结

在我们旅程的这个阶段,我们对系统进行了版本控制,并完成了v2伪生产版本。这个新版本包含了一些额外的功能和特性,比如支持不需要付费的订单和在ui中显示更多信息。

我们还对基础设施做了一些改变。例如,我们使更多的消息具有幂等性,现在持久化集成事件。下一章将描述我们旅程的最后阶段,我们将继续增强基础设施,并在准备发布v3版本时加强系统。

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网