首页 > 其他 > 详细

RabbitMQ消息列队的认识与应用

时间:2019-12-25 17:27:28      阅读:103      评论:0      收藏:0      [点我收藏+]

一. 背景

RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现. AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息队列中间件主要用于组件之间的解耦 ,消息的发送者无需知道消息使用者的存在,消息使用者亦不需要知道发送者的存在. 以及 异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构

 

二. 应用场景 (简单介绍几种)

1异步处理
场景说明:用户注册后,需要发注册邮件和注册短信。传统的做法有两种 1.串行的方式;2.并行方式
a、串行方式:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端。

 技术分享图片

b、并行方式:将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间

 技术分享图片

假设三个业务节点每个使用50毫秒钟,不考虑网络等其他开销,则串行方式的时间是150毫秒,并行的时间可能是100毫秒。
因为CPU在单位时间内处理的请求数是一定的,假设CPU1秒内吞吐量是100次。则串行方式1秒内CPU可处理的请求量是7次(1000/150)。并行方式处理的请求量是10次(1000/100)
小结:如以上案例描述,传统的方式系统的性能(并发量,吞吐量,响应时间)会有瓶颈。如何解决这个问题呢?

引入消息队列,将不是必须的业务逻辑,异步处理。改造后的架构如下:

 技术分享图片

按照以上约定,用户的响应时间相当于是注册信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20 QPS。比串行提高了3倍,比并行提高了两倍。

2应用解耦
场景说明:用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口。如下图:

 技术分享图片

传统模式的缺点:假如库存系统无法访问,则订单减库存将失败,从而导致订单失败,订单系统与库存系统耦合

如何解决以上问题呢?引入应用消息队列后的方案,如下图:

 技术分享图片

订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功
库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作
假如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦

3流量削锋
流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛。
应用场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。
a、可以控制活动的人数
b、可以缓解短时间内高流量压垮应用

 技术分享图片

用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面。
秒杀业务根据消息队列中的请求信息,再做后续处理

 

三. rabbitMq基本安装使用

1.安装rabbitMQ之前,需要先安装Erlang 官方下载地址: https://www.erlang.org/downloads  全部点击下一步就行 可自行选择安装路径

2.安装rabbitMq 官方下载地址:  https://www.rabbitmq.com/download.html 可自行选择安装路径

 需要注意:默认安装的RabbitMQ 监听端口是5672

3.激活RabbitMQ

  命令: 例 C:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.5\sbin\rabbitmq-plugins.bat enable rabbitmq_management

4.重启RabbitMQ

  命令: net stop RabbitMQ && net start RabbitMQ

5.查询 创建用户和分配管理员权限

   查看已有用户及用户的角色:

   rabbitmqctl.bat list_users

   新增一个用户:

   rabbitmqctl.bat add_user username password

   给创建的用户分配超级管理员权限

   rabbitmqctl.bat set_user_tags username administrator

 

四.  Rabbit Mq的管理控制台

1.使用浏览器打开 http://localhost:15672访问Rabbit Mq的管理控制台,使用刚才创建的账号登陆系统

 技术分享图片

 

 

 技术分享图片

 

 

 2.添加Virtual Host

 技术分享图片

 

五. 上代码

 生产者端代码

 static void Main(string[] args)

        {

        string exchangeName = "TestChange";

        string queueName = "queueName";

        string routeKey = "routeKey";

 

//创建连接工厂

ConnectionFactory factory = new ConnectionFactory

        {

                UserName = "yan",//用户名

                Password = "123456",//密码

                HostName = "127.0.0.1",//rabbitmq ip

                Port = 5672,//监听端口

                VirtualHost = "TestHost"

            };

 

        //创建连接

        var connection = factory.CreateConnection();

        //创建通道

        var channel = connection.CreateModel();

 

            //定义一个Direct类型交换机

            channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, true, false, null);  //参数  durable: 是否队列持久化, 队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失

 

            //定义一个队列

            channel.QueueDeclare(queueName, true, false, false, null); // 参数 durable: 是否队列持久化

 

            //将队列绑定到交换机

            channel.QueueBind(queueName, exchangeName, routeKey, null);

 

            var basicProperties = channel.CreateBasicProperties();

            basicProperties.DeliveryMode = 2; //传输模式即DeliveryModel属性为1,1代表非持久化,2为可持久化

 

            Console.WriteLine($"\nRabbitMQ连接成功,Exchange:{exchangeName},Queue:{queueName},Route:{routeKey},\n\n请输入消息,输入exit退出!");

 

        string input;

            do

            {

                input = Console.ReadLine();

 

                var sendBytes = Encoding.UTF8.GetBytes(input);

 

                //发送方确认机制

                channel.ConfirmSelect();

 

                //发布消息

                channel.BasicPublish(exchangeName, routeKey, basicProperties, sendBytes);

 

                if (channel.WaitForConfirms())//此函数必须在开启确认机制下使用

                {

                    Console.WriteLine("发布成功");

                }

 

                } while (input.Trim().ToLower() != "exit");

 

            channel.Close();

        connection.Close();

}

 

消费者代码

        static void Main(string[] args)

        {

            //创建连接工厂

            ConnectionFactory factory = new ConnectionFactory

            {

                UserName = "yan",//用户名

                Password = "123456",//密码

                HostName = "127.0.0.1",//rabbitmq ip

                Port = 5672,

                VirtualHost = "TestHost"

            };

 

            //创建连接

            var connection = factory.CreateConnection();

            //创建通道

            var channel = connection.CreateModel();

 

            //事件基本消费者

            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

 

            //接收到消息事件

            consumer.Received += (ch, ea) =>

                {

 

                    var message = Encoding.UTF8.GetString(ea.Body);

 

                    Console.WriteLine($"收到消息: {message} ");

 

                    //确认该消息已被消费

                    channel.BasicAck(ea.DeliveryTag, false);  //这是ack确认机制 当BasicConsume里的autoAck(true时自动确认) 设置为false即手动确认 如果不确认返回该消费成功的消息 则该队列会一直存在内存当中 会有内存溢出的风险  

 

                };

 

            //启动消费者 设置为手动应答消息

            channel.BasicConsume("queueName", false, consumer);  //BasicConsume里的autoAck(true时自动确认) 设置为false即手动确认

            Console.WriteLine("消费者已启动");

 

            Console.ReadKey();

            channel.Dispose();

            connection.Close();

 

        }

 

生产者发布两条消息

 技术分享图片

 

消费者接收消息

 技术分享图片

  

六: 注意事项

生产者端

1.定义接换机类型和队列时是否开启队列持久化 没有则rabbitMq服务重启时 队列丢失

 技术分享图片

 2.传输模式是否开启消费持久化 没有则rabbitMq服务重启时消息丢失

 技术分享图片

3.channel.WaitForConfirms()发送成功与否回调函数 必须在开启发送方确认机制下使用

 技术分享图片

消费者端

4 消息消费确认机制  channel.BasicAck  当设为手动消费时 必须返回消息是否被确认 否则 则该队列消息会一直存在内存当中 有内存溢出的风险  

 技术分享图片

 

七: 建议

1 也许你并不需要使用消息列队

消息队列是一个能让你获得容错性,分布式,解耦等架构能力的系统, 光听起来好像很不错 但是实际应用中时 如 用户注册成功需要发送信息和邮件 使用MQ确实可以节省依次执行的时间 但是如果用户发送信息后在发邮件用的时间为0.2秒 使用MQ发送为0.1秒 那在用户体验度上就毫无区别 反之还会增大使用MQ增大项目的复杂度

2 系统可用性降低
系统引入的外部依赖越多,越容易挂掉。例如: 本来你是 A 系统调用 BCD 三个系统的接口就好了,人 ABCD 四个系统好好的,没啥问题,为了解耦你偏加个 MQ 进来,万一 MQ 挂了咋整,MQ 一挂,整套系统崩溃的,你不就完了?如何保证消息队列的高可用 点击这里查看

3 一致性问题

A 系统处理完了直接返回成功了,人都以为你这个请求就成功了;但是问题是,要是 BCD 三个系统那里,BD 两个系统写库成功了,结果 C 系统写库失败了,咋整?你这数据就不一致了。 (解决方案 可以将消费失败的异常信息收集起来 做后续处理)

所以消息队列实际是一种非常复杂的架构,你引入它有很多好处,但是也得针对它带来的坏处做各种额外的技术方案和架构来规避掉,做好之后,你会发现,妈呀,系统复杂度提升了一个数量级,也许复杂了 N多 倍。但是关键的时候,用,还是得用的

 

 

 

技术分享图片

RabbitMQ消息列队的认识与应用

原文:https://www.cnblogs.com/fuyuncode/p/12097847.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!