AMQP(高级消息队列协议)是一个网络协议。它支持符合要求的客户端应用(application)和消息中间件代理(messaging middleware broker)之间进行通信。RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,所以安装RabbitMq服务器端前需要先安装Erlang。以下记录在windows 7环境下的学习笔记。
Erlang下载地址:https://www.erlang.org/downloads
RabbitMQ服务端下载地址:https://www.rabbitmq.com/install-windows.html
安装好Erlang和RabbitMQ之后都需要新建系统变量,计算机-->属性-->高级系统设置-->环境变量-->系统变量新建
建系统变量时值为安装目录,我的安装目录:
Erlang:C:\Program Files\erl10.4
RabbitMQ:C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.15
新建系统变量后,还需要在已有系统变量PATH中增加Erlang和RabbitMQ的配置(注:是追加,PATH原有的值不要删除),以分号分隔。
Erlang:%ERLANG_HOME%\bin
RabbitMQ:%RABBITMQ_SERVER%\sbin
在cmd中运行如下命令:
"C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.15\sbin\rabbitmq-plugins.bat" enable rabbitmq_management
默认会生成一个账号guest,密码也是guest。
查看已有用户及用户的角色命令:rabbitmqctl.bat list_users
RabbitMQ安装后是默认启动服务的,若要手动启动、停止服务,执行如下命令:
启动:net start rabbitmq
停止:net stop rabbitmq
默认地址是本地端口15672,地址:http://127.0.0.1:15672,用guest账号登录。
消息(message)被发布者(publisher)发送给交换机(exchange),交换机常常被比喻成邮局或者邮箱。然后交换机将收到的消息根据路由规则分发给绑定的队列(queue)。最后AMQP代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。
Default exchange(默认交换机):一个由消息代理预先声明好的没有名字(名字为空字符串)的直连交换机(direct exchange)。它有一个特殊的属性使得它对于简单应用特别有用处:那就是每个新建队列(queue)都会自动绑定到默认交换机上,绑定的路由键(routing key)名称与队列名称相同。
Direct exchange(直连交换机):根据消息携带的路由键(routing key)将消息投递给对应队列的。
Fanout exchange(扇型交换机):将消息路由给绑定到它身上的所有队列,而不理会绑定的路由键。如果N个队列绑定到某个扇型交换机上,当有消息发送给此扇型交换机时,交换机会将消息的拷贝分别发送给这所有的N个队列。扇型用来交换机处理消息的广播路由(broadcast routing)。
Topic exchange(主题交换机):通过对消息的路由键和队列到交换机的绑定模式之间的匹配,将消息路由给一个或多个队列。主题交换机经常用来实现各种分发/订阅模式及其变种。主题交换机通常用来实现消息的多播路由(multicast routing)。
routing_key格式是以点号“."分割的字符表。对于routing_key,有两个特殊字符(在正则表达式里叫元字符):
*代表任意一个单词
#代表0个或者多个单词
由于有"*"和"#",Topic exchange 非常强大并且可以转化为其他的exchange:
如果binding_key是"#",它会接收所有的Message,不管routing_key是什么,就像是Fanout exchange。
如果"*"和"#"都没有被使用,那么topic exchange就变成了Direct exchange。
Headers exchange(头交换机):有时消息的路由操作会涉及到多个属性,此时使用消息头就比用路由键更容易表达,头交换机(headers exchange)就是为此而生的。头交换机使用多个消息属性来代替路由键建立路由规则。通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。
创建2个项目,1个发送端Send,1个接收端Receive,通过Nuget安装RabbitMQ.Client。
发送端代码:
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "hello",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
for (int i = 0; i < 10; i++)
{
var body = Encoding.UTF8.GetBytes(message + i);
channel.BasicPublish(exchange: "",//使用默认交换机
routingKey: "hello",
basicProperties: null, //new BasicProperties() { DeliveryMode = 2 },
body: body);
}
}
接收端代码:
var factory = new ConnectionFactory() { HostName = "localhost" };
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
channel.QueueDeclare(queue: "hello",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);//限制流量,向消费者一条条发送消息
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
var directory = AppContext.BaseDirectory + "/Log";
if (!Directory.Exists(directory))
{
Directory.CreateDirectory(directory);
}
using (FileStream fs = System.IO.File.Open(directory + $"/{DateTime.Now:yyyyMMdd}{ea.ConsumerTag}.txt", FileMode.Append, FileAccess.Write, FileShare.ReadWrite))
{
var sr = new StreamWriter(fs);
sr.WriteLine($"[{DateTime.Now:yyyyMMddHHmmssfff}]");
sr.WriteLine(ea.ConsumerTag + ":" + message);
sr.Flush();
sr.Close();
Thread.Sleep(100);//可复制一份代码,将等待的时间设置成200,测试多消费者
}
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);//手动消息确认
};
channel.BasicConsume(queue: "hello",
autoAck: false,//消息确认设置成手动
consumer: consumer);
更多的内容可以看以下文章:
https://blog.csdn.net/anzhsoft2008/column/info/rabbitmq
http://rabbitmq.mr-ping.com/AMQP/AMQP_0-9-1_Model_Explained.html
原文:https://www.cnblogs.com/hsybs/p/11165627.html