大话RabbitMQ 基础入门

C#

浏览数:8

2019-5-15

———-写在前面———-

近些年微服务越来越火,让我也忍不住想去一窥微服务究竟,讲到微服务,就离不开分布式,而分布式,也离不开消息队列,在消息队列中,RabbitMQ可以说是比较具有代表性的一款。

这里是一篇介绍消息队列以及各种消息队列产品对比的文章,讲得很好,有兴趣的可以看一看。

https://cloud.tencent.com/developer/article/1006035

在讲RabbitMQ之前,首先需要在电脑上安装和配置RabbitMQ,网络上已经有很多这类文章,如果懒得去搜索,可以看看这篇介绍如何安装配置RabbitMQ的文章。

https://blog.csdn.net/weixin_39735923/article/details/79288578

其中,在安装RabbitMQ的过程中,遇到了一个坑,在启用RabbltMQ的管理界面执行

rabbitmq-plugins enable rabbitmq_management

命令时,出现了以下这样的报错

 

可以在该指令前加上 .\ 即

.\rabbitmq-plugins enable rabbitmq_management

祝安装顺利 !!

 ——-正文——

基本概念

下面是在.Net中使用RabbitMQ要明白的一些名词概念。

 

综上所诉,他们之间的关系可以用我下面的 丑图 表示。

 

在图中,没有吧Routing key画出。Producer每一次发送消息,除了发出消息本身,还会随着消息带上一个routingKey,而且每一次将Exchange和Queue绑定,大体需要三个参数,

string queueName, string exchangeName, string routingKey

其中也有一个routingKey,但此RoutingKey非彼Routingkey。

大白话

对这个过程,我们可以理解为国家给灾区发送救灾物资,国家给当地政府划拨物资的时候,会规定,谁才能拿到这批物资,如(房子倒了的.家里有人受伤了的.家庭经济困难的)。

而当地政府在分配这批物资之前,为了方便物资的分配,会给每个家庭贴上一个标签,如

家庭A 经济困难

家庭B 房子倒了.经济困难

家庭C 家庭富有.房子倒了

家庭D 房子倒了的.家里有人受伤了的.家庭经济困难的

所以,发送消息时候的routingKey就是国家规定的那批物质分配规则。

而Exchange和Queue绑定时的RoutingKey可以理解为当地政府给每个家庭贴上的一个标签。

Exchange(交换机)转发消息的规则也有很多种:direct, topic, headers(不常用) 和 fanout,我们称之为交换类型。

我们可以把Exchange理解为分配这批物质的政府,现在国家规定了宏观的分配方向(发送消息时的routingKey),每个家庭也有了家庭情况的标签(绑定Exchange时的routingKey),但是这个物资具体怎么分,还是当地政府说了算。

Direct 严格按照国家规定来,只有房子倒了的,家里有人受伤了的而且家庭经济困难的才能分到救灾物资。    家庭D能分到

Fanout 只要是灾区的居民都能分到, 不管家庭情况如何。 家庭A\B\C\D都能分到

Topic 主题匹配: 只要家庭情况在国家规定分配规则内的,都能分到物资,但是家庭C分不到,因为他家太有钱了,这个条件不在国家的分配规则里。家庭A\B\D能分到

所以,我们在声明一个Exchange(交换机)的同时,还要指定该交换机的类型,即(当地政府怎么来分救灾物资)

其实,用这个例子,我是想说,生产者和消费者之间,就像国家与难民之间一样,国家只知道,我要帮助难民,但是难民有谁,物资能不能分到难民手里,还得当地政府说了算,你就说我这个例子恰不恰当吧!哈哈😄

 好了,懂了概念,我们再来结合具体例子看看。

Fanout

Producer.cs的代码

using System;
using System.Text;
using RabbitMQ.Client;

namespace _2_Publish
{
    class Program
    {
        static void Main(string[] args)
        {
            //创建连接工厂
            var factory = new ConnectionFactory() { HostName = "localhost" };

            //创建连接
            using (var connection = factory.CreateConnection())
            {
                //创建会话
                using (var chancel = connection.CreateModel())
                {
                    //生命交换机
                    chancel.ExchangeDeclare(exchange: "FanoutDemo", type: ExchangeType.Fanout);

                    string readMsg = "helloWorld";
                    while (readMsg.ToLower() != "exit")
                    {
                        var body = Encoding.UTF8.GetBytes(readMsg);

                        //给交换机发送消息
                        chancel.BasicPublish(exchange: "FanoutDemo", routingKey: "", body: body);
                        Console.WriteLine($"成功发送消息{readMsg}");
                        Console.WriteLine("请输入要发送的内容!");
                        readMsg = Console.ReadLine();
                    }
                }
            }
        }
    }
}

Customer.cs代码

using System;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace _2_Receiver
{
    class Program
    {
        static void Main(string[] args)
        {
            //创建连接工厂
            var factory = new ConnectionFactory() { HostName = "localhost" };

            //创建连接
            using (var connection = factory.CreateConnection())
            {
                //创建会话
                using (var channel = connection.CreateModel())
                {
                    //声明一个Fanout类型的交换机
                    channel.ExchangeDeclare(exchange: "FanoutDemo", type: ExchangeType.Fanout);

                    //声明一个消息队列并获取它的名字
                    var queueName = channel.QueueDeclare().QueueName;

                    //把消息队列和交换机绑定
                    channel.QueueBind(exchange: "FanoutDemo", queue: queueName, routingKey: "");

                    //创建消费者
                    var consume = new EventingBasicConsumer(channel);

                    //把消费者和队列绑定
                    channel.BasicConsume(queue: queueName, autoAck: true,consumer: consume);

                    consume.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        Console.WriteLine($"收到消息{message}");
                    };

                    Console.ReadLine();
                }
            }
        }
    }
}

在上面的代码中,无论是在生产者的发送消息里

                         //给交换机发送消息

                        chancel.BasicPublish(exchange: “FanoutDemo”, routingKey: “”, body: body);

还是消费者所在的Queue的绑定里

                 //把消息队列和交换机绑定

                    channel.QueueBind(exchange: “FanoutDemo”, queue: queueName, routingKey: “”);

我们都没有制定routingKey,因为没个人都能获取消息,所以此处,声明routingKey就没有意义了。

我们看看运行效果。

运行了三个消费者,当生产者发出消息时,三个消费者都收到了相同的消息。可以理解为广播模式。(Customer单词拼写错了,图片修改不方便,就不改了,大家将就一下)

Direct

Direct时严格匹配的,只有队列绑定的RoutingKey与生产者发送消息时指定的RoutingKey完全相同,才能接收成功。

Producer.cs

using System;
using System.Text;
using RabbitMQ.Client;

namespace _2_Publish
{
    class Program
    {
        static void Main(string[] args)
        {
            //创建连接工厂
            var factory = new ConnectionFactory() { HostName = "localhost" };

            //创建连接
            using (var connection = factory.CreateConnection())
            {
                //创建会话
                using (var chancel = connection.CreateModel())
                {
                    //生命交换机
                    chancel.ExchangeDeclare(exchange: "DirectDemo", type: ExchangeType.Direct);

                    string readMsg = "helloWorld";
                    while (readMsg.ToLower() != "exit")
                    {
                        var body = Encoding.UTF8.GetBytes(readMsg);

                        //给交换机发送消息
                        chancel.BasicPublish(exchange: "DirectDemo", routingKey: "Direct.Key", body: body);
                        Console.WriteLine($"成功发送消息{readMsg}");
                        Console.WriteLine("请输入要发送的内容!");
                        readMsg = Console.ReadLine();
                    }
                }
            }
        }
    }
}

我把Exchange的类型更改为Direct类型,并且发送消息的routingKey设置为Direct.Key。

然后我们来定义消费者

Customer.CS

using System;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace _2_Receiver
{
    class Program
    {
        static void Main(string[] args)
        {
            //创建连接工厂
            var factory = new ConnectionFactory() { HostName = "localhost" };

            //创建连接
            using (var connection = factory.CreateConnection())
            {
                //创建会话
                using (var channel = connection.CreateModel())
                {
                    //声明一个Fanout类型的交换机
                    channel.ExchangeDeclare(exchange: "DirectDemo", type: ExchangeType.Direct);

                    //声明一个消息队列并获取它的名字
                    var queueName = channel.QueueDeclare().QueueName;

                    Console.WriteLine("请输入RoutingKey!");
                    var routingKey = Console.ReadLine();
                    //把消息队列和交换机绑定
                    channel.QueueBind(exchange: "DirectDemo", queue: queueName, routingKey: routingKey);

                    //创建消费者
                    var consume = new EventingBasicConsumer(channel);

                    //把消费者和队列绑定
                    channel.BasicConsume(queue: queueName, autoAck: true,consumer: consume);

                    Console.WriteLine("开始监听消息");

                    consume.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        Console.WriteLine($"收到消息{message}");
                    };

                    Console.ReadLine();
                }
            }
        }
    }
}

消费者的RoutingKey再控制台输入,

运行效果如下:

可以看到,只有RoutingKey为Direct.Key的消费者才收到了生产者发出的消息。

Topic

RabbitMQ 中的 RouteKey 支持绑定键表达式写法,有两种主要的绑定键:

*(星号)可以代替一个单词.

# (井号) 可以代替0个或多个单词.

比如在下面这个图中(P为发送者,X为RabbitMQ中的Exchange,C为消费者,Q为队列)

在这个示例中,我们将发送一条关于动物描述的消息,也就是说 Name(routeKey) 字段中的内容包含 3 个单词。第一个单词是描述速度的(celerity),第二个单词是描述颜色的(colour),第三个是描述哪种动物的(species),它们组合起来类似:“..”。

然后在使用 CapSubscribe 绑定的时候,Q1绑定为 CapSubscribe["*.orange.*"], Q2 绑定为CapSubscribe["*.*.rabbit"] 和 [CapSubscribe["lazy.#]

那么,当发送一个名为 “quick.orange.rabbit” 消息的时候,这两个队列将会同时收到该消息。同样名为 lazy.orange.elephant的消息也会被同时收到。另外,名为 “quick.orange.fox” 的消息将仅会被发送到Q1队列,名为 “lazy.brown.fox” 的消息仅会被发送到Q2。”lazy.pink.rabbit” 仅会被发送到Q2一次,即使它被绑定了2次。”quick.brown.fox” 没有匹配到任何绑定的队列,所以它将会被丢弃。

另外一种情况,如果你违反约定,比如使用 4个单词进行组合,例如 “quick.orange.male.rabbit”,那么它将匹配不到任何的队列,消息将会被丢弃。

但是,假如你的消息名为 “lazy.orange.male.rabbit”,那么他们将会被发送到Q2,因为 #(井号)可以匹配 0 或者多个单词。

我们结合代码来看一看。

Producer.cs

using System;
using System.Text;
using RabbitMQ.Client;

namespace _2_Publish
{
    class Program
    {
        static void Main(string[] args)
        {
            //创建连接工厂
            var factory = new ConnectionFactory() { HostName = "localhost" };

            //创建连接
            using (var connection = factory.CreateConnection())
            {
                //创建会话
                using (var chancel = connection.CreateModel())
                {
                    //生命交换机
                    chancel.ExchangeDeclare(exchange: "TopicDemo", type: ExchangeType.Topic);

                    string readMsg = "helloWorld";
                    while (readMsg.ToLower() != "exit")
                    {
                        var body = Encoding.UTF8.GetBytes(readMsg);

                        //给交换机发送消息
                        chancel.BasicPublish(exchange: "TopicDemo", routingKey: "Topic.Demo.Key", body: body);
                        Console.WriteLine($"成功发送消息{readMsg}");
                        Console.WriteLine("请输入要发送的内容!");
                        readMsg = Console.ReadLine();
                    }
                }
            }
        }
    }
}

我给发送消息的routingKey指定为Topic.Demo.Key

再来看看消费者

Cuustomer.cs

using System;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace _2_Receiver
{
    class Program
    {
        static void Main(string[] args)
        {
            //创建连接工厂
            var factory = new ConnectionFactory() { HostName = "localhost" };

            //创建连接
            using (var connection = factory.CreateConnection())
            {
                //创建会话
                using (var channel = connection.CreateModel())
                {
                    //声明一个Fanout类型的交换机
                    channel.ExchangeDeclare(exchange: "TopicDemo", type: ExchangeType.Topic);

                    //声明一个消息队列并获取它的名字
                    var queueName = channel.QueueDeclare().QueueName;

                    Console.WriteLine("请输入RoutingKey!");
                    var routingKey = Console.ReadLine();
                    //把消息队列和交换机绑定
                    channel.QueueBind(exchange: "TopicDemo", queue: queueName, routingKey: routingKey);

                    //创建消费者
                    var consume = new EventingBasicConsumer(channel);

                    //把消费者和队列绑定
                    channel.BasicConsume(queue: queueName, autoAck: true,consumer: consume);

                    Console.WriteLine("开始监听消息");

                    consume.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        Console.WriteLine($"收到消息{message}");
                    };

                    Console.ReadLine();
                }
            }
        }
    }
}

其RoutingKey也是在外部输入。

我们看看运行效果

因为Producer发布消息的RoutingKey是Topic.Demo.Key

又因为#可以代表0个或者多个单词 ,*能代表一个单词

所以*.*.Key    Topic.#与Topic.Demo.Key匹配,而其他两个*.Key和test.1.2当然是不匹配的,所以没有收到消息。

总结

对于上面的例子,我们可以总结出,编写一个生产者的过程如下:

创建连接工厂-》创建连接-》创建会话(Chanel)-》创建交换机(Exchange)-》发送消息

编写一个生产者的过程如下:

创建连接工厂-》创建连接-》创建会话(Chanel)-》创建交换机(Exchange)-》创建队列-》绑定队列和交换机-》创建消费者-》把消费者和队列绑定-》监听消息

掌握这个大的方向,不管交换机怎么分配,代码应该都会写了。

为什么在生产者中和消费者中都要创建交换机呢? 因为我们不确定是生产者先执行还是消费者先执行,所以提前创建一下,避免连接时发现没有创建交换机,出现错误,如果交换机已经创建了,那么默认不会再次创建的。

另外,交换机创建后,同一名称的交换机使用完不会自动删除,但是第二次如果创建的名称和上次一样,但是交换机类型不一样了,那么便会出现报错。

这里总结的是一些RabbitMQ的基础知识,后面还会继续写一些更深入的使用技巧,如果不想错过精彩信息,点击关注一下吧(๑¯◡¯๑)!

 

作者:码农阿宇