第33课:集成事件:使用RabbitMQ来实现EventBus
学习分享 丨作者 / 郑 子 铭 丨公众号 / DotNet NB / CloudNative NB
Last updated
学习分享 丨作者 / 郑 子 铭 丨公众号 / DotNet NB / CloudNative NB
Last updated
这一节我们来讲解如何通过 CAP 组件和 RabbitMQ 来实现 EventBus
要实现 EventBus,我们这里借助了 RabbitMQ,它的整个安装和使用的体验是非常人性化的,如果是在 Windows 下开发的话,它可以有 Windows 的 installer,也可以在其它的操作系统下安装和使用,当然它也支持 Docker 的模式,我们可以在以下的地址获取到安装包和安装方法的说明
https://www.rabbitmq.com/download.html
另一个就是在 .NET Core 社区比较知名的 CAP 框架,这个框架是由我们国人开发的,它实现了开箱即用的 EventBus 的实现,我们可以通过简单的配置,就能把 RabbitMQ 集成进来,并且实现我们的集成事件的处理
https://github.com/dotnetcore/CAP
我们来看一下 CAP 框架的实现架构
它实际上实现了一个叫 OutBox 的设计模式,就是在我们的每个微服务,比如说微服务 A 的数据库 A,在这个数据库内部它建立了两张表,一张叫 publish 事件表和一张叫 receiver 事件表,这两张事件表用来记录微服务 A 发出的和微服务 A 收到的事件
当我们要发出事件时,我们会把事件的存储逻辑与我们的业务逻辑的事务合并,在同一个事务里提交,也就意味着当我们的业务逻辑提交成功时,我们的事件表里面的事件是一定存在的,它是与我们的业务逻辑的事务是强绑定的
如果说我们的业务逻辑失败了,事务回滚了,这条事件是不会出现在我们的事件表里的,这样子就可以做到说我们要发送的事件一定是与业务逻辑是一致的
接下来由我们组件来负责将事件表里的事件全部都发送到 EventBus,比如说 RabbitMQ 消息队列里面去,由接收方订阅
对于订阅的事件的话,设计的模式也是同理,当我们的应用程序在消息队列获取到信息的时候,它就会将这些消息持久化到我们的数据库的 Receive 事件表里,这样我们就可以在本地进行事件的处理,失败重试等操作,这些都是由 CAP 框架完成的,我们仅需要去做简单的配置,关注发布和订阅的业务逻辑即可
我们看一下代码,刚才有提到 CAP 的架构,关键的一点是需要事件的存储与我们的业务逻辑在同一个事务里,所以说我们在处理事务的逻辑部分的话,需要嵌入 CAP 的一部分代码,我们看一下 EFContext 的定义
之前有关注到有一个叫 ICapPublisher 这个入参,关键的是这一行代码我们需要关注一下
这一行代码的作用是创建事务,我们可以看到创建事务的过程中,我们把 ICapPublisher 也传入给了这个方法的构造函数,实际上这个方法是由 CAP 的组件提供的,它的核心作用就是将我们要发送的事件与我们的业务的存储都放在同一个事务内部,这样子我们就可以使得事务提交时或者回滚时,我们的事件与业务逻辑的存取都是一致的
然后我们再来看一下配置的部分,写在 ServiceCollectionExtensions 下面
我们这里定义了一个 AddEventBus,可以看到将我们之前演示的代码订阅服务注入进来,然后 Services 最重点的代码是 AddCap,我们需要告诉 CAP 框架我们是针对 DomainContext 来实现我们的 EventBus,EventBus 与 DomainContext 共享我们的数据库连接,下面一行代码是指我们要用 RabbitMQ 来作为我们 EventBus 的消息队列的存储,这里可以看到使用了一个 Bind 的方法将我们的配置绑定到 RabbitMQ 的 options 上面去
我们可以看一下我们的配置
这里可以看到我们定义了一个 RabbitMQ 的配置,然后这里面会有我们的 host,因为是本地安装的,所以访问地址就是 localhost,VirtualHost 是 RabbitMQ 一个比较特殊的设置,它的作用是将 RabbitMQ 的空间区分为不同的空间,你可以认为这是一个租户,相同的 VirtualHost,大家都可以认为是一个 RabbitMQ 的集群,最下面的 ExchangeName 就是队列需要订阅的 Exchange 的名称,消息的发布和订阅都是通过这个 Exchange 来的
然后我们在 Startup 这里添加一行
这样我们就配置完成了
为了演示我们的发布和订阅的话,我们在这里的代码做一些稍微的调整
这里我们发布了一个 OrderCreated 的集成事件,然后订阅一个 OrderCreated
通过标注属性,我们就可以完成订阅
也就是说我们创建一个订单的时,我们会触发订单创建的领域事件,订单创建的领域事件又发送了一个订单创建的集成事件,然后我们在订阅服务里面订阅了订单创建的集成事件
在发布和订阅的地方分别打上一个断点,启动程序,可以看到整个流程
我们再梳理一下整个流程,首先我们创建了一个订单,这个订单触发了我们的 OrderCreated 的领域事件,OrderCreated 的领域事件的处理器像我们的 EventBus 发布了一个 OrderCreated 的集成事件,我们在订阅服务的地方订阅了这个事件,所以我们可以接收到并且做出相应的处理
我们观察一下数据库的表,一共有四张表,cap.publish 和 cap.received 这两张表分别对应发送事件表和接收事件表,order 和 user 这两张表是我们的领域模型表
整个 CAP 的框架,它的实现原理其实有两个关键点,一个是事件表,一个就是事务控制,也就是说将事件的存储嵌入到我们的业务逻辑的事务里面去,这样子我们就可以保证我们的业务与事件是要么都能存储成功,要么都失败
整个 CAP 框架它的应用性是非常强的,非常建议在处理集成事件的时候使用这个框架
GitHub源码链接: