# 模块二 基础巩固 RabbitMQ Masstransit 异常处理

## 2.6.8 RabbitMQ -- Masstransit 异常处理 <a href="#id-268rabbitmqmasstransit-yi-chang-chu-li" id="id-268rabbitmqmasstransit-yi-chang-chu-li"></a>

* 异常处理
* 其他
* 高级功能

### 异常处理 <a href="#yi-chang-chu-li" id="yi-chang-chu-li"></a>

* 异常与重试
* 重试配置
* 重试条件
* 重新投递信息
* 信箱

#### 异常与重试 <a href="#yi-chang-yu-zhong-shi" id="yi-chang-yu-zhong-shi"></a>

Exception

```
public class SubmitOrderConsumer :
    IConsumer<SubmitOrder>
{
    public Task Consume(ConsumeContext<SubmitOrder> context)
    {
        throw new Exception("Very bad things happened");
    }
}
```

UseMessageRetry

```
var sessionFactory = CreateSessionFactory();

var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
    cfg.Host("rabbitmq://localhost/");

    cfg.ReceiveEndpoint("submit-order", e =>
    {
        e.UseMessageRetry(r => r.Immediate(5));

        e.Consumer(() => new SubmitOrderConsumer(sessionFactory));
    });
});
```

#### 重试配置 <a href="#zhong-shi-pei-zhi" id="zhong-shi-pei-zhi"></a>

![](https://3083743005-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2F8gwpNo3eyzHkX0O40HRA%2Fuploads%2FxYbxlauSuMsddJmhv0Xp%2F229.jpg?alt=media\&token=8ab62635-fce6-433a-87a6-31bed83e1626)

```
// 立即重试：一共连续重试10次
ep.UseMessageRetry(r => r.Immediate(10));

// 间隔重试：一共重试10次，每次间隔10秒
ep.UseMessageRetry(r => r.Interval(10, TimeSpan.FromSeconds(10)));

// 多个间隔重试：5秒后第一次，5+10秒后第二次，5+10+15秒后第三次
ep.UseMessageRetry(r => r.Intervals(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(15)));

// 指数级间隔重试：共10次，每次间隔：当前重试次数 * 60秒
ep.UseMessageRetry(r => r.Exponential(10, TimeSpan.FromSeconds(60), TimeSpan.FromHours(24), TimeSpan.FromSeconds(60)));

// 每次叠加50秒
ep.UseMessageRetry(r => r.Incremental(10, TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(50)));
```

#### 重试条件 <a href="#zhong-shi-tiao-jian" id="zhong-shi-tiao-jian"></a>

```
e.UseMessageRetry(r => 
{
    r.Handle<ArgumentNullException>();
    r.Ignore(typeof(InvalidOperationException), typeof(InvalidCastException));
    r.Ignore<ArgumentException>(t => t.ParamName == "orderTotal");
});
```

过滤某些异常类型不进行重试

#### 重新投递信息 <a href="#zhong-xin-tou-di-xin-xi" id="zhong-xin-tou-di-xin-xi"></a>

```
cfg.ReceiveEndpoint("submit-order", e =>
{
    e.UseScheduledRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30)));
    e.UseMessageRetry(r => r.Immediate(5));
    e.Consumer(() => new SubmitOrderConsumer(sessionFactory));
});
```

消息冲队列移除之后，在一定时间之后重新投入消息队列。需要配置调度模块（scheduling）

### 信箱 <a href="#xin-xiang" id="xin-xiang"></a>

```
cfg.ReceiveEndpoint("submit-order", e =>
{
    e.UseScheduledRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30)));
    e.UseMessageRetry(r => r.Immediate(5));
    e.UseInMemoryOutbox();

    e.Consumer(() => new SubmitOrderConsumer(sessionFactory));
});
```

有些消息是在 consume 方法中发送或发布的，如果在发送之后 consume 中产生了异常，那原来发出去的消息就需要撤回，如果使用信箱之后，在 consume 中要发布/发送的消息就会先暂存在内存中直到 consume 方法成功之后才真正发出去

### 其他 <a href="#qi-ta" id="qi-ta"></a>

* Fault
* Consuming Faults
* Error Pipe
* Dead-Letter Pipe

#### Fault <a href="#fault" id="fault"></a>

```
public interface Fault<T>
    where T : class
{
    Guid FaultId { get; }
    Guid? FaultedMessageId { get; }
    DateTime Timestamp { get; }
    ExceptionInfo[] Exceptions { get; }
    HostInfo Host { get; }
    T Message { get; }
}
```

Fault 消息在异常的时候会发布出来

#### Consuming Faults <a href="#consuming-faults" id="consuming-faults"></a>

```
public class DashboardFaultConsumer :
    IConsumer<Fault<SubmitOrder>>
{
    public async Task Consume(ConsumeContext<Fault<SubmitOrder>> context)
    {
        // update the dashboard
    }
}
```

Fault 消息也是可以进行订阅的

#### Error Pipe <a href="#error-pipe" id="error-pipe"></a>

```
cfg.ReceiveEndpoint("input-queue", ec =>
{
    ec.DiscardFaultedMessages();
});
```

默认情况下错误的消息会被投递到了 \_error 队列，可以配置直接抛弃错误信息

#### Dead-Letter Pipe <a href="#dead-letter-pipe" id="dead-letter-pipe"></a>

```
cfg.ReceiveEndpoint("input-queue", ec =>
{
    ec.DiscardSkippedMessages();
});
```

死信队列：没有消费者的消息会被移到 \_skipped 队列，但可以配置为不移到 \_skipped 队列

### 高级功能 <a href="#gao-ji-gong-neng" id="gao-ji-gong-neng"></a>

* 持久化
* Saga 事件串
* 调度
* Courier 最终一致性
* 监控
