.NET Core 开发实战
  • 目录
  • 第1课:课程介绍
  • 第2课:内容综述
  • 第3课:.NET Core的现状、未来以及环境搭建
  • 第4课:Startup:掌握ASP.NET Core的启动过程
  • 第5课:依赖注入:良好架构的起点
  • 第6课:作用域与对象释放行为
  • 第7课:用Autofac增强容器能力
  • 第8课:配置框架:让服务无缝适应各种环境
  • 第9课:命令行配置提供程序
  • 第10课:环境变量配置提供程序
  • 第11课:文件配置提供程序
  • 第12课:配置变更监听
  • 第13课:配置绑定:使用强类型对象承载配置数据
  • 第14课:自定义配置数据源:低成本实现定制化配置方案
  • 第15课:选项框架:服务组件集成配置的最佳实践
  • 第16课:选项数据热更新:让服务感知配置的变化
  • 第17课:为选项数据添加验证:避免错误配置的应用接收用户流量
  • 第18课:日志框架:聊聊记日志的最佳姿势
  • 第19课:日志作用域:解决不同请求之间的日志干扰
  • 第20课:结构化日志组件Serilog:记录对查询分析友好的日志
  • 第21课:中间件:掌控请求处理过程的关键
  • 第22课:异常处理中间件:区分真异常与逻辑异常
  • 第23课:静态文件中间件:前后端分离开发合并部署骚操作
  • 第24课:文件提供程序:让你可以将文件放在任何地方
  • 第25课:路由与终结点:如何规划好你的Web API
  • 第26课:工程结构概览:定义应用分层及依赖关系
  • 第27课:定义Entity:区分领域模型的内在逻辑和外在行为
  • 第28课:工作单元模式(UnitOfWork):管理好你的事务
  • 第29课:定义仓储:使用EF Core实现仓储层
  • 第30课:领域事件:提升业务内聚,实现模块解耦
  • 第31课:APIController:定义API的最佳实践
  • 第32课:集成事件:解决跨微服务的最终一致性
  • 第33课:集成事件:使用RabbitMQ来实现EventBus
  • 第34课:MediatR:轻松实现命令查询职责分离模式(CQRS)
  • 第35课:MediatR:让领域事件处理更加优雅
Powered by GitBook
On this page

第33课:集成事件:使用RabbitMQ来实现EventBus

学习分享 丨作者 / 郑 子 铭 丨公众号 / DotNet NB / CloudNative NB

Previous第32课:集成事件:解决跨微服务的最终一致性Next第34课:MediatR:轻松实现命令查询职责分离模式(CQRS)

Last updated 3 years ago

这一节我们来讲解如何通过 CAP 组件和 RabbitMQ 来实现 EventBus

要实现 EventBus,我们这里借助了 RabbitMQ,它的整个安装和使用的体验是非常人性化的,如果是在 Windows 下开发的话,它可以有 Windows 的 installer,也可以在其它的操作系统下安装和使用,当然它也支持 Docker 的模式,我们可以在以下的地址获取到安装包和安装方法的说明

另一个就是在 .NET Core 社区比较知名的 CAP 框架,这个框架是由我们国人开发的,它实现了开箱即用的 EventBus 的实现,我们可以通过简单的配置,就能把 RabbitMQ 集成进来,并且实现我们的集成事件的处理

我们来看一下 CAP 框架的实现架构

它实际上实现了一个叫 OutBox 的设计模式,就是在我们的每个微服务,比如说微服务 A 的数据库 A,在这个数据库内部它建立了两张表,一张叫 publish 事件表和一张叫 receiver 事件表,这两张事件表用来记录微服务 A 发出的和微服务 A 收到的事件

当我们要发出事件时,我们会把事件的存储逻辑与我们的业务逻辑的事务合并,在同一个事务里提交,也就意味着当我们的业务逻辑提交成功时,我们的事件表里面的事件是一定存在的,它是与我们的业务逻辑的事务是强绑定的

如果说我们的业务逻辑失败了,事务回滚了,这条事件是不会出现在我们的事件表里的,这样子就可以做到说我们要发送的事件一定是与业务逻辑是一致的

接下来由我们组件来负责将事件表里的事件全部都发送到 EventBus,比如说 RabbitMQ 消息队列里面去,由接收方订阅

对于订阅的事件的话,设计的模式也是同理,当我们的应用程序在消息队列获取到信息的时候,它就会将这些消息持久化到我们的数据库的 Receive 事件表里,这样我们就可以在本地进行事件的处理,失败重试等操作,这些都是由 CAP 框架完成的,我们仅需要去做简单的配置,关注发布和订阅的业务逻辑即可

我们看一下代码,刚才有提到 CAP 的架构,关键的一点是需要事件的存储与我们的业务逻辑在同一个事务里,所以说我们在处理事务的逻辑部分的话,需要嵌入 CAP 的一部分代码,我们看一下 EFContext 的定义

public EFContext(DbContextOptions options, IMediator mediator, ICapPublisher capBus) : base(options)
{
    _mediator = mediator;
    _capBus = capBus;
}

之前有关注到有一个叫 ICapPublisher 这个入参,关键的是这一行代码我们需要关注一下

_currentTransaction = Database.BeginTransaction(_capBus, autoCommit: false);

这一行代码的作用是创建事务,我们可以看到创建事务的过程中,我们把 ICapPublisher 也传入给了这个方法的构造函数,实际上这个方法是由 CAP 的组件提供的,它的核心作用就是将我们要发送的事件与我们的业务的存储都放在同一个事务内部,这样子我们就可以使得事务提交时或者回滚时,我们的事件与业务逻辑的存取都是一致的

然后我们再来看一下配置的部分,写在 ServiceCollectionExtensions 下面

public static IServiceCollection AddEventBus(this IServiceCollection services, IConfiguration configuration)
{
    services.AddTransient<ISubscriberService, SubscriberService>();
    services.AddCap(options =>
    {
        options.UseEntityFramework<DomainContext>();

        options.UseRabbitMQ(options =>
        {
            configuration.GetSection("RabbitMQ").Bind(options);
        });
        //options.UseDashboard();
    });

    return services;
}

我们这里定义了一个 AddEventBus,可以看到将我们之前演示的代码订阅服务注入进来,然后 Services 最重点的代码是 AddCap,我们需要告诉 CAP 框架我们是针对 DomainContext 来实现我们的 EventBus,EventBus 与 DomainContext 共享我们的数据库连接,下面一行代码是指我们要用 RabbitMQ 来作为我们 EventBus 的消息队列的存储,这里可以看到使用了一个 Bind 的方法将我们的配置绑定到 RabbitMQ 的 options 上面去

我们可以看一下我们的配置

  "RabbitMQ": {
    "HostName": "localhost",
    "UserName": "admin",
    "Password": "123456",
    "VirtualHost": "geektime",
    "ExchangeName": "geek_queue"
  }

这里可以看到我们定义了一个 RabbitMQ 的配置,然后这里面会有我们的 host,因为是本地安装的,所以访问地址就是 localhost,VirtualHost 是 RabbitMQ 一个比较特殊的设置,它的作用是将 RabbitMQ 的空间区分为不同的空间,你可以认为这是一个租户,相同的 VirtualHost,大家都可以认为是一个 RabbitMQ 的集群,最下面的 ExchangeName 就是队列需要订阅的 Exchange 的名称,消息的发布和订阅都是通过这个 Exchange 来的

然后我们在 Startup 这里添加一行

services.AddEventBus(Configuration);

这样我们就配置完成了

为了演示我们的发布和订阅的话,我们在这里的代码做一些稍微的调整

namespace GeekTime.API.Application.DomainEventHandlers
{
    public class OrderCreatedDomainEventHandler : IDomainEventHandler<OrderCreatedDomainEvent>
    {
        ICapPublisher _capPublisher;
        public OrderCreatedDomainEventHandler(ICapPublisher capPublisher)
        {
            _capPublisher = capPublisher;
        }

        public async Task Handle(OrderCreatedDomainEvent notification, CancellationToken cancellationToken)
        {
            await _capPublisher.PublishAsync("OrderCreated", new OrderCreatedIntegrationEvent(notification.Order.Id));
        }
    }
}

这里我们发布了一个 OrderCreated 的集成事件,然后订阅一个 OrderCreated

namespace GeekTime.API.Application.IntegrationEvents
{
    public class SubscriberService : ISubscriberService, ICapSubscribe
    {
        IMediator _mediator;
        public SubscriberService(IMediator mediator)
        {
            _mediator = mediator;
        }


        [CapSubscribe("OrderPaymentSucceeded")]
        public void OrderPaymentSucceeded(OrderPaymentSucceededIntegrationEvent @event)
        {
            //Do SomeThing
        }

        [CapSubscribe("OrderCreated")]
        public void OrderCreated(OrderCreatedIntegrationEvent @event)
        {
            //Do SomeThing
        }
    }
}

通过标注属性,我们就可以完成订阅

也就是说我们创建一个订单的时,我们会触发订单创建的领域事件,订单创建的领域事件又发送了一个订单创建的集成事件,然后我们在订阅服务里面订阅了订单创建的集成事件

在发布和订阅的地方分别打上一个断点,启动程序,可以看到整个流程

我们再梳理一下整个流程,首先我们创建了一个订单,这个订单触发了我们的 OrderCreated 的领域事件,OrderCreated 的领域事件的处理器像我们的 EventBus 发布了一个 OrderCreated 的集成事件,我们在订阅服务的地方订阅了这个事件,所以我们可以接收到并且做出相应的处理

我们观察一下数据库的表,一共有四张表,cap.publish 和 cap.received 这两张表分别对应发送事件表和接收事件表,order 和 user 这两张表是我们的领域模型表

整个 CAP 的框架,它的实现原理其实有两个关键点,一个是事件表,一个就是事务控制,也就是说将事件的存储嵌入到我们的业务逻辑的事务里面去,这样子我们就可以保证我们的业务与事件是要么都能存储成功,要么都失败

整个 CAP 框架它的应用性是非常强的,非常建议在处理集成事件的时候使用这个框架

GitHub源码链接:

https://github.com/witskeeper/geektime
https://www.rabbitmq.com/download.html
https://github.com/dotnetcore/CAP