之前待了7年的公司倒闭,终于找了一份真正的程序员工作,总算体验了996的感觉,现在项目接近尾声了,总算有点时间下写博客了。找工作时一直看到要求RabbitMQ跟CAP的要求,这几天找了时间学习了下。
以下我的理解说法不知规不规范,只是用我最通俗的理解写出来
RabbitMQ是一种底层队列的实现,CAP提供了一种通用队列发布、订阅使用方法。
可以理解为SqlServer、MySql跟EF的关系吧(EF比作为CAP),你不通过EF,也可以用SqlClient相关类来使用SqlServer,但EF提供一种通用的代码使用方法,同样的代码可以同时用于SqlServer、MySql,代码使用者不用关心我底层是用了SqlServer还是MySql。
RabbitMQ简单介绍
我先说下RabbitMQ原生的使用方法,然后再说下怎么跟CAP结合
说到队列,先理解下以下几个概念
消息从发布到订阅的流程步骤:
生产者发布消息给交换器(传递一个key值),交换器在它绑定的队列中根据key值及交换器模式找到匹配的队列发送消息,订阅了此队列的消费者就可以获取消息进行处理,并返回应答;
交换器模式
topic匹配规则:队列的key为TestRouteKey.#,可以匹配到 TestRouteKey.A.B,队列的key为TestRouteKey.*,可以匹配到TestRouteKey.A
以下是需要注意理解的点
以下演示,可以创建两个控制台程序,然后在Main里面写相关代码进行测试
生产者发布消息代码
安装包 RabbitMQ.Client
//连接工厂 var factory = new ConnectionFactory(){ UserName="", Password="", HostName="", Port=0 }; //创建连接 var connection = factory.CreateConnection(); //创建通道 var channel = connection.CreateModel(); //声明交换器,模式为direct channel.ExchangeDeclare("exchangeName","direct"); //声明队列 channel.QueueDeclare("queueName",durable:true); //将交换器跟队列进行绑定 channel.QueueBind("queueName","exchangeName","routeKey",null); //发布消息 channel.BasicPublish("exchangeName","routeKey",null,Encoding.UTF8.GetBytes("hello world")); channel.Close(); connection.Close();
消费者订阅消息
var factory = new ConnectionFactory { UserName = "", Password = "", HostName = "", }; //创建个连接 var connection = factory.CreateConnection(); //创建个通道 var channel = connection.CreateModel(); var consumer = new EventingBasicConsumer(channel); //定义事件消费者,及消费接收事件(返回应答) consumer.Received += (o, e) => { var message = Encoding.UTF8.GetString(e.Body.ToArray()); Console.WriteLine($"收到消息:{message}"); channel.BasicAck(e.DeliveryTag, false); }; //启动消费者,第二个参数是代表是否自动应答,false就得手动调用BasicAck方法 channel.BasicConsume("hello", false, consumer); Console.WriteLine("消费者已启动"); Console.ReadKey(); channel.Close(); connection.Close();
CAP简单介绍
上面简单的介绍完RabbitMQ使用方法,下面再来简单说下CAP是干什么的
CAP可用于微服务分布式事务解决方案,就是可以搭建不同站点,使用CAP,连接同一个RabbitMQ,部署在不同的服务器上,实现分布式部署。
那要实现CAP,需要一个数据库来记录事件,需要一个队列来存放事件消息。
CAP更详情的文档可查看它的官网,重点有中文的 http://cap.dotnetcore.xyz/
创建一个WebApi初始项目来演示一下。
安装包 DotNetCore.CAP
安装包DotNetCore.CAP.SqlServer,这是提供Sqlserver来记录事件的包
安装包 DotNetCore.CAP.RabbitMQ,这是提供RabbitMQ来存放事件消息的包
安装包 DotNetCore.CAP.Dashboard,这是提供一个Web管理后台可查看发布、订阅消息情况
在Startup.cs的ConfigureServices方法中注入
services.AddCap(o=>{ o.UseSqlServer(""); o.UseRabbitMQ(mq => { mq.HostName = "";//RabbitMQ服务器地址 mq.Port=5672; mq.UserName = "admin"; mq.Password = "admin"; }); o.UseDashboard(); //添加监控仪表盘,通过http://localhost/cap访问 o.FailedRetryInterval = 30;//失败后的重拾间隔,默认60秒 o.FailedRetryCount = 10;//失败后的重试次数,默认50次;在FailedRetryInterval默认60秒的情况下,即默认重试50*60秒(50分钟)之后放弃失败重试 o.SucceedMessageExpiredAfter = 60 * 60; //设置成功信息的删除时间默认24*3600秒 });
然后在Controllers目录下创建一个测试控制器
[ApiController] [Route("[controller]/[action]")] public class TestController : ControllerBase { private readonly ICapPublisher _capPublisher; public TestController(ICapPublisher capPublisher) { _capPublisher = capPublisher; } [HttpPost] public void Test1() { //发布消息,消息被订阅处理后,会回调到Test.Callback _capPublisher.Publish<string>("Test.Event", "Hello,World","Test.Callback"); } [NonAction] [CapSubscribe("Test.Event")] //订阅Test.Event事件 public string Test2(string message) { //进行订阅消息处理 Console.WriteLine(message); return "OK"; } [NonAction] [CapSubscribe("Test.Callback")] public void TestCallback(string result) { //发布消息完成后的回调 Console.WriteLine(result); } }
好了,上面就简单的介绍了RabbitMQ跟CAP的使用方法,本来还在想这些东西适用于哪些场景,然后今天项目上线后出现问题了,里面涉及到两个系统的调用,一个系统A因为接口被频繁地调用超时,导致另一个系统B一直显示出错,我就发现这个场景就很适合用这个CAP了。
系统A的崩溃不应影响到系统B,而系统A崩溃时也可以自动进行重试,当系统B发布消息后,也不用等待系统A,显示处理中,等系统A处理成功后再通知系统B,B再显示成功就可以了。
原文:https://www.cnblogs.com/caijt/p/14211271.html