# 模块二 基础巩固 RabbitMQ 工作队列和交换机

## 2.6.4 RabbitMQ -- 工作队列和交换机 <a href="#id-264rabbitmq-gong-zuo-dui-lie-he-jiao-huan-ji" id="id-264rabbitmq-gong-zuo-dui-lie-he-jiao-huan-ji"></a>

* WorkQueue
* Publish/Subscribe
* Routing
* EmitLog

### WorkQueue <a href="#workqueue" id="workqueue"></a>

WorkQueue：<https://www.rabbitmq.com/tutorials/tutorial-two-dotnet.html>

* 一个消息生产者，多个消息消费者
* exchange 交换机自动恢复
* 对消息进行持久化
* 手动确认消息

#### 对消息进行持久化 <a href="#dui-xiao-xi-jin-hang-chi-jiu-hua" id="dui-xiao-xi-jin-hang-chi-jiu-hua"></a>

```
var properties = channel.CreateBasicProperties();
properties.Persistent = true;

channel.BasicPublish(exchange: "",
                     routingKey: "task_queue",
                     basicProperties: properties,
                     body: body);
```

#### 手动确认消息 <a href="#shou-dong-que-ren-xiao-xi" id="shou-dong-que-ren-xiao-xi"></a>

autoAck: false

```
channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer);
```

手动调用 BasicAck

```
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
```

修改接收端为手动确认消息

autoAck: false

```
channel.BasicConsume(queue: "hello",
    autoAck: false,
    consumer: consumer);
```

BasicAck

```
consumer.Received += (model, ea) =>
{
    var body = ea.Body.ToArray();
    var message = Encoding.UTF8.GetString(body);

    Thread.Sleep(2000);// 演示多个接收端

    channel.BasicAck(ea.DeliveryTag, false);
    Console.WriteLine(" [x] Received {0}", message);
};
```

启动多个接收端

![](https://3083743005-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2F8gwpNo3eyzHkX0O40HRA%2Fuploads%2FrthrE8Iluo08EmorymZ5%2F222.jpg?alt=media\&token=54d88b4f-ccd9-43dd-b31b-9eb44234e9ea)

### Publish/Subscribe <a href="#publishsubscribe" id="publishsubscribe"></a>

Publish/Subscribe：<https://www.rabbitmq.com/tutorials/tutorial-three-dotnet.html>

Fanout 交换机，每个队列都会收到

```
channel.ExchangeDeclare("logs", ExchangeType.Fanout);
```

### Routing <a href="#routing" id="routing"></a>

Routing：<https://www.rabbitmq.com/tutorials/tutorial-four-dotnet.html>

Bindings

```
channel.QueueBind(queue: queueName,
                  exchange: "logs",
                  routingKey: "");
```

Direct exchange

```
channel.ExchangeDeclare(exchange: "direct_logs", type: "direct");
```

### EmitLog <a href="#emitlog" id="emitlog"></a>

新建控制台项目 EmitLogDirect，ReceiveLogsDirect

发送端

```
namespace EmitLogDirect
{
    class EmitLogDirect
    {
        public static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using(var connection = factory.CreateConnection())
            using(var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);// 声明交换机

                var severity = (args.Length > 0) ? args[0] : "info";
                var message = (args.Length > 1)
                    ? string.Join(" ", args.Skip( 1 ).ToArray())
                    : "Hello World!";
                var body = Encoding.UTF8.GetBytes(message);

                channel.BasicPublish(exchange: "direct_logs",
                    routingKey: severity,// 路由 Key 自动带上严重级别
                    basicProperties: null,
                    body: body);

                Console.WriteLine(" [x] Sent '{0}':'{1}'", severity, message);
            }

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}
```

![](https://3083743005-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2F8gwpNo3eyzHkX0O40HRA%2Fuploads%2FljpE1QVp3ehgF5lTNLj3%2F223.jpg?alt=media\&token=9a710e8b-6a65-4140-a7ca-53e0913fc63d)

error 级别单独发送到一个队列

接收端

```
namespace ReceiveLogsDirect
{
    class ReceiveLogsDirect
    {
        public static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);// 声明交换机

                var queueName = channel.QueueDeclare().QueueName;

                if (args.Length < 1)
                {
                    Console.Error.WriteLine("Usage: {0} [info] [warning] [error]",
                        Environment.GetCommandLineArgs()[0]);
                    Console.WriteLine(" Press [enter] to exit.");
                    Console.ReadLine();
                    Environment.ExitCode = 1;
                    return;
                }

                foreach (var severity in args)
                {
                    channel.QueueBind(queue: queueName,
                        exchange: "direct_logs",
                        routingKey: severity);// 路由 Key 自动带上严重级别
                }

                Console.WriteLine(" [*] Waiting for messages.");

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    var routingKey = ea.RoutingKey;
                    Console.WriteLine(" [x] Received '{0}':'{1}'",
                        routingKey, message);
                };
                channel.BasicConsume(queue: queueName,
                    autoAck: true,
                    consumer: consumer);

                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}
```

替换发送端，接收端的 localhost 为服务器地址

接收端控制台启动

```
dotnet run info waring error
```

发送端控制台启动

```
dotnet run info

dotnet run error

dotnet run waring test
```

接收端输出

```
 [x] Received 'info':'Hello World!'
 [x] Received 'error':'Hello World!'
 [x] Received 'waring':'test'
```

### GitHub源码链接： <a href="#github-yuan-ma-lian-jie" id="github-yuan-ma-lian-jie"></a>

<https://github.com/MingsonZheng/ArchitectTrainingCamp>
