目录
什么是生产端的可靠性投递?
如果想保障消息百分百投递成功,只做到前三步不一定能够保障。有些时候或者说有些极端情况,比如生产端在投递消息时可能就失败了,或者说生产端投递了消息,MQ也收到了,MQ在返回确认应答时,由于网络闪断导致生产端没有收到应答,此时这条消息就不知道投递成功了还是失败了,所以针对这些情况我们需要做一些补偿机制。
?
?
(confirm)
给生产端生产端有一个Confirm Listener
,去异步的监听Broker
回送的响应,从而判断消息是否投递成功,如果成功,去数据库查询该消息,并将消息状态更新为1,表示消息投递成功
?
假设第二步OK了,在第三步回送响应时,网络突然出现了闪断,导致生产端的Listener就永远收不到这条消息的confirm应答了,也就是说这条消息的状态就一直为0了
(Retry Send)
,也就是从第二步开始继续往下走当然有些消息可能就是由于一些实际的问题无法路由到Broker,比如routingKey设置不对,对应的队列被误删除了,那么这种消息即使重试多次也仍然无法投递成功,所以需要对重试次数做限制,比如限制3次,如果投递次数大于三次,那么就将消息状态更新为2,表示这个消息最终投递失败。
?
针对这种情况如何去做补偿呢,可以有一个补偿系统去查询这些最终失败的消息,然后给出失败的原因,当然这些可能都需要人工去操作。
第一种可靠性投递,在高并发的场景下是否适合?
对于第一种方案,我们需要做两次数据库的持久化操作,在高并发场景下显然数据库存在着性能瓶颈。其实在我们的核心链路中只需要对业务进行入库就可以了,消息就没必要先入库了,我们可以做消息的延迟投递,做二次确认,回调检查。
当然这种方案不一定能保障百分百投递成功,但是基本上可以保障大概99.9%的消息是OK的,有些特别极端的情况只能是人工去做补偿了,或者使用定时任务去做都可以。
?
?
Upstream Service
上游服务也就是生产端,Downstream service
下游服务也就是消费端,Callback service
就是回调服务。
?
(Second Send Delay Check)
,即延迟消息投递检查,这里需要设置一个延迟时间,比如5分钟之后进行投递。confirm
消息,也就是回送响应,但是这里响应不是正常的ACK,而是重新生成一条消息,投递到MQ中。Callback service
是一个单独的服务,其实它扮演了第一种方案的存储消息的DB角色,它通过MQ去监听下游服务发送的confirm
消息,如果Callback service
收到confirm
消息,那么就对消息做持久化存储,即将消息持久化到DB中。Callback service
还是去监听延迟消息所对应的队列,收到Check消息后去检查DB中是否存在消息,如果存在,则不需要做任何处理,如果不存在或者消费失败了,那么Callback service
就需要主动发起RPC通信给上游服务,告诉它延迟检查的这条消息我没有找到,你需要重新发送,生产端收到信息后就会重新查询业务消息然后将消息发送出去。这么做的目的是少做了一次DB的存储,在高并发场景下,最关心的不是消息100%投递成功,而是一定要保证性能,保证能抗得住这么大的并发量。所以能节省数据库的操作就尽量节省,可以异步的进行补偿。
?
其实在主流程里面是没有这个Callback service的,它属于一个补偿的服务,整个核心链路就是生产端入库业务消息,发送消息到MQ,消费端监听队列,消费消息。其他的步骤都是一个补偿机制。
第二种方案也是互联网大厂更为经典和主流的解决方案。但是若对性能要求不是那么高,第一种方案要更简单
?
简单来说就是用户对于同一操作发起的一次请求或者多次请求的结果是一致的。
我们可以借鉴数据库的乐观锁机制来举个例子:
首先为表添加一个版本字段version
在执行更新操作前呢,会先去数据库查询这个version
然后执行更新语句,以version作为条件,例如:
UPDATE T_REPS SET COUNT = COUNT -1,VERSION = VERSION + 1 WHERE VERSION = 1
如果执行更新时有其他人先更新了这张表的数据,那么这个条件就不生效了,也就不会执行操作了,通过这种乐观锁的机制来保障幂等性。
?
重复消费问题:
当消费者消费完消息时,在给生产端返回ack时由于网络中断,导致生产端未收到确认信息,该条消息会重新发送并被消费者消费,但实际上该消费者已成功消费了该条消息,这就是重复消费问题。
?
唯一ID:业务表唯一的主键,如商品ID
指纹码:为了区别每次正常操作的码,每次操作时生成指纹码;可以用时间戳+业务编号或者标志位(具体视业务场景而定)
?
整个思路就是首先我们需要根据消息生成一个全局唯一的ID,然后还需要加上一个指纹码。这个指纹码它并不一定是系统去生成的,而是一些外部的规则或者内部的业务规则去拼接,它的目的就是为了保障这次操作是绝对唯一的。
将ID + 指纹码拼接好的值作为数据库主键,就可以进行去重了。即在消费消息前呢,先去数据库查询这条消息的指纹码标识是否存在,没有就执行insert操作,如果有就代表已经被消费了,就不需要管了。
?
这里只提用Redis的原子性去解决MQ幂等性重复消费的问题
注意:MQ的幂等性问题 根本在于的是生产端未正常接收ACK,可能是网络抖动、网络中断导致
?
我的方案:
MQ消费端在消费开始时 将 ID放入到Redis的BitMap中,MQ生产端每次生产数据时,从Redis的BitMap对应位置若不能取出ID,则生产消息发送,否则不进行消息发送。
但是有人可能会说,万一消费端,生产端Redis命令执行失败了怎么办,虽然又出现重复消费又出现Redis非正常执行命令的可能性极低,但是万一呢?
OK,我们可以在Redis命令执行失败时,将消息落库,每日用定时器,对这种极特殊的消息进行处理。
?
的核心保障
?
确认机制流程图
生产端发送消息到Broker,然后Broker接收到了消息后,进行回送响应,生产端有一个Confirm Listener
,去监听应答,当然这个操作是异步进行的,生产端将消息发送出去就可以不用管了,让内部监听器去监听Broker给我们的响应。
?
channel.confirmSelect()
addConfirmListener
,监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或记录日志等后续处理!public class Producer {
public static void main(String[] args) throws Exception {
//创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.244.11");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setHandshakeTimeout(20000);
//获取Connection
Connection connection = connectionFactory.newConnection();
//通过connection创建一个新的Channel
Channel channel = connection.createChannel();
//指定我们的消息投递模式
channel.confirmSelect();
String exchangeName = "test_confirm_exchange";
String routingkey = "confirm.save";
//发送一条信息
String msg = "Hello RabbitMQ Send confirm message!";
channel.basicPublish(exchangeName, routingkey, null, msg.getBytes());
//添加一个确认监听
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleNack(long deliveryTag, boolean multiple)
throws IOException {
System.out.println("-------no ack!---------");
}
@Override
public void handleAck(long deliveryTag, boolean multiple)
throws IOException {
System.out.println("--------ack!----------");
}
});
}
}
?
public class Consumer {
public static void main(String[] args) throws Exception{
//创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.244.11");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setHandshakeTimeout(20000);
//获取Connection
Connection connection = connectionFactory.newConnection();
//通过connection创建一个新的Channel
Channel channel = connection.createChannel();
String exchangeName = "test_confirm_exchange";
String routingkey = "confirm.#";
String queueName = "test_confirm_queue";
//声明交换机和队列 然后进行绑定和 设置 最后制定路由key
channel.exchangeDeclare(exchangeName, "topic",true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingkey);
//创建消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true,queueingConsumer);
while(true){
Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("消费端:" + msg);
}
}
}
?
运行说明
先启动消费端,访问管控台:http://ip:15672,检查Exchange和Queue是否设置OK,然后启动生产端,消息被消费端消费,生产端也成功监听到了ACK响应。
?
Return Listener
用于处理一些不可路由的消息!Return Listener
!?
addReturnListener
,生产端去监听这些不可达的消息,做一些后续处理,比如说,记录下消息日志,或者及时去跟踪记录,有可能重新设置一下就好了Mandatory
:如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么broker端自动删除该消息!?
public class ReturnProducer {
public static void main(String[] args) throws Exception {
//1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.244.11");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setHandshakeTimeout(20000);
//2 获取Connection
Connection connection = connectionFactory.newConnection();
//3 通过Connection创建一个新的Channel
Channel channel = connection.createChannel();
String exchange = "test_return_exchange";
//String routingKey = "return.save";
String routingKeyError = "abc.save";
String msg = "Hello RabbitMQ Return Message";
//添加return监听
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange,
String routingKey, BasicProperties properties, byte[] body) throws IOException {
//replyCode:响应码 replyText:响应信息
System.err.println("---------handle return----------");
System.err.println("replyCode: " + replyCode);
System.err.println("replyText: " + replyText);
System.err.println("exchange: " + exchange);
System.err.println("routingKey: " + routingKey);
//System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
}
});
//5 发送一条消息,第三个参数mandatory:必须设置为true
channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());
}
}
?
public class ReturnConsumer {
public static void main(String[] args) throws Exception {
//1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.244.11");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setHandshakeTimeout(20000);
//2 获取Connection
Connection connection = connectionFactory.newConnection();
//3 通过Connection创建一个新的Channel
Channel channel = connection.createChannel();
String exchangeName = "test_return_exchange";
String routingKey = "return.#";
String queueName = "test_return_queue";
//4 声明交换机和队列,然后进行绑定设置路由Key
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//5 创建消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, queueingConsumer);
while(true){
Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.err.println("消费者: " + msg);
}
}
}
?
运行说明
先启动消费端,访问管控台:http://ip:15672,检查Exchange和Queue是否设置OK,然后启动生产端。
由于生产端设置的是一个错误的路由key,所以消费端没有任何打印,而生产端打印了如下内容
如果我们将 Mandatory
属性设置为false,对于不可达的消息会被Broker直接删除,那么生产端就不会进行任何打印了。如果我们的路由key设置为正确的,那么消费端能够正确消费,生产端也不会进行任何打印。
原文:https://www.cnblogs.com/dwlovelife/p/10991371.html