一. 背景
RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现. AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息队列中间件主要用于组件之间的解耦 ,消息的发送者无需知道消息使用者的存在,消息使用者亦不需要知道发送者的存在. 以及 异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构
二. 应用场景 (简单介绍几种)
1异步处理
场景说明:用户注册后,需要发注册邮件和注册短信。传统的做法有两种 1.串行的方式;2.并行方式
a、串行方式:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端。
b、并行方式:将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间
假设三个业务节点每个使用50毫秒钟,不考虑网络等其他开销,则串行方式的时间是150毫秒,并行的时间可能是100毫秒。
因为CPU在单位时间内处理的请求数是一定的,假设CPU1秒内吞吐量是100次。则串行方式1秒内CPU可处理的请求量是7次(1000/150)。并行方式处理的请求量是10次(1000/100)
小结:如以上案例描述,传统的方式系统的性能(并发量,吞吐量,响应时间)会有瓶颈。如何解决这个问题呢?
引入消息队列,将不是必须的业务逻辑,异步处理。改造后的架构如下:
按照以上约定,用户的响应时间相当于是注册信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20 QPS。比串行提高了3倍,比并行提高了两倍。
2应用解耦
场景说明:用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口。如下图:
传统模式的缺点:假如库存系统无法访问,则订单减库存将失败,从而导致订单失败,订单系统与库存系统耦合
如何解决以上问题呢?引入应用消息队列后的方案,如下图:
订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功
库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作
假如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦
3流量削锋
流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛。
应用场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。
a、可以控制活动的人数
b、可以缓解短时间内高流量压垮应用
用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面。
秒杀业务根据消息队列中的请求信息,再做后续处理
三. rabbitMq基本安装使用
1.安装rabbitMQ之前,需要先安装Erlang 官方下载地址: https://www.erlang.org/downloads 全部点击下一步就行 可自行选择安装路径
2.安装rabbitMq 官方下载地址: https://www.rabbitmq.com/download.html 可自行选择安装路径
需要注意:默认安装的RabbitMQ 监听端口是5672
3.激活RabbitMQ
命令: 例 C:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.5\sbin\rabbitmq-plugins.bat enable rabbitmq_management
4.重启RabbitMQ
命令: net stop RabbitMQ && net start RabbitMQ
5.查询 创建用户和分配管理员权限
查看已有用户及用户的角色:
rabbitmqctl.bat list_users
新增一个用户:
rabbitmqctl.bat add_user username password
给创建的用户分配超级管理员权限
rabbitmqctl.bat set_user_tags username administrator
四. Rabbit Mq的管理控制台
1.使用浏览器打开 http://localhost:15672 访问Rabbit Mq的管理控制台,使用刚才创建的账号登陆系统
2.添加Virtual Host
五. 上代码
生产者端代码
static void Main(string[] args)
{
string exchangeName = "TestChange";
string queueName = "queueName";
string routeKey = "routeKey";
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory
{
UserName = "yan",//用户名
Password = "123456",//密码
HostName = "127.0.0.1",//rabbitmq ip
Port = 5672,//监听端口
VirtualHost = "TestHost"
};
//创建连接
var connection = factory.CreateConnection();
//创建通道
var channel = connection.CreateModel();
//定义一个Direct类型交换机
channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, true, false, null); //参数 durable: 是否队列持久化, 队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失
//定义一个队列
channel.QueueDeclare(queueName, true, false, false, null); // 参数 durable: 是否队列持久化
//将队列绑定到交换机
channel.QueueBind(queueName, exchangeName, routeKey, null);
var basicProperties = channel.CreateBasicProperties();
basicProperties.DeliveryMode = 2; //传输模式即DeliveryModel属性为1,1代表非持久化,2为可持久化
Console.WriteLine($"\nRabbitMQ连接成功,Exchange:{exchangeName},Queue:{queueName},Route:{routeKey},\n\n请输入消息,输入exit退出!");
string input;
do
{
input = Console.ReadLine();
var sendBytes = Encoding.UTF8.GetBytes(input);
//发送方确认机制
channel.ConfirmSelect();
//发布消息
channel.BasicPublish(exchangeName, routeKey, basicProperties, sendBytes);
if (channel.WaitForConfirms())//此函数必须在开启确认机制下使用
{
Console.WriteLine("发布成功");
}
} while (input.Trim().ToLower() != "exit");
channel.Close();
connection.Close();
}
消费者代码
static void Main(string[] args)
{
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory
{
UserName = "yan",//用户名
Password = "123456",//密码
HostName = "127.0.0.1",//rabbitmq ip
Port = 5672,
VirtualHost = "TestHost"
};
//创建连接
var connection = factory.CreateConnection();
//创建通道
var channel = connection.CreateModel();
//事件基本消费者
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
//接收到消息事件
consumer.Received += (ch, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body);
Console.WriteLine($"收到消息: {message} ");
//确认该消息已被消费
channel.BasicAck(ea.DeliveryTag, false); //这是ack确认机制 当BasicConsume里的autoAck(true时自动确认) 设置为false即手动确认 如果不确认返回该消费成功的消息 则该队列会一直存在内存当中 会有内存溢出的风险
};
//启动消费者 设置为手动应答消息
channel.BasicConsume("queueName", false, consumer); //BasicConsume里的autoAck(true时自动确认) 设置为false即手动确认
Console.WriteLine("消费者已启动");
Console.ReadKey();
channel.Dispose();
connection.Close();
}
生产者发布两条消息
消费者接收消息
六: 注意事项
生产者端
1.定义接换机类型和队列时是否开启队列持久化 没有则rabbitMq服务重启时 队列丢失
2.传输模式是否开启消费持久化 没有则rabbitMq服务重启时消息丢失
3.channel.WaitForConfirms()发送成功与否回调函数 必须在开启发送方确认机制下使用
消费者端
4 消息消费确认机制 channel.BasicAck 当设为手动消费时 必须返回消息是否被确认 否则 则该队列消息会一直存在内存当中 有内存溢出的风险
七: 建议
1 也许你并不需要使用消息列队
消息队列是一个能让你获得容错性,分布式,解耦等架构能力的系统, 光听起来好像很不错 但是实际应用中时 如 用户注册成功需要发送信息和邮件 使用MQ确实可以节省依次执行的时间 但是如果用户发送信息后在发邮件用的时间为0.2秒 使用MQ发送为0.1秒 那在用户体验度上就毫无区别 反之还会增大使用MQ增大项目的复杂度
2 系统可用性降低
系统引入的外部依赖越多,越容易挂掉。例如: 本来你是 A 系统调用 BCD 三个系统的接口就好了,人 ABCD 四个系统好好的,没啥问题,为了解耦你偏加个 MQ 进来,万一 MQ 挂了咋整,MQ 一挂,整套系统崩溃的,你不就完了?如何保证消息队列的高可用 点击这里查看
3 一致性问题
A 系统处理完了直接返回成功了,人都以为你这个请求就成功了;但是问题是,要是 BCD 三个系统那里,BD 两个系统写库成功了,结果 C 系统写库失败了,咋整?你这数据就不一致了。 (解决方案 可以将消费失败的异常信息收集起来 做后续处理)
所以消息队列实际是一种非常复杂的架构,你引入它有很多好处,但是也得针对它带来的坏处做各种额外的技术方案和架构来规避掉,做好之后,你会发现,妈呀,系统复杂度提升了一个数量级,也许复杂了 N多 倍。但是关键的时候,用,还是得用的
原文:https://www.cnblogs.com/fuyuncode/p/12097847.html