模块二 基础巩固 RabbitMQ Masstransit 介绍

学习分享 丨作者 / 郑 子 铭 丨公众号 / DotNet NB / CloudNative NB

2.6.6 RabbitMQ -- Masstransit 介绍

  • Masstransit 是什么

  • Quickstart

  • 消息 Message

Masstransit 是什么

Masstransit 是一个 .NET 免费开源的分布式应用框架

  • 集成多种消息中间件(Rabbitmq, Azure, Service Bus, ActiveMQ, Kafka, In-Memory)

  • 强大且完整的消息模式(发布与订阅,saga,event-driven state machine,最终一致性支持)

  • 端到端解决方案(消息路由,异常,重试,并发控制,连接与消费生命周期管理)

  • 使用简单

  • 单元测试友好

  • 内置监控

Quickstart

新建控制台程序 mt-001,引入 Masstransit 包

namespace mt_001
{
    class Program
    {
        static async Task Main(string[] args)
        {
            var bus = Bus.Factory.CreateUsingInMemory(sbc =>
            {
                sbc.ReceiveEndpoint("test_queue", ep =>
                {
                    ep.Handler<Message>(context => Console.Out.WriteLineAsync($"Received: {context.Message.Text}"));
                });
            });

            await bus.StartAsync();// This is important !

            await bus.Publish(new Message { Text = "Hi" });

            Console.WriteLine("Please input your message with enter:");
            string message = Console.ReadLine();

            while (message != "EXIT")
            {
                await bus.Publish(new Message() {Text = message});
                message = Console.ReadLine();
            }

            await bus.StopAsync();

            Console.WriteLine("Hello World!");
        }
    }

    public class Message
    {
        public string Text { get; set; }
    }
}

启动程序,收发消息

新建控制台程序 mt-002,引入 MassTransit.RabbitMQ 包

方法改为 CreateUsingRabbitMq,并且添加 rabbitmq host

var bus = Bus.Factory.CreateUsingRabbitMq(sbc =>
{
    sbc.Host("rabbitmq://localhost");

    sbc.ReceiveEndpoint("test_queue", ep =>
    {
        ep.Handler<Message>(context => Console.Out.WriteLineAsync($"Received: {context.Message.Text}"));
    });
});

启动两个客户端,消息是轮询接收的

消息 Message

  • 消息

  • 消息类型

  • 消息头

  • 最佳实践

消息

MassTransit 使用 C# 强类型来定义,一个消息可以被定义为接口,通常我们也称之为消息契约

消息分为 command 命令与 event 事件,分别对应 send 和 publish 方法

在不同项目里面创建类来消费消息时确保命名空间一致,否则消费不到

命名空间:Company.Application.Contracts

namespace Company.Application.Contracts
{
	using System;

	public interface UpdateCustomerAddress
	{
		Guid CommandId { get; }
		DateTime Timestamp { get; }
		string CustomerId { get; }
		string HouseNumber { get; }
		string Street { get; }
		string City { get; }
		string State { get; }
		string PostalCode { get; }
	}
}

消息类型

Command

  • 通过 send 发送到一个 endpoint

Event

  • 通过 publish,不直接发送到 endpoint,发布到多个消费者

  • 通常以名称短语(过去式的形式来命名)比如 OrderCreatedEvent, OrderSubmitted, OrderPaid, OrderDeliveried

消息头

最佳实践

尽量使用接口来定义消息类型,使用消息初始化器(有点困难)

使用类以及继承时需要特别注意:

  • 通过消费基类并利用多态行为来处理,总会遇到很多问题

  • 消息格式设计不是面向对象设计,消息中应该只包含状态而不应该包含行为

  • 大的基类也会产生很多问题,特别是在支持消息版本的时候

GitHub源码链接:

https://github.com/MingsonZheng/ArchitectTrainingCamp

Last updated