消息队列核心目的:作为第三方服务,完成消息的(时序)处理。
RabbitMQ中,将这个消息中间件的流程,抽象成几种元素的调度关系:
Producer: 创建消息的主体
Exchange,交换器,负责按照消息的属性,分配对应的队列
Queue,队列,负责存放消息的数据结构
Consumer,负责处理消息的主体
RabbitMQ地址:https://www.rabbitmq.com/download.html
Erlang地址:https://www.erlang.org/downloads
下载安装即可,值得注意的时,RabbitMQ是基于Erlang完成的,所以还需要先安装Erlang
安装完毕,可以启用web管理
执行命令:
rabbitmq-plugins enable rabbitmq_management重启RabbitMQ:
rabbitmq-service stop rabbitmq-service start
Web管理默认端口为15672
默认用户名:guest / guest
若使用云服务器,记得查看端口安全规则,是否放行15672,5672两个端口
实现一个单队列的消息队列
通过网页管理,创建一个指定的队列TestMsgQueue,避免在后续代码中申明,编写额外代码。
1、生成者,随机生成10条消息
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using RabbitMQ.Client; namespace RabbitMQ_Producer { class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) { for (int i = 0; i < 10; i++) { System.Threading.Thread.Sleep(20); Task.Factory.StartNew(() => { using (var channel = connection.CreateModel()) { Random r = new Random(); int n = r.Next(2000); string message = "Hello World! Message Num:" + n; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "TestMsgExchange", routingKey: "TestMsgQueue", basicProperties: null, body: body); Console.WriteLine(" [x] Sent {0}", message); } }); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } } }
2、消费者,监听消息队列进行处理
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Text; namespace RabbitRQ_Consumer { class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); //模拟消费者的处理时间 Random r = new Random(); int n = r.Next(2000); System.Threading.Thread.Sleep(n); Console.WriteLine(" [x] Received {0}", message); }; channel.BasicConsume(queue: "TestMsgQueue", autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } } }
原文:https://www.cnblogs.com/longt/p/11811310.html