# 模块二 基础巩固 RabbitMQ Masstransit 详解

## 2.6.7 RabbitMQ -- Masstransit 详解 <a href="#id-267rabbitmqmasstransit-xiang-jie" id="id-267rabbitmqmasstransit-xiang-jie"></a>

* Consumer 消费者
* Producer 生产者
* Request-Response 请求-响应

### Consumer 消费者 <a href="#consumer-xiao-fei-zhe" id="consumer-xiao-fei-zhe"></a>

在 MassTransit 中，一个消费者可以消费一种或多种消息

消费者的类型包括：普通消费者，saga，saga 状态机，路由活动（分布式追踪），处理器 handlers，工作消费者 job comsumers

* Consumer
* Instance
* Handler
* Others

#### Consumer <a href="#consumer" id="consumer"></a>

```
public class Program
{
    public static async Task Main()
    {
        var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
            cfg.ReceiveEndpoint("order-service", e =>
            {
                e.Consumer<SubmitOrderConsumer>();
            });
        });
    }
}
```

继承 IConsumer，实现 Consume 方法

```
class SubmitOrderConsumer :
    IConsumer<SubmitOrder>
{
    public async Task Consume(ConsumeContext<SubmitOrder> context)
    {
        await context.Publish<OrderSubmitted>(new
        {
            context.Message.OrderId
        });
    }
}
```

三个原则：

* 拥抱 The Hollywood Principle, which states, "Dont't call us, we'll call you."
* Consume 方法是一个被等待的方法，在执行中时其他消费者无法接收到这个消息，当这个方法完成的时候，消息被 ack，并且从队列中移除
* Task 方法异常会导致消息触发 retry，如果没有配置重试，消息将被投递到失败队列

#### Instance <a href="#instance" id="instance"></a>

```
public class Program
{
    public static async Task Main()
    {
        var submitOrderConsumer = new SubmitOrderConsumer();

        var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
            cfg.ReceiveEndpoint("order-service", e =>
            {
                e.Instance(submitOrderConsumer);
            });
        });
    }
}
```

所有接收到的消息都由一个消费者来实例来处理（请确保这个消费者类是线程安全）

Consumer 每次接收到消息都会 new 一个实例

#### Handler <a href="#handler" id="handler"></a>

```
public class Program
{
    public static async Task Main()
    {
        var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
            cfg.ReceiveEndpoint("order-service", e =>
            {
                e.Handler<SubmitOrder>(async context =>
                {
                    await Console.Out.WriteLineAsync($"Submit Order Received: {context.Message.OrderId}");
                });
            });
        });
    }
}
```

通过一个委托 Lambda 方法，来消费消息

#### Others <a href="#others" id="others"></a>

* Saga<>
* StateMachineSaga<>

### Producer 生产者 <a href="#producer-sheng-chan-zhe" id="producer-sheng-chan-zhe"></a>

消息的生产可以通过两种方式产生：发送和发布

发送的时候需要指定一个具体的地址 DestinationAddress，发布的时候消息会被广播给所有订阅了这个消息类型的消费者

基于这两种规则，消息被定义为：命令 command 和事件 event

* send
* publish

#### send <a href="#send" id="send"></a>

可以调用以下对象的 send 方法来发送 command：

* ConsumeContext （在 Consumer 的 Consumer 方法参数中传递）
* ISendEndpointProvider（可以从 DI 中获取）
* IBusControl（最顶层的控制对象，用来启动和停止 masstransit 的控制器）

**ConsumeContext**

```
public class SubmitOrderConsumer : 
    IConsumer<SubmitOrder>
{
    private readonly IOrderSubmitter _orderSubmitter;

    public SubmitOrderConsumer(IOrderSubmitter submitter)
        => _orderSubmitter = submitter;

    public async Task Consume(IConsumeContext<SubmitOrder> context)
    {
        await _orderSubmitter.Process(context.Message);

        await context.Send(new StartDelivery(context.Message.OrderId, DateTime.UtcNow));
    }
}
```

**ISendEndpointProvider**

```
public async Task SendOrder(ISendEndpointProvider sendEndpointProvider)
{
    var endpoint = await sendEndpointProvider.GetSendEndpoint(_serviceAddress);

    await endpoint.Send(new SubmitOrder { OrderId = "123" });
}
```

#### publish <a href="#publish" id="publish"></a>

* 发送地址
* 短地址
* Convention Map

**发送地址**

* rabbitmq://localhost/input-queue
* rabbitmq://localhost/input-queue?durable=false

**短地址**

* GetSendEndpoint(new Uri("queue:input-queue"))

![](https://3083743005-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2F8gwpNo3eyzHkX0O40HRA%2Fuploads%2FoxkZS1g4XZwhXZfPo2bK%2F228.jpg?alt=media\&token=d0719150-aed3-48f6-bcdf-4043f86a5680)

**Convention Map**

在配置文件中指定 map 规则

```
EndpointConvention.Map<StartDelivery>(new Uri(ConfigurationManager.AppSettings["deliveryServiceQueue"]));
```

直接发送

```
public class SubmitOrderConsumer : 
    IConsumer<SubmitOrder>
{
    private readonly IOrderSubmitter _orderSubmitter;

    public SubmitOrderConsumer(IOrderSubmitter submitter)
        => _orderSubmitter = submitter;

    public async Task Consume(IConsumeContext<SubmitOrder> context)
    {
        await _orderSubmitter.Process(context.Message);

        await context.Send(new StartDelivery(context.Message.OrderId, DateTime.UtcNow));
    }
}
```

可以调用以下对象的 publish 方法来发送 event：

* ConsumeContext （在 Consumer 的 Consumer 方法参数中传递）
* IPublishEndpoint（可以从 DI 中获取）
* IBusControl（最顶层的控制对象，用来启动和停止 masstransit 的控制器）

IPublishEndpoint

```
public async Task NotifyOrderSubmitted(IPublishEndpoint publishEndpoint)
{
    await publishEndpoint.Publish<OrderSubmitted>(new
    {
        OrderId = "27",
        OrderDate = DateTime.UtcNow,
    });
}
```

### Request-Response 请求-响应 <a href="#requestresponse-qing-qiu-xiang-ying" id="requestresponse-qing-qiu-xiang-ying"></a>

Request-Response 模式让应用程序之间解耦之后，依然采用同步的方式

* Consumer
* IClientFactory
* IRequestClient
* Send a request

#### Consumer <a href="#consumer-1" id="consumer-1"></a>

```
public async Task Consume(ConsumeContext<CheckOrderStatus> context)
{
    var order = await _orderRepository.Get(context.Message.OrderId);
    if (order == null)
        throw new InvalidOperationException("Order not found");
    
    await context.RespondAsync<OrderStatusResult>(new 
    {
        OrderId = order.Id,
        order.Timestamp,
        order.StatusCode,
        order.StatusText
    });
}
```

需要处理返回类型 OrderStatusResult，异步方式模拟同步，实际上同样有消息队列，消费者处理过程

#### IClientFactory <a href="#iclientfactory" id="iclientfactory"></a>

```
public interface IClientFactory 
{
    IRequestClient<T> CreateRequestClient<T>(ConsumeContext context, Uri destinationAddress, RequestTimeout timeout);

    IRequestClient<T> CreateRequestClient<T>(Uri destinationAddress, RequestTimeout timeout);

    RequestHandle<T> CreateRequest<T>(T request, Uri destinationAddress, CancellationToken cancellationToken, RequestTimeout timeout);

    RequestHandle<T> CreateRequest<T>(ConsumeContext context, T request, Uri destinationAddress, CancellationToken cancellationToken, RequestTimeout timeout);
}
```

通过 IBusControl 的 CreateClientFactory 方法可以得到 ClientFactory

#### IRequestClient <a href="#irequestclient" id="irequestclient"></a>

```
public interface IRequestClient<TRequest>
    where TRequest : class
{
    RequestHandle<TRequest> Create(TRequest request, CancellationToken cancellationToken, RequestTimeout timeout);

    Task<Response<T>> GetResponse<T>(TRequest request, CancellationToken cancellationToken, RequestTimeout timeout);
}
```

RequestClient 可以创建请求，或者直接获得响应

#### Send a request <a href="#send-a-request" id="send-a-request"></a>

```
var serviceAddress = new Uri("rabbitmq://localhost/check-order-status");
var client = bus.CreateRequestClient<CheckOrderStatus>(serviceAddress);

var response = await client.GetResponse<OrderStatusResult>(new { OrderId = id});
```
